Skip to content

Conversation

@fpacifici
Copy link
Collaborator

The rust adapter chains map and filter steps together in one step in rust.
This means that, when a map or filter is processed, it is not added to the
Rust consumer immediately. This only happens when the chain is finalized.

that happens when we encounter a step that cannot be chained or a sink.

Problem: if there is a sequence of map operations and no sink, these were
never added to the rust consumer.
See the new test added for a demonstration.

This PR adds a finalzie method on the adapter that is called by the
code that iterates through the pipeline so that every pending data
structure is finalized before we start using the adapter.

A better option would be to make the translator able to understand when a
chain needs to be closed but it does nort know whether there is a sink or not.

@fpacifici fpacifici requested a review from a team as a code owner January 6, 2026 02:12
@fpacifici fpacifici force-pushed the fpacifici/fix_last_step branch from a8739e0 to d24f21c Compare January 6, 2026 17:48
@fpacifici fpacifici force-pushed the fpacifici/fix_last_step branch from d24f21c to aecbbad Compare January 6, 2026 18:13
Comment on lines 1 to +10
from sentry_streams.adapters.arroyo.rust_arroyo import RustArroyoAdapter
from sentry_streams.adapters.stream_adapter import RuntimeTranslator
from sentry_streams.pipeline.message import Message
from sentry_streams.pipeline.pipeline import (
Map,
Pipeline,
streaming_source,
)
from sentry_streams.runner import iterate_edges
from sentry_streams.rust_streams import (

This comment was marked as outdated.

args: --release --out dist --find-interpreter --features=extension-module
sccache: ${{ !startsWith(github.ref, 'refs/tags/') }}
manylinux: "2014"
manylinux: "2_28"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    Finished `release` profile [optimized + debuginfo] target(s) in 52.75s
💥 maturin failed
  Caused by: Error ensuring manylinux_2_17 compliance
  Caused by: Your library is not manylinux_2_17 (aka manylinux2014) compliant because it depends on black-listed symbols: ["libc.so.6: __cxa_thread_atexit_impl"]
Error: The process '/usr/bin/docker' failed with exit code 1
    at ExecState._setResult (/home/runner/work/_actions/PyO3/maturin-action/aef21716ff3dcae8a1c301d23ec3e4446972a6e3/dist/index.js:1823:25)
    at ExecState.CheckComplete (/home/runner/work/_actions/PyO3/maturin-action/aef21716ff3dcae8a1c301d23ec3e4446972a6e3/dist/index.js:1806:18)
    at ChildProcess.<anonymous> (/home/runner/work/_actions/PyO3/maturin-action/aef21716ff3dcae8a1c301d23ec3e4446972a6e3/dist/index.js:1700:27)
    at ChildProcess.emit (node:events:524:28)
    at maybeClose (node:internal/child_process:1104:16)
    at ChildProcess._handle.onexit (node:internal/child_process:304:5)
Error: The process '/usr/bin/docker' failed with exit code 1

Some symbols required by some rust dependencies do not work in the libc version in manylinux2014.
Upgrading the manylinux version

Copy link
Member

@evanh evanh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a feature we actually want to support? Shouldn't all pipelines have a sink?


def terminate(self, stream: Route) -> None:
"""
Performs all the operations to complete the construciton of the graph
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Performs all the operations to complete the construciton of the graph
Performs all the operations to complete the construction of the graph

@abstractmethod
def terminate(self, stream: Union[StreamT, StreamSinkT]) -> None:
"""
Performs all the operations to complete the construciton of the graph
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Performs all the operations to complete the construciton of the graph
Performs all the operations to complete the construction of the graph

Copy link
Member

@untitaker untitaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do both cargo and uv lockfiles change? I don't see you intentionally adding any new dependencies. I think you might have outdated uv and/or outdated Rust.

This might also explain why the workflow needs changing?

@fpacifici
Copy link
Collaborator Author

Is this a feature we actually want to support? Shouldn't all pipelines have a sink?

Either this or the pipeline generation should fail if a sink is not present. @evanh what do you think ? If we require a sink we need a NoopSink as it is not given that the pipeline should provide results. Though at that point, I am not sure the NoopSink provides a great value considering the current DSL makes it very clear whene the pipeline ends.

@evanh
Copy link
Member

evanh commented Jan 7, 2026

If we require a sink we need a NoopSink as it is not given that the pipeline should provide results.

I'm OK with that. I would prefer if pipelines always ended with Sinks that do something. The pipeline doesn't need to produce something, but ideally it should end with a TaskSink or ClickhouseSink or something equivalent.

@fpacifici
Copy link
Collaborator Author

I'm OK with that. I would prefer if pipelines always ended with Sinks that do something. The pipeline doesn't need to produce something, but ideally it should end with a TaskSink or ClickhouseSink or something equivalent.

We have scenarios where none of this is applicable. THink about the subscription results consumer. It does trigger some tasks but they are in the middle of a custom strategy. Maybe one day that can be refactored into a task but that will be quite far away. So, in that case, asking to add a sink would require them to add a NoopSink.

Then the question is: Should the noopSink be explicit ?

@evanh
Copy link
Member

evanh commented Jan 8, 2026

It does trigger some tasks but they are in the middle of a custom strategy.

What does it do after the tasks? I feel like we're missing some abstraction here: It doesn't make sense to me to have a Pipeline end with a Map or a Filter step. Those steps don't make sense to me if they have no output. So if someone wants to execute some code on every message, without outputting anything, then that feels like some kind of Sink. ExecutionSink?

Should the noopSink be explicit ?

I think so, yes. As I said above, I feel like there is an abstraction missing and it should be possible to model all our pipelines with a Sink. I think it's fine for now to have a NoopSink while we figure out the abstractions. Having the NoopSink be explicit also gives us an easy signal on pipelines that need to be improved in the future.

@untitaker
Copy link
Member

I think that if we implicitly add NoopSinks I would question why we have sinks in the first place, and not just transform steps and sources (kind of like arroyo, just that some steps return ()/None)

@fpacifici
Copy link
Collaborator Author

I think that if we implicitly add NoopSinks I would question why we have sinks in the first place, and not just transform steps and sources (kind of like arroyo, just that some steps return ()/None)

There are two reasons to have sinks and I believe we should keep them:

  • semantics: sink signals nothing gets out of there. Maps are transforming 'a' in 'b' in a 1:1..n way. Reduce do the opposite. Sink is the only one that is 1:None. It also states you cannot add anything to the pipeline afterwards. If we model Sinks as transformation we can be certain we will start seeing a proliferation of large steps that do transformation and kafka production like we see today.
  • Sinks are generally going to be platform provided, having this abstraction that can accumulate data and decide what to do when it wants allows us to embed a lot of common optimizations in the platform: batching data and write to file or HTTP in multi threading. We have plenty of flavors of these in the product code today. We can unify all of it.

@evanh I think you convinced me. All pipelines should have a sink and it should be explicit, so we can tell when a user forgot it or where the pipeline is actually supposed to end. It makes no sense, semantically speaking, to have a pipeline that terminates with a filter. As long as that is allowed there should be a sink that terminates the pipeline.

@fpacifici fpacifici closed this Jan 23, 2026
fpacifici added a commit that referenced this pull request Jan 27, 2026
This is an alternative approach to
#218.

The rust adapter chains map and filter steps together in one step in
rust.
This means that, when a map or filter is processed, it is not added to
the
Rust consumer immediately. This only happens when the chain is
finalized.

that happens when we encounter a step that cannot be chained or a sink.

Problem: if there is a sequence of map operations and no sink, these
were
never added to the rust consumer.

This PR requires aa Sink to the end of each branch.
This idea depends on the assumption that, if you terminate a branch with
anything that can produce output, is likely wrong. 
If a step can produce an output and it is at the end of the branch, 
what should we do with those messages ?

Requiring a sink implies asking this question to the author of the
pipeline and letting them decide.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants