Skip to content

[FLINK-39027][pipeline-connector][postgres] Support DDL for postgres pipeline connector#4259

Draft
zml1206 wants to merge 10 commits intoapache:masterfrom
zml1206:pg_ddl
Draft

[FLINK-39027][pipeline-connector][postgres] Support DDL for postgres pipeline connector#4259
zml1206 wants to merge 10 commits intoapache:masterfrom
zml1206:pg_ddl

Conversation

@zml1206
Copy link
Contributor

@zml1206 zml1206 commented Feb 4, 2026

Postgres WAL log cannot parse table structure change records, we can support this through event triggers.

Preliminary work

Create an event listener table in the synchronized database and associate it with event triggers to listen for the ddl_command_end and sql_drop events.

CREATE SCHEMA IF NOT EXISTS audit;

CREATE TABLE audit.ddl_log (
  id BIGSERIAL PRIMARY KEY,          -- 使用BIGSERIAL防止长期运行后溢出
  ddl_tag TEXT NOT NULL,             -- DDL命令类型(CREATE, ALTER, DROP等)
  object_type TEXT,                  -- 对象类型(TABLE, INDEX, FUNCTION等)
  object_identity TEXT,              -- 对象身份
  command_text TEXT,                 -- 完整的DDL命令文本
  executing_user TEXT DEFAULT CURRENT_USER, -- 执行DDL的用户
  created_at TIMESTAMP DEFAULT clock_timestamp() -- 更精确的时间戳
);

-- 创建处理 DDL 操作的触发器函数
CREATE OR REPLACE FUNCTION audit.log_ddl_event()
RETURNS event_trigger
SECURITY DEFINER -- 以函数拥有者权限执行,确保有权限插入日志表
AS $$
DECLARE
    r RECORD;
BEGIN
    FOR r IN SELECT * FROM pg_event_trigger_ddl_commands()
    LOOP
        -- 关键:防止递归触发!忽略对审计表自身的DDL操作。
        IF r.schema_name = 'audit' AND r.object_identity LIKE 'audit.ddl_log%' THEN
            RAISE DEBUG 'Ignoring DDL operation on audit table: %', r.object_identity;
            CONTINUE;
        END IF;

        INSERT INTO audit.ddl_log
            (ddl_tag, object_type, object_identity, command_text)
        VALUES
            (
              r.command_tag,
              r.object_type,
              r.object_identity,
              current_query()
            );
    END LOOP;
END;
$$ LANGUAGE plpgsql;

-- 创建处理 DROP 操作的触发器函数
CREATE OR REPLACE FUNCTION audit.log_drop_event()
RETURNS event_trigger
SECURITY DEFINER -- 以函数拥有者权限执行,确保有权限插入日志表
AS $$
DECLARE
    r RECORD;
BEGIN
    FOR r IN SELECT * FROM pg_event_trigger_dropped_objects()
    LOOP
        -- 关键:防止递归触发!忽略对审计表自身的DROP操作。
        IF r.schema_name = 'audit' AND r.object_identity LIKE 'audit.ddl_log%' THEN
            RAISE DEBUG 'Ignoring DROP operation on audit table: %', r.object_identity;
            CONTINUE;
        END IF;

        INSERT INTO audit.ddl_log
            (ddl_tag, object_type, object_identity, command_text)
        VALUES
            (
              'DROP', -- 对于 DROP 操作,手动设置 ddl_tag 为 'DROP'
              r.object_type,
              r.object_identity,
              current_query()
            );
    END LOOP;
END;
$$ LANGUAGE plpgsql;

-- 创建事件触发器
CREATE EVENT TRIGGER capture_ddl_trigger
    ON ddl_command_end
    EXECUTE FUNCTION audit.log_ddl_event();
    
CREATE EVENT TRIGGER log_drop
    ON sql_drop
    EXECUTE FUNCTION audit.log_drop_event();

Supported DDL statements

create table
drop table
alter add column
alter drop column
alter rename column
alter column datatype
alter column optional

Truncate table is not supported because PostgreSQL event triggers currently do not support the truncate table command.

fix

fix
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant