Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions sentry_streams/sentry_streams/pipeline/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,15 @@ class InvalidMessageError(Exception):
"""

pass


class InvalidPipelineError(Exception):
"""
Exception raised when a pipeline is invalid or misconfigured.

This includes cases like:
- Pipeline branches that don't terminate with Sink steps
- Malformed pipeline graph structure
"""

pass
28 changes: 28 additions & 0 deletions sentry_streams/sentry_streams/pipeline/validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from typing import Any

from sentry_streams.pipeline.exception import InvalidPipelineError
from sentry_streams.pipeline.pipeline import Pipeline, Sink


def validate_all_branches_have_sinks(pipeline: Pipeline[Any]) -> None:
"""
Validates that all branches in a pipeline terminate with a Sink step.

Raises:
InvalidPipelineError: If any branch doesn't end with a Sink step
"""
# Find all leaf nodes (nodes with no outgoing edges)
leaf_nodes = [
step_name
for step_name in pipeline.steps.keys()
if not pipeline.outgoing_edges.get(step_name)
]

# Check each leaf is a Sink
non_sink_leaves = [name for name in leaf_nodes if not isinstance(pipeline.steps[name], Sink)]

if non_sink_leaves:
raise InvalidPipelineError(
f"All pipeline branches must terminate with a Sink step. "
f"The following steps are leaves but not sinks: {', '.join(non_sink_leaves)}"
)
2 changes: 2 additions & 0 deletions sentry_streams/sentry_streams/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
Pipeline,
WithInput,
)
from sentry_streams.pipeline.validation import validate_all_branches_have_sinks

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -114,6 +115,7 @@ def load_runtime(

assigned_segment_id = int(segment_id) if segment_id else None
pipeline: Pipeline[Any] = pipeline_globals["pipeline"]
validate_all_branches_have_sinks(pipeline)
runtime: Any = load_adapter(adapter, environment_config, assigned_segment_id, metric_config)
translator = RuntimeTranslator(runtime)

Expand Down
3 changes: 2 additions & 1 deletion sentry_streams/tests/pipeline/test_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from sentry_streams.pipeline.pipeline import (
Batch,
ComplexStep,
DevNullSink,
Filter,
FlatMap,
Map,
Expand Down Expand Up @@ -181,7 +182,7 @@ def test_router() -> None:
def test_register_steps(step: Union[Transform[Any, Any], ComplexStep[Any, Any]]) -> None:
name = step.name
pipeline = Pipeline(StreamSource(name="mysource", stream_name="name"))
pipeline.apply(step)
pipeline.apply(step).sink(DevNullSink("test_sink"))
assert pipeline.steps[name] == step
assert pipeline.steps[name].name == name
assert pipeline.incoming_edges[name] == ["mysource"]
Expand Down
68 changes: 68 additions & 0 deletions sentry_streams/tests/pipeline/test_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from typing import Any

import pytest

from sentry_streams.pipeline.exception import InvalidPipelineError
from sentry_streams.pipeline.pipeline import (
Map,
StreamSink,
branch,
streaming_source,
)
from sentry_streams.pipeline.validation import validate_all_branches_have_sinks


def test_valid_pipeline_with_router() -> None:
"""Test that a pipeline with router and all branches having sinks passes validation."""

def routing_func(msg: Any) -> str:
return "route1"

pipeline = (
streaming_source("myinput", "events")
.apply(Map("transform1", lambda msg: msg))
.route(
"route_to_one",
routing_function=routing_func,
routing_table={
"route1": branch("route1")
.apply(Map("transform2", lambda msg: msg))
.sink(StreamSink("myoutput1", stream_name="transformed-events-2")),
"route2": branch("route2")
.apply(Map("transform3", lambda msg: msg))
.sink(StreamSink("myoutput2", stream_name="transformed-events-3")),
},
)
)

# Should not raise
validate_all_branches_have_sinks(pipeline)


def test_invalid_pipeline_with_router_missing_sink() -> None:
"""Test that a pipeline with router where one branch is missing a sink fails validation."""

def routing_func(msg: Any) -> str:
return "route1"

pipeline = (
streaming_source("myinput", "events")
.apply(Map("transform1", lambda msg: msg))
.route(
"route_to_one",
routing_function=routing_func,
routing_table={
"route1": branch("route1")
.apply(Map("transform2", lambda msg: msg))
.sink(StreamSink("myoutput1", stream_name="transformed-events-2")),
"route2": branch("route2").apply(Map("transform3", lambda msg: msg)),
# Missing sink on route2
},
)
)

with pytest.raises(InvalidPipelineError) as exc_info:
validate_all_branches_have_sinks(pipeline)

assert "transform3" in str(exc_info.value)
assert "must terminate with a Sink step" in str(exc_info.value)
22 changes: 15 additions & 7 deletions sentry_streams/tests/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from sentry_streams.dummy.dummy_adapter import DummyAdapter
from sentry_streams.pipeline import Filter, Map, branch, streaming_source
from sentry_streams.pipeline.pipeline import (
DevNullSink,
Pipeline,
)
from sentry_streams.runner import iterate_edges
Expand All @@ -27,16 +28,20 @@ def create_pipeline() -> Pipeline[bytes]:
"router1",
routing_function=lambda x: RouterBranch.BRANCH1.value,
routing_table={
RouterBranch.BRANCH1.value: branch("map4_segment").apply(
Map("map4", function=lambda x: x.payload)
),
RouterBranch.BRANCH2.value: branch("map5_segment").apply(
Map("map5", function=lambda x: x.payload)
),
RouterBranch.BRANCH1.value: branch("map4_segment")
.apply(Map("map4", function=lambda x: x.payload))
.sink(DevNullSink("sink_map4")),
RouterBranch.BRANCH2.value: branch("map5_segment")
.apply(Map("map5", function=lambda x: x.payload))
.sink(DevNullSink("sink_map5")),
},
)
)
broadcast_branch_2 = branch("branch2").apply(Map("map3", function=lambda x: x.payload))
broadcast_branch_2 = (
branch("branch2")
.apply(Map("map3", function=lambda x: x.payload))
.sink(DevNullSink("sink_map3"))
)

test_pipeline = (
streaming_source("source1", stream_name="foo")
Expand Down Expand Up @@ -67,8 +72,11 @@ def test_iterate_edges(create_pipeline: Pipeline[bytes]) -> None:
"map2",
"map3",
"router1",
"sink_map3",
"map4",
"map5",
"sink_map4",
"sink_map5",
]
assert runtime.branches == [
"branch1",
Expand Down