[FLINK-38959][pipeline-connector][postgres]Added support for schema change in the postgres pipeline connector#4233
Conversation
| "SELECT a.attname attname,a.attnum AS age_oid \n" + | ||
| "FROM pg_attribute a\n" + | ||
| "JOIN pg_class c ON a.attrelid = c.oid\n" + |
There was a problem hiding this comment.
Is the official recommended way to obtain history schema of a Postgres table ?
There was a problem hiding this comment.
Is the official recommended way to obtain history schema of a Postgres table ?
Here, we are only obtaining the OID of the schema field in the current state of the Postgres table, and determining whether the field has undergone a rename operation based on whether the OID remains consistent
c9048cc to
595f299
Compare
|
Could a configuration option be added to control the on/off state? Also, I'm implementing a way to parse DDL SQL to support DDL, which requires adding an event table in Postgres. The two methods don't seem to conflict, is this approach acceptable to the community? |
…hange in the postgres pipeline connector.
595f299 to
01a67b4
Compare
Already added to control the on/off state,thanks for suggestion! |
There was a problem hiding this comment.
Pull request overview
This pull request adds schema change event support to the Flink CDC Postgres pipeline connector. The implementation enables the connector to detect and emit schema change events (ADD/DROP/RENAME columns, ALTER column types) during change data capture operations.
Changes:
- Added
includeSchemaChangesconfiguration option to control schema change event emission - Implemented schema change detection logic by comparing before/after schemas and tracking column OIDs
- Added comprehensive test coverage for various schema evolution scenarios (add, drop, rename, type changes)
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 14 comments.
Show a summary per file
| File | Description |
|---|---|
| PostgresSourceBuilder.java | Added includeSchemaChanges() method to builder API |
| PostgresDataSourceOptions.java | Added SCHEMA_CHANGE_ENABLED configuration option with default value true |
| PostgresDataSourceFactory.java | Integrated schema change configuration into data source factory |
| PostgresSchemaUtils.java | Added methods to retrieve column OIDs for schema change tracking |
| PostgresDataSource.java | Initialized column OID maps and passed to deserializer |
| PostgresEventDeserializer.java | Core implementation of schema change detection and event generation logic |
| PostgresPipelineITCaseTest.java | Added parameterized tests for various schema evolution scenarios |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| columns.stream() | ||
| .filter(e -> e.getName().equals(afterFieldName)) | ||
| .findFirst() | ||
| .get(); |
There was a problem hiding this comment.
NoSuchElementException risk: The code uses .get() on an Optional at line 349 without checking if the element is present. If no column matches the filter condition (e.getName().equals(afterFieldName)), this will throw a NoSuchElementException. Use .orElseThrow() with a descriptive error message or handle the absent case appropriately.
| .get(); | |
| .orElseThrow( | |
| () -> | |
| new IllegalStateException( | |
| "Failed to find column with name '" | |
| + afterFieldName | |
| + "' in table " | |
| + tableId)); |
| if ((newDelEntry.getValue() == null && newAddEntry.getValue() == null) | ||
| || (newDelEntry.getValue() == null && newAddEntry.getValue() != null)) { | ||
| int oldFieldIndex = oldSchema.field(newDelEntry.getKey()).index(); | ||
| int afterFieldIndex = afterSchema.field(newAddEntry.getKey()).index(); | ||
| if (oldFieldIndex == afterFieldIndex) { | ||
| renameColumnMaps.put(newDelEntry.getKey(), newAddEntry.getKey()); | ||
| } |
There was a problem hiding this comment.
Logic issue in rename detection: The condition at line 385-386 compares null values but has problematic logic. When newDelEntry.getValue() is null and newAddEntry.getValue() is not null, it treats this as a potential rename. However, this is counterintuitive - if the deleted column has no OID (null) and the added column has an OID, they are likely different columns, not a rename. The logic should be reconsidered to ensure accurate rename detection.
| jdbc.query( | ||
| "SELECT b.relname,a.attname attname,a.attnum AS oid FROM pg_attribute a JOIN pg_class b ON a.attrelid = b.oid WHERE b.relname " | ||
| + inClause | ||
| + " and a.attnum > 0 and a.attisdropped = 'f' group by b.relname,a.attname,a.attnum", |
There was a problem hiding this comment.
Performance concern: The query at line 144-146 uses a GROUP BY clause unnecessarily when retrieving column OIDs. Since we're already filtering by attnum > 0 and attisdropped = 'f', each (relname, attname, attnum) combination should be unique without needing GROUP BY. Removing the GROUP BY would improve query performance.
| + " and a.attnum > 0 and a.attisdropped = 'f' group by b.relname,a.attname,a.attnum", | |
| + " and a.attnum > 0 and a.attisdropped = 'f'", |
| @Override | ||
| public List<? extends Event> deserialize(SourceRecord record) throws Exception { | ||
| List<Event> result = new ArrayList<>(); | ||
| if (postgresSourceConfig.isIncludeSchemaChanges()) { |
There was a problem hiding this comment.
Potential NullPointerException: At line 118, postgresSourceConfig.isIncludeSchemaChanges() is called without checking if postgresSourceConfig is null. The constructor allows postgresSourceConfig to be null (see constructors at lines 78-93), which would cause a NullPointerException here. Add a null check before calling isIncludeSchemaChanges().
| if (postgresSourceConfig.isIncludeSchemaChanges()) { | |
| if (postgresSourceConfig != null && postgresSourceConfig.isIncludeSchemaChanges()) { |
| try { | ||
| jdbc.query( | ||
| "SELECT a.attname attname,a.attnum as oid \n" | ||
| + "FROM pg_attribute a\n" | ||
| + "JOIN pg_class b ON a.attrelid = b.oid\n" | ||
| + "WHERE b.relname = '" | ||
| + tableId.getTableName() |
There was a problem hiding this comment.
Missing schema qualification in query: The query at lines 114-121 filters by table name (b.relname) only, without considering the schema/namespace. In PostgreSQL, multiple schemas can have tables with the same name. This could return incorrect column OIDs if tables with the same name exist in different schemas. The query should also filter by schema to ensure accurate results, especially since TableId includes schema information.
| try { | |
| jdbc.query( | |
| "SELECT a.attname attname,a.attnum as oid \n" | |
| + "FROM pg_attribute a\n" | |
| + "JOIN pg_class b ON a.attrelid = b.oid\n" | |
| + "WHERE b.relname = '" | |
| + tableId.getTableName() | |
| String tableName = tableId.getTableName().replace("'", "''"); | |
| String schemaName = tableId.getSchemaName().replace("'", "''"); | |
| try { | |
| jdbc.query( | |
| "SELECT a.attname attname,a.attnum as oid \n" | |
| + "FROM pg_attribute a\n" | |
| + "JOIN pg_class b ON a.attrelid = b.oid\n" | |
| + "JOIN pg_namespace n ON b.relnamespace = n.oid\n" | |
| + "WHERE b.relname = '" | |
| + tableName | |
| + "' AND n.nspname = '" | |
| + schemaName |
| beforeTableColumnsOidMaps = | ||
| PostgresSchemaUtils.getAllTablesColumnOids( | ||
| postgresSourceConfig, postgresSourceConfig.getTableList()); |
There was a problem hiding this comment.
Resource leak risk: The getAllTablesColumnOids method at line 76 opens a database connection internally. If this method throws an exception, the connection may not be properly closed. While the method uses try-with-resources internally, if an exception occurs between line 75 and 85 during deserializer construction, the partially initialized state could cause issues. Consider wrapping this in proper error handling.
| while (rs.next()) { | ||
| TableId tableId = TableId.tableId(rs.getString(1)); |
There was a problem hiding this comment.
Inconsistent TableId construction: At line 149, TableId.tableId() is called with only the table name (relname), which creates a single-component TableId. However, elsewhere in the code (e.g., line 145 in PostgresEventDeserializer), TableId.tableId(tableId.getTableName()) is also used with just the table name. This inconsistency can lead to lookup failures if the beforeTableColumnsOidMaps uses different TableId formats. The TableId should be constructed consistently with schema information to match how it's used throughout the code.
| + "WHERE b.relname = '" | ||
| + tableId.getTableName() | ||
| + "' AND a.attname " | ||
| + inClause, | ||
| rs -> { | ||
| while (rs.next()) { | ||
| oidMaps.put(rs.getString(1), rs.getInt(2)); | ||
| } | ||
| }); |
There was a problem hiding this comment.
SQL injection vulnerability: The tableId.getTableName() is directly concatenated into the SQL query without proper escaping or use of prepared statements. An attacker could exploit this by crafting a malicious table name. Consider using parameterized queries or properly escaping the table name.
| + "WHERE b.relname = '" | |
| + tableId.getTableName() | |
| + "' AND a.attname " | |
| + inClause, | |
| rs -> { | |
| while (rs.next()) { | |
| oidMaps.put(rs.getString(1), rs.getInt(2)); | |
| } | |
| }); | |
| + "WHERE b.relname = ? AND a.attname " | |
| + inClause, | |
| rs -> { | |
| while (rs.next()) { | |
| oidMaps.put(rs.getString(1), rs.getInt(2)); | |
| } | |
| }, | |
| tableId.getTableName()); |
| Map<TableId, Map<String, Integer>> tableOidMaps = new HashMap<>(); | ||
| String inClause = | ||
| tableList.stream() | ||
| .map(table -> "'" + table.split("\\.")[1].replace("'", "''") + "'") |
There was a problem hiding this comment.
Potential ArrayIndexOutOfBoundsException: The code assumes that table.split("\.") will always have at least 2 elements (accessing index [1]). If a table name doesn't contain a dot, this will throw an exception. Add validation to ensure the split result has the expected number of elements before accessing index 1.
| .map(table -> "'" + table.split("\\.")[1].replace("'", "''") + "'") | |
| .map( | |
| table -> { | |
| String[] parts = table.split("\\."); | |
| String tableName = | |
| parts.length > 1 ? parts[1] : parts[0]; | |
| return "'" + tableName.replace("'", "''") + "'"; | |
| }) |
| afterField = | ||
| afterSchema.field(finalRenameColumnMaps.get(oldFieldName)); | ||
| } |
There was a problem hiding this comment.
Potential NullPointerException: At line 329, afterField could still be null if both afterSchema.field(oldFieldName) returns null and finalRenameColumnMaps.get(oldFieldName) also returns null or the field doesn't exist. Attempting to call afterField.name() will throw a NullPointerException. Add a null check before accessing afterField.
| afterField = | |
| afterSchema.field(finalRenameColumnMaps.get(oldFieldName)); | |
| } | |
| String renamedFieldName = finalRenameColumnMaps.get(oldFieldName); | |
| if (renamedFieldName != null) { | |
| afterField = afterSchema.field(renamedFieldName); | |
| } | |
| } | |
| if (afterField == null) { | |
| // No corresponding field in the after-schema (even after rename); skip. | |
| return; | |
| } |
|
Hi @Mrart CC as you may be interested in this. |
| beforeTableColumnsOidMaps = | ||
| PostgresSchemaUtils.getAllTablesColumnOids( | ||
| postgresSourceConfig, postgresSourceConfig.getTableList()); |
There was a problem hiding this comment.
If the parallelism of PostgresDataSource is greater than 1, will there be any inconsistency issues here?
| Map<String, Integer> beforeColumnsOidMaps) { | ||
| Map<String, String> renameColumnMaps = new HashMap<>(); | ||
| Map<String, Integer> newAddColumnsOidMaps = | ||
| PostgresSchemaUtils.getColumnOids(postgresSourceConfig, tableId, newAddColumnNames); |
There was a problem hiding this comment.
Since DDL is driven by DML, consider this scenario:
- DDL is executed at T1
- DML is executed at T2
- DDL is executed at T3
If a query is performed at T2, the obtained OID might be from T3, which could cause problems.
By the way, it seems difficult to distinguish between DROP+ADD and Rename?
loserwang1024
left a comment
There was a problem hiding this comment.
Thanks for your job which mean a lot for CDC. I have left some comments:
| private final PostgresSourceConfig postgresSourceConfig; | ||
|
|
||
| private final List<PostgreSQLReadableMetadata> readableMetadataList; | ||
| Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps = new HashMap<>(); |
| public List<? extends Event> deserialize(SourceRecord record) throws Exception { | ||
| List<Event> result = new ArrayList<>(); | ||
| if (postgresSourceConfig.isIncludeSchemaChanges()) { | ||
| handleSchemaChange(record, result); |
There was a problem hiding this comment.
the record is not schema change , we have isSchemaChangeRecord(record) in the bellow. I think method named inferSchemaChangeByRecord will be better
| private List<PostgreSQLReadableMetadata> readableMetadataList; | ||
| private final boolean includeDatabaseInTableId; | ||
| private final String databaseName; | ||
| private Map<TableId, Schema> schemaMap = new HashMap<>(); |
There was a problem hiding this comment.
The schemaMap can be restored from the StreamSplit, specifically from its tableSchemas field upon job restart. To ensure correctness, the tableSchemas in the StreamSplit should be updated during each checkpoint to reflect any changes in the schema.
Otherwise, after a job restart, the schemaMap will be empty, and the previous schema will be null. In your current design, it seems that even if the schema of newly arriving data changes, no schema change event will be generated.
You can address this by passing the schema information through org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema#applyChangeEvent.
To be honest, I believe that DebeziumEventDeserializationSchema#createTableEventCache is not necessary for PostgreSQL CDC. We can achieve the same — and more powerful — functionality using the schemaMap.
✅ My Proposed Design for CreateTableEvent
-
Replace
PostgresPipelineRecordEmitter#createTableEventCachewithschemaMap:
UseschemaMapas the source of truth for table schemas instead of maintaining a separate cache. -
Remove the logic in
PostgresPipelineRecordEmitterthat generatesCreateTableEventfrom the table ID:// In rare cases, we may miss some CreateTableEvents before DataChangeEvents. // Don't send CreateTableEvent for SchemaChangeEvents as it's the latest schema. if (isDataChangeRecord && !createTableEventCache.containsKey(tableId)) { CreateTableEvent createTableEvent = getCreateTableEvent(sourceConfig, tableId); output.collect((T) createTableEvent); createTableEventCache.put(tableId, createTableEvent); }
-
Move schema change handling to
PostgresEventDeserializer#handleSchemaChange:
If the previous schema isnull, emit aCreateTableEvent. This ensures that all schema change events are centralized within the deserializer rather than being scattered across different components.
✅ Summary
By centralizing schema change handling in PostgresEventDeserializer and leveraging schemaMap via StreamSplit, we can simplify the architecture, reduce redundancy, and improve the robustness of schema tracking in PostgreSQL CDC scenarios.
Let me know if you'd like help writing this as a pull request description, or need further clarification on any part!
@leonardBang , WDYT?
| List<String> afterColumnNames; | ||
| Schema afterSchema = fieldSchema(valueSchema, Envelope.FieldName.AFTER); | ||
| List<Field> afterFields = afterSchema.fields(); | ||
| org.apache.flink.cdc.common.schema.Schema schema = |
There was a problem hiding this comment.
org.apache.flink.cdc.common.schema.Schema schema =
PostgresSchemaUtils.getTableSchema(postgresSourceConfig, tableId);It's too heavy that lookup current schema from database for each record. In production, it cannot work.
Why not just just afterSchema and beforeSchema? why stiil need to lookup current schema?
| private final PostgresSourceConfig postgresSourceConfig; | ||
|
|
||
| private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); | ||
| private Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps; |
There was a problem hiding this comment.
problem
The beforeTableColumnsOidMaps is only populated during the YAML compilation phase. When the job is restarted without recompilation—such as in the case of a failover or other non-recompile-based restarts—any modifications to this map within PostgresEventDeserializer will not take effect.
As a result, two potential issues may arise:
- If the job has significant read lag, and it is later recompiled and restarted, the beforeTableColumnsOidMaps could be newer than the actual data being processed.
- If the job is simply restarted without recompilation, PostgresPipelineSource recovers from its serialized state and retrieves an outdated initial value that lags significantly behind the current log position.
suggestion
However, I question whether comparing OIDs is strictly necessary to determine if a table has been renamed or dropped and recreated. In the default lenient mode:
- Column Rename: In downstream systems, a renamed column is typically handled by omitting the old column and introducing a new one. This is functionally similar to removing the column (which is ignored) and then adding a new one with the same name.
- Drop and Recreate Same Column Name: In lenient mode, this scenario behaves identically to no change at all.
Therefore, I recommend not using beforeTableColumnsOidMaps for now, especially since the evolve mode is still immature and large-scale data scenarios generally avoid dropping or modifying downstream data structures. This would significantly simplify the current design.
@leonardBang , WDYT?
| List<Event> expected; | ||
| Map<String, String> renameMap = new HashMap<>(); | ||
| switch (testType) { | ||
| case "modifyType": |
There was a problem hiding this comment.
In your code,also includes change of oldField.schema().defaultValue() and oldField.schema().parameters(), please also test them.
|
@linjianchang if you don't have time, could I help to modified based on your CR? And I will reserve your commit. |
|
I have reviewd this PR(by @linjianchang )and #4259 by @zml1206 . Thanks for both hard work! I have also done some research on the Debezium codebase and PostgreSQL replication, and I have another design idea that prefers to leverage PostgreSQL’s native capabilities more fully. So I’d like to discuss it with you. PostgreSQL Pgoutput's relation message in logical replicationAs document of PostgreSQL 16 says: Section 55.5.3 - Logical Replication Protocol Message Flow:
When DDL changes are executed in PostgreSQL, no corresponding logs are generated. However, if the pgoutput sender is about to send the first DML message for a new schema, it will first send a Relation message. The relation message will include the schema of table. how Debezium use it?Depending on The decoder plug-in, schema updates take two completely different paths: What cdc need to do?Therefore, my personal opinion:
In this way, we can avoid intrusive changes to PostgreSQL through triggers, and there is also no need to compare schemas for every message. More importantly, it updates the schemas stored in the split, so schema consistency can still be guaranteed after state recovery or restart. Though only useful for pgoutput, but I think it's enough. Because, the pgoutput is the only official replication plugin. @linjianchang @zml1206 @leonardBang , WDYT? here is my poc code: https://github.com/loserwang1024/flink-cdc-connectors/tree/poc-schema-change-event |
Thanks @loserwang1024 for the detailed investigation!Here are my opinions: (1) Querying the PostgreSQL database table for every CDC event is not feasible — this approach will directly overwhelm and crash the business database once the data volume grows even slightly. (2) The trigger-based solution is also unacceptable, as it directly intrudes into the business write workflow on the database. Downstream CDC subscribers may not have the permission to implement this, and requiring all business teams to accept such triggers is not realistic. The approach of subscribing to the latest table schema based on OID in real-time is conceptually sound. It is non-intrusive to the business database, avoids frequent polling of the upstream DB as traffic increases, and ensures the ordering between schema events and data events. Debezium also uses this mechanism — +1 to implementing our solution based on this approach. |
+1 |
Sure, I haven't had time to make the revision recently because I've been busy with other things,thanks for review! |


Added support for schema change in the postgres pipeline connector