Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,5 @@ TempTypeSpecFiles/

# Azure Artifacts Credential Provider runtime
.azure-artifacts/
.coding-harness/

Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,20 @@ public void start(Map<String, String> props) {
this.sinkTaskConfig.getClientMetadataCachesSnapshot());
LOGGER.info("The taskId is " + this.sinkTaskConfig.getTaskId());
this.throughputControlClientItem = this.getThroughputControlCosmosClient();
this.sinkRecordTransformer = new SinkRecordTransformer(this.sinkTaskConfig);

ErrantRecordReporter errantRecordReporter = null;
try {
errantRecordReporter = this.context.errantRecordReporter();
Copy link
Copy Markdown
Member Author

@xinlian12 xinlian12 May 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need this check? from public API -> this.context.errantRecordReporter() returned ErrantRecordReporter. I think it should be safe to just pass in other SinkRecordTransformer, similar as the writer pattern

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in e5f697d — removed the try-catch guard around errantRecordReporter(). Now passes this.context.errantRecordReporter() directly to SinkRecordTransformer, consistent with the writer pattern. This also fixes the CI compilation error (the guard required referencing ErrantRecordReporter as a local variable type without the corresponding import).

Additionally fixed test compilation errors: Future<?>Future<Void> to match ErrantRecordReporter.report() return type.

} catch (NoClassDefFoundError | NoSuchMethodError e) {
LOGGER.info(
"ErrantRecordReporter not available in this Kafka Connect runtime; "
+ "DLQ will not be used for transform errors.");
}

this.sinkRecordTransformer = new SinkRecordTransformer(
this.sinkTaskConfig,
errantRecordReporter,
this.sinkTaskConfig.getWriteConfig().getToleranceOnErrorLevel());

if (this.sinkTaskConfig.getWriteConfig().isBulkEnabled()) {
this.cosmosWriter =
Expand Down Expand Up @@ -129,7 +142,7 @@ record -> this.sinkTaskConfig
List<SinkRecord> transformedRecords = sinkRecordTransformer.transform(containerName, entry.getValue());
this.cosmosWriter.write(container, transformedRecords);

totalWrittenRecordsPerContainer.merge(containerName, (long) entry.getValue().size(), Long::sum);
totalWrittenRecordsPerContainer.merge(containerName, (long) transformedRecords.size(), Long::sum);
}

logWrittenRecordCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.azure.cosmos.kafka.connect.implementation.sink.idstrategy.ProvidedInValueStrategy;
import com.azure.cosmos.kafka.connect.implementation.sink.idstrategy.TemplateStrategy;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -24,9 +25,24 @@ public class SinkRecordTransformer {
private static final Logger LOGGER = LoggerFactory.getLogger(SinkRecordTransformer.class);

private final IdStrategy idStrategy;
private final ErrantRecordReporter errantRecordReporter;
private final ToleranceOnErrorLevel toleranceOnErrorLevel;

public SinkRecordTransformer(
CosmosSinkTaskConfig sinkTaskConfig,
ErrantRecordReporter errantRecordReporter,
ToleranceOnErrorLevel toleranceOnErrorLevel) {
this(createIdStrategy(sinkTaskConfig), errantRecordReporter, toleranceOnErrorLevel);
}

public SinkRecordTransformer(CosmosSinkTaskConfig sinkTaskConfig) {
this.idStrategy = this.createIdStrategy(sinkTaskConfig);
// Package-private constructor for unit testing without requiring CosmosSinkTaskConfig.
SinkRecordTransformer(
IdStrategy idStrategy,
ErrantRecordReporter errantRecordReporter,
ToleranceOnErrorLevel toleranceOnErrorLevel) {
this.idStrategy = idStrategy;
this.errantRecordReporter = errantRecordReporter;
this.toleranceOnErrorLevel = toleranceOnErrorLevel;
}

@SuppressWarnings("unchecked")
Expand All @@ -44,30 +60,68 @@ public List<SinkRecord> transform(String containerName, List<SinkRecord> sinkRec
record.value() == null ? null : record.value().getClass().getName(),
record.value() == null ? null : record.valueSchema());

Object recordValue;
if (record.value() instanceof Struct) {
recordValue = StructToJsonMap.toJsonMap((Struct) record.value());
} else if (record.value() instanceof Map) {
recordValue = StructToJsonMap.handleMap((Map<String, Object>) record.value());
} else {
recordValue = record.value();
try {
Object recordValue;
if (record.value() instanceof Struct) {
recordValue = StructToJsonMap.toJsonMap((Struct) record.value());
} else if (record.value() instanceof Map) {
recordValue = StructToJsonMap.handleMap((Map<String, Object>) record.value());
} else {
recordValue = record.value();
}

maybeInsertId(recordValue, record);

final SinkRecord updatedRecord = new SinkRecord(record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
record.valueSchema(),
recordValue,
record.kafkaOffset(),
record.timestamp(),
record.timestampType(),
record.headers());

toBeWrittenRecordList.add(updatedRecord);
} catch (RuntimeException e) {
// Report to DLQ if configured (fire-and-forget, guarded against reporter failures).
// This is consistent with the writer-level pattern in CosmosWriterBase.sendToDlqIfConfigured().
if (this.errantRecordReporter != null) {
try {
this.errantRecordReporter.report(record, e);
} catch (Exception reportException) {
LOGGER.error(
"Failed to report errant record to DLQ for topic {}, partition {}, offset {}, container {}.",
record.topic(),
record.kafkaPartition(),
record.kafkaOffset(),
containerName,
reportException);
Comment on lines +96 to +100
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 4ac387b — changed the outer catch to catch(RuntimeException e) since all exceptions from ID strategies (ConnectException, etc.) are RuntimeException subclasses.

}
}

// Use tolerance level to decide continue-vs-throw (consistent with writer pattern).
if (this.toleranceOnErrorLevel == ToleranceOnErrorLevel.ALL) {
LOGGER.warn(
"Skipping record from topic {}, partition {}, offset {}, container {} due to transform error.",
record.topic(),
record.kafkaPartition(),
record.kafkaOffset(),
containerName,
e);
} else {
LOGGER.error(
"Failing task due to transform error for record from topic {}, partition {}, offset {}, "
+ "container {}.",
record.topic(),
record.kafkaPartition(),
record.kafkaOffset(),
containerName,
e);
throw e;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 4ac387b — same fix as above, outer catch changed to catch(RuntimeException e).

}
}

maybeInsertId(recordValue, record);

// Create an updated record with from the current record and the updated record value
final SinkRecord updatedRecord = new SinkRecord(record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
record.valueSchema(),
recordValue,
record.kafkaOffset(),
record.timestamp(),
record.timestampType(),
record.headers());

toBeWrittenRecordList.add(updatedRecord);
}

return toBeWrittenRecordList;
Expand All @@ -82,7 +136,7 @@ private void maybeInsertId(Object recordValue, SinkRecord sinkRecord) {
recordMap.put("id", this.idStrategy.generateId(sinkRecord));
}

private IdStrategy createIdStrategy(CosmosSinkTaskConfig sinkTaskConfig) {
private static IdStrategy createIdStrategy(CosmosSinkTaskConfig sinkTaskConfig) {
IdStrategy idStrategyClass;
switch (sinkTaskConfig.getIdStrategy()) {
case FULL_KEY_STRATEGY:
Expand Down
Loading
Loading