Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sentry_flink/sentry_flink/flink/flink_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,3 +337,6 @@ def shutdown(self) -> None:

def run(self) -> None:
self.env.execute()

def terminate(self, stream: Union[DataStream, DataStreamSink]) -> None:
pass
3 changes: 3 additions & 0 deletions sentry_streams/sentry_streams/adapters/arroyo/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ def router(

return build_branches(stream, step.routing_table.values())

def terminate(self, stream: Route) -> None:
pass

def get_processor(self, source: str) -> StreamProcessor[KafkaPayload]:
"""
Returns the stream processor for the given source
Expand Down
14 changes: 14 additions & 0 deletions sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,14 @@ def routing_function(msg: Message[Any]) -> str:
)
return build_branches(stream, step.routing_table.values())

def terminate(self, stream: Route) -> None:
"""
Performs all the operations to complete the construciton of the graph
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Performs all the operations to complete the construciton of the graph
Performs all the operations to complete the construction of the graph

before using the adapter.
"""
if self.__chains.exists(stream):
self.__close_chain(stream)

def run(self) -> None:
"""
Starts the pipeline
Expand All @@ -516,3 +524,9 @@ def shutdown(self) -> None:
work.
"""
raise NotImplementedError

def get_steps(self) -> dict[str, list[RuntimeOperator]]:
"""
Returns the list of steps in the pipeline.
"""
return {source: consumer.steps for source, consumer in self.__consumers.items()}
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,6 @@ def finalize(

def exists(self, route: Route) -> bool:
return _hashable_route(route) in self.__chains

def get_chains(self) -> list[Route]:
return [Route(route[0], list(route[1])) for route in self.__chains.keys()]
17 changes: 17 additions & 0 deletions sentry_streams/sentry_streams/adapters/stream_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ def broadcast(
"""
raise NotImplementedError

@abstractmethod
def terminate(self, stream: Union[StreamT, StreamSinkT]) -> None:
"""
Performs all the operations to complete the construciton of the graph
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Performs all the operations to complete the construciton of the graph
Performs all the operations to complete the construction of the graph

before using the adapter.
"""
raise NotImplementedError

@abstractmethod
def run(self) -> None:
"""
Expand Down Expand Up @@ -218,3 +226,12 @@ def translate_step(

else:
assert_never(step_type)

def terminate(self, stream: Union[StreamT, StreamSinkT]) -> None:
"""
Closes a stream. This signals the adapter that a specific stream
reached its end whether or not there is a sink at the end.
The adapter can do any operation needed to complete the costruction
of a branch of the graph.
"""
self.adapter.terminate(stream)
3 changes: 3 additions & 0 deletions sentry_streams/sentry_streams/dummy/dummy_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ def router(self, step: Router[RoutingFuncReturnType, Any], stream: Any) -> Any:
ret[branch.root.name] = branch
return ret

def terminate(self, stream: Any) -> None:
pass

def run(self) -> None:
pass

Expand Down
1 change: 1 addition & 0 deletions sentry_streams/sentry_streams/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def iterate_edges(
input_stream = step_streams.pop(input_name)

if not output_steps:
translator.terminate(input_stream)
continue

for output in output_steps:
Expand Down
2 changes: 2 additions & 0 deletions sentry_streams/sentry_streams/rust_streams.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ class ArroyoConsumer:
def add_step(self, step: RuntimeOperator) -> None: ...
def run(self) -> None: ...
def shutdown(self) -> None: ...
@property
def steps(self) -> list[RuntimeOperator]: ...

class PyAnyMessage:
def __init__(
Expand Down
7 changes: 7 additions & 0 deletions sentry_streams/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ impl ArroyoConsumer {
self.steps.push(step);
}

/// Exposes the steps attribute to Python.
/// Returns a list of RuntimeOperator steps that have been added to this consumer.
/// This is generally used for debugging purposes.
#[getter]
fn steps(&self, py: Python) -> PyResult<Vec<Py<RuntimeOperator>>> {
Ok(self.steps.iter().map(|step| step.clone_ref(py)).collect())
}
/// Runs the consumer.
/// This method is blocking and will run until the consumer
/// is stopped via SIGTERM or SIGINT.
Expand Down
113 changes: 113 additions & 0 deletions sentry_streams/tests/adapters/arroyo/test_rust_arroyo.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
from sentry_streams.adapters.arroyo.rust_arroyo import RustArroyoAdapter
from sentry_streams.adapters.stream_adapter import RuntimeTranslator
from sentry_streams.pipeline.message import Message
from sentry_streams.pipeline.pipeline import (
Map,
Pipeline,
streaming_source,
)
from sentry_streams.runner import iterate_edges
from sentry_streams.rust_streams import (
Comment on lines 1 to +10

This comment was marked as outdated.

ArroyoConsumer,
InitialOffset,
PyKafkaConsumerConfig,
)
from sentry_streams.rust_streams import Route as RustRoute
from sentry_streams.rust_streams import (
RuntimeOperator,
)


def test_rust_arroyo_adapter(
Expand All @@ -30,3 +42,104 @@ def test_rust_arroyo_adapter(
# The consumer that this adapter uses is a pyo3 wrapper around the Rust consumer,
# so it also can't be replaced with the in-memory broker or triggered manually.
assert adapter.get_consumer("myinput") is not None


def test_build_pipeline() -> None:
# Create a consumer
kafka_config = PyKafkaConsumerConfig(
bootstrap_servers=["localhost:9092"],
group_id="test-group",
auto_offset_reset=InitialOffset.earliest,
strict_offset_reset=False,
max_poll_interval_ms=60000,
override_params={},
)

consumer = ArroyoConsumer(
source="test_source",
kafka_config=kafka_config,
topic="test_topic",
schema="test_schema",
metric_config=None,
)

route = RustRoute("test_source", [])
consumer.add_step(RuntimeOperator.Map(route, lambda x: x.payload))
consumer.add_step(RuntimeOperator.Filter(route, lambda x: x.payload))

assert len(consumer.steps) == 2


def test_get_steps(
pipeline: Pipeline[bytes],
) -> None:
"""Test that get_steps returns the correct structure of steps per source."""
bootstrap_servers = ["localhost:9092"]

adapter = RustArroyoAdapter.build(
{
"steps_config": {
"myinput": {
"bootstrap_servers": bootstrap_servers,
"auto_offset_reset": "earliest",
"consumer_group": "test_group",
"additional_settings": {},
},
"kafkasink": {"bootstrap_servers": bootstrap_servers, "additional_settings": {}},
},
},
)
iterate_edges(pipeline, RuntimeTranslator(adapter))

# Get the steps
steps = adapter.get_steps()

# Verify the structure
assert isinstance(steps, dict), "get_steps should return a dictionary"
assert "myinput" in steps, "The source 'myinput' should be in the steps dict"
assert isinstance(steps["myinput"], list), "Each source should map to a list of steps"
assert len(steps["myinput"]) > 0, "There should be at least one step in the pipeline"
assert all(
isinstance(step, RuntimeOperator) for step in steps["myinput"]
), "All steps should be RuntimeOperator instances"


def process_function(msg: Message[bytes]) -> bytes:
return msg.payload


def test_map_steps_without_sink() -> None:
"""Test that Map steps without a terminal operation are not finalized in get_steps."""
bootstrap_servers = ["localhost:9092"]

# Create a pipeline with source -> map -> map (no sink)
pipeline = (
streaming_source("test_source", stream_name="test_topic")
.apply(Map("map1", function=process_function))
.apply(Map("map2", function=process_function))
)

adapter = RustArroyoAdapter.build(
{
"steps_config": {
"test_source": {
"bootstrap_servers": bootstrap_servers,
"auto_offset_reset": "earliest",
"consumer_group": "test_group",
"additional_settings": {},
},
},
},
)
iterate_edges(pipeline, RuntimeTranslator(adapter))

# Get the steps
steps = adapter.get_steps()

assert isinstance(steps, dict), "get_steps should return a dictionary"
assert "test_source" in steps, "The source 'test_source' should be in the steps dict"
assert isinstance(steps["test_source"], list), "Each source should map to a list of steps"
# One map is added as the two maps are chained together.
assert (
len(steps["test_source"]) == 1
), "Map steps are added even if there is no Sink in the pipeline"