Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 76 additions & 10 deletions bdd/go/tests/basic_messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type basicMessagingCtx struct {
lastPollMessages *iggcon.PolledMessage
lastStreamID *uint32
lastStreamName *string
lastDeletedStreamID *uint32
lastTopicID *uint32
lastTopicName *string
lastTopicPartitions *uint32
Expand Down Expand Up @@ -209,6 +210,56 @@ func (s basicMessagingSteps) thenLastPolledMessageMatchesSent(ctx context.Contex
return nil
}

func (s basicMessagingSteps) whenUpdateStreamName(ctx context.Context, newName string) error {
c := getBasicMessagingCtx(ctx)
streamIdentifier, _ := iggcon.NewIdentifier(*c.lastStreamID)
if err := c.client.UpdateStream(streamIdentifier, newName); err != nil {
return fmt.Errorf("failed to update stream: %w", err)
}
c.lastStreamName = &newName
return nil
}

func (s basicMessagingSteps) thenStreamNameUpdated(ctx context.Context, expectedName string) error {
c := getBasicMessagingCtx(ctx)
streamIdentifier, _ := iggcon.NewIdentifier(*c.lastStreamID)
stream, err := c.client.GetStream(streamIdentifier)
if err != nil {
return fmt.Errorf("failed to get stream: %w", err)
}
if stream.Name != expectedName {
return fmt.Errorf("expected stream name %s, got %s", expectedName, stream.Name)
}
return nil
}

func (s basicMessagingSteps) whenDeleteStreamByName(ctx context.Context, name string) error {
c := getBasicMessagingCtx(ctx)
streamIdentifier, err := iggcon.NewIdentifier(name)
if err != nil {
return fmt.Errorf("invalid stream name %q: %w", name, err)
}
if err := c.client.DeleteStream(streamIdentifier); err != nil {
return fmt.Errorf("failed to delete stream: %w", err)
}
c.lastDeletedStreamID = c.lastStreamID
c.lastStreamID = nil
return nil
}

func (s basicMessagingSteps) thenStreamDeletedSuccessfully(ctx context.Context) error {
c := getBasicMessagingCtx(ctx)
if c.lastDeletedStreamID == nil {
return errors.New("no stream was deleted in this scenario")
}
streamIdentifier, _ := iggcon.NewIdentifier(*c.lastDeletedStreamID)
stream, err := c.client.GetStream(streamIdentifier)
if err == nil && stream != nil {
return fmt.Errorf("stream %d still exists after deletion", *c.lastDeletedStreamID)
}
return nil
}

func (s basicMessagingSteps) givenNoStreams(ctx context.Context) error {
client := getBasicMessagingCtx(ctx).client
streams, err := client.GetStreams()
Expand Down Expand Up @@ -305,27 +356,42 @@ func initBasicMessagingScenario(sc *godog.ScenarioContext) {
return context.WithValue(context.Background(), basicMessagingCtxKey{}, &basicMessagingCtx{}), nil
})
s := &basicMessagingSteps{}
sc.Step(`I have a running Iggy server`, s.givenRunningServer)
sc.Step(`I am authenticated as the root user`, s.givenAuthenticationAsRoot)
sc.Step(`^I have a running Iggy server$`, s.givenRunningServer)
sc.Step(`^I am authenticated as the root user$`, s.givenAuthenticationAsRoot)
sc.Step(`^I send (\d+) messages to stream (\d+), topic (\d+), partition (\d+)$`, s.whenSendMessages)
sc.Step(`^I poll messages from stream (\d+), topic (\d+), partition (\d+) starting from offset (\d+)$`, s.whenPollMessages)
sc.Step(`all messages should be sent successfully`, s.thenMessageSentSuccessfully)
sc.Step(`^all messages should be sent successfully$`, s.thenMessageSentSuccessfully)
sc.Step(`^I should receive (\d+) messages$`, s.thenShouldReceiveMessages)
sc.Step(`^the messages should have sequential offsets from (\d+) to (\d+)$`, s.thenMessagesHaveSequentialOffsets)
sc.Step(`each message should have the expected payload content`, s.thenMessagesHaveExpectedPayload)
sc.Step(`the last polled message should match the last sent message`, s.thenLastPolledMessageMatchesSent)
sc.Step(`^each message should have the expected payload content$`, s.thenMessagesHaveExpectedPayload)
sc.Step(`^the last polled message should match the last sent message$`, s.thenLastPolledMessageMatchesSent)
sc.Step(`^the stream should have name "([^"]*)"$`, s.thenStreamHasName)
sc.Step(`the stream should be created successfully`, s.thenStreamCreatedSuccessfully)
sc.Step(`^the stream should be created successfully$`, s.thenStreamCreatedSuccessfully)
sc.Step(`^I create a stream with name "([^"]*)"$`, s.whenCreateStream)
sc.Step(`I have no streams in the system`, s.givenNoStreams)
sc.Step(`^I have no streams in the system$`, s.givenNoStreams)
sc.Step(`^I create a topic with name "([^"]*)" in stream (\d+) with (\d+) partitions$`, s.whenCreateTopic)
sc.Step(`the topic should be created successfully`, s.thenTopicCreatedSuccessfully)
sc.Step(`^the topic should be created successfully$`, s.thenTopicCreatedSuccessfully)
sc.Step(`^the topic should have name "([^"]*)"$`, s.thenTopicHasName)
sc.Step(`^the topic should have (\d+) partitions$`, s.thenTopicsHasPartitions)
sc.Step(`^I update the stream name to "([^"]*)"$`, s.whenUpdateStreamName)
sc.Step(`^the stream name should be updated to "([^"]*)"$`, s.thenStreamNameUpdated)
sc.Step(`^I delete the stream with name "([^"]*)"$`, s.whenDeleteStreamByName)
sc.Step(`^the stream should be deleted successfully$`, s.thenStreamDeletedSuccessfully)
sc.After(func(ctx context.Context, sc *godog.Scenario, scErr error) (context.Context, error) {
c := getBasicMessagingCtx(ctx)
if err := c.client.Close(); err != nil {
scErr = errors.Join(scErr, fmt.Errorf("error closing client: %w", err))
// Best-effort cleanup: if the scenario left a stream behind (e.g.
// failed before the explicit delete step), try to remove it so the
// next scenario starts clean. A failure here is intentionally
// ignored; for guaranteed teardown across all resource kinds, a
// global cleanup script remains the better long-term solution.
if c.client != nil && c.lastStreamID != nil {
streamIdentifier, _ := iggcon.NewIdentifier(*c.lastStreamID)
_ = c.client.DeleteStream(streamIdentifier)
}
if c.client != nil {
if err := c.client.Close(); err != nil {
scErr = errors.Join(scErr, fmt.Errorf("error closing client: %w", err))
}
}
return ctx, scErr
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.cucumber.java.en.When;
import org.apache.iggy.client.blocking.IggyBaseClient;
import org.apache.iggy.client.blocking.tcp.IggyTcpClient;
import org.apache.iggy.identifier.StreamId;
import org.apache.iggy.message.Message;
import org.apache.iggy.message.Partitioning;
import org.apache.iggy.message.PollingStrategy;
Expand All @@ -43,6 +44,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class BasicMessagingSteps {
Expand Down Expand Up @@ -209,6 +211,30 @@ public void lastPolledMessageMatchesSent() {
assertEquals(context.lastSentMessage, lastPayload, "Last message should match sent message");
}

@When("I update the stream name to {string}")
public void updateStreamName(String newName) {
getClient().streams().updateStream(context.lastStreamId, newName);
context.lastStreamName = newName;
}

@Then("the stream name should be updated to {string}")
public void streamNameUpdated(String expectedName) {
Optional<StreamDetails> stream = getClient().streams().getStream(context.lastStreamId);
assertTrue(stream.isPresent(), "Stream should exist");
assertEquals(expectedName, stream.get().name(), "Stream name should be updated");
}

@When("I delete the stream with name {string}")
public void deleteStream(String name) {
getClient().streams().deleteStream(StreamId.of(name));
context.lastStreamId = null;
}

@Then("the stream should be deleted successfully")
public void streamDeletedSuccessfully() {
assertNull(context.lastStreamId, "Stream should have been deleted");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertNull(context.lastStreamId) re-checks a local field the when-step at line 230 just nulled, so it can't fail. mirror bdd/go/tests/basic_messaging.go:250-261: call getClient().streams().getStream(...) against the deleted id and assert empty Optional.

}

private IggyBaseClient getClient() {
if (context.client == null) {
throw new IllegalStateException("Iggy client not initialized");
Expand Down
40 changes: 40 additions & 0 deletions bdd/python/tests/test_basic_messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,43 @@ def verify_last_message_match(context):
last_polled_payload = last_polled.payload().decode("utf-8")

assert last_polled_payload == context.last_sent_message


@when(parsers.parse('I update the stream name to "{new_name}"'))
def update_stream_name(context, new_name):
"""Update the stream name"""

async def _update():
await context.client.update_stream(context.last_stream_id, new_name)
context.last_stream_name = new_name

asyncio.run(_update())


@then(parsers.parse('the stream name should be updated to "{expected_name}"'))
def verify_stream_name_updated(context, expected_name):
"""Verify stream name was updated"""

async def _verify():
stream = await context.client.get_stream(context.last_stream_id)
assert stream is not None
assert stream.name == expected_name

asyncio.run(_verify())


@when(parsers.parse('I delete the stream with name "{name}"'))
def delete_stream(context, name):
"""Delete the stream by name"""

async def _delete():
await context.client.delete_stream(name)
context.last_stream_id = None

asyncio.run(_delete())


@then("the stream should be deleted successfully")
def verify_stream_deleted(context):
"""Verify stream was deleted"""
assert context.last_stream_id is None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tautological - the when-step three lines above nulled context.last_stream_id, and this just asserts it's None. add a server roundtrip - await context.client.get_stream(...) against the deleted id and expect missing - to match bdd/go/tests/basic_messaging.go:250-261.

46 changes: 45 additions & 1 deletion bdd/rust/tests/steps/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use crate::common::global_context::GlobalContext;
use cucumber::{given, then, when};
use iggy::prelude::StreamClient;
use iggy::prelude::{Identifier, StreamClient};

#[given("I have no streams in the system")]
pub async fn given_no_streams(world: &mut GlobalContext) {
Expand Down Expand Up @@ -64,3 +64,47 @@ pub async fn then_stream_has_name(world: &mut GlobalContext, expected_name: Stri
"Stream should have expected name"
);
}

#[when(regex = r#"^I update the stream name to "([^"]*)"$"#)]
pub async fn when_update_stream_name(world: &mut GlobalContext, new_name: String) {
let client = world.client.as_ref().expect("Client should be available");
let stream_id = world.last_stream_id.expect("Stream should exist");
let identifier = Identifier::numeric(stream_id).unwrap();
client
.update_stream(&identifier, &new_name)
.await
.expect("Should be able to update stream");
world.last_stream_name = Some(new_name);
}

#[then(regex = r#"^the stream name should be updated to "([^"]*)"$"#)]
pub async fn then_stream_name_updated(world: &mut GlobalContext, expected_name: String) {
let client = world.client.as_ref().expect("Client should be available");
let stream_id = world.last_stream_id.expect("Stream should exist");
let identifier = Identifier::numeric(stream_id).unwrap();
let stream = client
.get_stream(&identifier)
.await
.expect("Should be able to get stream")
.expect("Stream should exist");
assert_eq!(stream.name, expected_name, "Stream name should be updated");
}

#[when(regex = r#"^I delete the stream with name "([^"]*)"$"#)]
pub async fn when_delete_stream(world: &mut GlobalContext, name: String) {
let client = world.client.as_ref().expect("Client should be available");
let identifier = Identifier::named(&name).expect("Stream name should be valid");
client
.delete_stream(&identifier)
.await
.expect("Should be able to delete stream");
world.last_stream_id = None;
}

#[then("the stream should be deleted successfully")]
pub async fn then_stream_deleted_successfully(world: &mut GlobalContext) {
assert!(
world.last_stream_id.is_none(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this only checks world.last_stream_id, which the when-step at line 101 just set to None - it can never fail. for real coverage mirror the go pattern at bdd/go/tests/basic_messaging.go:250-261: re-fetch via client.get_stream(&identifier) against the deleted id and expect not-found / None.

"Stream should have been deleted"
);
}
6 changes: 6 additions & 0 deletions bdd/scenarios/basic_messaging.feature
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,9 @@ Feature: Basic Messaging Operations
And the messages should have sequential offsets from 0 to 9
And each message should have the expected payload content
And the last polled message should match the last sent message

When I update the stream name to "test-stream-updated"
Then the stream name should be updated to "test-stream-updated"

When I delete the stream with name "test-stream-updated"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete-step uses the new name test-stream-updated. nothing in the scenario asserts the old name test-stream no longer resolves after the rename. minor gap - delete-by-new-name succeeding implies the rename happened, but an explicit Then stream "test-stream" should not exist would lock it down.

Then the stream should be deleted successfully
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,36 @@ public void ThenTheLastPolledMessageShouldMatchTheLastSentMessage()
lastPolled.Header.Id.ShouldBe(_context.LastSendMessage.Header.Id);
lastPolled.Payload.ShouldBe(_context.LastSendMessage.Payload);
}

[When("I update the stream name to {string}")]
public async Task WhenIUpdateTheStreamNameTo(string newName)
{
_context.CreatedStream.ShouldNotBeNull();
await _context.IggyClient.UpdateStreamAsync(
Identifier.Numeric(_context.CreatedStream!.Id), newName);
_context.CreatedStream = await _context.IggyClient.GetStreamByIdAsync(
Identifier.Numeric(_context.CreatedStream.Id));
}

[Then("the stream name should be updated to {string}")]
public void ThenTheStreamNameShouldBeUpdatedTo(string expectedName)
{
_context.CreatedStream.ShouldNotBeNull();
_context.CreatedStream!.Name.ShouldBe(expectedName);
}

[When(@"I delete the stream with name ""(.*)""")]
public async Task WhenIDeleteTheStream(string name)
{
await _context.IggyClient.DeleteStreamAsync(Identifier.String(name));
_context.CreatedStream = null;
}

[Then(@"the stream should be deleted successfully")]
public void ThenTheStreamShouldBeDeletedSuccessfully()
{
_context.CreatedStream.ShouldBeNull();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the when-step at line 221 already nulled _context.CreatedStream, so this assertion can't fail. mirror bdd/go/tests/basic_messaging.go:250-261: await _context.IggyClient.GetStreamByIdAsync(...) against the deleted id and assert null.

}
}

// Test context for sharing data between steps
35 changes: 35 additions & 0 deletions foreign/node/src/bdd/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,41 @@ Then(
}
);

When(
'I update the stream name to {string}',
async function (this: TestWorld, newName: string) {
assert.ok(await this.client.stream.update({
streamId: this.stream.id,
name: newName
}));
this.stream = { ...this.stream, name: newName };
}
);

Then(
'the stream name should be updated to {string}',
async function (this: TestWorld, expectedName: string) {
const stream = await this.client.stream.get({ streamId: this.stream.id });
assert.ok(stream, 'Stream should exist after update');
assert.equal(stream!.name, expectedName);
}
);

When(
'I delete the stream with name {string}',
async function (this: TestWorld, name: string) {
assert.ok(await this.client.stream.delete({ streamId: name }));
}
);

Then(
'the stream should be deleted successfully',
async function (this: TestWorld) {
// If we reached here without error, the stream was deleted successfully
assert.ok(true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert.ok(true) is a no-op - it never fails regardless of server state. the when-step at line 71 already wraps the delete call in assert.ok(...), so this then-step adds zero coverage. re-fetch via this.client.stream.get({ streamId: name }) and assert it returns null / throws, matching bdd/go/tests/basic_messaging.go:250-261.

}
);

// Cleanup: delete stream after test
Then(
'I can delete stream with ID {int}',
Expand Down
16 changes: 16 additions & 0 deletions foreign/python/apache_iggy.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,22 @@ class IggyClient:

Returns Option of stream details or a PyRuntimeError on failure.
"""
def update_stream(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apache_iggy.pyi is auto-generated by pyo3_stub_gen per the header at line 18. can you confirm this diff was produced by re-running the generator rather than hand-edited? hand edits get clobbered on next regen. the stub shape matches the surrounding methods so it's probably regenerated, just want to be sure.

self, stream_id: builtins.str | builtins.int, name: builtins.str
) -> collections.abc.Awaitable[None]:
r"""
Updates a stream's name.

Returns Ok(()) on successful stream update or a PyRuntimeError on failure.
"""
def delete_stream(
self, stream_id: builtins.str | builtins.int
) -> collections.abc.Awaitable[None]:
r"""
Deletes a stream by id.

Returns Ok(()) on successful stream deletion or a PyRuntimeError on failure.
"""
def create_topic(
self,
stream: builtins.str | builtins.int,
Expand Down
Loading
Loading