Skip to content

fix(connectors): propagate sink consume FFI failures with batch context#3180

Open
Standing-Man wants to merge 6 commits intoapache:masterfrom
Standing-Man:handle-consume-error
Open

fix(connectors): propagate sink consume FFI failures with batch context#3180
Standing-Man wants to merge 6 commits intoapache:masterfrom
Standing-Man:handle-consume-error

Conversation

@Standing-Man
Copy link
Copy Markdown
Contributor

@Standing-Man Standing-Man commented Apr 26, 2026

Which issue does this PR close?

Closes #2927

Rationale

The sink runtime called the FFI consume callback but ignored its returned status code. As a result, sink implementations could return Err, the SDK would convert that failure into a non-zero FFI status, but the runtime would still treat the batch as successfully processed.

This made sink failures hard to observe at the runtime level and could incorrectly increment processed-message metrics. The previous workaround required sinks, such as the HTTP sink, to log errors internally before returning Err, but the runtime still had no structured failure signal for the failed batch.

What changed?

  • Updated process_messages to capture the FFI consume return value and return an error when the status is non-zero.
  • Added RuntimeError::SinkConsumeFailed with batch context:
  • Added a regression test proving process_messages returns SinkConsumeFailed when consume fails.
  • Updated HTTP sink comments, logs, README, and integration-test documentation to reflect that the runtime now observes non-zero consume status codes instead of ignoring them.

Local Execution

  • Passed / not passed
    Passed
  • Pre-commit hooks ran / not ran
    Ran

AI Usage

  1. Which tools? (e.g., GitHub Copilot, Claude, ChatGPT) Codex
  2. Scope of usage? (e.g., autocomplete, generated functions, entire implementation)
    Codex was used to draft the implementation/test/documentation updates. The final changes were reviewed and adjusted by me.
  3. How did you verify the generated code works correctly?
    Based on my understanding of the issue, the implementation has been verified, and all test cases generated by Codex are passing.
  4. Can you explain every line of the code if asked? Yes

Signed-off-by: StandingMan <jmtangcs@gmail.com>
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 26, 2026

Codecov Report

❌ Patch coverage is 94.87179% with 4 lines in your changes missing coverage. Please review.
✅ Project coverage is 52.48%. Comparing base (9a9254a) to head (849e50d).
⚠️ Report is 15 commits behind head on master.

Files with missing lines Patch % Lines
core/connectors/runtime/src/sink.rs 96.10% 3 Missing ⚠️
core/connectors/runtime/src/error.rs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##             master    #3180       +/-   ##
=============================================
- Coverage     74.09%   52.48%   -21.61%     
  Complexity      943      943               
=============================================
  Files          1159     1175       +16     
  Lines        102033    92891     -9142     
  Branches      79083    70067     -9016     
=============================================
- Hits          75597    48753    -26844     
- Misses        23768    41583    +17815     
+ Partials       2668     2555      -113     
Components Coverage Δ
Rust Core 46.29% <94.87%> (-29.03%) ⬇️
Java SDK 62.30% <ø> (+2.15%) ⬆️
C# SDK 69.42% <ø> (+0.04%) ⬆️
Python SDK 81.43% <ø> (ø)
Node SDK 91.53% <ø> (ø)
Go SDK 39.41% <ø> (-0.02%) ⬇️
Files with missing lines Coverage Δ
core/connectors/sinks/http_sink/src/lib.rs 85.70% <ø> (-0.58%) ⬇️
core/connectors/runtime/src/error.rs 0.00% <0.00%> (ø)
core/connectors/runtime/src/sink.rs 78.44% <96.10%> (+3.92%) ⬆️

... and 320 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@Standing-Man
Copy link
Copy Markdown
Contributor Author

Hi @hubcio, @spetz, @numinnex and @mmodzelewski, just a gentle ping — this PR is ready on my side. Could you please take a look when convenient?

@hubcio
Copy link
Copy Markdown
Contributor

hubcio commented Apr 27, 2026

We will check in upcoming days.

@hubcio
Copy link
Copy Markdown
Contributor

hubcio commented Apr 29, 2026

@mlevkov @atharvalade @kriti-sc please review this once you have some free time - you're doing most contributions in connectors.

@kriti-sc
Copy link
Copy Markdown
Contributor

kriti-sc commented May 2, 2026

Out of scope for this PR, but Iggy has an error count metric iggy_connector_errors_total that is not being incremented for sink failures. Might be worth addressing in this PR as it is already touching the error path.

Comment thread core/connectors/runtime/src/sink.rs Outdated
partition_id: messages_metadata.partition_id,
current_offset: messages_metadata.current_offset,
schema: messages_metadata.schema.to_string(),
messages_count: processed_count,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should aim for consistency here as messages_count & processed_count mean different things in Iggy. The field name should be renamed to processed_count.

@kriti-sc
Copy link
Copy Markdown
Contributor

kriti-sc commented May 2, 2026

Correct me if I am wrong, but does the sink exit after it throws an error? Exiting the sink if one message batch fails is too aggressive a strategy. This paradigm of data shipping from a streaming system to a target system sees a lot of transient failures.

@Standing-Man Standing-Man force-pushed the handle-consume-error branch from 6cfb36f to fd7976e Compare May 6, 2026 12:14
Signed-off-by: StandingMan <jmtangcs@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

bug(connectors): consume() return value discarded in sink runtime

3 participants