-
-
Notifications
You must be signed in to change notification settings - Fork 0
feat: Barely functional version of Rust map and Rust filter #177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1bad483
0314b6e
7115b95
4c82cfd
dd6a7d5
d9f393b
35535f1
ad8894b
eb17e6e
70e4f44
68258bd
4ecfd9d
9fe2d69
760af35
12f34a6
710e9a4
9014056
96a7494
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,7 +22,10 @@ tests-integration: | |
| ./sentry_streams/.venv/bin/pytest -vv sentry_streams/integration_tests | ||
| .PHONY: tests-integration | ||
|
|
||
| test-rust-streams: | ||
| test-rust-streams: tests-rust-streams | ||
| .PHONY: test-rust-streams | ||
|
|
||
| tests-rust-streams: | ||
| . sentry_streams/.venv/bin/activate && . scripts/rust-envvars && cd ./sentry_streams/ && cargo test | ||
| .PHONY: tests-rust-streams | ||
|
|
||
|
|
@@ -31,6 +34,8 @@ tests-flink: | |
| .PHONY: tests-flink | ||
|
|
||
| typecheck: | ||
| . ./sentry_streams/.venv/bin/activate && cd ./sentry_streams/sentry_streams/examples/rust_simple_map_filter/rust_transforms/ && maturin develop | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is this for ?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typechecking the example requires the rust functions to be installed in the current virtualenv. otherwise mypy will not understand what |
||
| . ./sentry_streams/.venv/bin/activate && cd ./sentry_streams/tests/rust_test_functions/ && maturin develop | ||
| ./sentry_streams/.venv/bin/mypy --config-file sentry_streams/mypy.ini --strict sentry_streams/ | ||
| ./sentry_flink/.venv/bin/mypy --config-file sentry_flink/mypy.ini --strict sentry_flink/sentry_flink/ | ||
| .PHONY: typecheck | ||
|
|
||
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,7 @@ log = "0.4.27" | |
|
|
||
| [lib] | ||
| name = "rust_streams" | ||
| crate-type = ["cdylib"] | ||
| crate-type = ["cdylib", "rlib"] | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do we need this for ?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is so that the |
||
|
|
||
| [features] | ||
| extension-module = ["pyo3/extension-module"] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,3 +12,4 @@ | |
| configure_pipeline | ||
| runtime/arroyo | ||
| deployment | ||
| rust | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,199 @@ | ||
| Rust applications | ||
| ================= | ||
|
|
||
| Hybrid applications | ||
| ------------------- | ||
|
|
||
| PR: https://github.com/getsentry/streams/pull/177 | ||
|
|
||
| **User story:** I want to rewrite a pipeline step in getsentry monolith | ||
| in Rust. | ||
|
|
||
| Currently Rust-ification within the monolith is being done by adding new | ||
| pyo3-based Python dependencies to getsentry’s requirements. We’ll go the | ||
| same path, users can define pipeline steps using pyo3, but using our | ||
| helper functions/”framework.” | ||
|
|
||
| Here is how a function definition works: | ||
|
|
||
| .. code:: rust | ||
|
|
||
| // mypackage/src/lib.rs | ||
| sentry_streams::rust_function!( | ||
| RustTransformMsg, | ||
| IngestMetric, | ||
| TransformedIngestMetric, | ||
| |msg: Message<IngestMetric>| -> TransformedIngestMetric { | ||
| let (payload, _) = msg.take(); | ||
| TransformedIngestMetric { | ||
| metric_type: payload.metric_type, | ||
| name: payload.name, | ||
| value: payload.value, | ||
| tags: payload.tags, | ||
| timestamp: payload.timestamp, | ||
| transformed: true, | ||
| } | ||
| } | ||
| ); | ||
|
|
||
| This would be packaged up in a pyo3-based crate, and then can be | ||
| referenced from the regular pipeline definition like this: | ||
|
|
||
| .. code:: python | ||
|
|
||
| .apply(Map("transform", function=my_package.RustTransformMsg())) | ||
|
|
||
| Message payloads | ||
| ~~~~~~~~~~~~~~~~ | ||
|
|
||
| ``IngestMetric`` and ``TransformedIngestMetric`` types have to be | ||
| defined by the user in both Rust and Python. | ||
|
|
||
| .. code:: rust | ||
|
|
||
| // mypackage/src/lib.rs | ||
| #[derive(Serialize, Deserialize) | ||
| struct IngestMetric { ... } | ||
|
|
||
| .. code:: python | ||
|
|
||
| class IngestMetric(TypedDict): ... | ||
|
|
||
| The user has to write their own Python ``.pyi`` stub file to declare | ||
| that ``RustTransformMsg`` takes ``IngestMetric`` and returns | ||
| ``TransformedIngestMetric``: | ||
|
|
||
| .. code:: python | ||
|
|
||
| # mypackage/mypackage.pyi | ||
| class RustTransformMsg(RustFunction[IngestMetric, Any]): | ||
| def __init__(self) -> None: ... | ||
| def __call__(self, msg: Message[IngestMetric]) -> Any: ... | ||
|
|
||
| Then, the user has to define how conversion works between these types. | ||
| They can implement this function manually, or use a builtin conversion | ||
| method provided by us. We currently only provide one builtin conversion | ||
| by round-tripping via JSON: | ||
|
|
||
| .. code:: rust | ||
|
|
||
| // mypackage/src/lib.rs | ||
| sentry_streams::convert_via_json!(IngestMetric); | ||
|
|
||
| …and the same procedure has to be repeated for the output type | ||
| ``TransformedIngestMetric``. | ||
|
|
||
| What happens at runtime | ||
| ~~~~~~~~~~~~~~~~~~~~~~~ | ||
|
|
||
| The ``rust_function`` macro currently just generates a simple Python | ||
| function for the given Rust function. The GIL *is* released while the | ||
| user’s Rust code is running, but there is still some GIL overhead when | ||
| entering and exiting the function. | ||
|
|
||
| In the future we can transparently optimize this without users having to | ||
| change their applications. For example, batching function calls to | ||
| amortize GIL overhead. We would then only hold the GIL while entering | ||
| and exiting the batch. | ||
|
|
||
| What we want to improve in the future | ||
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||
|
|
||
| - improve performance of calling convention/reduce overhead | ||
|
|
||
| - take inspiration from | ||
| https://github.com/ealmloff/sledgehammer_bindgen | ||
|
|
||
| - automatically generate type stubs for user’s Rust code — pyo3 does | ||
| have something like that, but it doesn’t work perfectly (exposes | ||
| internals of our Rust macro) | ||
| - improve ergonomics of message types and their conversion, add protobuf | ||
| or msgpack as a way to roundtrip | ||
| - each team at sentry would have to maintain a new python package for | ||
| their Rust functions, set up pyo3 and CI from scratch, etc. we can | ||
| streamline this. | ||
|
|
||
| - we already have: ``sentry_relay`` (relay integration), ``ophio`` | ||
| (grouping engine), ``vroomrs`` (profiles), ``symbolic`` (stacktrace | ||
| processing) | ||
| - easiest: we provide a “monorepo” and “monopackage” where all rust | ||
| functions for getsentry go. we maintain CI for this monorepo. | ||
| - medium: repository template | ||
| - also, ideally this is aligned with devinfra’s “golden path” for | ||
| python devenv | ||
| - in practice some team will have to provide support for questions | ||
| about pyo3, since its entire API surface is exposed to product teams | ||
| (although we can templatize and abstract a lot) | ||
|
|
||
| Pure-Rust pipelines | ||
| ------------------- | ||
|
|
||
| A lot of the complexity mentioned above is only really necessary for | ||
| when you want to mix Python and Rust code. For pure-Rust applications, | ||
| we could do something entirely different: | ||
|
|
||
| - The runner does not have to be started from Python at all. If we | ||
| started it from Rust, we would have a much easier time optimizing | ||
| function calls. | ||
| - The pipeline definition does not have to be Python. We could have it | ||
| be YAML or even Rust as well. | ||
| - Type stubs are not really necessary. We can easily validate that the | ||
| types match during startup, or if the pipeline definition is in Rust, | ||
| let the compiler do that job for us. | ||
|
|
||
| Any of these will however split the ecosystem. I think we have plenty of | ||
| ergonomic improvements we can make even for hybrid applications, that | ||
| would benefit pure-Rust users as well. We should focus on those first. | ||
|
|
||
| Meeting notes July 24, 2025 | ||
| =========================== | ||
|
|
||
| - a better pure-rust story | ||
|
|
||
| - we have too much boilerplate, and now especially for pure rust apps | ||
| - build a rust runner, and try to get rid of as much pyo3 junk as | ||
| possible | ||
|
|
||
| - reference: `The rust arroyo | ||
| runtime <https://www.notion.so/The-rust-arroyo-runtime-2228b10e4b5d806dbe9ccd4e70c93aa2?pvs=21>`__ | ||
|
|
||
| - maybe hybrid will get better through this rearchitecture | ||
| - maybe denormalize Parse steps into Map (@Filippo Pacifici) | ||
|
|
||
| .. code:: rust | ||
|
|
||
|
|
||
| // mypackage/src/lib.rs as pyo3 | ||
| use sentry_streams; | ||
|
|
||
| sentry_streams::rust_function!(...); | ||
|
|
||
| sentry_streams::main_function!(); | ||
|
|
||
| // or, in bin target: | ||
| pub use sentry_streams::main; | ||
|
|
||
| .. code:: python | ||
|
|
||
|
|
||
| mypackage.run_streams() | ||
|
|
||
| concerns: | ||
|
|
||
| - user can freely downgrade/upgrade verison, since they “own” the | ||
| runtime (as they are statically linking it) | ||
| - ability to opt out of message conversion trait requirements | ||
|
|
||
| - message type conversion | ||
|
|
||
| - boilerplate is an issue | ||
|
|
||
| - integration with existing schema repos, or copy schema-to-type | ||
| generation into streams for “inline schemas” | ||
|
|
||
| - better performance | ||
|
|
||
| - better runtime semantics for rust functions | ||
|
|
||
| - map chains, but in rust? | ||
| - no multiprocessing! |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| """ | ||
| Rust version of simple_map_filter.py example | ||
| """ | ||
|
|
||
| from sentry_kafka_schemas.schema_types.ingest_metrics_v1 import IngestMetric | ||
|
|
||
| from sentry_streams.pipeline.pipeline import ( | ||
| Filter, | ||
| Map, | ||
| Parser, | ||
| Serializer, | ||
| StreamSink, | ||
| streaming_source, | ||
| ) | ||
|
|
||
| # Import the compiled Rust functions | ||
| try: | ||
| from metrics_rust_transforms import RustFilterEvents, RustTransformMsg | ||
| except ImportError as e: | ||
| raise ImportError( | ||
| "Rust extension 'metrics_rust_transforms' not found. " | ||
| "You must build it first:\n" | ||
| " cd rust_transforms\n" | ||
| " maturin develop\n" | ||
| f"Original error: {e}" | ||
| ) from e | ||
|
Comment on lines
+17
to
+26
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here would we wrap the import in a try-except block just to provide a better error message or for other reasons ?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes it's just for the better error message. |
||
|
|
||
| # Same pipeline structure as simple_map_filter.py, but with Rust functions | ||
| # that will be called directly without Python overhead | ||
| pipeline = streaming_source( | ||
| name="myinput", | ||
| stream_name="ingest-metrics", | ||
| ) | ||
|
|
||
| ( | ||
| pipeline.apply(Parser("parser", msg_type=IngestMetric)) | ||
| # This filter will run in native Rust with zero Python overhead | ||
| .apply(Filter("filter", function=RustFilterEvents())) | ||
| # This transform will run in native Rust with zero Python overhead | ||
| .apply(Map("transform", function=RustTransformMsg())) | ||
| .apply(Serializer("serializer")) | ||
| .sink(StreamSink("mysink", stream_name="transformed-events")) | ||
| ) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
run mypy in surrounding venv so that it picks up installed dependnecies properly. this is also how we run it in sentry (although @asottile-sentry at some point said he doesn't want to use
language: systemthat often in the future IIRC)