Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 6 additions & 19 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,6 @@ repos:
hooks:
- id: flake8
language_version: python3.11
- repo: https://github.com/pre-commit/mirrors-mypy
rev: "v1.14.1"
hooks:
- id: mypy
args: [--config-file, sentry_streams/mypy.ini, --strict]
additional_dependencies:
[
pytest==7.1.2,
types-requests,
responses,
click,
types-confluent-kafka,
"sentry-arroyo>=2.18.2",
types-pyYAML,
types-jsonschema,
"sentry-kafka-schemas>=1.2.0",
"polars==1.30.0",
]
files: ^sentry_streams/.+
- repo: https://github.com/pycqa/isort
rev: 6.0.0
hooks:
Expand All @@ -70,6 +51,12 @@ repos:
# https://github.com/rust-lang/rustfmt/issues/4485
entry: rustfmt --edition 2021
files: ^sentry_streams/.*\.rs$
- id: mypy
name: mypy
entry: mypy
files: ^sentry_streams/.*\.py$
types: [python]
language: system
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run mypy in surrounding venv so that it picks up installed dependnecies properly. this is also how we run it in sentry (although @asottile-sentry at some point said he doesn't want to use language: system that often in the future IIRC)


default_language_version:
python: python3.11
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ tests-integration:
./sentry_streams/.venv/bin/pytest -vv sentry_streams/integration_tests
.PHONY: tests-integration

test-rust-streams:
test-rust-streams: tests-rust-streams
.PHONY: test-rust-streams

tests-rust-streams:
. sentry_streams/.venv/bin/activate && . scripts/rust-envvars && cd ./sentry_streams/ && cargo test
.PHONY: tests-rust-streams

Expand All @@ -31,6 +34,8 @@ tests-flink:
.PHONY: tests-flink

typecheck:
. ./sentry_streams/.venv/bin/activate && cd ./sentry_streams/sentry_streams/examples/rust_simple_map_filter/rust_transforms/ && maturin develop
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typechecking the example requires the rust functions to be installed in the current virtualenv. otherwise mypy will not understand what rust_transforms is.

. ./sentry_streams/.venv/bin/activate && cd ./sentry_streams/tests/rust_test_functions/ && maturin develop
./sentry_streams/.venv/bin/mypy --config-file sentry_streams/mypy.ini --strict sentry_streams/
./sentry_flink/.venv/bin/mypy --config-file sentry_flink/mypy.ini --strict sentry_flink/sentry_flink/
.PHONY: typecheck
Expand Down
14 changes: 0 additions & 14 deletions rust-streams/src/lib.rs

This file was deleted.

2 changes: 1 addition & 1 deletion sentry_streams/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ log = "0.4.27"

[lib]
name = "rust_streams"
crate-type = ["cdylib"]
crate-type = ["cdylib", "rlib"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we need this for ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is so that the sentry_streams crate can be imported as a normal crate.


[features]
extension-module = ["pyo3/extension-module"]
Expand Down
1 change: 1 addition & 0 deletions sentry_streams/docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@
configure_pipeline
runtime/arroyo
deployment
rust
199 changes: 199 additions & 0 deletions sentry_streams/docs/source/rust.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
Rust applications
=================

Hybrid applications
-------------------

PR: https://github.com/getsentry/streams/pull/177

**User story:** I want to rewrite a pipeline step in getsentry monolith
in Rust.

Currently Rust-ification within the monolith is being done by adding new
pyo3-based Python dependencies to getsentry’s requirements. We’ll go the
same path, users can define pipeline steps using pyo3, but using our
helper functions/”framework.”

Here is how a function definition works:

.. code:: rust

// mypackage/src/lib.rs
sentry_streams::rust_function!(
RustTransformMsg,
IngestMetric,
TransformedIngestMetric,
|msg: Message<IngestMetric>| -> TransformedIngestMetric {
let (payload, _) = msg.take();
TransformedIngestMetric {
metric_type: payload.metric_type,
name: payload.name,
value: payload.value,
tags: payload.tags,
timestamp: payload.timestamp,
transformed: true,
}
}
);

This would be packaged up in a pyo3-based crate, and then can be
referenced from the regular pipeline definition like this:

.. code:: python

.apply(Map("transform", function=my_package.RustTransformMsg()))

Message payloads
~~~~~~~~~~~~~~~~

``IngestMetric`` and ``TransformedIngestMetric`` types have to be
defined by the user in both Rust and Python.

.. code:: rust

// mypackage/src/lib.rs
#[derive(Serialize, Deserialize)
struct IngestMetric { ... }

.. code:: python

class IngestMetric(TypedDict): ...

The user has to write their own Python ``.pyi`` stub file to declare
that ``RustTransformMsg`` takes ``IngestMetric`` and returns
``TransformedIngestMetric``:

.. code:: python

# mypackage/mypackage.pyi
class RustTransformMsg(RustFunction[IngestMetric, Any]):
def __init__(self) -> None: ...
def __call__(self, msg: Message[IngestMetric]) -> Any: ...

Then, the user has to define how conversion works between these types.
They can implement this function manually, or use a builtin conversion
method provided by us. We currently only provide one builtin conversion
by round-tripping via JSON:

.. code:: rust

// mypackage/src/lib.rs
sentry_streams::convert_via_json!(IngestMetric);

…and the same procedure has to be repeated for the output type
``TransformedIngestMetric``.

What happens at runtime
~~~~~~~~~~~~~~~~~~~~~~~

The ``rust_function`` macro currently just generates a simple Python
function for the given Rust function. The GIL *is* released while the
user’s Rust code is running, but there is still some GIL overhead when
entering and exiting the function.

In the future we can transparently optimize this without users having to
change their applications. For example, batching function calls to
amortize GIL overhead. We would then only hold the GIL while entering
and exiting the batch.

What we want to improve in the future
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

- improve performance of calling convention/reduce overhead

- take inspiration from
https://github.com/ealmloff/sledgehammer_bindgen

- automatically generate type stubs for user’s Rust code — pyo3 does
have something like that, but it doesn’t work perfectly (exposes
internals of our Rust macro)
- improve ergonomics of message types and their conversion, add protobuf
or msgpack as a way to roundtrip
- each team at sentry would have to maintain a new python package for
their Rust functions, set up pyo3 and CI from scratch, etc. we can
streamline this.

- we already have: ``sentry_relay`` (relay integration), ``ophio``
(grouping engine), ``vroomrs`` (profiles), ``symbolic`` (stacktrace
processing)
- easiest: we provide a “monorepo” and “monopackage” where all rust
functions for getsentry go. we maintain CI for this monorepo.
- medium: repository template
- also, ideally this is aligned with devinfra’s “golden path” for
python devenv
- in practice some team will have to provide support for questions
about pyo3, since its entire API surface is exposed to product teams
(although we can templatize and abstract a lot)

Pure-Rust pipelines
-------------------

A lot of the complexity mentioned above is only really necessary for
when you want to mix Python and Rust code. For pure-Rust applications,
we could do something entirely different:

- The runner does not have to be started from Python at all. If we
started it from Rust, we would have a much easier time optimizing
function calls.
- The pipeline definition does not have to be Python. We could have it
be YAML or even Rust as well.
- Type stubs are not really necessary. We can easily validate that the
types match during startup, or if the pipeline definition is in Rust,
let the compiler do that job for us.

Any of these will however split the ecosystem. I think we have plenty of
ergonomic improvements we can make even for hybrid applications, that
would benefit pure-Rust users as well. We should focus on those first.

Meeting notes July 24, 2025
===========================

- a better pure-rust story

- we have too much boilerplate, and now especially for pure rust apps
- build a rust runner, and try to get rid of as much pyo3 junk as
possible

- reference: `The rust arroyo
runtime <https://www.notion.so/The-rust-arroyo-runtime-2228b10e4b5d806dbe9ccd4e70c93aa2?pvs=21>`__

- maybe hybrid will get better through this rearchitecture
- maybe denormalize Parse steps into Map (@Filippo Pacifici)

.. code:: rust


// mypackage/src/lib.rs as pyo3
use sentry_streams;

sentry_streams::rust_function!(...);

sentry_streams::main_function!();

// or, in bin target:
pub use sentry_streams::main;

.. code:: python


mypackage.run_streams()

concerns:

- user can freely downgrade/upgrade verison, since they “own” the
runtime (as they are statically linking it)
- ability to opt out of message conversion trait requirements

- message type conversion

- boilerplate is an issue

- integration with existing schema repos, or copy schema-to-type
generation into streams for “inline schemas”

- better performance

- better runtime semantics for rust functions

- map chains, but in rust?
- no multiprocessing!
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,13 @@ def filter(self, step: Filter[Any], stream: Route) -> Route:
stream.source in self.__consumers
), f"Stream starting at source {stream.source} not found when adding a map"

def filter_msg(msg: Message[Any]) -> bool:
return step.resolved_function(msg)

route = RustRoute(stream.source, stream.waypoints)
logger.info(f"Adding filter: {step.name} to pipeline")
self.__consumers[stream.source].add_step(RuntimeOperator.Filter(route, filter_msg))

self.__consumers[stream.source].add_step(
RuntimeOperator.Filter(route, step.resolved_function)
)

return stream

def reduce(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""
Rust version of simple_map_filter.py example
"""

from sentry_kafka_schemas.schema_types.ingest_metrics_v1 import IngestMetric

from sentry_streams.pipeline.pipeline import (
Filter,
Map,
Parser,
Serializer,
StreamSink,
streaming_source,
)

# Import the compiled Rust functions
try:
from metrics_rust_transforms import RustFilterEvents, RustTransformMsg
except ImportError as e:
raise ImportError(
"Rust extension 'metrics_rust_transforms' not found. "
"You must build it first:\n"
" cd rust_transforms\n"
" maturin develop\n"
f"Original error: {e}"
) from e
Comment on lines +17 to +26
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here would we wrap the import in a try-except block just to provide a better error message or for other reasons ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it's just for the better error message.


# Same pipeline structure as simple_map_filter.py, but with Rust functions
# that will be called directly without Python overhead
pipeline = streaming_source(
name="myinput",
stream_name="ingest-metrics",
)

(
pipeline.apply(Parser("parser", msg_type=IngestMetric))
# This filter will run in native Rust with zero Python overhead
.apply(Filter("filter", function=RustFilterEvents()))
# This transform will run in native Rust with zero Python overhead
.apply(Map("transform", function=RustTransformMsg()))
.apply(Serializer("serializer"))
.sink(StreamSink("mysink", stream_name="transformed-events"))
)
Loading
Loading