fix(connectors): propagate sink consume FFI failures with batch context#3180
fix(connectors): propagate sink consume FFI failures with batch context#3180Standing-Man wants to merge 6 commits intoapache:masterfrom
Conversation
Signed-off-by: StandingMan <jmtangcs@gmail.com>
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3180 +/- ##
=============================================
- Coverage 74.09% 52.48% -21.61%
Complexity 943 943
=============================================
Files 1159 1175 +16
Lines 102033 92891 -9142
Branches 79083 70067 -9016
=============================================
- Hits 75597 48753 -26844
- Misses 23768 41583 +17815
+ Partials 2668 2555 -113
🚀 New features to boost your workflow:
|
Signed-off-by: StandingMan <jmtangcs@gmail.com>
|
Hi @hubcio, @spetz, @numinnex and @mmodzelewski, just a gentle ping — this PR is ready on my side. Could you please take a look when convenient? |
|
We will check in upcoming days. |
|
@mlevkov @atharvalade @kriti-sc please review this once you have some free time - you're doing most contributions in connectors. |
|
Out of scope for this PR, but Iggy has an error count metric |
| partition_id: messages_metadata.partition_id, | ||
| current_offset: messages_metadata.current_offset, | ||
| schema: messages_metadata.schema.to_string(), | ||
| messages_count: processed_count, |
There was a problem hiding this comment.
We should aim for consistency here as messages_count & processed_count mean different things in Iggy. The field name should be renamed to processed_count.
|
Correct me if I am wrong, but does the sink exit after it throws an error? Exiting the sink if one message batch fails is too aggressive a strategy. This paradigm of data shipping from a streaming system to a target system sees a lot of transient failures. |
6cfb36f to
fd7976e
Compare
Signed-off-by: StandingMan <jmtangcs@gmail.com>
Which issue does this PR close?
Closes #2927
Rationale
The sink runtime called the FFI consume callback but ignored its returned status code. As a result, sink implementations could return Err, the SDK would convert that failure into a non-zero FFI status, but the runtime would still treat the batch as successfully processed.
This made sink failures hard to observe at the runtime level and could incorrectly increment processed-message metrics. The previous workaround required sinks, such as the HTTP sink, to log errors internally before returning Err, but the runtime still had no structured failure signal for the failed batch.
What changed?
process_messagesto capture the FFI consume return value and return an error when the status is non-zero.RuntimeError::SinkConsumeFailedwith batch context:Local Execution
Passed
Ran
AI Usage
Codex was used to draft the implementation/test/documentation updates. The final changes were reviewed and adjusted by me.
Based on my understanding of the issue, the implementation has been verified, and all test cases generated by Codex are passing.