Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.dtstack.chunjun.util.MapUtil;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.CollectionUtil;

import org.apache.commons.collections.CollectionUtils;
Expand Down Expand Up @@ -87,8 +88,8 @@ public class KafkaSyncConverter
/** kafka sink out fields */
protected List<String> outList;

public KafkaSyncConverter(KafkaConfig kafkaConfig, List<String> keyTypeList) {
super(null, kafkaConfig);
public KafkaSyncConverter(KafkaConfig kafkaConfig, RowType rowType, List<String> keyTypeList) {
super(rowType, kafkaConfig);
this.kafkaConfig = kafkaConfig;
this.outList = keyTypeList;
this.jsonDecoder = new JsonDecoder(kafkaConfig.isAddMessage());
Expand All @@ -99,8 +100,8 @@ public KafkaSyncConverter(KafkaConfig kafkaConfig, List<String> keyTypeList) {
}
}

public KafkaSyncConverter(KafkaConfig kafkaConfig) {
super(null, kafkaConfig);
public KafkaSyncConverter(KafkaConfig kafkaConfig, RowType rowType) {
super(rowType, kafkaConfig);
this.commonConfig = this.kafkaConfig = kafkaConfig;
this.jsonDecoder = new JsonDecoder(kafkaConfig.isAddMessage());
if (DEFAULT_CODEC.defaultValue().equals(kafkaConfig.getCodec())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.dtstack.chunjun.util.MapUtil;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.CollectionUtil;

import org.apache.commons.lang3.StringUtils;
Expand All @@ -41,15 +42,15 @@
*/
public class KafkaSyncKeyConverter extends KafkaSyncConverter {

private final PartitionStrategy partitionStrategy;
private PartitionStrategy partitionStrategy;

public KafkaSyncKeyConverter(KafkaConfig kafkaConf, List<String> keyTypeList) {
super(kafkaConf, keyTypeList);
public KafkaSyncKeyConverter(KafkaConfig kafkaConf, RowType rowType, List<String> keyTypeList) {
super(kafkaConf, rowType, keyTypeList);
this.partitionStrategy = PartitionStrategy.fromValue(kafkaConf.getPartitionStrategy());
}

public KafkaSyncKeyConverter(KafkaConfig kafkaConf) {
this(kafkaConf, null);
public KafkaSyncKeyConverter(KafkaConfig kafkaConf, RowType rowType) {
this(kafkaConf, rowType, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
import com.dtstack.chunjun.converter.RawTypeMapper;
import com.dtstack.chunjun.sink.SinkFactory;
import com.dtstack.chunjun.util.GsonUtil;
import com.dtstack.chunjun.util.TableUtil;

import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -96,6 +98,9 @@ protected DataStreamSink<RowData> createOutput(
"when kafka sink dataCompelOrder set true , Parallelism must 1.");
}

RowType rowType =
TableUtil.createRowType(kafkaConfig.getColumn(), KafkaRawTypeMapping::apply);

RowSerializationSchema rowSerializationSchema;
if (!CollectionUtil.isNullOrEmpty(kafkaConfig.getPartitionAssignColumns())) {
Preconditions.checkState(
Expand All @@ -115,15 +120,15 @@ protected DataStreamSink<RowData> createOutput(
kafkaConfig,
new CustomerFlinkPartition<>(),
new KafkaSyncKeyConverter(
kafkaConfig, kafkaConfig.getPartitionAssignColumns()),
new KafkaSyncConverter(kafkaConfig));
kafkaConfig, rowType, kafkaConfig.getPartitionAssignColumns()),
new KafkaSyncConverter(kafkaConfig, rowType));
} else {
rowSerializationSchema =
new RowSerializationSchema(
kafkaConfig,
new CustomerFlinkPartition<>(),
new KafkaSyncKeyConverter(kafkaConfig),
new KafkaSyncConverter(kafkaConfig));
new KafkaSyncKeyConverter(kafkaConfig, rowType),
new KafkaSyncConverter(kafkaConfig, rowType));
}

KafkaProducer kafkaProducer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ public DataStream<RowData> createSource() {
RowType rowType =
TableUtil.createRowType(kafkaConfig.getColumn(), KafkaRawTypeMapping::apply);
DynamicKafkaDeserializationSchema deserializationSchema =
new RowDeserializationSchema(kafkaConfig, new KafkaSyncConverter(kafkaConfig));
new RowDeserializationSchema(
kafkaConfig,
new KafkaSyncConverter(
kafkaConfig, rowType, kafkaConfig.getPartitionAssignColumns()));
KafkaConsumerWrapper consumer =
new KafkaConsumerWrapper(topics, deserializationSchema, props);
switch (kafkaConfig.getMode()) {
Expand Down