Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCHEMA_CHANGE_ENABLED;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SERVER_TIME_ZONE;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SLOT_NAME;
import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
Expand Down Expand Up @@ -131,6 +132,7 @@ public DataSource createDataSource(Context context) {
boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
boolean tableIdIncludeDatabase = config.get(TABLE_ID_INCLUDE_DATABASE);
boolean includeSchemaChanges = config.get(SCHEMA_CHANGE_ENABLED);
Comment thread
loserwang1024 marked this conversation as resolved.

validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
Expand Down Expand Up @@ -172,6 +174,7 @@ public DataSource createDataSource(Context context) {
.lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay)
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
.includeDatabaseInTableId(tableIdIncludeDatabase)
.includeSchemaChanges(includeSchemaChanges)
Comment thread
loserwang1024 marked this conversation as resolved.
.getConfigFactory();

List<TableId> tableIds = PostgresSchemaUtils.listTables(configFactory.create(0), null);
Expand Down Expand Up @@ -262,6 +265,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(METADATA_LIST);
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
options.add(TABLE_ID_INCLUDE_DATABASE);
options.add(SCHEMA_CHANGE_ENABLED);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,4 +273,12 @@ public class PostgresDataSourceOptions {
"Whether to include database in the generated Table ID. "
+ "If set to true, the Table ID will be in the format (database, schema, table). "
+ "If set to false, the Table ID will be in the format (schema, table). Defaults to false.");

@Experimental
public static final ConfigOption<Boolean> SCHEMA_CHANGE_ENABLED =
ConfigOptions.key("schema-change.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to infer CDC column types when processing pgoutput Relation messages.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,39 @@

import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import org.apache.flink.cdc.connectors.postgres.source.schema.PostgresSchemaRecord;
import org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
import org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils;
import org.apache.flink.cdc.connectors.postgres.utils.PostgresTypeUtils;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema;
import org.apache.flink.connector.base.source.reader.RecordEmitter;

import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.data.Envelope;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.SQLException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
Expand All @@ -62,24 +60,26 @@
import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord;
import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent;
import static org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils.toCdcTableId;
import static org.apache.flink.cdc.connectors.postgres.utils.SchemaChangeUtil.inferSchemaChangeEvent;
import static org.apache.flink.cdc.connectors.postgres.utils.SchemaChangeUtil.toCreateTableEvent;

/** The {@link RecordEmitter} implementation for PostgreSQL pipeline connector. */
public class PostgresPipelineRecordEmitter<T> extends IncrementalSourceRecordEmitter<T> {
public class PostgresPipelineRecordEmitter<T> extends PostgresSourceRecordEmitter<T> {
private static final Logger LOG = LoggerFactory.getLogger(PostgresPipelineRecordEmitter.class);
private final PostgresSourceConfig sourceConfig;
private final PostgresDialect postgresDialect;

// Used when startup mode is initial
private Set<TableId> alreadySendCreateTableTables;
private final Set<TableId> alreadySendCreateTableTables;
private final boolean isBounded;
private final boolean includeDatabaseInTableId;
private final Map<TableId, CreateTableEvent> createTableEventCache;

// Used when startup mode is not initial
private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true;
private boolean isBounded = false;
private boolean includeDatabaseInTableId = false;

private final Map<TableId, CreateTableEvent> createTableEventCache;

public PostgresPipelineRecordEmitter(
DebeziumDeserializationSchema debeziumDeserializationSchema,
DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
SourceReaderMetrics sourceReaderMetrics,
PostgresSourceConfig sourceConfig,
OffsetFactory offsetFactory,
Expand Down Expand Up @@ -108,16 +108,11 @@ public void applySplit(SourceSplitBase split) {
} else {
for (Map.Entry<TableId, TableChanges.TableChange> entry :
split.getTableSchemas().entrySet()) {
TableId tableId =
entry.getKey(); // Use the TableId from the map key which contains full info
TableChanges.TableChange tableChange = entry.getValue();

Table table = tableChange.getTable();
CreateTableEvent createTableEvent =
new CreateTableEvent(
toCdcTableId(
tableId,
sourceConfig.getDatabaseList().get(0),
includeDatabaseInTableId),
buildSchemaFromTable(tableChange.getTable()));
toCreateTableEvent(table, sourceConfig, postgresDialect);
((DebeziumEventDeserializationSchema) debeziumDeserializationSchema)
.applyChangeEvent(createTableEvent);
}
Expand All @@ -137,68 +132,63 @@ protected void processElement(
shouldEmitAllCreateTableEventsInSnapshotMode = false;
} else if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState()) {
TableId tableId = splitState.asSnapshotSplitState().toSourceSplit().getTableId();
if (!alreadySendCreateTableTables.contains(tableId)) {
sendCreateTableEvent(tableId, (SourceOutput<Event>) output);
alreadySendCreateTableTables.add(tableId);
}
} else {
boolean isDataChangeRecord = isDataChangeRecord(element);
if (isDataChangeRecord || isSchemaChangeEvent(element)) {
TableId tableId = getTableId(element);
if (!alreadySendCreateTableTables.contains(tableId)) {
CreateTableEvent createTableEvent = createTableEventCache.get(tableId);
if (createTableEvent != null) {
output.collect((T) createTableEvent);
}
alreadySendCreateTableTables.add(tableId);
}
// In rare case, we may miss some CreateTableEvents before DataChangeEvents.
// Don't send CreateTableEvent for SchemaChangeEvents as it's the latest schema.
if (isDataChangeRecord && !createTableEventCache.containsKey(tableId)) {
CreateTableEvent createTableEvent = getCreateTableEvent(sourceConfig, tableId);
output.collect((T) createTableEvent);
createTableEventCache.put(tableId, createTableEvent);
}
}
maybeSendCreateTableEventFromCache(tableId, output);
} else if (isDataChangeRecord(element)) {
handleDataChangeRecord(element, output);
} else if (isSchemaChangeEvent(element) && sourceConfig.isSchemaChangeEnabled()) {
handleSchemaChangeRecord(element, output, splitState);
}
super.processElement(element, output, splitState);
}

private Schema buildSchemaFromTable(Table table) {
List<Column> columns = table.columns();
Schema.Builder tableBuilder = Schema.newBuilder();
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);

String colName = column.name();
DataType dataType;
try (PostgresConnection jdbc = postgresDialect.openJdbcConnection()) {
dataType =
PostgresTypeUtils.fromDbzColumn(
column,
this.sourceConfig.getDbzConnectorConfig(),
jdbc.getTypeRegistry());
}
if (!column.isOptional()) {
dataType = dataType.notNull();
}
tableBuilder.physicalColumn(
colName,
dataType,
column.comment(),
column.defaultValueExpression().orElse(null));
private void handleDataChangeRecord(SourceRecord element, SourceOutput<T> output) {
TableId tableId = getTableId(element);
maybeSendCreateTableEventFromCache(tableId, output);
// In rare case, we may miss some CreateTableEvents before DataChangeEvents.
// Don't send CreateTableEvent for SchemaChangeEvents as it's the latest schema.
if (!createTableEventCache.containsKey(tableId)) {
CreateTableEvent createTableEvent = getCreateTableEvent(sourceConfig, tableId);
sendCreateTableEvent(createTableEvent, output);
createTableEventCache.put(tableId, createTableEvent);
}
}

private void handleSchemaChangeRecord(
SourceRecord element, SourceOutput<T> output, SourceSplitState splitState) {
if (!(element instanceof PostgresSchemaRecord)) {
// Ignore non-Postgres schema change records; they may represent non-relation
// schema changes that are not handled via PostgresSchemaRecord.
LOG.warn("Ignoring non-PostgresSchemaRecord schema change event: {}", element);
return;
}
Map<TableId, TableChanges.TableChange> existedTableSchemas =
splitState.toSourceSplit().getTableSchemas();
PostgresSchemaRecord schemaRecord = (PostgresSchemaRecord) element;
Table schemaAfter = schemaRecord.getTable();
maybeSendCreateTableEventFromCache(schemaAfter.id(), output);
Comment thread
loserwang1024 marked this conversation as resolved.
Table schemaBefore = null;
if (existedTableSchemas.containsKey(schemaAfter.id())) {
schemaBefore = existedTableSchemas.get(schemaAfter.id()).getTable();
}
tableBuilder.comment(table.comment());
List<SchemaChangeEvent> schemaChangeEvents =
inferSchemaChangeEvent(
schemaAfter.id(), schemaBefore, schemaAfter, sourceConfig, postgresDialect);
LOG.info("Inferred Schema change events: {}", schemaChangeEvents);
schemaChangeEvents.forEach(schemaChangeEvent -> output.collect((T) schemaChangeEvent));
}

List<String> primaryKey = table.primaryKeyColumnNames();
if (Objects.nonNull(primaryKey) && !primaryKey.isEmpty()) {
tableBuilder.primaryKey(primaryKey);
private void maybeSendCreateTableEventFromCache(TableId tableId, SourceOutput<T> output) {
if (!alreadySendCreateTableTables.contains(tableId)) {
CreateTableEvent createTableEvent = createTableEventCache.get(tableId);
if (createTableEvent != null) {
sendCreateTableEvent(createTableEvent, output);
}
alreadySendCreateTableTables.add(tableId);
}
return tableBuilder.build();
}

private void sendCreateTableEvent(TableId tableId, SourceOutput<Event> output) {
output.collect(getCreateTableEvent(sourceConfig, tableId));
private void sendCreateTableEvent(CreateTableEvent createTableEvent, SourceOutput<T> output) {
output.collect((T) createTableEvent);
}

private CreateTableEvent getCreateTableEvent(
Expand Down
Loading
Loading