fix: isolate per-record failures in Kafka sink transformer and guard DLQ reporter#49286
fix: isolate per-record failures in Kafka sink transformer and guard DLQ reporter#49286xinlian12 wants to merge 10 commits into
Conversation
…mer (Azure#49268) When a single record's ID strategy fails (e.g., ProvidedInStrategy JsonPath parse error), only that record should be routed to DLQ — not the entire batch. Previously, SinkRecordTransformer.transform() had no per-record error handling, so one malformed record would abort transformation of all records in the batch. Changes: - SinkRecordTransformer: Add per-record try-catch in transform(). Accept ErrantRecordReporter and ToleranceOnErrorLevel. Report failing records to DLQ when available, skip when tolerance is ALL, rethrow when tolerance is NONE. - CosmosSinkTask: Pass reporter and tolerance to SinkRecordTransformer. Fix written-record bookkeeping to count only successfully transformed records. Fixes Azure#49268 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Covers: DLQ reporting, tolerance ALL skip, tolerance NONE rethrow, all-valid regression, all-bad with reporter, reporter precedence over tolerance. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Wrap errantRecordReporter.report() in its own try/catch to prevent DLQ reporter failures from collapsing the entire batch. When the reporter throws: - ToleranceOnErrorLevel.ALL: log and continue (skip the bad record) - ToleranceOnErrorLevel.NONE: rethrow the original transform exception Add T6/T7 tests covering both scenarios. Renumber T6→T8. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Address review findings: - Align DLQ/tolerance precedence with writer pattern: DLQ report is fire-and-forget side-effect, tolerance level controls continue-vs-throw. With tolerance=NONE + reporter, record is reported AND task fails. - Guard context.errantRecordReporter() against older Kafka Connect runtimes that lack the API (catch NoClassDefFoundError/NoSuchMethodError). - Add package-private constructor for testability (eliminates reflection). - Consolidate double-logging: one log entry per failed record. - Rewrite tests to use package-private constructor and align with new semantics. T8 now tests tolerance=NONE+reporter → DLQ+throw. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Change catch clause from Exception (checked) to RuntimeException (unchecked) since transform() doesn't declare throws Exception. ConnectException and all other exceptions from ID strategies are RuntimeException subclasses. This fixes the CI build failure. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…est compilation - Remove try-catch guard around errantRecordReporter() in CosmosSinkTask, passing it directly like the writer pattern (fixes compilation error from missing ErrantRecordReporter import) - Fix Future<?> to Future<Void> in test mocks to match ErrantRecordReporter.report() return type - Add missing @SuppressWarnings("unchecked") annotations on test methods Addresses review comment from xinlian12 and fixes CI build failure. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR updates the Cosmos DB Kafka Sink connector to isolate per-record ID-generation/transform failures so that a single malformed SinkRecord no longer aborts the entire batch and routes all records to the DLQ.
Changes:
- Added per-record exception isolation in
SinkRecordTransformer.transform()with DLQ reporting (when available) and tolerance-driven continue vs fail-fast behavior. - Wired
ErrantRecordReporterandToleranceOnErrorLevelintoCosmosSinkTaskand fixed written-record bookkeeping to count post-transform records. - Added unit tests covering mixed/invalid batches, reporter failures, and tolerance behaviors.
Show a summary per file
| File | Description |
|---|---|
| sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java | Adds per-record try/catch, optional DLQ reporting, and tolerance-based control flow during record transformation/ID injection. |
| sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java | Passes reporter/tolerance into the transformer and fixes per-container written record counting to use transformed output size. |
| sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java | Introduces unit tests validating per-record isolation, DLQ reporting behavior, and tolerance handling. |
| .gitignore | Ignores .coding-harness/ directory. |
Copilot's findings
- Files reviewed: 3/3 changed files
- Comments generated: 2
| this.sinkRecordTransformer = new SinkRecordTransformer( | ||
| this.sinkTaskConfig, | ||
| this.context.errantRecordReporter(), | ||
| this.sinkTaskConfig.getWriteConfig().getToleranceOnErrorLevel()); |
There was a problem hiding this comment.
will update the description
| // Report to DLQ if configured (fire-and-forget, guarded against reporter failures). | ||
| // This is consistent with the writer-level pattern in CosmosWriterBase.sendToDlqIfConfigured(). | ||
| if (this.errantRecordReporter != null) { | ||
| try { | ||
| this.errantRecordReporter.report(record, e); |
|
@sdkReviewAgent |
| * for records whose value map contains a field "fail" set to true, | ||
| * and returns a valid ID otherwise. | ||
| */ | ||
| private IdStrategy createSelectivelyFailingIdStrategy() { |
There was a problem hiding this comment.
🟢 Suggestion | Test Coverage — Broader catch scope not exercised by tests
All 8 tests trigger DLQ/tolerance behavior through generateId() failures only (via createSelectivelyFailingIdStrategy()). However, the catch (RuntimeException e) in SinkRecordTransformer.transform() also covers failures from StructToJsonMap.toJsonMap() and handleMap() — for example, a malformed Struct whose schema triggers a NullPointerException during conversion.
Why it matters: The PR title says "isolate per-record ID-generation failures," but the implementation intentionally protects a broader scope (all of value conversion + ID generation). Without a test exercising the value conversion failure path, a future refactor could accidentally narrow the try block to only wrap maybeInsertId(), breaking the broader protection silently.
Suggestion: Add one test that uses a value type triggering a conversion failure (e.g., a mocked Struct that throws from toJsonMap(), or a Map with a nested value causing ClassCastException in handleMap()) to verify the same DLQ/tolerance behavior applies.
|
✅ Review complete (37:40) Posted 1 inline comment(s). Steps: ✓ context, correctness, cross-sdk, design, history, past-prs, synthesis, test-coverage |
Problem
When a single
SinkRecordin a batch fails during transformation (ID generation or value conversion), the entire batch fails and all records are routed to the DLQ — not just the malformed record.Root cause:
SinkRecordTransformer.transform()lacked per-record error isolation. An exception fromidStrategy.generateId()orStructToJsonMap.toJsonMap()would abort the entiretransform()call before records reached the writer-level DLQ handling.Additionally,
CosmosWriterBase.sendToDlqIfConfigured()did not guard againstErrantRecordReporter.report()failures, meaning a DLQ error could mask the original write failure.Solution
ErrantRecordReporterif available, guarded against reporter failures per Kafka Connect best practices.ALLskips and continues,NONEthrows (regardless of whether DLQ reporter is present).CosmosWriterBase.sendToDlqIfConfigured()now guards against reporter failures too.ErrantRecordReporterToleranceOnErrorLevelALLNONEnullALLnullNONEALLNONEChanges
SinkRecordTransformer.javaErrantRecordReporterandToleranceOnErrorLevelfieldsErrantRecordReporter.report()against secondary failurescreateIdStrategy()static for constructor chainCosmosWriterBase.javaerrantRecordReporter.report()insendToDlqIfConfigured()so DLQ failures do not mask original errorsCosmosSinkTask.javaerrantRecordReporterandtoleranceOnErrorLeveltoSinkRecordTransformertransformedRecords.size()(post-filter) instead ofentry.getValue().size()(pre-filter)SinkRecordTransformerTest.java— 10 unit tests:Fixes #49268