Skip to content

[FLINK-38959][pipeline-connector][postgres]Added support for schema change in the postgres pipeline connector#4233

Open
linjianchang wants to merge 1 commit intoapache:masterfrom
linjianchang:master-pgschemachang
Open

[FLINK-38959][pipeline-connector][postgres]Added support for schema change in the postgres pipeline connector#4233
linjianchang wants to merge 1 commit intoapache:masterfrom
linjianchang:master-pgschemachang

Conversation

@linjianchang
Copy link
Contributor

Added support for schema change in the postgres pipeline connector

@lvyanquan lvyanquan self-assigned this Jan 22, 2026
@lvyanquan lvyanquan added this to the V3.6.0 milestone Jan 22, 2026
Comment on lines 113 to 115
"SELECT a.attname attname,a.attnum AS age_oid \n" +
"FROM pg_attribute a\n" +
"JOIN pg_class c ON a.attrelid = c.oid\n" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the official recommended way to obtain history schema of a Postgres table ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the official recommended way to obtain history schema of a Postgres table ?

Here, we are only obtaining the OID of the schema field in the current state of the Postgres table, and determining whether the field has undergone a rename operation based on whether the OID remains consistent

@linjianchang linjianchang force-pushed the master-pgschemachang branch 2 times, most recently from c9048cc to 595f299 Compare January 23, 2026 08:43
@zml1206
Copy link
Contributor

zml1206 commented Jan 26, 2026

Could a configuration option be added to control the on/off state? Also, I'm implementing a way to parse DDL SQL to support DDL, which requires adding an event table in Postgres. The two methods don't seem to conflict, is this approach acceptable to the community?

@linjianchang
Copy link
Contributor Author

added to control the on/off state

Already added to control the on/off state,thanks for suggestion!

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request adds schema change event support to the Flink CDC Postgres pipeline connector. The implementation enables the connector to detect and emit schema change events (ADD/DROP/RENAME columns, ALTER column types) during change data capture operations.

Changes:

  • Added includeSchemaChanges configuration option to control schema change event emission
  • Implemented schema change detection logic by comparing before/after schemas and tracking column OIDs
  • Added comprehensive test coverage for various schema evolution scenarios (add, drop, rename, type changes)

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 14 comments.

Show a summary per file
File Description
PostgresSourceBuilder.java Added includeSchemaChanges() method to builder API
PostgresDataSourceOptions.java Added SCHEMA_CHANGE_ENABLED configuration option with default value true
PostgresDataSourceFactory.java Integrated schema change configuration into data source factory
PostgresSchemaUtils.java Added methods to retrieve column OIDs for schema change tracking
PostgresDataSource.java Initialized column OID maps and passed to deserializer
PostgresEventDeserializer.java Core implementation of schema change detection and event generation logic
PostgresPipelineITCaseTest.java Added parameterized tests for various schema evolution scenarios

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

columns.stream()
.filter(e -> e.getName().equals(afterFieldName))
.findFirst()
.get();
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NoSuchElementException risk: The code uses .get() on an Optional at line 349 without checking if the element is present. If no column matches the filter condition (e.getName().equals(afterFieldName)), this will throw a NoSuchElementException. Use .orElseThrow() with a descriptive error message or handle the absent case appropriately.

Suggested change
.get();
.orElseThrow(
() ->
new IllegalStateException(
"Failed to find column with name '"
+ afterFieldName
+ "' in table "
+ tableId));

Copilot uses AI. Check for mistakes.
Comment on lines +385 to +391
if ((newDelEntry.getValue() == null && newAddEntry.getValue() == null)
|| (newDelEntry.getValue() == null && newAddEntry.getValue() != null)) {
int oldFieldIndex = oldSchema.field(newDelEntry.getKey()).index();
int afterFieldIndex = afterSchema.field(newAddEntry.getKey()).index();
if (oldFieldIndex == afterFieldIndex) {
renameColumnMaps.put(newDelEntry.getKey(), newAddEntry.getKey());
}
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic issue in rename detection: The condition at line 385-386 compares null values but has problematic logic. When newDelEntry.getValue() is null and newAddEntry.getValue() is not null, it treats this as a potential rename. However, this is counterintuitive - if the deleted column has no OID (null) and the added column has an OID, they are likely different columns, not a rename. The logic should be reconsidered to ensure accurate rename detection.

Copilot uses AI. Check for mistakes.
jdbc.query(
"SELECT b.relname,a.attname attname,a.attnum AS oid FROM pg_attribute a JOIN pg_class b ON a.attrelid = b.oid WHERE b.relname "
+ inClause
+ " and a.attnum > 0 and a.attisdropped = 'f' group by b.relname,a.attname,a.attnum",
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performance concern: The query at line 144-146 uses a GROUP BY clause unnecessarily when retrieving column OIDs. Since we're already filtering by attnum > 0 and attisdropped = 'f', each (relname, attname, attnum) combination should be unique without needing GROUP BY. Removing the GROUP BY would improve query performance.

Suggested change
+ " and a.attnum > 0 and a.attisdropped = 'f' group by b.relname,a.attname,a.attnum",
+ " and a.attnum > 0 and a.attisdropped = 'f'",

Copilot uses AI. Check for mistakes.
@Override
public List<? extends Event> deserialize(SourceRecord record) throws Exception {
List<Event> result = new ArrayList<>();
if (postgresSourceConfig.isIncludeSchemaChanges()) {
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential NullPointerException: At line 118, postgresSourceConfig.isIncludeSchemaChanges() is called without checking if postgresSourceConfig is null. The constructor allows postgresSourceConfig to be null (see constructors at lines 78-93), which would cause a NullPointerException here. Add a null check before calling isIncludeSchemaChanges().

Suggested change
if (postgresSourceConfig.isIncludeSchemaChanges()) {
if (postgresSourceConfig != null && postgresSourceConfig.isIncludeSchemaChanges()) {

Copilot uses AI. Check for mistakes.
Comment on lines +113 to +119
try {
jdbc.query(
"SELECT a.attname attname,a.attnum as oid \n"
+ "FROM pg_attribute a\n"
+ "JOIN pg_class b ON a.attrelid = b.oid\n"
+ "WHERE b.relname = '"
+ tableId.getTableName()
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing schema qualification in query: The query at lines 114-121 filters by table name (b.relname) only, without considering the schema/namespace. In PostgreSQL, multiple schemas can have tables with the same name. This could return incorrect column OIDs if tables with the same name exist in different schemas. The query should also filter by schema to ensure accurate results, especially since TableId includes schema information.

Suggested change
try {
jdbc.query(
"SELECT a.attname attname,a.attnum as oid \n"
+ "FROM pg_attribute a\n"
+ "JOIN pg_class b ON a.attrelid = b.oid\n"
+ "WHERE b.relname = '"
+ tableId.getTableName()
String tableName = tableId.getTableName().replace("'", "''");
String schemaName = tableId.getSchemaName().replace("'", "''");
try {
jdbc.query(
"SELECT a.attname attname,a.attnum as oid \n"
+ "FROM pg_attribute a\n"
+ "JOIN pg_class b ON a.attrelid = b.oid\n"
+ "JOIN pg_namespace n ON b.relnamespace = n.oid\n"
+ "WHERE b.relname = '"
+ tableName
+ "' AND n.nspname = '"
+ schemaName

Copilot uses AI. Check for mistakes.
Comment on lines +75 to +77
beforeTableColumnsOidMaps =
PostgresSchemaUtils.getAllTablesColumnOids(
postgresSourceConfig, postgresSourceConfig.getTableList());
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource leak risk: The getAllTablesColumnOids method at line 76 opens a database connection internally. If this method throws an exception, the connection may not be properly closed. While the method uses try-with-resources internally, if an exception occurs between line 75 and 85 during deserializer construction, the partially initialized state could cause issues. Consider wrapping this in proper error handling.

Copilot uses AI. Check for mistakes.
Comment on lines +148 to +149
while (rs.next()) {
TableId tableId = TableId.tableId(rs.getString(1));
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent TableId construction: At line 149, TableId.tableId() is called with only the table name (relname), which creates a single-component TableId. However, elsewhere in the code (e.g., line 145 in PostgresEventDeserializer), TableId.tableId(tableId.getTableName()) is also used with just the table name. This inconsistency can lead to lookup failures if the beforeTableColumnsOidMaps uses different TableId formats. The TableId should be constructed consistently with schema information to match how it's used throughout the code.

Copilot uses AI. Check for mistakes.
Comment on lines +118 to +126
+ "WHERE b.relname = '"
+ tableId.getTableName()
+ "' AND a.attname "
+ inClause,
rs -> {
while (rs.next()) {
oidMaps.put(rs.getString(1), rs.getInt(2));
}
});
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQL injection vulnerability: The tableId.getTableName() is directly concatenated into the SQL query without proper escaping or use of prepared statements. An attacker could exploit this by crafting a malicious table name. Consider using parameterized queries or properly escaping the table name.

Suggested change
+ "WHERE b.relname = '"
+ tableId.getTableName()
+ "' AND a.attname "
+ inClause,
rs -> {
while (rs.next()) {
oidMaps.put(rs.getString(1), rs.getInt(2));
}
});
+ "WHERE b.relname = ? AND a.attname "
+ inClause,
rs -> {
while (rs.next()) {
oidMaps.put(rs.getString(1), rs.getInt(2));
}
},
tableId.getTableName());

Copilot uses AI. Check for mistakes.
Map<TableId, Map<String, Integer>> tableOidMaps = new HashMap<>();
String inClause =
tableList.stream()
.map(table -> "'" + table.split("\\.")[1].replace("'", "''") + "'")
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential ArrayIndexOutOfBoundsException: The code assumes that table.split("\.") will always have at least 2 elements (accessing index [1]). If a table name doesn't contain a dot, this will throw an exception. Add validation to ensure the split result has the expected number of elements before accessing index 1.

Suggested change
.map(table -> "'" + table.split("\\.")[1].replace("'", "''") + "'")
.map(
table -> {
String[] parts = table.split("\\.");
String tableName =
parts.length > 1 ? parts[1] : parts[0];
return "'" + tableName.replace("'", "''") + "'";
})

Copilot uses AI. Check for mistakes.
Comment on lines +326 to +328
afterField =
afterSchema.field(finalRenameColumnMaps.get(oldFieldName));
}
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential NullPointerException: At line 329, afterField could still be null if both afterSchema.field(oldFieldName) returns null and finalRenameColumnMaps.get(oldFieldName) also returns null or the field doesn't exist. Attempting to call afterField.name() will throw a NullPointerException. Add a null check before accessing afterField.

Suggested change
afterField =
afterSchema.field(finalRenameColumnMaps.get(oldFieldName));
}
String renamedFieldName = finalRenameColumnMaps.get(oldFieldName);
if (renamedFieldName != null) {
afterField = afterSchema.field(renamedFieldName);
}
}
if (afterField == null) {
// No corresponding field in the after-schema (even after rename); skip.
return;
}

Copilot uses AI. Check for mistakes.
@lvyanquan
Copy link
Contributor

Hi @Mrart CC as you may be interested in this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants