Skip to content

feat(cpp): add messaging FFI functions for C++ SDK#3046

Open
seokjin0414 wants to merge 25 commits intoapache:masterfrom
seokjin0414:2965-cpp-sdk-ffi-messaging
Open

feat(cpp): add messaging FFI functions for C++ SDK#3046
seokjin0414 wants to merge 25 commits intoapache:masterfrom
seokjin0414:2965-cpp-sdk-ffi-messaging

Conversation

@seokjin0414
Copy link
Copy Markdown
Contributor

@seokjin0414 seokjin0414 commented Mar 29, 2026

Summary

Closes #2965 (Phase 1 of 2).

  • Add get_streams, send_messages, poll_messages FFI functions to C++ SDK
  • Add shared structs: Stream, IggyMessageToSend, IggyMessagePolled, PolledMessages
  • Add 6 e2e tests for message round-trip
  • All additions are purely additive — no existing FFI changes

Discussed with @slbotbm on Discord — PR split and message struct design confirmed.

Test plan

  • cargo check / cargo clippy / cargo fmt clean
  • 6 e2e tests in tests/message/low_level_e2e.cpp (Bazel + iggy-server)

@slbotbm
Copy link
Copy Markdown
Contributor

slbotbm commented Mar 29, 2026

@seokjin0414 Thanks for the PR!

Here is my first round of review:
The CI is failing. Please check which test is failing. Also, the cpp code is likely not linted properly, so please do that as well.


Let's unify the Message struct, instead of having separate structs for sending and polling. The unified struct could look like:

struct Message {
      checksum: u64,
      id_lo: u64,
      id_hi: u64,
      offset: u64,
      timestamp: u64,
      origin_timestamp: u64,
      user_headers_length: u32,
      payload_length: u32,
      reserved: u64,
      payload: Vec<u8>,
      user_headers: Vec<u8>,
  }

Define a function using impl Message that would allow the user to create a message to send. This has to be exported to the cpp side. If the bdd tests only require the payload to be set, you can just define a function called from(payload: Vec<u8>).


You are using map(Into::into) in many places. Let's use from to be more explicit about what is being converted to what.


let mut iggy_messages: Vec<IggyMessage> = messages
            .into_iter()
            .map(|m| {
                let id = ((m.id_hi as u128) << 64) | (m.id_lo as u128);
                let payload = Bytes::from(m.payload);
                let msg = if id > 0 {
                    IggyMessage::builder().id(id).payload(payload).build()
                } else {
                    IggyMessage::builder().payload(payload).build()
                };
                msg.map_err(|error| format!("Could not build message: {error}"))
            })
            .collect::<Result<Vec<_>, _>>()?;

In the above code, please define TryFrom<ffi::Message> for IggyMessage for the conversion and use that.


Let's rename strategy_kind and strategy_value to partitioning_strategy_kind and partitioning_strategy_value respectively so that there is no ambiguity about its meaning.

@seokjin0414
Copy link
Copy Markdown
Contributor Author

@slbotbm Thanks for the review! Addressed all feedback in c8f9adf:

  1. CI fix: clang-format applied, fragile s.id > 0 assertion removed (75c2a28)
  2. Unified Message struct: Merged IggyMessageToSend/IggyMessagePolled into single Message with all header fields. Added new_message(payload) factory function exported to C++
  3. TryFrom: Defined TryFrom<ffi::Message> for IggyMessage and use it in send_messages
  4. Explicit from: Replaced map(Into::into) with map(ffi::Stream::from) / map(ffi::Message::from)
  5. Naming: Renamed to polling_strategy_kind/polling_strategy_value — these refer to the polling strategy (offset/timestamp/first/last/next), not partitioning. Partitioning params (partitioning_kind/partitioning_value) are on send_messages separately

@slbotbm
Copy link
Copy Markdown
Contributor

slbotbm commented Mar 29, 2026

@seokjin0414 Thanks for the quick turn-around.

I may not have been clear in my review previously, but I wanted you to implement something more like

impl ffi::Message {
      pub fn new_message(&mut self, payload: Vec<u8>) -> Result<(), String> {
          if payload.is_empty() {
              return Err("Could not create message: payload must not be empty".to_string());
          }

          let payload_length = payload.len() as u32;

          *self = Self {
              checksum: 0,
              id_lo: 0,
              id_hi: 0,
              offset: 0,
              timestamp: 0,
              origin_timestamp: 0,
              user_headers_length: 0,
              payload_length,
              reserved: 0,
              payload,
              user_headers: Vec::new(),
          };

          Ok(())
      }
}

The above interface would allow us to call Message.new_message(...) from the cpp side while avoiding ambiguity.


GetStreamsReturnsEmptyAfterCleanup and GetStreamsReturnsStreamAfterCreation test the get_streams function, but they are in the messages/ folder. Please move them to their appropriate folders.


The amount of testing is insufficient for the three functions.
Please think about the following and add tests:

  • get_streams:

    • For GetStreamsReturnsStreamAfterCreation, verify everything that is verifiable, not just the name. Also create additional streams and topics inside them, and verify everything.
    • Does the sdk allow calling get_streams before login and connect?
    • Does calling get_stream and get_streams when there is only one one stream return the same stream with the same details?
    • What happens when you call get_streams repeatedly?
    • Lastly, in GetStreamsReturnsEmptyAfterCleanup, you are deleting every stream without ever creating any. Instead of this, let's create some unique streams, delete those, and then verify whether 0 is returned or not. Ignore this please.
  • send_messages:

    • What happens when you call send_messages before login and connect?
    • What happens when you give invalid topic and / or stream identifiers to the function?
    • What happens when you give non-existent topic and / or stream identifiers to the function?
    • What happens when you give wrong partitioning kind and / or value to the function? (also test the relevant limits)
    • Verify that the messages are being stored in partitions according to the strategy we have strategy we have chosen.
    • What happens when you supply an empty vector to the send_messages?
    • Is there a limit on how many messages can be sent in total, or in a single batch?
    • What happens when you try to send an invalid payload through send_messages (empty payload, payload size > 64,000,000 bytes)
    • When send_messages is called multiple times, is the order of the messages preserved?
    • You have written a test where one custom ID is set and sent to the server. What happens when you set the same ID for multiple messages?
    • Lastly, since we are working with bytes, I would like to you to try sending a variety of payloads, and check whether they succeed or not. (null bytes, utf-8 strings, emojis, etc. - consider using property based testing)
  • poll_messages :

    • What happens when you call poll_messages before login and connect?
    • What happens when you give invalid topic, stream, and / or consumer identifiers to the function?
    • What happens when you give non-existent topic, stream, and / or consumer identifiers to the function?
    • What happens when you give a wrong consumer_kind to the function?
    • What happens when you give a wrong polling strategy kind and / or value to the function? (also test the relevant limits)
    • What happens when you give an invalid or non-existent partition ID to the function?
    • What happens when you supply count = 0 to the poll_messages?
    • Is there a limit on how many messages can be requested in a single poll?
    • When the requested count is smaller than the number of available messages, are only that many messages returned?
    • When the requested count is larger than the number of available messages, are only the available messages returned?
    • Verify that polling without specifying a partition behaves as expected.
    • Verify the behavior of each polling strategy: offset, first, last, next, and timestamp.
    • For offset, what happens when the offset is valid, too large, or points exactly to the end?
    • For timestamp, what happens when the timestamp is before the first message, between messages, and after the last message?
    • What happens when auto_commit = false and poll_messages is called repeatedly with next?
    • What happens when auto_commit = true and poll_messages is called repeatedly with next?
    • Are offsets tracked independently for different consumer IDs?
    • Are offsets tracked independently for consumer and consumer_group?
    • When multiple consumers in the same consumer group poll, is the behavior what we expect?
    • When poll_messages is called multiple times after multiple send_messages calls, is the order of the returned messages preserved?
    • Verify that the returned offsets are monotonic and correct across multiple polls.
    • Verify that the returned payloads are preserved exactly for a variety of byte payloads. (null bytes, utf-8 strings, emojis, arbitrary bytes, etc. - consider doing property-based testing)
    • Verify that custom message IDs, if present, are returned correctly for multiple messages, not just one.
    • What happens when you poll from an empty partition, and then poll again after new messages are sent?
    • What happens when you poll after the stream or topic has been deleted?

@slbotbm
Copy link
Copy Markdown
Contributor

slbotbm commented Mar 29, 2026

I wrote the above in multiple sittings, so please tell me if I wrote overlapping things.

@hubcio
Copy link
Copy Markdown
Contributor

hubcio commented Mar 30, 2026

One comment from my side: remember that Rust SDK (which you are calling) is also perform these validations and returns respective errors. Make sure that you are not double checking this on rust wrapper side or cpp side.

@seokjin0414 seokjin0414 force-pushed the 2965-cpp-sdk-ffi-messaging branch from e6ba6e9 to 0a49445 Compare March 30, 2026 16:26
@seokjin0414
Copy link
Copy Markdown
Contributor Author

@slbotbm

@slbotbm
Copy link
Copy Markdown
Contributor

slbotbm commented Mar 30, 2026

@seokjin0414 Thanks for the update. Could you please tell me which tests you have not added from the above list? Just copy pasting the questions I had written will do.

@slbotbm
Copy link
Copy Markdown
Contributor

slbotbm commented Mar 30, 2026

@hubcio about your comment: would it make sense to add some of these test to the rust sdk instead? I ask because some of the e2e tests can become unit tests on the rust side. I ask because we would also not have to duplicate these tests for the python sdk

@seokjin0414
Copy link
Copy Markdown
Contributor Author

seokjin0414 commented Mar 31, 2026

@slbotbm
Tests not added:

send_messages:

  • Is there a limit on how many messages can be sent in total, or in a single batch? — need to investigate the actual server limit first
  • Property-based testing — used manual cases (null bytes, UTF-8, binary) instead

poll_messages:

  • Invalid or non-existent partition ID — overlooked, will add
  • count = 0 — overlooked, will add
  • Limit on how many messages can be requested in a single poll — need to investigate the actual server limit first
  • Polling without specifying a partition — overlooked, will add
  • Timestamp strategy (before/between/after) — overlooked, will add
  • Monotonic offset verification across multiple polls — overlooked, will add
  • Consumer vs consumer_group offset independence — pending feat(cpp): Add functions related to consumer groups #2988
  • Multiple consumers in same consumer group — pending feat(cpp): Add functions related to consumer groups #2988
  • Property-based testing — used manual cases instead

I can add the overlooked ones now.

@slbotbm
Copy link
Copy Markdown
Contributor

slbotbm commented Mar 31, 2026

@seokjin0414 please go ahead and add the ones you overlooked. I'll review then

@seokjin0414 seokjin0414 force-pushed the 2965-cpp-sdk-ffi-messaging branch from c8d885a to 8742221 Compare April 1, 2026 00:37
@seokjin0414
Copy link
Copy Markdown
Contributor Author

seokjin0414 commented Apr 1, 2026

@slbotbm
Done. Added the overlooked tests:

Your question Test added
Invalid or non-existent partition ID PollMessagesWithInvalidPartitionId — partition 9999 on topic with 1 partition → throws
count = 0 PollMessagesWithCountZero — returns success with 0 messages (server returns empty, not error)
Polling without specifying a partition PollMessagesWithoutSpecifyingPartition — UINT32_MAX defaults to partition 0
Timestamp strategy (before/between/after) PollMessagesTimestampStrategy — 2 batches with 100ms gap, verifies >= semantics
Monotonic offset verification PollMessagesMonotonicOffsets — 20 messages, 4 polls of 5, verifies continuous offsets 0-19
Batch size limit SendMessagesLargeBatch — 1000 messages in single batch, all retrievable
Poll request limit PollMessagesLargeCount — count=UINT32_MAX, returns only available 10 messages

Still not added:

@slbotbm
Copy link
Copy Markdown
Contributor

slbotbm commented Apr 1, 2026

@seokjin0414 Thanks for the update! I'll review this by friday/saturday. In the meantime, could you please:

  1. rebase this pr and fix conflicts, since feat(cpp): Add functions related to consumer groups #2988 was recently merged
  2. Fix the failing CI
  3. write unit tests for Message.new_message in tests/messages/unit_tests.cpp? You could refer to https://github.com/apache/iggy/blob/master/foreign/cpp/tests/identifier/unit_tests.cpp to think about what kind of unit tests to write.

@seokjin0414 seokjin0414 force-pushed the 2965-cpp-sdk-ffi-messaging branch 3 times, most recently from 0d16d27 to 8b74d12 Compare April 1, 2026 22:33
Comment thread foreign/cpp/src/identifier.rs
Comment thread foreign/cpp/tests/message/low_level_e2e.cpp Outdated
Comment thread foreign/cpp/tests/message/low_level_e2e.cpp Outdated
Comment thread foreign/cpp/tests/message/low_level_e2e.cpp Outdated
Comment thread foreign/cpp/tests/message/low_level_e2e.cpp Outdated
Comment thread foreign/cpp/tests/message/low_level_e2e.cpp Outdated
Comment thread foreign/cpp/tests/message/low_level_e2e.cpp Outdated
Comment thread foreign/cpp/tests/message/low_level_e2e.cpp Outdated
Comment thread foreign/cpp/tests/message/low_level_e2e.cpp
Comment thread foreign/cpp/tests/message/low_level_e2e.cpp
@seokjin0414 seokjin0414 force-pushed the 2965-cpp-sdk-ffi-messaging branch 2 times, most recently from acb69ac to 472616e Compare April 4, 2026 15:58
@seokjin0414
Copy link
Copy Markdown
Contributor Author

@slbotbm

  • Renamed 10 tests with Throws suffix
  • Added payload/offset verification:
    • PollMessagesFirstStrategy — payload + offset for all 3 messages
    • PollMessagesLastStrategy — payload for messages at indices 7-9
    • PollMessagesNextStrategyNoAutoCommit — payload + offset for both polls
    • PollMessagesNextStrategyAutoCommit — payload for polled1 (0-4) and polled2 (5-9)
    • PollMessagesMultipleSendsThenPollOrder — offset for all 10 messages
    • PollMessagesWithoutSpecifyingPartition — payload for 5 messages
    • PollMessagesTimestampStrategy — payload for batch2 messages
  • Deleted PollMessagesLargeCount
  • Removed accidental #[allow] from identifier.rs

Copy link
Copy Markdown
Contributor

@slbotbm slbotbm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@seokjin0414 I requested some small changes.
Other than this, I noticed

  • you were not testing cases for send_messages and poll_messages when invalid topic ID was passed to them.
  • No test for invalid consumer identifier for poll_messages

Also, since the consumer group PR was merged, could you add a test for creating a consumer group and polling messages after joining that?

Comment thread foreign/cpp/tests/stream/low_level_e2e.cpp
Comment thread foreign/cpp/tests/stream/low_level_e2e.cpp
@seokjin0414 seokjin0414 force-pushed the 2965-cpp-sdk-ffi-messaging branch 3 times, most recently from 2b22956 to 41a3f03 Compare April 7, 2026 15:09
Copy link
Copy Markdown
Contributor Author

@seokjin0414 seokjin0414 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review!

  • Added field assertions to GetStreamsReturnsStreamAfterCreation (created_at, size_bytes, messages_count, topics_count) and field comparison to GetStreamsConsistentWithGetStream (created_at, size_bytes). Note: id is excluded from both due to a known cxx Vec<SharedStruct> issue where id returns 0.
  • Added SendMessagesWithInvalidTopicIdThrows, PollMessagesWithInvalidTopicIdThrows, PollMessagesWithInvalidConsumerIdThrows
  • Added join_consumer_group and leave_consumer_group FFI functions
  • Added ConsumerGroupCreateJoinAndPollMessages test covering the full flow: create → join → verify member count → send → poll with consumer_group kind → verify payloads → leave → verify member count drops to 0

Copy link
Copy Markdown
Contributor

@slbotbm slbotbm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@seokjin0414 Thanks for the update!

Could you add the following comment in tests/client/low_level_e2e.cpp please:

TODO(slbotbm): Add tests for join_consumer_group() and leave_consumer_group()

Also, you commented:

Note: id is excluded from both due to a known cxx Vec issue where id returns 0.

Do you have an issue you could link to for me to see?

Otherwise lgtm!

@seokjin0414
Copy link
Copy Markdown
Contributor Author

@slbotbm
I investigated further it's not a cxx bug
Iggy uses a slab allocator for stream IDs (metadata/reader.rs:202, vacant_key()) so the first stream gets id=0
The original ASSERT_GT(s.id, 0u) was simply wrong for that reason
I've added EXPECT_EQ(list_id, single.id) to GetStreamsConsistentWithGetStream instead which compares the two API results without assuming a specific value

Copy link
Copy Markdown

@amlel-el-mahrouss amlel-el-mahrouss left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, Nothing to add.

@slbotbm
Copy link
Copy Markdown
Contributor

slbotbm commented Apr 12, 2026

Part of #2100

Copy link
Copy Markdown
Contributor

@hubcio hubcio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

besides below review I noticed:

  • foreign/cpp/src/client.rs:33-54 new_connection returns *mut Client via Box::into_raw. ownership contract is undocumented - who owns it, who calls delete_connection, what happens on double-free. add a doc-comment.
  • foreign/cpp/src/client.rs:586 the TODO about logout_user failing leaving a leak is misleading. Box::from_raw/drop runs unconditionally before the error is classified, so there's no leak path. either remove the TODO or rephrase what it actually warns about.
  • foreign/cpp/src/lib.rs:29 enable_all() works only because the SDK pulls time/net/io-util transitively. fragile - if SDK feature flags change, this breaks. enable explicit features in this crate's Cargo.toml.
  • foreign/cpp/src/identifier.rs not in this PR's diff but recommend audit for the same magic-number pattern (length: u8 0-sentinel for invalid, etc).
  • roundtrip tests do not assert polled.partition_id matches the partition the messages were sent to. that's a free regression detector for the u32::MAX-sentinel bug below - add it.

Comment thread foreign/cpp/Cargo.toml
crate-type = ["staticlib"]

[dependencies]
bytes = "1.11.1"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bytes = "1.11.1" hard-pins this crate while the rest of the workspace uses bytes = { workspace = true } (see core/sdk/Cargo.toml:36, core/common/Cargo.toml:40). swap to workspace form so dep versions stay unified.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cpp sdk is excluded from the workspace, as is the python sdk. That is why this is hard-pinned.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as @slbotbm noted — cpp crate is workspace-excluded (same as python sdk), so the pin stays.

Comment thread foreign/cpp/src/client.rs Outdated
let partitioning = match partitioning_kind.as_str() {
"balanced" => Partitioning::balanced(),
"partition_id" => {
if partitioning_value.len() < 4 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitioning_value.len() < 4 accepts >4 bytes silently and slices [..4]. caller passing 8 bytes (mistaking u64) gets the low 4 bytes with no error. tighten to != 4 and reject.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tightened to != 4 — 8-byte payloads now error explicitly.

Comment thread foreign/cpp/src/client.rs Outdated
})?);
Partitioning::partition_id(id)
}
"messages_key" => Partitioning::messages_key(&partitioning_value).map_err(|error| {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partitioning::messages_key(&partitioning_value) accepts an empty Vec<u8> at the FFI boundary; the SDK rejects it later. validate at the FFI boundary or at minimum document the constraint - right now the call shape is ambiguous.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rejected at the ffi boundary now.

Comment thread foreign/cpp/src/client.rs Outdated
let rust_consumer_id = RustIdentifier::try_from(consumer_id)
.map_err(|error| format!("Could not poll messages: {error}"))?;

let consumer = Consumer {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use Consumer::new(rust_consumer_id) / Consumer::group(rust_consumer_id) instead of the struct literal. matches the SDK idiom (see core/sdk/src/clients/client.rs:138,157) and core/common/src/types/consumer/consumer_kind.rs:78,86 provides both constructors.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switched to Consumer::new() / Consumer::group().

Comment thread foreign/cpp/src/client.rs Outdated
}
};

let opt_partition = if partition_id == u32::MAX {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partition_id == u32::MAX is magic number

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exposed as ANY_PARTITION_ID with a doc-comment.

ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
}

TEST(LowLevelE2E_Message, SendMessagesBeforeLoginThrows) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test (and PollMessagesBeforeLoginThrows at line 479) skip cleanup because they don't create a stream. fine for these specifically. the broader gap: the suite has no fixture for cleanup on early ASSERT failure. when an ASSERT_* trips mid-test, the delete_stream at the end never runs and orphan cpp-msg-* streams accumulate across runs, flaking subsequent suites. add a per-suite TearDown that deletes leftover cpp-msg-* streams. there's already a TODO(slbotbm) for a fixture in test_helpers.hpp:18 - this is what it's for.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll do it in future PR's

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leaving as-is — @slbotbm said he'd handle it in a follow-up.

"server_default");

rust::Vec<std::uint8_t> oversized_payload;
for (std::uint32_t i = 0; i < 64000001u; i++) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

64MB+1-byte payload via 64M push_back calls is slow - rust::Vec::push_back does no reserve. either pre-reserve with oversized_payload.reserve(64000001) or send exactly MAX_PAYLOAD_SIZE+1 (whatever constant the SDK exposes) instead of a hardcoded 64M.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tried reserve_total but cxx 1.0.194 has it private (caught in CI, reverted in 5d1f1e6). loop now relies on amortised growth — left a comment about it.

client->send_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), "partition_id",
partition_id_bytes(0), std::move(batch1));

std::this_thread::sleep_for(std::chrono::milliseconds(100));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sleep_for(100ms) to ensure the two batches land in distinct timestamps is flaky. IggyTimestamp::now() is microseconds so 100ms should be safe, but on busy CI the boundary can collapse. assert batch2_timestamp - batch1_timestamp >= some_threshold explicitly so the test fails loudly if the gap closes, instead of silently degrading into a tautology.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added an explicit >= 50ms gap assert so it fails loudly instead of silently degrading.

auto polled = client->poll_messages(make_numeric_identifier(stream.id), make_numeric_identifier(0), 0, "consumer",
make_numeric_identifier(2), "timestamp", batch2_timestamp, 100, false);

ASSERT_GE(polled.count, 5u);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ASSERT_GE(polled.count, 5u) then loops polled.messages.size() indexing expected = "batch2-" + i. if the server returns >5 (e.g. boundary inclusion of a batch1 message with equal-or-later timestamp), index 5+ payloads will not be "batch2-N" and the comparison fails with a confusing payload-mismatch error rather than a count error. tighten to ASSERT_EQ(polled.count, 5u) if the contract guarantees it, or assert each polled payload has prefix "batch2-" regardless of count.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switched to prefix matching (batch1- or batch2-) so boundary inclusion doesn't cause confusing payload mismatches.

EXPECT_EQ(msg.payload[2], 0x00);
}

TEST(MessageTest, NewMessageOverwritesPreviousState) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NewMessageOverwritesPreviousState literally tests "second new_message call zeroes id_lo". this is testing the smell. if new_message is reshaped into a constructor (see lib.rs:191 comment), this test goes away. fine to keep until the API is reshaped, but flagging - it's a test that's locking in awkward behavior.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gone with the api reshape — no more "second call zeros fields" since make_message returns by value.

@github-actions github-actions Bot removed the stale Inactive issue or pull request label Apr 30, 2026
seokjin0414 added 21 commits May 1, 2026 19:17
…d struct

Signed-off-by: shin <sars21@hanmail.net>
…PolledMessages

Signed-off-by: shin <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
…sages

Signed-off-by: shin <sars21@hanmail.net>
- fix error message format consistency in send_messages partitioning
- extract to_payload and partition_id_bytes helpers to test_helpers.hpp
- fix GetStreamsReturnsEmpty test to clean up before asserting

Signed-off-by: shin <sars21@hanmail.net>
…rove naming

- unify IggyMessageToSend/IggyMessagePolled into single Message struct
- add new_message(payload) factory function exported to C++
- add TryFrom<ffi::Message> for IggyMessage conversion
- replace map(Into::into) with explicit map(ffi::X::from)
- rename strategy_kind/value to polling_strategy_kind/value

Signed-off-by: shin <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
…method

Signed-off-by: shin <sars21@hanmail.net>
…, and poll_messages

Signed-off-by: shin <sars21@hanmail.net>
…t, timestamp, and offset verification

Signed-off-by: shin <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
…ertions, remove redundant test

- Rename 10 error-throwing tests with `Throws` suffix for naming consistency
- Add payload/offset verification to 7 tests that only checked counts
- Delete PollMessagesLargeCount test (not explicitly testing limits)

Signed-off-by: shin <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
- Add join_consumer_group and leave_consumer_group FFI functions
- Add field assertions to GetStreamsReturnsStreamAfterCreation test
- Add field comparison to GetStreamsConsistentWithGetStream test
- Add SendMessagesWithInvalidTopicIdThrows test
- Add PollMessagesWithInvalidTopicIdThrows test
- Add PollMessagesWithInvalidConsumerIdThrows test
- Add ConsumerGroupCreateJoinAndPollMessages e2e test

Signed-off-by: shin <sars21@hanmail.net>
Signed-off-by: shin <sars21@hanmail.net>
@seokjin0414 seokjin0414 force-pushed the 2965-cpp-sdk-ffi-messaging branch from 2721681 to 56a9bd6 Compare May 1, 2026 10:21
Rust 1.95 promoted wrong_self_convention to a clippy lint that fires on
`from_*` methods taking `&mut self`. The Identifier::from_string and
from_numeric methods are part of the C++ FFI surface used by every test
and downstream binding, so renaming would be an ABI-breaking change.
Restore the allow attribute with a comment explaining why it stays.

Signed-off-by: shin <sars21@hanmail.net>
Split the unified Message struct into IggyMessageToSend (input-only) and
IggyMessagePolled (output-only) per slbotbm's revised direction. The send
struct now exposes only fields the caller can set; server-managed fields
(checksum, offset, timestamp, etc.) live on the polled struct. Replace
the awkward new_message(&mut self) mutation pattern with a free-function
make_message(payload) that returns the input struct by value.

Harden the FFI surface:
- partition_id partitioning now rejects payloads of any size other than
  4 bytes instead of silently slicing
- messages_key partitioning now rejects empty payloads at the FFI
  boundary instead of deferring to the SDK
- poll_messages uses Consumer::new()/Consumer::group() factories instead
  of struct-literal construction
- the u32::MAX sentinel for 'no partition specified' is now exposed as
  the ANY_PARTITION_ID constant with a doc-comment

Rewrite TryFrom<IggyMessageToSend> for IggyMessage to drop the dead
'if id > 0' branch (IggyMessageBuilder treats Some(0) and None
identically) and to switch the u128 split to to_le_bytes() instead of
truncating casts. Reject non-empty user_headers explicitly — the C++
SDK does not yet support them, and silently dropping caller data is
worse than surfacing an error.

Document the raw-pointer ownership contract on new_connection and drop
the misleading leak TODO around delete_connection — Box::from_raw runs
unconditionally so there is no leak path. Pin the tokio features the
Runtime::enable_all() call relies on so this crate is insulated from
upstream SDK feature changes.

Update the 26 e2e tests to the new API. Reinforce a few that hubcio
flagged: oversized-payload now pre-reserves capacity, the timestamp
strategy test asserts an explicit minimum gap so it cannot silently
collapse into a tautology, and round-trip tests now assert that
polled.partition_id matches the partition the messages were sent to —
a free regression detector for the u32::MAX sentinel mapping.

Signed-off-by: shin <sars21@hanmail.net>
cxx 1.0.194's rust::Vec<T> has reserve_total declared private at the
generated-header level, so the call introduced when addressing T2
broke the C++ e2e build. Restore the simple push_back loop and call
out the limitation in a comment.

Signed-off-by: shin <sars21@hanmail.net>
The previous Message-split commit (0f69a69) updated tests/message/
but missed the GetStreamsFieldsVerification test in tests/stream/,
which also constructs messages to populate stream stats. Update the
declaration to use IggyMessageToSend and make_message() so the e2e
build no longer fails with 'Message is not a member of iggy::ffi'.

Signed-off-by: shin <sars21@hanmail.net>
@seokjin0414
Copy link
Copy Markdown
Contributor Author

seokjin0414 commented May 1, 2026

@hubcio @slbotbm

  • new_connection ownership: added # Ownership / # Safety doc sections covering double-free, use-after-free, and threading.
  • logout TODO: dropped — Box::from_raw runs unconditionally, no leak path. left a one-liner instead.
  • enable_all() fragility: pinned tokio features in foreign/cpp/Cargo.toml (rt-multi-thread, macros, time, net, io-util) so it doesn't ride on sdk transitives.
  • identifier magic-number audit: out of scope here, will open a separate PR.
  • polled.partition_id assertions: added to round-trip / specific-partition / no-partition tests — catches the u32::MAX sentinel regression.

@seokjin0414 seokjin0414 requested a review from hubcio May 2, 2026 05:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[C++ SDK] implement bdd/scenarios/basic_messaging.feature test for C++ SDK

4 participants