Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/fluss.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ Pipeline Connector Options
<td>String</td>
<td>用于建立与 Fluss 集群初始连接的主机/端口对列表。 </td>
</tr>
<tr>
<td>auto.create.table.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>当目标表不存在时,是否自动创建 Fluss table。如果禁用该选项,Pipeline 启动前必须提前创建好目标 Fluss table,否则 Pipeline 会失败。</td>
</tr>
<tr>
<td>bucket.key</td>
<td>optional</td>
Expand Down Expand Up @@ -138,6 +145,7 @@ Pipeline Connector Options
* 支持 Fluss 主键表和日志表。

* 关于自动建表
* 默认启用自动建表。如需要求目标表提前创建,可将 `auto.create.table.enabled` 设置为 `false`。
* 没有分区键
* 桶数量由 `bucket.num` 选项控制
* 数据分布由 `bucket.key` 选项控制。对于主键表,若未指定分桶键,则分桶键默认为主键(不含分区键);对于无主键的日志表,若未指定分桶键,则数据将随机分配到各个桶中。
Expand Down
8 changes: 8 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/fluss.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ Pipeline Connector Options
<td>String</td>
<td>The bootstrap servers for the Fluss sink connection. </td>
</tr>
<tr>
<td>auto.create.table.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to automatically create the Fluss table when the target table does not exist. If disabled, the target Fluss table must already exist before the pipeline starts, otherwise the pipeline will fail.</td>
</tr>
<tr>
<td>bucket.key</td>
<td>optional</td>
Expand Down Expand Up @@ -140,6 +147,7 @@ Pipeline Connector Options
* Support Fluss primary key table and log table.

* For creating table automatically
* Automatic table creation is enabled by default. To require pre-created target tables, set `auto.create.table.enabled` to `false`.
* There is no partition key
* The number of buckets is controlled by `bucket.num`
* The distribution keys are controlled by option `bucket.key`. For primary key table and a bucket key is not specified, the bucket key will be used as primary key(excluding the partition key). For log table has no primary key and the bucket key is not specified, the data will be distributed to each bucket randomly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Map;
import java.util.Set;

import static org.apache.flink.cdc.connectors.fluss.sink.FlussDataSinkOptions.AUTO_CREATE_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.fluss.sink.FlussDataSinkOptions.BOOTSTRAP_SERVERS;
import static org.apache.flink.cdc.connectors.fluss.sink.FlussDataSinkOptions.BUCKET_KEY;
import static org.apache.flink.cdc.connectors.fluss.sink.FlussDataSinkOptions.BUCKET_NUMBER;
Expand All @@ -57,7 +58,12 @@ public DataSink createDataSink(Context context) {
parseBucketKeys(factoryConfiguration.get(BUCKET_KEY));
Map<String, Integer> bucketNumMap =
parseBucketNumber(factoryConfiguration.get(BUCKET_NUMBER));
return new FlussDataSink(flussClientConfig, tableProperties, bucketKeysMap, bucketNumMap);
return new FlussDataSink(
flussClientConfig,
tableProperties,
bucketKeysMap,
bucketNumMap,
factoryConfiguration.get(AUTO_CREATE_TABLE_ENABLED));
}

@Override
Expand All @@ -75,6 +81,7 @@ public Set<ConfigOption<?>> requiredOptions() {
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(AUTO_CREATE_TABLE_ENABLED);
options.add(BUCKET_KEY);
options.add(BUCKET_NUMBER);
return options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,19 @@ public class FlussDataSink implements DataSink {
private final Map<String, String> tableProperties;
private final Map<String, List<String>> bucketKeysMap;
private final Map<String, Integer> bucketNumMap;
private final boolean autoCreateTableEnabled;

public FlussDataSink(
Configuration flussClientConfig,
Map<String, String> tableProperties,
Map<String, List<String>> bucketKeysMap,
Map<String, Integer> bucketNumMap) {
Map<String, Integer> bucketNumMap,
boolean autoCreateTableEnabled) {
this.flussClientConfig = flussClientConfig;
this.tableProperties = tableProperties;
this.bucketKeysMap = bucketKeysMap;
this.bucketNumMap = bucketNumMap;
this.autoCreateTableEnabled = autoCreateTableEnabled;
}

@Override
Expand All @@ -58,7 +61,11 @@ public EventSinkProvider getEventSinkProvider() {
@Override
public MetadataApplier getMetadataApplier() {
return new FlussMetaDataApplier(
flussClientConfig, tableProperties, bucketKeysMap, bucketNumMap);
flussClientConfig,
tableProperties,
bucketKeysMap,
bucketNumMap,
autoCreateTableEnabled);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ public class FlussDataSinkOptions {
.noDefaultValue()
.withDescription("The bootstrap servers for the Fluss sink connection.");

public static final ConfigOption<Boolean> AUTO_CREATE_TABLE_ENABLED =
ConfigOptions.key("auto.create.table.enabled")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to automatically create the Fluss table when the target table does not exist.");

public static final ConfigOption<String> BUCKET_KEY =
ConfigOptions.key("bucket.key")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE;
import static org.apache.flink.cdc.connectors.fluss.sink.FlussDataSinkOptions.AUTO_CREATE_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.toFlussTable;
import static org.apache.flink.cdc.connectors.fluss.utils.FlussConversions.toFlussType;

Expand All @@ -60,6 +61,7 @@ public class FlussMetaDataApplier implements MetadataApplier {
private final Map<String, String> tableProperties;
private final Map<String, List<String>> bucketKeysMap;
private final Map<String, Integer> bucketNumMap;
private final boolean autoCreateTableEnabled;
private Set<SchemaChangeEventType> enabledEventTypes =
new HashSet<>(Arrays.asList(CREATE_TABLE, DROP_TABLE));

Expand All @@ -68,10 +70,25 @@ public FlussMetaDataApplier(
Map<String, String> tableProperties,
Map<String, List<String>> bucketKeysMap,
Map<String, Integer> bucketNumMap) {
this(
flussClientConfig,
tableProperties,
bucketKeysMap,
bucketNumMap,
AUTO_CREATE_TABLE_ENABLED.defaultValue());
}

public FlussMetaDataApplier(
Configuration flussClientConfig,
Map<String, String> tableProperties,
Map<String, List<String>> bucketKeysMap,
Map<String, Integer> bucketNumMap,
boolean autoCreateTableEnabled) {
this.flussClientConfig = flussClientConfig;
this.tableProperties = tableProperties;
this.bucketKeysMap = bucketKeysMap;
this.bucketNumMap = bucketNumMap;
this.autoCreateTableEnabled = autoCreateTableEnabled;
}

@Override
Expand Down Expand Up @@ -122,18 +139,32 @@ private void applyCreateTable(CreateTableEvent event) {
toFlussTable(event.getSchema(), bucketKeys, bucketNum, tableProperties);
admin.createDatabase(tablePath.getDatabaseName(), DatabaseDescriptor.EMPTY, true);
if (!admin.tableExists(tablePath).get()) {
if (!autoCreateTableEnabled) {
throwMissingTableException(tableIdentifier);
}
admin.createTable(tablePath, inferredFlussTable, false).get();
} else {
TableInfo currentTableInfo = admin.getTableInfo(tablePath).get();
// sanity check to prevent unexpected table schema evolution.
sanityCheck(inferredFlussTable, currentTableInfo);
}
} catch (ValidationException e) {
throw e;
} catch (Exception e) {
LOG.error("Failed to apply schema change {}", event, e);
throw new RuntimeException(e);
}
}

private void throwMissingTableException(String tableIdentifier) {
throw new ValidationException(
"Target Fluss table "
+ tableIdentifier
+ " does not exist and "
+ AUTO_CREATE_TABLE_ENABLED.key()
+ " is false.");
}

private void applyDropTable(DropTableEvent event) {
try (Connection connection = ConnectionFactory.createConnection(flussClientConfig);
Admin admin = connection.getAdmin()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ private Configuration createValidConfiguration() {
return Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put(FlussDataSinkOptions.BOOTSTRAP_SERVERS.key(), "localhost:9123")
.put(FlussDataSinkOptions.AUTO_CREATE_TABLE_ENABLED.key(), "false")
.put(FlussDataSinkOptions.BUCKET_KEY.key(), "database1.table1:a,b")
.put(FlussDataSinkOptions.BUCKET_NUMBER.key(), "database1.table1:2")
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.IntType;
import org.apache.flink.table.api.ValidationException;

import org.apache.fluss.client.Connection;
import org.apache.fluss.client.ConnectionFactory;
Expand Down Expand Up @@ -605,6 +606,32 @@ void testRecreateTableWithDifferentSchema() throws Exception {
}
}

@Test
void testCreateTableWithAutoCreateTableDisabled() throws Exception {
TableId tableId = TableId.tableId("default_namespace", DATABASE_NAME, "table1");
TablePath tablePath = new TablePath(DATABASE_NAME, "table1");
Schema schema =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.build();
CreateTableEvent createTableEvent = new CreateTableEvent(tableId, schema);

try (FlussMetaDataApplier applier =
new FlussMetaDataApplier(
FLUSS_CLUSTER_EXTENSION.getClientConfig(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
false)) {
assertThatThrownBy(() -> applier.applySchemaChange(createTableEvent))
.isExactlyInstanceOf(ValidationException.class)
.hasMessageContaining("auto.create.table.enabled is false");
assertThat(admin.databaseExists(DATABASE_NAME).get()).isTrue();
assertThat(admin.tableExists(tablePath).get()).isFalse();
}
}

@Test
void testCreateTableWithTableOptions() throws Exception {
Schema schema =
Expand Down
Loading