[FLINK-38959][pipeline-connector][postgres]Added support for schema change in the postgres pipeline connector#4233
Conversation
| "SELECT a.attname attname,a.attnum AS age_oid \n" + | ||
| "FROM pg_attribute a\n" + | ||
| "JOIN pg_class c ON a.attrelid = c.oid\n" + |
There was a problem hiding this comment.
Is the official recommended way to obtain history schema of a Postgres table ?
There was a problem hiding this comment.
Is the official recommended way to obtain history schema of a Postgres table ?
Here, we are only obtaining the OID of the schema field in the current state of the Postgres table, and determining whether the field has undergone a rename operation based on whether the OID remains consistent
c9048cc to
595f299
Compare
|
Could a configuration option be added to control the on/off state? Also, I'm implementing a way to parse DDL SQL to support DDL, which requires adding an event table in Postgres. The two methods don't seem to conflict, is this approach acceptable to the community? |
…hange in the postgres pipeline connector.
595f299 to
01a67b4
Compare
Already added to control the on/off state,thanks for suggestion! |
There was a problem hiding this comment.
Pull request overview
This pull request adds schema change event support to the Flink CDC Postgres pipeline connector. The implementation enables the connector to detect and emit schema change events (ADD/DROP/RENAME columns, ALTER column types) during change data capture operations.
Changes:
- Added
includeSchemaChangesconfiguration option to control schema change event emission - Implemented schema change detection logic by comparing before/after schemas and tracking column OIDs
- Added comprehensive test coverage for various schema evolution scenarios (add, drop, rename, type changes)
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 14 comments.
Show a summary per file
| File | Description |
|---|---|
| PostgresSourceBuilder.java | Added includeSchemaChanges() method to builder API |
| PostgresDataSourceOptions.java | Added SCHEMA_CHANGE_ENABLED configuration option with default value true |
| PostgresDataSourceFactory.java | Integrated schema change configuration into data source factory |
| PostgresSchemaUtils.java | Added methods to retrieve column OIDs for schema change tracking |
| PostgresDataSource.java | Initialized column OID maps and passed to deserializer |
| PostgresEventDeserializer.java | Core implementation of schema change detection and event generation logic |
| PostgresPipelineITCaseTest.java | Added parameterized tests for various schema evolution scenarios |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| columns.stream() | ||
| .filter(e -> e.getName().equals(afterFieldName)) | ||
| .findFirst() | ||
| .get(); |
There was a problem hiding this comment.
NoSuchElementException risk: The code uses .get() on an Optional at line 349 without checking if the element is present. If no column matches the filter condition (e.getName().equals(afterFieldName)), this will throw a NoSuchElementException. Use .orElseThrow() with a descriptive error message or handle the absent case appropriately.
| .get(); | |
| .orElseThrow( | |
| () -> | |
| new IllegalStateException( | |
| "Failed to find column with name '" | |
| + afterFieldName | |
| + "' in table " | |
| + tableId)); |
| if ((newDelEntry.getValue() == null && newAddEntry.getValue() == null) | ||
| || (newDelEntry.getValue() == null && newAddEntry.getValue() != null)) { | ||
| int oldFieldIndex = oldSchema.field(newDelEntry.getKey()).index(); | ||
| int afterFieldIndex = afterSchema.field(newAddEntry.getKey()).index(); | ||
| if (oldFieldIndex == afterFieldIndex) { | ||
| renameColumnMaps.put(newDelEntry.getKey(), newAddEntry.getKey()); | ||
| } |
There was a problem hiding this comment.
Logic issue in rename detection: The condition at line 385-386 compares null values but has problematic logic. When newDelEntry.getValue() is null and newAddEntry.getValue() is not null, it treats this as a potential rename. However, this is counterintuitive - if the deleted column has no OID (null) and the added column has an OID, they are likely different columns, not a rename. The logic should be reconsidered to ensure accurate rename detection.
| jdbc.query( | ||
| "SELECT b.relname,a.attname attname,a.attnum AS oid FROM pg_attribute a JOIN pg_class b ON a.attrelid = b.oid WHERE b.relname " | ||
| + inClause | ||
| + " and a.attnum > 0 and a.attisdropped = 'f' group by b.relname,a.attname,a.attnum", |
There was a problem hiding this comment.
Performance concern: The query at line 144-146 uses a GROUP BY clause unnecessarily when retrieving column OIDs. Since we're already filtering by attnum > 0 and attisdropped = 'f', each (relname, attname, attnum) combination should be unique without needing GROUP BY. Removing the GROUP BY would improve query performance.
| + " and a.attnum > 0 and a.attisdropped = 'f' group by b.relname,a.attname,a.attnum", | |
| + " and a.attnum > 0 and a.attisdropped = 'f'", |
| @Override | ||
| public List<? extends Event> deserialize(SourceRecord record) throws Exception { | ||
| List<Event> result = new ArrayList<>(); | ||
| if (postgresSourceConfig.isIncludeSchemaChanges()) { |
There was a problem hiding this comment.
Potential NullPointerException: At line 118, postgresSourceConfig.isIncludeSchemaChanges() is called without checking if postgresSourceConfig is null. The constructor allows postgresSourceConfig to be null (see constructors at lines 78-93), which would cause a NullPointerException here. Add a null check before calling isIncludeSchemaChanges().
| if (postgresSourceConfig.isIncludeSchemaChanges()) { | |
| if (postgresSourceConfig != null && postgresSourceConfig.isIncludeSchemaChanges()) { |
| try { | ||
| jdbc.query( | ||
| "SELECT a.attname attname,a.attnum as oid \n" | ||
| + "FROM pg_attribute a\n" | ||
| + "JOIN pg_class b ON a.attrelid = b.oid\n" | ||
| + "WHERE b.relname = '" | ||
| + tableId.getTableName() |
There was a problem hiding this comment.
Missing schema qualification in query: The query at lines 114-121 filters by table name (b.relname) only, without considering the schema/namespace. In PostgreSQL, multiple schemas can have tables with the same name. This could return incorrect column OIDs if tables with the same name exist in different schemas. The query should also filter by schema to ensure accurate results, especially since TableId includes schema information.
| try { | |
| jdbc.query( | |
| "SELECT a.attname attname,a.attnum as oid \n" | |
| + "FROM pg_attribute a\n" | |
| + "JOIN pg_class b ON a.attrelid = b.oid\n" | |
| + "WHERE b.relname = '" | |
| + tableId.getTableName() | |
| String tableName = tableId.getTableName().replace("'", "''"); | |
| String schemaName = tableId.getSchemaName().replace("'", "''"); | |
| try { | |
| jdbc.query( | |
| "SELECT a.attname attname,a.attnum as oid \n" | |
| + "FROM pg_attribute a\n" | |
| + "JOIN pg_class b ON a.attrelid = b.oid\n" | |
| + "JOIN pg_namespace n ON b.relnamespace = n.oid\n" | |
| + "WHERE b.relname = '" | |
| + tableName | |
| + "' AND n.nspname = '" | |
| + schemaName |
| beforeTableColumnsOidMaps = | ||
| PostgresSchemaUtils.getAllTablesColumnOids( | ||
| postgresSourceConfig, postgresSourceConfig.getTableList()); |
There was a problem hiding this comment.
Resource leak risk: The getAllTablesColumnOids method at line 76 opens a database connection internally. If this method throws an exception, the connection may not be properly closed. While the method uses try-with-resources internally, if an exception occurs between line 75 and 85 during deserializer construction, the partially initialized state could cause issues. Consider wrapping this in proper error handling.
| while (rs.next()) { | ||
| TableId tableId = TableId.tableId(rs.getString(1)); |
There was a problem hiding this comment.
Inconsistent TableId construction: At line 149, TableId.tableId() is called with only the table name (relname), which creates a single-component TableId. However, elsewhere in the code (e.g., line 145 in PostgresEventDeserializer), TableId.tableId(tableId.getTableName()) is also used with just the table name. This inconsistency can lead to lookup failures if the beforeTableColumnsOidMaps uses different TableId formats. The TableId should be constructed consistently with schema information to match how it's used throughout the code.
| + "WHERE b.relname = '" | ||
| + tableId.getTableName() | ||
| + "' AND a.attname " | ||
| + inClause, | ||
| rs -> { | ||
| while (rs.next()) { | ||
| oidMaps.put(rs.getString(1), rs.getInt(2)); | ||
| } | ||
| }); |
There was a problem hiding this comment.
SQL injection vulnerability: The tableId.getTableName() is directly concatenated into the SQL query without proper escaping or use of prepared statements. An attacker could exploit this by crafting a malicious table name. Consider using parameterized queries or properly escaping the table name.
| + "WHERE b.relname = '" | |
| + tableId.getTableName() | |
| + "' AND a.attname " | |
| + inClause, | |
| rs -> { | |
| while (rs.next()) { | |
| oidMaps.put(rs.getString(1), rs.getInt(2)); | |
| } | |
| }); | |
| + "WHERE b.relname = ? AND a.attname " | |
| + inClause, | |
| rs -> { | |
| while (rs.next()) { | |
| oidMaps.put(rs.getString(1), rs.getInt(2)); | |
| } | |
| }, | |
| tableId.getTableName()); |
| Map<TableId, Map<String, Integer>> tableOidMaps = new HashMap<>(); | ||
| String inClause = | ||
| tableList.stream() | ||
| .map(table -> "'" + table.split("\\.")[1].replace("'", "''") + "'") |
There was a problem hiding this comment.
Potential ArrayIndexOutOfBoundsException: The code assumes that table.split("\.") will always have at least 2 elements (accessing index [1]). If a table name doesn't contain a dot, this will throw an exception. Add validation to ensure the split result has the expected number of elements before accessing index 1.
| .map(table -> "'" + table.split("\\.")[1].replace("'", "''") + "'") | |
| .map( | |
| table -> { | |
| String[] parts = table.split("\\."); | |
| String tableName = | |
| parts.length > 1 ? parts[1] : parts[0]; | |
| return "'" + tableName.replace("'", "''") + "'"; | |
| }) |
| afterField = | ||
| afterSchema.field(finalRenameColumnMaps.get(oldFieldName)); | ||
| } |
There was a problem hiding this comment.
Potential NullPointerException: At line 329, afterField could still be null if both afterSchema.field(oldFieldName) returns null and finalRenameColumnMaps.get(oldFieldName) also returns null or the field doesn't exist. Attempting to call afterField.name() will throw a NullPointerException. Add a null check before accessing afterField.
| afterField = | |
| afterSchema.field(finalRenameColumnMaps.get(oldFieldName)); | |
| } | |
| String renamedFieldName = finalRenameColumnMaps.get(oldFieldName); | |
| if (renamedFieldName != null) { | |
| afterField = afterSchema.field(renamedFieldName); | |
| } | |
| } | |
| if (afterField == null) { | |
| // No corresponding field in the after-schema (even after rename); skip. | |
| return; | |
| } |
|
Hi @Mrart CC as you may be interested in this. |
Added support for schema change in the postgres pipeline connector