[FLINK-38959][postgres] Update split state's table schemas info and infer schema change event based on pgoutput plugin's relation message.#4316
Conversation
1ea71bb to
51c2077
Compare
|
@ruanhang1993 @leonardBang @linjianchang @zml1206 @LYanquan @yuxiqian , Would you like to help me review this PR? |
My pleasure, will review soon |
...-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/SchemaChangeUtil.java
Show resolved
Hide resolved
| * with memoization. Available operations: rename column, add column at last, drop column, alter | ||
| * column type. Recursion depth bounded by total column count. | ||
| */ | ||
| private static List<SchemaChangeEvent> inferMinimalSchemaChanges( |
There was a problem hiding this comment.
Thanks for the update. I have one question about the DP design.
It looks like the algorithm does not try to preserve the existing suffix in a head-insert case. For example:
before: [b, c]
after: [a, b, c]
the inferred result becomes Rename(b -> a), Rename(c -> b), Add(c), instead of treating this as adding a new leading column.
Is this intentional because the supported operation set only allows trailing AddColumnEvent, or is this just a limitation of the current implementation?
If this is intentional, could we document it more explicitly? Otherwise, it seems the "minimal" result here is only minimal under the current restricted cost model, but
not necessarily the most natural schema change sequence.
I am still learning this part of the codebase, so I would really appreciate any correction or guidance.
There was a problem hiding this comment.
@Daishuyuan
As far as I understand, PostgreSQL does not support inserting a column at the head of a table. ALTER TABLE ... ADD COLUMN only appends the new column to the end of the column list:
https://www.postgresql.org/docs/current/sql-altertable.html
So in a case like:
before: [b, c]
after: [a, b, c]
a true “head-insert” is not something PostgreSQL would normally produce through native schema evolution on the same table.
The reason I used this algorithm is that I am trying, as much as possible, to preserve the original DDL semantics that the source database can actually produce. I’ll add this explanation near the beginning of the method to make the assumption clearer.
In the current design, the main ambiguous case is the last-column scenario: for example, distinguishing rename of the last column from drop + add of a last column with the same name.(Before any data).It's not common in production that user don't do any data modification between drop + add of a last column with the same name.
So yes, this behavior is intentional under the current design constraints, rather than an attempt to find the most “natural” edit sequence in the abstract
There was a problem hiding this comment.
Thanks for the detailed explanation — this makes the assumption much clearer.
I understand the intent now: the algorithm is designed to preserve DDL semantics that the source database can actually produce, rather than infer the most abstract edit sequence.
Appreciate the clarification.
...rc/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
Outdated
Show resolved
Hide resolved
51c2077 to
199aa53
Compare
There was a problem hiding this comment.
Pull request overview
This PR enhances the PostgreSQL CDC connectors to leverage pgoutput Relation messages for updating split-state table schemas and inferring schema change events without relying on intrusive database triggers.
Changes:
- Introduces a Relation-aware Debezium
PostgresSchemaextension that dispatches Relation-based schema updates into the change-event queue. - Adds schema-diff inference utilities and updates the Postgres pipeline record emitter to emit CDC
SchemaChangeEvents derived from Relation messages. - Expands source-connector and pipeline-connector test coverage for schema evolution + state snapshot schema persistence.
Reviewed changes
Copilot reviewed 21 out of 21 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/SchemaDispatcher.java | Adds dispatcher abstraction for pushing schema updates into the queue. |
| flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/RelationAwarePostgresSchema.java | Extends Debezium PostgresSchema to dispatch Relation-based schema updates. |
| flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/PostgresSchemaRecord.java | Adds a custom SourceRecord wrapper to carry Debezium Table schemas. |
| flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordEmitter.java | Adds a Postgres-specific emitter hook for schema-change extraction. |
| flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java | Installs Relation-aware schema + dispatcher and handles table-id extraction for schema records. |
| flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/CDCPostgresDispatcher.java | Enqueues Relation-driven schema records into Debezium queue. |
| flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectUtils.java | Adds schema bootstrap from split-state schemas (no refresh). |
| flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java | Adds schema-change enablement flag to config factory. |
| flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java | Carries schemaChangeEnabled into runtime config. |
| flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java | Refactors schema-change parsing into an overridable method. |
| flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/SourceSplitSerializer.java | Fixes deserialization to respect per-entry useCatalogBeforeSchema. |
| flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/SchemaChangeUtil.java | Adds minimal-edit schema diff inference (add/drop/rename/alter-type). |
| flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java | Emits inferred SchemaChangeEvents for Relation schema updates. |
| flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java | Adds schema-change.enabled pipeline option. |
| flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java | Wires new option into source builder (currently problematic). |
| flink-connector-postgres-cdc/src/test/.../PostgresSourceReaderTest.java | Adds state-update test verifying schema persistence across snapshotState. |
| flink-connector-postgres-cdc/src/test/.../IncrementalSourceStreamFetcherTest.java | Adds ordering test ensuring schema record is enqueued around DML. |
| flink-connector-postgres-cdc/src/test/.../PostgresScanFetchTaskTest.java | Updates factory method signature usage. |
| flink-connector-postgres-cdc/src/test/java/.../PostgresTestBase.java | Adds optional slot-name plumbing for tests. |
| flink-cdc-pipeline-connector-postgres/src/test/.../PostgresPipelineITCaseTest.java | Adds end-to-end schema evolution IT case. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...rc/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
Show resolved
Hide resolved
...rc/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
Show resolved
Hide resolved
...cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
Show resolved
Hide resolved
...java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordEmitter.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/PostgresSchemaRecord.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/PostgresSchemaRecord.java
Outdated
Show resolved
Hide resolved
...va/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
Show resolved
Hide resolved
...-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/SchemaChangeUtil.java
Outdated
Show resolved
Hide resolved
…nfer schema change event based on pgoutput plugin's relation message.
199aa53 to
45d255e
Compare
As disscuss in #4233, Thanks for both hard wor by @linjianchang 's #4259 and @zml1206 's #4233 . I have another design idea that prefers to leverage PostgreSQL’s native capabilities more fully.
PostgreSQL Pgoutput's relation message in logical replication
As document of PostgreSQL 16 says: Section 55.5.3 - Logical Replication Protocol Message Flow:
When DDL changes are executed in PostgreSQL, no corresponding logs are generated. However, if the pgoutput sender is about to send the first DML message for a new schema, it will first send a Relation message.
The relation message will include the schema of table.
how Debezium use it?
Depending on The decoder plug-in, schema updates take two completely different paths:
● pgoutput: Sends a correlation message before each DML event,You can use the
applySchemaChangesForTableto actively update schema in advance.shouldSchemaBeSynchronized()returns false, sosynchronizeTableSchema()is an empty operation for DML events.● decoderbufs: If no RELATION message is sent,
shouldSchemaBeSynchronized()returns true (the default value). The schema is synchronized by comparing the message column with the in-memory schema in a reactive manner.Thus, if it is pgoutput, we can send schema change events on demand without comparing each message?
What cdc need to do?
Therefore, my personal opinion:
In this way, we can avoid intrusive changes to PostgreSQL through triggers, and there is also no need to compare schemas for every message. More importantly, it updates the schemas stored in the split, so schema consistency can still be guaranteed after state recovery or restart.
Though only useful for pgoutput, but I think it's enough. Because, the pgoutput is the only official replication plugin.