Skip to content

[FLINK-38959][pipeline-connector][postgres]Added support for schema change in the postgres pipeline connector#4233

Open
linjianchang wants to merge 1 commit intoapache:masterfrom
linjianchang:master-pgschemachang
Open

[FLINK-38959][pipeline-connector][postgres]Added support for schema change in the postgres pipeline connector#4233
linjianchang wants to merge 1 commit intoapache:masterfrom
linjianchang:master-pgschemachang

Conversation

@linjianchang
Copy link
Contributor

Added support for schema change in the postgres pipeline connector

@lvyanquan lvyanquan self-assigned this Jan 22, 2026
@lvyanquan lvyanquan added this to the V3.6.0 milestone Jan 22, 2026
Comment on lines +113 to +115
"SELECT a.attname attname,a.attnum AS age_oid \n" +
"FROM pg_attribute a\n" +
"JOIN pg_class c ON a.attrelid = c.oid\n" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the official recommended way to obtain history schema of a Postgres table ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the official recommended way to obtain history schema of a Postgres table ?

Here, we are only obtaining the OID of the schema field in the current state of the Postgres table, and determining whether the field has undergone a rename operation based on whether the OID remains consistent

@linjianchang linjianchang force-pushed the master-pgschemachang branch 2 times, most recently from c9048cc to 595f299 Compare January 23, 2026 08:43
@zml1206
Copy link
Contributor

zml1206 commented Jan 26, 2026

Could a configuration option be added to control the on/off state? Also, I'm implementing a way to parse DDL SQL to support DDL, which requires adding an event table in Postgres. The two methods don't seem to conflict, is this approach acceptable to the community?

@linjianchang
Copy link
Contributor Author

added to control the on/off state

Already added to control the on/off state,thanks for suggestion!

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request adds schema change event support to the Flink CDC Postgres pipeline connector. The implementation enables the connector to detect and emit schema change events (ADD/DROP/RENAME columns, ALTER column types) during change data capture operations.

Changes:

  • Added includeSchemaChanges configuration option to control schema change event emission
  • Implemented schema change detection logic by comparing before/after schemas and tracking column OIDs
  • Added comprehensive test coverage for various schema evolution scenarios (add, drop, rename, type changes)

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 14 comments.

Show a summary per file
File Description
PostgresSourceBuilder.java Added includeSchemaChanges() method to builder API
PostgresDataSourceOptions.java Added SCHEMA_CHANGE_ENABLED configuration option with default value true
PostgresDataSourceFactory.java Integrated schema change configuration into data source factory
PostgresSchemaUtils.java Added methods to retrieve column OIDs for schema change tracking
PostgresDataSource.java Initialized column OID maps and passed to deserializer
PostgresEventDeserializer.java Core implementation of schema change detection and event generation logic
PostgresPipelineITCaseTest.java Added parameterized tests for various schema evolution scenarios

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

columns.stream()
.filter(e -> e.getName().equals(afterFieldName))
.findFirst()
.get();
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NoSuchElementException risk: The code uses .get() on an Optional at line 349 without checking if the element is present. If no column matches the filter condition (e.getName().equals(afterFieldName)), this will throw a NoSuchElementException. Use .orElseThrow() with a descriptive error message or handle the absent case appropriately.

Suggested change
.get();
.orElseThrow(
() ->
new IllegalStateException(
"Failed to find column with name '"
+ afterFieldName
+ "' in table "
+ tableId));

Copilot uses AI. Check for mistakes.
Comment on lines +385 to +391
if ((newDelEntry.getValue() == null && newAddEntry.getValue() == null)
|| (newDelEntry.getValue() == null && newAddEntry.getValue() != null)) {
int oldFieldIndex = oldSchema.field(newDelEntry.getKey()).index();
int afterFieldIndex = afterSchema.field(newAddEntry.getKey()).index();
if (oldFieldIndex == afterFieldIndex) {
renameColumnMaps.put(newDelEntry.getKey(), newAddEntry.getKey());
}
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic issue in rename detection: The condition at line 385-386 compares null values but has problematic logic. When newDelEntry.getValue() is null and newAddEntry.getValue() is not null, it treats this as a potential rename. However, this is counterintuitive - if the deleted column has no OID (null) and the added column has an OID, they are likely different columns, not a rename. The logic should be reconsidered to ensure accurate rename detection.

Copilot uses AI. Check for mistakes.
jdbc.query(
"SELECT b.relname,a.attname attname,a.attnum AS oid FROM pg_attribute a JOIN pg_class b ON a.attrelid = b.oid WHERE b.relname "
+ inClause
+ " and a.attnum > 0 and a.attisdropped = 'f' group by b.relname,a.attname,a.attnum",
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performance concern: The query at line 144-146 uses a GROUP BY clause unnecessarily when retrieving column OIDs. Since we're already filtering by attnum > 0 and attisdropped = 'f', each (relname, attname, attnum) combination should be unique without needing GROUP BY. Removing the GROUP BY would improve query performance.

Suggested change
+ " and a.attnum > 0 and a.attisdropped = 'f' group by b.relname,a.attname,a.attnum",
+ " and a.attnum > 0 and a.attisdropped = 'f'",

Copilot uses AI. Check for mistakes.
@Override
public List<? extends Event> deserialize(SourceRecord record) throws Exception {
List<Event> result = new ArrayList<>();
if (postgresSourceConfig.isIncludeSchemaChanges()) {
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential NullPointerException: At line 118, postgresSourceConfig.isIncludeSchemaChanges() is called without checking if postgresSourceConfig is null. The constructor allows postgresSourceConfig to be null (see constructors at lines 78-93), which would cause a NullPointerException here. Add a null check before calling isIncludeSchemaChanges().

Suggested change
if (postgresSourceConfig.isIncludeSchemaChanges()) {
if (postgresSourceConfig != null && postgresSourceConfig.isIncludeSchemaChanges()) {

Copilot uses AI. Check for mistakes.
Comment on lines +113 to +119
try {
jdbc.query(
"SELECT a.attname attname,a.attnum as oid \n"
+ "FROM pg_attribute a\n"
+ "JOIN pg_class b ON a.attrelid = b.oid\n"
+ "WHERE b.relname = '"
+ tableId.getTableName()
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing schema qualification in query: The query at lines 114-121 filters by table name (b.relname) only, without considering the schema/namespace. In PostgreSQL, multiple schemas can have tables with the same name. This could return incorrect column OIDs if tables with the same name exist in different schemas. The query should also filter by schema to ensure accurate results, especially since TableId includes schema information.

Suggested change
try {
jdbc.query(
"SELECT a.attname attname,a.attnum as oid \n"
+ "FROM pg_attribute a\n"
+ "JOIN pg_class b ON a.attrelid = b.oid\n"
+ "WHERE b.relname = '"
+ tableId.getTableName()
String tableName = tableId.getTableName().replace("'", "''");
String schemaName = tableId.getSchemaName().replace("'", "''");
try {
jdbc.query(
"SELECT a.attname attname,a.attnum as oid \n"
+ "FROM pg_attribute a\n"
+ "JOIN pg_class b ON a.attrelid = b.oid\n"
+ "JOIN pg_namespace n ON b.relnamespace = n.oid\n"
+ "WHERE b.relname = '"
+ tableName
+ "' AND n.nspname = '"
+ schemaName

Copilot uses AI. Check for mistakes.
Comment on lines +75 to +77
beforeTableColumnsOidMaps =
PostgresSchemaUtils.getAllTablesColumnOids(
postgresSourceConfig, postgresSourceConfig.getTableList());
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource leak risk: The getAllTablesColumnOids method at line 76 opens a database connection internally. If this method throws an exception, the connection may not be properly closed. While the method uses try-with-resources internally, if an exception occurs between line 75 and 85 during deserializer construction, the partially initialized state could cause issues. Consider wrapping this in proper error handling.

Copilot uses AI. Check for mistakes.
Comment on lines +148 to +149
while (rs.next()) {
TableId tableId = TableId.tableId(rs.getString(1));
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent TableId construction: At line 149, TableId.tableId() is called with only the table name (relname), which creates a single-component TableId. However, elsewhere in the code (e.g., line 145 in PostgresEventDeserializer), TableId.tableId(tableId.getTableName()) is also used with just the table name. This inconsistency can lead to lookup failures if the beforeTableColumnsOidMaps uses different TableId formats. The TableId should be constructed consistently with schema information to match how it's used throughout the code.

Copilot uses AI. Check for mistakes.
Comment on lines +118 to +126
+ "WHERE b.relname = '"
+ tableId.getTableName()
+ "' AND a.attname "
+ inClause,
rs -> {
while (rs.next()) {
oidMaps.put(rs.getString(1), rs.getInt(2));
}
});
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQL injection vulnerability: The tableId.getTableName() is directly concatenated into the SQL query without proper escaping or use of prepared statements. An attacker could exploit this by crafting a malicious table name. Consider using parameterized queries or properly escaping the table name.

Suggested change
+ "WHERE b.relname = '"
+ tableId.getTableName()
+ "' AND a.attname "
+ inClause,
rs -> {
while (rs.next()) {
oidMaps.put(rs.getString(1), rs.getInt(2));
}
});
+ "WHERE b.relname = ? AND a.attname "
+ inClause,
rs -> {
while (rs.next()) {
oidMaps.put(rs.getString(1), rs.getInt(2));
}
},
tableId.getTableName());

Copilot uses AI. Check for mistakes.
Map<TableId, Map<String, Integer>> tableOidMaps = new HashMap<>();
String inClause =
tableList.stream()
.map(table -> "'" + table.split("\\.")[1].replace("'", "''") + "'")
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential ArrayIndexOutOfBoundsException: The code assumes that table.split("\.") will always have at least 2 elements (accessing index [1]). If a table name doesn't contain a dot, this will throw an exception. Add validation to ensure the split result has the expected number of elements before accessing index 1.

Suggested change
.map(table -> "'" + table.split("\\.")[1].replace("'", "''") + "'")
.map(
table -> {
String[] parts = table.split("\\.");
String tableName =
parts.length > 1 ? parts[1] : parts[0];
return "'" + tableName.replace("'", "''") + "'";
})

Copilot uses AI. Check for mistakes.
Comment on lines +326 to +328
afterField =
afterSchema.field(finalRenameColumnMaps.get(oldFieldName));
}
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential NullPointerException: At line 329, afterField could still be null if both afterSchema.field(oldFieldName) returns null and finalRenameColumnMaps.get(oldFieldName) also returns null or the field doesn't exist. Attempting to call afterField.name() will throw a NullPointerException. Add a null check before accessing afterField.

Suggested change
afterField =
afterSchema.field(finalRenameColumnMaps.get(oldFieldName));
}
String renamedFieldName = finalRenameColumnMaps.get(oldFieldName);
if (renamedFieldName != null) {
afterField = afterSchema.field(renamedFieldName);
}
}
if (afterField == null) {
// No corresponding field in the after-schema (even after rename); skip.
return;
}

Copilot uses AI. Check for mistakes.
@lvyanquan
Copy link
Contributor

Hi @Mrart CC as you may be interested in this.

Comment on lines +75 to +77
beforeTableColumnsOidMaps =
PostgresSchemaUtils.getAllTablesColumnOids(
postgresSourceConfig, postgresSourceConfig.getTableList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the parallelism of PostgresDataSource is greater than 1, will there be any inconsistency issues here?

Map<String, Integer> beforeColumnsOidMaps) {
Map<String, String> renameColumnMaps = new HashMap<>();
Map<String, Integer> newAddColumnsOidMaps =
PostgresSchemaUtils.getColumnOids(postgresSourceConfig, tableId, newAddColumnNames);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since DDL is driven by DML, consider this scenario:

  1. DDL is executed at T1
  2. DML is executed at T2
  3. DDL is executed at T3

If a query is performed at T2, the obtained OID might be from T3, which could cause problems.

By the way, it seems difficult to distinguish between DROP+ADD and Rename?

Copy link
Contributor

@loserwang1024 loserwang1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your job which mean a lot for CDC. I have left some comments:

private final PostgresSourceConfig postgresSourceConfig;

private final List<PostgreSQLReadableMetadata> readableMetadataList;
Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: private final

public List<? extends Event> deserialize(SourceRecord record) throws Exception {
List<Event> result = new ArrayList<>();
if (postgresSourceConfig.isIncludeSchemaChanges()) {
handleSchemaChange(record, result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the record is not schema change , we have isSchemaChangeRecord(record) in the bellow. I think method named inferSchemaChangeByRecord will be better

private List<PostgreSQLReadableMetadata> readableMetadataList;
private final boolean includeDatabaseInTableId;
private final String databaseName;
private Map<TableId, Schema> schemaMap = new HashMap<>();
Copy link
Contributor

@loserwang1024 loserwang1024 Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


The schemaMap can be restored from the StreamSplit, specifically from its tableSchemas field upon job restart. To ensure correctness, the tableSchemas in the StreamSplit should be updated during each checkpoint to reflect any changes in the schema.

Otherwise, after a job restart, the schemaMap will be empty, and the previous schema will be null. In your current design, it seems that even if the schema of newly arriving data changes, no schema change event will be generated.

You can address this by passing the schema information through org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema#applyChangeEvent.


To be honest, I believe that DebeziumEventDeserializationSchema#createTableEventCache is not necessary for PostgreSQL CDC. We can achieve the same — and more powerful — functionality using the schemaMap.

✅ My Proposed Design for CreateTableEvent

  1. Replace PostgresPipelineRecordEmitter#createTableEventCache with schemaMap:
    Use schemaMap as the source of truth for table schemas instead of maintaining a separate cache.

  2. Remove the logic in PostgresPipelineRecordEmitter that generates CreateTableEvent from the table ID:

    // In rare cases, we may miss some CreateTableEvents before DataChangeEvents.
    // Don't send CreateTableEvent for SchemaChangeEvents as it's the latest schema.
    if (isDataChangeRecord && !createTableEventCache.containsKey(tableId)) {
        CreateTableEvent createTableEvent = getCreateTableEvent(sourceConfig, tableId);
        output.collect((T) createTableEvent);
        createTableEventCache.put(tableId, createTableEvent);
    }
  3. Move schema change handling to PostgresEventDeserializer#handleSchemaChange:
    If the previous schema is null, emit a CreateTableEvent. This ensures that all schema change events are centralized within the deserializer rather than being scattered across different components.


✅ Summary

By centralizing schema change handling in PostgresEventDeserializer and leveraging schemaMap via StreamSplit, we can simplify the architecture, reduce redundancy, and improve the robustness of schema tracking in PostgreSQL CDC scenarios.

Let me know if you'd like help writing this as a pull request description, or need further clarification on any part!

@leonardBang , WDYT?

List<String> afterColumnNames;
Schema afterSchema = fieldSchema(valueSchema, Envelope.FieldName.AFTER);
List<Field> afterFields = afterSchema.fields();
org.apache.flink.cdc.common.schema.Schema schema =
Copy link
Contributor

@loserwang1024 loserwang1024 Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 org.apache.flink.cdc.common.schema.Schema schema =
                PostgresSchemaUtils.getTableSchema(postgresSourceConfig, tableId);

It's too heavy that lookup current schema from database for each record. In production, it cannot work.

Why not just just afterSchema and beforeSchema? why stiil need to lookup current schema?

private final PostgresSourceConfig postgresSourceConfig;

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

problem

The beforeTableColumnsOidMaps is only populated during the YAML compilation phase. When the job is restarted without recompilation—such as in the case of a failover or other non-recompile-based restarts—any modifications to this map within PostgresEventDeserializer will not take effect.

As a result, two potential issues may arise:

  • If the job has significant read lag, and it is later recompiled and restarted, the beforeTableColumnsOidMaps could be newer than the actual data being processed.
  • If the job is simply restarted without recompilation, PostgresPipelineSource recovers from its serialized state and retrieves an outdated initial value that lags significantly behind the current log position.

suggestion

However, I question whether comparing OIDs is strictly necessary to determine if a table has been renamed or dropped and recreated. In the default lenient mode:

  • Column Rename: In downstream systems, a renamed column is typically handled by omitting the old column and introducing a new one. This is functionally similar to removing the column (which is ignored) and then adding a new one with the same name.
  • Drop and Recreate Same Column Name: In lenient mode, this scenario behaves identically to no change at all.

Therefore, I recommend not using beforeTableColumnsOidMaps for now, especially since the evolve mode is still immature and large-scale data scenarios generally avoid dropping or modifying downstream data structures. This would significantly simplify the current design.

@leonardBang , WDYT?

List<Event> expected;
Map<String, String> renameMap = new HashMap<>();
switch (testType) {
case "modifyType":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In your code,also includes change of oldField.schema().defaultValue() and oldField.schema().parameters(), please also test them.

@loserwang1024
Copy link
Contributor

loserwang1024 commented Mar 12, 2026

@linjianchang if you don't have time, could I help to modified based on your CR? And I will reserve your commit.

@loserwang1024
Copy link
Contributor

loserwang1024 commented Mar 13, 2026

I have reviewd this PR(by @linjianchang )and #4259 by @zml1206 . Thanks for both hard work!

I have also done some research on the Debezium codebase and PostgreSQL replication, and I have another design idea that prefers to leverage PostgreSQL’s native capabilities more fully. So I’d like to discuss it with you.


PostgreSQL Pgoutput's relation message in logical replication

As document of PostgreSQL 16 says: Section 55.5.3 - Logical Replication Protocol Message Flow

"Every DML message contains a relation OID, identifying the publisher's relation that was acted on. Before the first DML message for a given relation OID, a Relation message will be sent, describing the schema of that relation. Subsequently, a new Relation message will be sent if the relation's definition has changed since the last Relation message was sent for it."

When DDL changes are executed in PostgreSQL, no corresponding logs are generated. However, if the pgoutput sender is about to send the first DML message for a new schema, it will first send a Relation message.

The relation message will include the schema of table.

how Debezium use it?

Depending on The decoder plug-in, schema updates take two completely different paths:
● pgoutput: Sends a correlation message before each DML event,You can use the applySchemaChangesForTable to actively update schema in advance. shouldSchemaBeSynchronized() returns false, so synchronizeTableSchema() is an empty operation for DML events.
● decoderbufs: If no RELATION message is sent, shouldSchemaBeSynchronized() returns true (the default value). The schema is synchronized by comparing the message column with the in-memory schema in a reactive manner.
Thus, if it is pgoutput, we can send schema change events on demand without comparing each message?

What cdc need to do?

Therefore, my personal opinion:

  1. Extend the PostgresSchema and pass in a dispatcher. When a correlation message is received,Put the schema in the event queue as a special message
  2. When Postgres RecordEmitter receives the schema event, it:
  3. Update schemas in split,For persistence (in the current master branch, since pg cdc does not have a schema change event, the information in schemas will never be updated)
  4. Compare table changes and send scheme ddl in yaml.
image

In this way, we can avoid intrusive changes to PostgreSQL through triggers, and there is also no need to compare schemas for every message. More importantly, it updates the schemas stored in the split, so schema consistency can still be guaranteed after state recovery or restart.

Though only useful for pgoutput, but I think it's enough. Because, the pgoutput is the only official replication plugin.

@linjianchang @zml1206 @leonardBang , WDYT?

here is my poc code: https://github.com/loserwang1024/flink-cdc-connectors/tree/poc-schema-change-event

@leonardBang
Copy link
Contributor

I have reviewd this PR(by @linjianchang )and #4259 by @zml1206 . Thanks for both hard work!

I have also done some research on the Debezium codebase and PostgreSQL replication, and I have another design idea that prefers to leverage PostgreSQL’s native capabilities more fully. So I’d like to discuss it with you.

PostgreSQL Pgoutput's relation message in logical replication

As document of PostgreSQL 16 says: Section 55.5.3 - Logical Replication Protocol Message Flow

"Every DML message contains a relation OID, identifying the publisher's relation that was acted on. Before the first DML message for a given relation OID, a Relation message will be sent, describing the schema of that relation. Subsequently, a new Relation message will be sent if the relation's definition has changed since the last Relation message was sent for it."

When DDL changes are executed in PostgreSQL, no corresponding logs are generated. However, if the pgoutput sender is about to send the first DML message for a new schema, it will first send a Relation message.

The relation message will include the schema of table.

how Debezium use it?

Depending on The decoder plug-in, schema updates take two completely different paths: ● pgoutput: Sends a correlation message before each DML event,You can use the applySchemaChangesForTable to actively update schema in advance. shouldSchemaBeSynchronized() returns false, so synchronizeTableSchema() is an empty operation for DML events. ● decoderbufs: If no RELATION message is sent, shouldSchemaBeSynchronized() returns true (the default value). The schema is synchronized by comparing the message column with the in-memory schema in a reactive manner. Thus, if it is pgoutput, we can send schema change events on demand without comparing each message?

What cdc need to do?

Therefore, my personal opinion:

  1. Extend the PostgresSchema and pass in a dispatcher. When a correlation message is received,Put the schema in the event queue as a special message
  2. When Postgres RecordEmitter receives the schema event, it:
  3. Update schemas in split,For persistence (in the current master branch, since pg cdc does not have a schema change event, the information in schemas will never be updated)
  4. Compare table changes and send scheme ddl in yaml.
image In this way, we can avoid intrusive changes to PostgreSQL through triggers, and there is also no need to compare schemas for every message. More importantly, it updates the schemas stored in the split, so schema consistency can still be guaranteed after state recovery or restart.

Though only useful for pgoutput, but I think it's enough. Because, the pgoutput is the only official replication plugin.

@linjianchang @zml1206 @leonardBang , WDYT?

here is my poc code: https://github.com/loserwang1024/flink-cdc-connectors/tree/poc-schema-change-event

Thanks @loserwang1024 for the detailed investigation!Here are my opinions:

(1) Querying the PostgreSQL database table for every CDC event is not feasible — this approach will directly overwhelm and crash the business database once the data volume grows even slightly.

(2) The trigger-based solution is also unacceptable, as it directly intrudes into the business write workflow on the database. Downstream CDC subscribers may not have the permission to implement this, and requiring all business teams to accept such triggers is not realistic.

The approach of subscribing to the latest table schema based on OID in real-time is conceptually sound. It is non-intrusive to the business database, avoids frequent polling of the upstream DB as traffic increases, and ensures the ordering between schema events and data events. Debezium also uses this mechanism — +1 to implementing our solution based on this approach.

@zml1206
Copy link
Contributor

zml1206 commented Mar 13, 2026

The approach of subscribing to the latest table schema based on OID in real-time is conceptually sound. It is non-intrusive to the business database, avoids frequent polling of the upstream DB as traffic increases, and ensures the ordering between schema events and data events. Debezium also uses this mechanism — +1 to implementing our solution based on this approach.

+1

@linjianchang
Copy link
Contributor Author

@linjianchang if you don't have time, could I help to modified based on your CR? And I will reserve your commit.

Sure, I haven't had time to make the revision recently because I've been busy with other things,thanks for review!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants