Skip to content
Open
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed per-record error isolation in Kafka sink connector to honor DLQ and tolerance settings, instead of failing the entire batch when a single record fails. - See [PR 49286](https://github.com/Azure/azure-sdk-for-java/pull/49286)

#### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ public void start(Map<String, String> props) {
this.sinkTaskConfig.getClientMetadataCachesSnapshot());
LOGGER.info("The taskId is " + this.sinkTaskConfig.getTaskId());
this.throughputControlClientItem = this.getThroughputControlCosmosClient();
this.sinkRecordTransformer = new SinkRecordTransformer(this.sinkTaskConfig);

this.sinkRecordTransformer = new SinkRecordTransformer(
this.sinkTaskConfig,
this.context.errantRecordReporter(),
this.sinkTaskConfig.getWriteConfig().getToleranceOnErrorLevel());
Comment on lines +54 to +57
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update the description


if (this.sinkTaskConfig.getWriteConfig().isBulkEnabled()) {
this.cosmosWriter =
Expand Down Expand Up @@ -129,7 +133,7 @@ record -> this.sinkTaskConfig
List<SinkRecord> transformedRecords = sinkRecordTransformer.transform(containerName, entry.getValue());
this.cosmosWriter.write(container, transformedRecords);

totalWrittenRecordsPerContainer.merge(containerName, (long) entry.getValue().size(), Long::sum);
totalWrittenRecordsPerContainer.merge(containerName, (long) transformedRecords.size(), Long::sum);
}

logWrittenRecordCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.azure.cosmos.kafka.connect.implementation.sink.idstrategy.ProvidedInValueStrategy;
import com.azure.cosmos.kafka.connect.implementation.sink.idstrategy.TemplateStrategy;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -24,9 +25,24 @@ public class SinkRecordTransformer {
private static final Logger LOGGER = LoggerFactory.getLogger(SinkRecordTransformer.class);

private final IdStrategy idStrategy;
private final ErrantRecordReporter errantRecordReporter;
private final ToleranceOnErrorLevel toleranceOnErrorLevel;

public SinkRecordTransformer(
CosmosSinkTaskConfig sinkTaskConfig,
ErrantRecordReporter errantRecordReporter,
ToleranceOnErrorLevel toleranceOnErrorLevel) {
this(createIdStrategy(sinkTaskConfig), errantRecordReporter, toleranceOnErrorLevel);
}

public SinkRecordTransformer(CosmosSinkTaskConfig sinkTaskConfig) {
this.idStrategy = this.createIdStrategy(sinkTaskConfig);
// Package-private constructor for unit testing without requiring CosmosSinkTaskConfig.
SinkRecordTransformer(
IdStrategy idStrategy,
ErrantRecordReporter errantRecordReporter,
ToleranceOnErrorLevel toleranceOnErrorLevel) {
this.idStrategy = idStrategy;
this.errantRecordReporter = errantRecordReporter;
this.toleranceOnErrorLevel = toleranceOnErrorLevel;
}

@SuppressWarnings("unchecked")
Expand All @@ -44,30 +60,68 @@ public List<SinkRecord> transform(String containerName, List<SinkRecord> sinkRec
record.value() == null ? null : record.value().getClass().getName(),
record.value() == null ? null : record.valueSchema());

Object recordValue;
if (record.value() instanceof Struct) {
recordValue = StructToJsonMap.toJsonMap((Struct) record.value());
} else if (record.value() instanceof Map) {
recordValue = StructToJsonMap.handleMap((Map<String, Object>) record.value());
} else {
recordValue = record.value();
try {
Object recordValue;
if (record.value() instanceof Struct) {
recordValue = StructToJsonMap.toJsonMap((Struct) record.value());
} else if (record.value() instanceof Map) {
recordValue = StructToJsonMap.handleMap((Map<String, Object>) record.value());
} else {
recordValue = record.value();
}

maybeInsertId(recordValue, record);

final SinkRecord updatedRecord = new SinkRecord(record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
record.valueSchema(),
recordValue,
record.kafkaOffset(),
record.timestamp(),
record.timestampType(),
record.headers());

toBeWrittenRecordList.add(updatedRecord);
} catch (RuntimeException e) {
// Report to DLQ if configured (fire-and-forget, guarded against reporter failures).
// This is consistent with the writer-level pattern in CosmosWriterBase.sendToDlqIfConfigured().
if (this.errantRecordReporter != null) {
try {
this.errantRecordReporter.report(record, e);
Comment on lines +88 to +92
} catch (Exception reportException) {
LOGGER.error(
"Failed to report errant record to DLQ for topic {}, partition {}, offset {}, container {}.",
record.topic(),
record.kafkaPartition(),
record.kafkaOffset(),
containerName,
reportException);
}
}

// Use tolerance level to decide continue-vs-throw (consistent with writer pattern).
if (this.toleranceOnErrorLevel == ToleranceOnErrorLevel.ALL) {
LOGGER.warn(
"Skipping record from topic {}, partition {}, offset {}, container {} due to transform error.",
record.topic(),
record.kafkaPartition(),
record.kafkaOffset(),
containerName,
e);
} else {
LOGGER.error(
"Failing task due to transform error for record from topic {}, partition {}, offset {}, "
+ "container {}.",
record.topic(),
record.kafkaPartition(),
record.kafkaOffset(),
containerName,
e);
throw e;
}
}

maybeInsertId(recordValue, record);

// Create an updated record with from the current record and the updated record value
final SinkRecord updatedRecord = new SinkRecord(record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
record.valueSchema(),
recordValue,
record.kafkaOffset(),
record.timestamp(),
record.timestampType(),
record.headers());

toBeWrittenRecordList.add(updatedRecord);
}

return toBeWrittenRecordList;
Expand All @@ -82,7 +136,7 @@ private void maybeInsertId(Object recordValue, SinkRecord sinkRecord) {
recordMap.put("id", this.idStrategy.generateId(sinkRecord));
}

private IdStrategy createIdStrategy(CosmosSinkTaskConfig sinkTaskConfig) {
private static IdStrategy createIdStrategy(CosmosSinkTaskConfig sinkTaskConfig) {
IdStrategy idStrategyClass;
switch (sinkTaskConfig.getIdStrategy()) {
case FULL_KEY_STRATEGY:
Expand Down
Loading
Loading