feat(cpp): add messaging FFI functions for C++ SDK#3046
feat(cpp): add messaging FFI functions for C++ SDK#3046seokjin0414 wants to merge 25 commits intoapache:masterfrom
Conversation
|
@seokjin0414 Thanks for the PR! Here is my first round of review: Let's unify the Message struct, instead of having separate structs for sending and polling. The unified struct could look like: struct Message {
checksum: u64,
id_lo: u64,
id_hi: u64,
offset: u64,
timestamp: u64,
origin_timestamp: u64,
user_headers_length: u32,
payload_length: u32,
reserved: u64,
payload: Vec<u8>,
user_headers: Vec<u8>,
}Define a function using You are using let mut iggy_messages: Vec<IggyMessage> = messages
.into_iter()
.map(|m| {
let id = ((m.id_hi as u128) << 64) | (m.id_lo as u128);
let payload = Bytes::from(m.payload);
let msg = if id > 0 {
IggyMessage::builder().id(id).payload(payload).build()
} else {
IggyMessage::builder().payload(payload).build()
};
msg.map_err(|error| format!("Could not build message: {error}"))
})
.collect::<Result<Vec<_>, _>>()?;In the above code, please define Let's rename |
|
@slbotbm Thanks for the review! Addressed all feedback in c8f9adf:
|
|
@seokjin0414 Thanks for the quick turn-around. I may not have been clear in my review previously, but I wanted you to implement something more like impl ffi::Message {
pub fn new_message(&mut self, payload: Vec<u8>) -> Result<(), String> {
if payload.is_empty() {
return Err("Could not create message: payload must not be empty".to_string());
}
let payload_length = payload.len() as u32;
*self = Self {
checksum: 0,
id_lo: 0,
id_hi: 0,
offset: 0,
timestamp: 0,
origin_timestamp: 0,
user_headers_length: 0,
payload_length,
reserved: 0,
payload,
user_headers: Vec::new(),
};
Ok(())
}
}The above interface would allow us to call
The amount of testing is insufficient for the three functions.
|
|
I wrote the above in multiple sittings, so please tell me if I wrote overlapping things. |
|
One comment from my side: remember that Rust SDK (which you are calling) is also perform these validations and returns respective errors. Make sure that you are not double checking this on rust wrapper side or cpp side. |
e6ba6e9 to
0a49445
Compare
|
|
@seokjin0414 Thanks for the update. Could you please tell me which tests you have not added from the above list? Just copy pasting the questions I had written will do. |
|
@hubcio about your comment: would it make sense to add some of these test to the rust sdk instead? |
|
@slbotbm send_messages:
poll_messages:
I can add the overlooked ones now. |
|
@seokjin0414 please go ahead and add the ones you overlooked. I'll review then |
c8d885a to
8742221
Compare
|
@slbotbm
Still not added:
|
|
@seokjin0414 Thanks for the update! I'll review this by friday/saturday. In the meantime, could you please:
|
0d16d27 to
8b74d12
Compare
acb69ac to
472616e
Compare
|
There was a problem hiding this comment.
@seokjin0414 I requested some small changes.
Other than this, I noticed
- you were not testing cases for
send_messagesandpoll_messageswhen invalid topic ID was passed to them. - No test for invalid consumer identifier for
poll_messages
Also, since the consumer group PR was merged, could you add a test for creating a consumer group and polling messages after joining that?
2b22956 to
41a3f03
Compare
seokjin0414
left a comment
There was a problem hiding this comment.
Thank you for the review!
- Added field assertions to
GetStreamsReturnsStreamAfterCreation(created_at,size_bytes,messages_count,topics_count) and field comparison toGetStreamsConsistentWithGetStream(created_at,size_bytes). Note:idis excluded from both due to a known cxxVec<SharedStruct>issue whereidreturns 0. - Added
SendMessagesWithInvalidTopicIdThrows,PollMessagesWithInvalidTopicIdThrows,PollMessagesWithInvalidConsumerIdThrows - Added
join_consumer_groupandleave_consumer_groupFFI functions - Added
ConsumerGroupCreateJoinAndPollMessagestest covering the full flow: create → join → verify member count → send → poll with consumer_group kind → verify payloads → leave → verify member count drops to 0
There was a problem hiding this comment.
@seokjin0414 Thanks for the update!
Could you add the following comment in tests/client/low_level_e2e.cpp please:
TODO(slbotbm): Add tests for join_consumer_group() and leave_consumer_group()
Also, you commented:
Note: id is excluded from both due to a known cxx Vec issue where id returns 0.
Do you have an issue you could link to for me to see?
Otherwise lgtm!
|
@slbotbm |
|
Part of #2100 |
hubcio
left a comment
There was a problem hiding this comment.
besides below review I noticed:
foreign/cpp/src/client.rs:33-54new_connectionreturns*mut ClientviaBox::into_raw. ownership contract is undocumented - who owns it, who callsdelete_connection, what happens on double-free. add a doc-comment.foreign/cpp/src/client.rs:586theTODOaboutlogout_userfailing leaving a leak is misleading.Box::from_raw/dropruns unconditionally before the error is classified, so there's no leak path. either remove the TODO or rephrase what it actually warns about.foreign/cpp/src/lib.rs:29enable_all()works only because the SDK pullstime/net/io-utiltransitively. fragile - if SDK feature flags change, this breaks. enable explicit features in this crate'sCargo.toml.foreign/cpp/src/identifier.rsnot in this PR's diff but recommend audit for the same magic-number pattern (length: u80-sentinel for invalid, etc).- roundtrip tests do not assert
polled.partition_idmatches the partition the messages were sent to. that's a free regression detector for theu32::MAX-sentinel bug below - add it.
| crate-type = ["staticlib"] | ||
|
|
||
| [dependencies] | ||
| bytes = "1.11.1" |
There was a problem hiding this comment.
bytes = "1.11.1" hard-pins this crate while the rest of the workspace uses bytes = { workspace = true } (see core/sdk/Cargo.toml:36, core/common/Cargo.toml:40). swap to workspace form so dep versions stay unified.
There was a problem hiding this comment.
The cpp sdk is excluded from the workspace, as is the python sdk. That is why this is hard-pinned.
There was a problem hiding this comment.
as @slbotbm noted — cpp crate is workspace-excluded (same as python sdk), so the pin stays.
| let partitioning = match partitioning_kind.as_str() { | ||
| "balanced" => Partitioning::balanced(), | ||
| "partition_id" => { | ||
| if partitioning_value.len() < 4 { |
There was a problem hiding this comment.
partitioning_value.len() < 4 accepts >4 bytes silently and slices [..4]. caller passing 8 bytes (mistaking u64) gets the low 4 bytes with no error. tighten to != 4 and reject.
There was a problem hiding this comment.
tightened to != 4 — 8-byte payloads now error explicitly.
| })?); | ||
| Partitioning::partition_id(id) | ||
| } | ||
| "messages_key" => Partitioning::messages_key(&partitioning_value).map_err(|error| { |
There was a problem hiding this comment.
Partitioning::messages_key(&partitioning_value) accepts an empty Vec<u8> at the FFI boundary; the SDK rejects it later. validate at the FFI boundary or at minimum document the constraint - right now the call shape is ambiguous.
There was a problem hiding this comment.
rejected at the ffi boundary now.
| let rust_consumer_id = RustIdentifier::try_from(consumer_id) | ||
| .map_err(|error| format!("Could not poll messages: {error}"))?; | ||
|
|
||
| let consumer = Consumer { |
There was a problem hiding this comment.
use Consumer::new(rust_consumer_id) / Consumer::group(rust_consumer_id) instead of the struct literal. matches the SDK idiom (see core/sdk/src/clients/client.rs:138,157) and core/common/src/types/consumer/consumer_kind.rs:78,86 provides both constructors.
There was a problem hiding this comment.
switched to Consumer::new() / Consumer::group().
| } | ||
| }; | ||
|
|
||
| let opt_partition = if partition_id == u32::MAX { |
There was a problem hiding this comment.
partition_id == u32::MAX is magic number
There was a problem hiding this comment.
exposed as ANY_PARTITION_ID with a doc-comment.
| ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); | ||
| } | ||
|
|
||
| TEST(LowLevelE2E_Message, SendMessagesBeforeLoginThrows) { |
There was a problem hiding this comment.
this test (and PollMessagesBeforeLoginThrows at line 479) skip cleanup because they don't create a stream. fine for these specifically. the broader gap: the suite has no fixture for cleanup on early ASSERT failure. when an ASSERT_* trips mid-test, the delete_stream at the end never runs and orphan cpp-msg-* streams accumulate across runs, flaking subsequent suites. add a per-suite TearDown that deletes leftover cpp-msg-* streams. there's already a TODO(slbotbm) for a fixture in test_helpers.hpp:18 - this is what it's for.
There was a problem hiding this comment.
leaving as-is — @slbotbm said he'd handle it in a follow-up.
| "server_default"); | ||
|
|
||
| rust::Vec<std::uint8_t> oversized_payload; | ||
| for (std::uint32_t i = 0; i < 64000001u; i++) { |
There was a problem hiding this comment.
64MB+1-byte payload via 64M push_back calls is slow - rust::Vec::push_back does no reserve. either pre-reserve with oversized_payload.reserve(64000001) or send exactly MAX_PAYLOAD_SIZE+1 (whatever constant the SDK exposes) instead of a hardcoded 64M.
There was a problem hiding this comment.
tried reserve_total but cxx 1.0.194 has it private (caught in CI, reverted in 5d1f1e6). loop now relies on amortised growth — left a comment about it.
| client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id", | ||
| partition_id_bytes(0), std::move(batch1)); | ||
|
|
||
| std::this_thread::sleep_for(std::chrono::milliseconds(100)); |
There was a problem hiding this comment.
sleep_for(100ms) to ensure the two batches land in distinct timestamps is flaky. IggyTimestamp::now() is microseconds so 100ms should be safe, but on busy CI the boundary can collapse. assert batch2_timestamp - batch1_timestamp >= some_threshold explicitly so the test fails loudly if the gap closes, instead of silently degrading into a tautology.
There was a problem hiding this comment.
added an explicit >= 50ms gap assert so it fails loudly instead of silently degrading.
| auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer", | ||
| make_numeric_identifier(2), "timestamp", batch2_timestamp, 100, false); | ||
|
|
||
| ASSERT_GE(polled.count, 5u); |
There was a problem hiding this comment.
ASSERT_GE(polled.count, 5u) then loops polled.messages.size() indexing expected = "batch2-" + i. if the server returns >5 (e.g. boundary inclusion of a batch1 message with equal-or-later timestamp), index 5+ payloads will not be "batch2-N" and the comparison fails with a confusing payload-mismatch error rather than a count error. tighten to ASSERT_EQ(polled.count, 5u) if the contract guarantees it, or assert each polled payload has prefix "batch2-" regardless of count.
There was a problem hiding this comment.
switched to prefix matching (batch1- or batch2-) so boundary inclusion doesn't cause confusing payload mismatches.
| EXPECT_EQ(msg.payload[2], 0x00); | ||
| } | ||
|
|
||
| TEST(MessageTest, NewMessageOverwritesPreviousState) { |
There was a problem hiding this comment.
NewMessageOverwritesPreviousState literally tests "second new_message call zeroes id_lo". this is testing the smell. if new_message is reshaped into a constructor (see lib.rs:191 comment), this test goes away. fine to keep until the API is reshaped, but flagging - it's a test that's locking in awkward behavior.
There was a problem hiding this comment.
gone with the api reshape — no more "second call zeros fields" since make_message returns by value.
Signed-off-by: shin <sars21@hanmail.net>
…d struct Signed-off-by: shin <sars21@hanmail.net>
…PolledMessages Signed-off-by: shin <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
…sages Signed-off-by: shin <sars21@hanmail.net>
- fix error message format consistency in send_messages partitioning - extract to_payload and partition_id_bytes helpers to test_helpers.hpp - fix GetStreamsReturnsEmpty test to clean up before asserting Signed-off-by: shin <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
…rove naming - unify IggyMessageToSend/IggyMessagePolled into single Message struct - add new_message(payload) factory function exported to C++ - add TryFrom<ffi::Message> for IggyMessage conversion - replace map(Into::into) with explicit map(ffi::X::from) - rename strategy_kind/value to polling_strategy_kind/value Signed-off-by: shin <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
…method Signed-off-by: shin <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
…, and poll_messages Signed-off-by: shin <sars21@hanmail.net>
…t, timestamp, and offset verification Signed-off-by: shin <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
…ertions, remove redundant test - Rename 10 error-throwing tests with `Throws` suffix for naming consistency - Add payload/offset verification to 7 tests that only checked counts - Delete PollMessagesLargeCount test (not explicitly testing limits) Signed-off-by: shin <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
- Add join_consumer_group and leave_consumer_group FFI functions - Add field assertions to GetStreamsReturnsStreamAfterCreation test - Add field comparison to GetStreamsConsistentWithGetStream test - Add SendMessagesWithInvalidTopicIdThrows test - Add PollMessagesWithInvalidTopicIdThrows test - Add PollMessagesWithInvalidConsumerIdThrows test - Add ConsumerGroupCreateJoinAndPollMessages e2e test Signed-off-by: shin <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
2721681 to
56a9bd6
Compare
Rust 1.95 promoted wrong_self_convention to a clippy lint that fires on `from_*` methods taking `&mut self`. The Identifier::from_string and from_numeric methods are part of the C++ FFI surface used by every test and downstream binding, so renaming would be an ABI-breaking change. Restore the allow attribute with a comment explaining why it stays. Signed-off-by: shin <sars21@hanmail.net>
Split the unified Message struct into IggyMessageToSend (input-only) and IggyMessagePolled (output-only) per slbotbm's revised direction. The send struct now exposes only fields the caller can set; server-managed fields (checksum, offset, timestamp, etc.) live on the polled struct. Replace the awkward new_message(&mut self) mutation pattern with a free-function make_message(payload) that returns the input struct by value. Harden the FFI surface: - partition_id partitioning now rejects payloads of any size other than 4 bytes instead of silently slicing - messages_key partitioning now rejects empty payloads at the FFI boundary instead of deferring to the SDK - poll_messages uses Consumer::new()/Consumer::group() factories instead of struct-literal construction - the u32::MAX sentinel for 'no partition specified' is now exposed as the ANY_PARTITION_ID constant with a doc-comment Rewrite TryFrom<IggyMessageToSend> for IggyMessage to drop the dead 'if id > 0' branch (IggyMessageBuilder treats Some(0) and None identically) and to switch the u128 split to to_le_bytes() instead of truncating casts. Reject non-empty user_headers explicitly — the C++ SDK does not yet support them, and silently dropping caller data is worse than surfacing an error. Document the raw-pointer ownership contract on new_connection and drop the misleading leak TODO around delete_connection — Box::from_raw runs unconditionally so there is no leak path. Pin the tokio features the Runtime::enable_all() call relies on so this crate is insulated from upstream SDK feature changes. Update the 26 e2e tests to the new API. Reinforce a few that hubcio flagged: oversized-payload now pre-reserves capacity, the timestamp strategy test asserts an explicit minimum gap so it cannot silently collapse into a tautology, and round-trip tests now assert that polled.partition_id matches the partition the messages were sent to — a free regression detector for the u32::MAX sentinel mapping. Signed-off-by: shin <sars21@hanmail.net>
cxx 1.0.194's rust::Vec<T> has reserve_total declared private at the generated-header level, so the call introduced when addressing T2 broke the C++ e2e build. Restore the simple push_back loop and call out the limitation in a comment. Signed-off-by: shin <sars21@hanmail.net>
The previous Message-split commit (0f69a69) updated tests/message/ but missed the GetStreamsFieldsVerification test in tests/stream/, which also constructs messages to populate stream stats. Update the declaration to use IggyMessageToSend and make_message() so the e2e build no longer fails with 'Message is not a member of iggy::ffi'. Signed-off-by: shin <sars21@hanmail.net>
|
Summary
Closes #2965 (Phase 1 of 2).
get_streams,send_messages,poll_messagesFFI functions to C++ SDKStream,IggyMessageToSend,IggyMessagePolled,PolledMessagesDiscussed with @slbotbm on Discord — PR split and message struct design confirmed.
Test plan
cargo check/cargo clippy/cargo fmtcleantests/message/low_level_e2e.cpp(Bazel + iggy-server)