Skip to content

fix: isolate per-record failures in Kafka sink transformer and guard DLQ reporter#49286

Open
xinlian12 wants to merge 10 commits into
Azure:mainfrom
xinlian12:fix/issue-49268-single-record-dlq
Open

fix: isolate per-record failures in Kafka sink transformer and guard DLQ reporter#49286
xinlian12 wants to merge 10 commits into
Azure:mainfrom
xinlian12:fix/issue-49268-single-record-dlq

Conversation

@xinlian12
Copy link
Copy Markdown
Member

@xinlian12 xinlian12 commented May 27, 2026

Problem

When a single SinkRecord in a batch fails during transformation (ID generation or value conversion), the entire batch fails and all records are routed to the DLQ — not just the malformed record.

Root cause: SinkRecordTransformer.transform() lacked per-record error isolation. An exception from idStrategy.generateId() or StructToJsonMap.toJsonMap() would abort the entire transform() call before records reached the writer-level DLQ handling.

Additionally, CosmosWriterBase.sendToDlqIfConfigured() did not guard against ErrantRecordReporter.report() failures, meaning a DLQ error could mask the original write failure.

Solution

  1. Per-record try-catch in transformer — each record is processed independently; failures are isolated to the bad record only.
  2. DLQ report is fire-and-forget — always report to ErrantRecordReporter if available, guarded against reporter failures per Kafka Connect best practices.
  3. Tolerance level controls flowALL skips and continues, NONE throws (regardless of whether DLQ reporter is present).
  4. Writer DLQ guardCosmosWriterBase.sendToDlqIfConfigured() now guards against reporter failures too.
Scenario ErrantRecordReporter ToleranceOnErrorLevel Behavior
DLQ configured Available ALL Report to DLQ, skip bad record, continue
DLQ configured Available NONE Report to DLQ, then throw (fail-fast)
DLQ not configured null ALL Log warning, skip bad record, continue
DLQ not configured null NONE Throw (fail-fast, preserves existing behavior)
DLQ reporter fails Available (throws) ALL Log DLQ error, skip bad record, continue
DLQ reporter fails Available (throws) NONE Log DLQ error, throw original exception

Changes

  • SinkRecordTransformer.java
    • Added ErrantRecordReporter and ToleranceOnErrorLevel fields
    • Wrapped per-record processing in try-catch with DLQ/tolerance handling
    • Guarded ErrantRecordReporter.report() against secondary failures
    • Added package-private constructor for testability
    • Made createIdStrategy() static for constructor chain
  • CosmosWriterBase.java
    • Added try-catch guard around errantRecordReporter.report() in sendToDlqIfConfigured() so DLQ failures do not mask original errors
  • CosmosSinkTask.java
    • Pass errantRecordReporter and toleranceOnErrorLevel to SinkRecordTransformer
    • Fix bookkeeping to count transformedRecords.size() (post-filter) instead of entry.getValue().size() (pre-filter)
  • SinkRecordTransformerTest.java — 10 unit tests:
    • T1: Mixed batch with reporter + tolerance ALL → bad→DLQ, good records survive
    • T2: Tolerance ALL without reporter → bad record skipped
    • T3: Tolerance NONE without reporter → exception thrown (fail-fast)
    • T4: All valid records → all in output, reporter never called (regression)
    • T5: All bad with reporter + tolerance ALL → all→DLQ, empty output
    • T6: Reporter throws + tolerance NONE → original exception rethrown
    • T7: Reporter throws + tolerance ALL → record skipped, continues
    • T8: Tolerance NONE with reporter → DLQ report AND exception thrown
    • T9: Struct conversion failure with reporter + tolerance ALL → DLQ + skip
    • T10: Struct conversion failure with tolerance NONE → exception thrown

Fixes #49268

xinlian12 and others added 6 commits May 26, 2026 14:29
…mer (Azure#49268)

When a single record's ID strategy fails (e.g., ProvidedInStrategy JsonPath
parse error), only that record should be routed to DLQ — not the entire batch.

Previously, SinkRecordTransformer.transform() had no per-record error handling,
so one malformed record would abort transformation of all records in the batch.

Changes:
- SinkRecordTransformer: Add per-record try-catch in transform(). Accept
  ErrantRecordReporter and ToleranceOnErrorLevel. Report failing records to DLQ
  when available, skip when tolerance is ALL, rethrow when tolerance is NONE.
- CosmosSinkTask: Pass reporter and tolerance to SinkRecordTransformer. Fix
  written-record bookkeeping to count only successfully transformed records.

Fixes Azure#49268

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Covers: DLQ reporting, tolerance ALL skip, tolerance NONE rethrow,
all-valid regression, all-bad with reporter, reporter precedence over tolerance.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Wrap errantRecordReporter.report() in its own try/catch to prevent DLQ
reporter failures from collapsing the entire batch. When the reporter
throws:
- ToleranceOnErrorLevel.ALL: log and continue (skip the bad record)
- ToleranceOnErrorLevel.NONE: rethrow the original transform exception

Add T6/T7 tests covering both scenarios. Renumber T6→T8.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Address review findings:
- Align DLQ/tolerance precedence with writer pattern: DLQ report is
  fire-and-forget side-effect, tolerance level controls continue-vs-throw.
  With tolerance=NONE + reporter, record is reported AND task fails.
- Guard context.errantRecordReporter() against older Kafka Connect runtimes
  that lack the API (catch NoClassDefFoundError/NoSuchMethodError).
- Add package-private constructor for testability (eliminates reflection).
- Consolidate double-logging: one log entry per failed record.
- Rewrite tests to use package-private constructor and align with new
  semantics. T8 now tests tolerance=NONE+reporter → DLQ+throw.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Change catch clause from Exception (checked) to RuntimeException
(unchecked) since transform() doesn't declare throws Exception.
ConnectException and all other exceptions from ID strategies are
RuntimeException subclasses. This fixes the CI build failure.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…est compilation

- Remove try-catch guard around errantRecordReporter() in CosmosSinkTask,
  passing it directly like the writer pattern (fixes compilation error
  from missing ErrantRecordReporter import)
- Fix Future<?> to Future<Void> in test mocks to match
  ErrantRecordReporter.report() return type
- Add missing @SuppressWarnings("unchecked") annotations on test methods

Addresses review comment from xinlian12 and fixes CI build failure.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Copilot AI review requested due to automatic review settings May 27, 2026 20:33
@xinlian12 xinlian12 requested review from a team and kirankumarkolli as code owners May 27, 2026 20:33
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the Cosmos DB Kafka Sink connector to isolate per-record ID-generation/transform failures so that a single malformed SinkRecord no longer aborts the entire batch and routes all records to the DLQ.

Changes:

  • Added per-record exception isolation in SinkRecordTransformer.transform() with DLQ reporting (when available) and tolerance-driven continue vs fail-fast behavior.
  • Wired ErrantRecordReporter and ToleranceOnErrorLevel into CosmosSinkTask and fixed written-record bookkeeping to count post-transform records.
  • Added unit tests covering mixed/invalid batches, reporter failures, and tolerance behaviors.
Show a summary per file
File Description
sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformer.java Adds per-record try/catch, optional DLQ reporting, and tolerance-based control flow during record transformation/ID injection.
sdk/cosmos/azure-cosmos-kafka-connect/src/main/java/com/azure/cosmos/kafka/connect/implementation/sink/CosmosSinkTask.java Passes reporter/tolerance into the transformer and fixes per-container written record counting to use transformed output size.
sdk/cosmos/azure-cosmos-kafka-connect/src/test/java/com/azure/cosmos/kafka/connect/implementation/sink/SinkRecordTransformerTest.java Introduces unit tests validating per-record isolation, DLQ reporting behavior, and tolerance handling.
.gitignore Ignores .coding-harness/ directory.

Copilot's findings

  • Files reviewed: 3/3 changed files
  • Comments generated: 2

Comment on lines +54 to +57
this.sinkRecordTransformer = new SinkRecordTransformer(
this.sinkTaskConfig,
this.context.errantRecordReporter(),
this.sinkTaskConfig.getWriteConfig().getToleranceOnErrorLevel());
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update the description

Comment on lines +88 to +92
// Report to DLQ if configured (fire-and-forget, guarded against reporter failures).
// This is consistent with the writer-level pattern in CosmosWriterBase.sendToDlqIfConfigured().
if (this.errantRecordReporter != null) {
try {
this.errantRecordReporter.report(record, e);
@xinlian12
Copy link
Copy Markdown
Member Author

@sdkReviewAgent

@xinlian12 xinlian12 changed the title fix: isolate per-record ID-generation failures in Kafka sink transformer [NO REVIEW YET]fix: isolate per-record ID-generation failures in Kafka sink transformer May 27, 2026
* for records whose value map contains a field "fail" set to true,
* and returns a valid ID otherwise.
*/
private IdStrategy createSelectivelyFailingIdStrategy() {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Suggestion | Test Coverage — Broader catch scope not exercised by tests

All 8 tests trigger DLQ/tolerance behavior through generateId() failures only (via createSelectivelyFailingIdStrategy()). However, the catch (RuntimeException e) in SinkRecordTransformer.transform() also covers failures from StructToJsonMap.toJsonMap() and handleMap() — for example, a malformed Struct whose schema triggers a NullPointerException during conversion.

Why it matters: The PR title says "isolate per-record ID-generation failures," but the implementation intentionally protects a broader scope (all of value conversion + ID generation). Without a test exercising the value conversion failure path, a future refactor could accidentally narrow the try block to only wrap maybeInsertId(), breaking the broader protection silently.

Suggestion: Add one test that uses a value type triggering a conversion failure (e.g., a mocked Struct that throws from toJsonMap(), or a Map with a nested value causing ClassCastException in handleMap()) to verify the same DLQ/tolerance behavior applies.

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

@xinlian12
Copy link
Copy Markdown
Member Author

Review complete (37:40)

Posted 1 inline comment(s).

Steps: ✓ context, correctness, cross-sdk, design, history, past-prs, synthesis, test-coverage

@xinlian12 xinlian12 changed the title [NO REVIEW YET]fix: isolate per-record ID-generation failures in Kafka sink transformer fix: isolate per-record failures in Kafka sink transformer and guard DLQ reporter May 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG][Kafka Connector]Single record which can not parse id successfully would fail the whole batch and cause all batches be routed to DLQ

2 participants