-
Notifications
You must be signed in to change notification settings - Fork 2.2k
[NO REVIEW YET]fix: isolate per-record ID-generation failures in Kafka sink transformer (#49268) #49269
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[NO REVIEW YET]fix: isolate per-record ID-generation failures in Kafka sink transformer (#49268) #49269
Changes from all commits
672a8fe
cc8e58a
d49548c
82946c1
4ac387b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -129,4 +129,5 @@ TempTypeSpecFiles/ | |
|
|
||
| # Azure Artifacts Credential Provider runtime | ||
| .azure-artifacts/ | ||
| .coding-harness/ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,7 @@ | |
| import com.azure.cosmos.kafka.connect.implementation.sink.idstrategy.ProvidedInValueStrategy; | ||
| import com.azure.cosmos.kafka.connect.implementation.sink.idstrategy.TemplateStrategy; | ||
| import org.apache.kafka.connect.data.Struct; | ||
| import org.apache.kafka.connect.sink.ErrantRecordReporter; | ||
| import org.apache.kafka.connect.sink.SinkRecord; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
@@ -24,9 +25,24 @@ public class SinkRecordTransformer { | |
| private static final Logger LOGGER = LoggerFactory.getLogger(SinkRecordTransformer.class); | ||
|
|
||
| private final IdStrategy idStrategy; | ||
| private final ErrantRecordReporter errantRecordReporter; | ||
| private final ToleranceOnErrorLevel toleranceOnErrorLevel; | ||
|
|
||
| public SinkRecordTransformer( | ||
| CosmosSinkTaskConfig sinkTaskConfig, | ||
| ErrantRecordReporter errantRecordReporter, | ||
| ToleranceOnErrorLevel toleranceOnErrorLevel) { | ||
| this(createIdStrategy(sinkTaskConfig), errantRecordReporter, toleranceOnErrorLevel); | ||
| } | ||
|
|
||
| public SinkRecordTransformer(CosmosSinkTaskConfig sinkTaskConfig) { | ||
| this.idStrategy = this.createIdStrategy(sinkTaskConfig); | ||
| // Package-private constructor for unit testing without requiring CosmosSinkTaskConfig. | ||
| SinkRecordTransformer( | ||
| IdStrategy idStrategy, | ||
| ErrantRecordReporter errantRecordReporter, | ||
| ToleranceOnErrorLevel toleranceOnErrorLevel) { | ||
| this.idStrategy = idStrategy; | ||
| this.errantRecordReporter = errantRecordReporter; | ||
| this.toleranceOnErrorLevel = toleranceOnErrorLevel; | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
|
|
@@ -44,30 +60,68 @@ public List<SinkRecord> transform(String containerName, List<SinkRecord> sinkRec | |
| record.value() == null ? null : record.value().getClass().getName(), | ||
| record.value() == null ? null : record.valueSchema()); | ||
|
|
||
| Object recordValue; | ||
| if (record.value() instanceof Struct) { | ||
| recordValue = StructToJsonMap.toJsonMap((Struct) record.value()); | ||
| } else if (record.value() instanceof Map) { | ||
| recordValue = StructToJsonMap.handleMap((Map<String, Object>) record.value()); | ||
| } else { | ||
| recordValue = record.value(); | ||
| try { | ||
| Object recordValue; | ||
| if (record.value() instanceof Struct) { | ||
| recordValue = StructToJsonMap.toJsonMap((Struct) record.value()); | ||
| } else if (record.value() instanceof Map) { | ||
| recordValue = StructToJsonMap.handleMap((Map<String, Object>) record.value()); | ||
| } else { | ||
| recordValue = record.value(); | ||
| } | ||
|
|
||
| maybeInsertId(recordValue, record); | ||
|
|
||
| final SinkRecord updatedRecord = new SinkRecord(record.topic(), | ||
| record.kafkaPartition(), | ||
| record.keySchema(), | ||
| record.key(), | ||
| record.valueSchema(), | ||
| recordValue, | ||
| record.kafkaOffset(), | ||
| record.timestamp(), | ||
| record.timestampType(), | ||
| record.headers()); | ||
|
|
||
| toBeWrittenRecordList.add(updatedRecord); | ||
| } catch (RuntimeException e) { | ||
| // Report to DLQ if configured (fire-and-forget, guarded against reporter failures). | ||
| // This is consistent with the writer-level pattern in CosmosWriterBase.sendToDlqIfConfigured(). | ||
| if (this.errantRecordReporter != null) { | ||
| try { | ||
| this.errantRecordReporter.report(record, e); | ||
| } catch (Exception reportException) { | ||
| LOGGER.error( | ||
| "Failed to report errant record to DLQ for topic {}, partition {}, offset {}, container {}.", | ||
| record.topic(), | ||
| record.kafkaPartition(), | ||
| record.kafkaOffset(), | ||
| containerName, | ||
| reportException); | ||
|
Comment on lines
+96
to
+100
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in 4ac387b — changed the outer catch to catch(RuntimeException e) since all exceptions from ID strategies (ConnectException, etc.) are RuntimeException subclasses. |
||
| } | ||
| } | ||
|
|
||
| // Use tolerance level to decide continue-vs-throw (consistent with writer pattern). | ||
| if (this.toleranceOnErrorLevel == ToleranceOnErrorLevel.ALL) { | ||
| LOGGER.warn( | ||
| "Skipping record from topic {}, partition {}, offset {}, container {} due to transform error.", | ||
| record.topic(), | ||
| record.kafkaPartition(), | ||
| record.kafkaOffset(), | ||
| containerName, | ||
| e); | ||
| } else { | ||
| LOGGER.error( | ||
| "Failing task due to transform error for record from topic {}, partition {}, offset {}, " | ||
| + "container {}.", | ||
| record.topic(), | ||
| record.kafkaPartition(), | ||
| record.kafkaOffset(), | ||
| containerName, | ||
| e); | ||
| throw e; | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in 4ac387b — same fix as above, outer catch changed to catch(RuntimeException e). |
||
| } | ||
| } | ||
|
|
||
| maybeInsertId(recordValue, record); | ||
|
|
||
| // Create an updated record with from the current record and the updated record value | ||
| final SinkRecord updatedRecord = new SinkRecord(record.topic(), | ||
| record.kafkaPartition(), | ||
| record.keySchema(), | ||
| record.key(), | ||
| record.valueSchema(), | ||
| recordValue, | ||
| record.kafkaOffset(), | ||
| record.timestamp(), | ||
| record.timestampType(), | ||
| record.headers()); | ||
|
|
||
| toBeWrittenRecordList.add(updatedRecord); | ||
| } | ||
|
|
||
| return toBeWrittenRecordList; | ||
|
|
@@ -82,7 +136,7 @@ private void maybeInsertId(Object recordValue, SinkRecord sinkRecord) { | |
| recordMap.put("id", this.idStrategy.generateId(sinkRecord)); | ||
| } | ||
|
|
||
| private IdStrategy createIdStrategy(CosmosSinkTaskConfig sinkTaskConfig) { | ||
| private static IdStrategy createIdStrategy(CosmosSinkTaskConfig sinkTaskConfig) { | ||
| IdStrategy idStrategyClass; | ||
| switch (sinkTaskConfig.getIdStrategy()) { | ||
| case FULL_KEY_STRATEGY: | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need this check? from public API -> this.context.errantRecordReporter() returned ErrantRecordReporter. I think it should be safe to just pass in other SinkRecordTransformer, similar as the writer pattern
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in e5f697d — removed the try-catch guard around
errantRecordReporter(). Now passesthis.context.errantRecordReporter()directly toSinkRecordTransformer, consistent with the writer pattern. This also fixes the CI compilation error (the guard required referencingErrantRecordReporteras a local variable type without the corresponding import).Additionally fixed test compilation errors:
Future<?>→Future<Void>to matchErrantRecordReporter.report()return type.