Skip to content

Conversation

@untitaker
Copy link
Member

@untitaker untitaker commented Jul 9, 2025

Make it possible to call Rust functions from pipeline steps. So far only Map and Filter are supported.

In order to make a Rust function referencable from our pipeline DSL (which is always Python), the user has to package their functions as a Python package. We provide macros to do that for the user, and a basic framework for converting input/output types.

For now we provide only one "builtin" way to convert types using convert_via_json!. But the user can choose to implement FromPythonObject etc themselves, and pick protobuf, or something entirely handwritten using pyo3 APIs.

User is responsible for writing their own .pyi files to make the pipeline typesafe. Ideally this would be automatically generated too, but we can add it later. It also depends on how they choose to convert types. We probably need additional features in our schemas repos (sentry-kafka-schemas and sentry-protos) to provide a mapping from generated Rust to generated Python types, so that users can't accidentally pick the wrong pair of rust/python types.

The rust_simple_map_filter example should show the envisioned API end-to-end.

Under the hood, we call the Rust function through the GIL, but release the GIL ASAP. We have a few alternatives:

  • We can do unsafe pointer casting. This was the initial version but i don't think it is actually sound. Different versions of the Rust compiler can generate different function calling conventions (in theory) -- it does however completely eliminate the GIL.
  • We can perform the function call in batches to amortize GIL overhead. TBD as to whether this is actually needed for performance.
  • We can use GRPC, but I think this just adds overhead and solves no problems right now.

The macro hides all of this from the API. So we can implement these improvements at a later point. The important thing is that we have a stable API for now, and we can worry about performance later.

entry: mypy
files: ^sentry_streams/.*\.py$
types: [python]
language: system
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run mypy in surrounding venv so that it picks up installed dependnecies properly. this is also how we run it in sentry (although @asottile-sentry at some point said he doesn't want to use language: system that often in the future IIRC)


// Cast function pointer and call it
let rust_fn: extern "C" fn(*const crate::ffi::Message<serde_json::Value>) -> bool =
unsafe { std::mem::transmute(self.function_ptr) };
Copy link
Contributor

@john-z-yang john-z-yang Jul 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure this is UB, I think we want to do a cast here before transmute, i.e.

unsafe { std::mem::transmute(self.function_ptr as *const () ) };

as per https://doc.rust-lang.org/std/primitive.fn.html#casting-to-and-from-integers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a minimal reproducible example

    #[test]
    fn test_build_rust_map() {
        extern "C" fn extern_map_fn(
            _: *const crate::ffi::Message<serde_json::Value>,
        ) -> *const crate::ffi::Message<serde_json::Value> {
            message_to_ptr(crate::ffi::Message::new(
                serde_json::from_str(r#"{"b": 0}"#).unwrap(),
                vec![],
                1.0,
                None,
            ))
        }

        let fn_addr = extern_map_fn as usize;

        let msg: crate::ffi::Message<serde_json::Value> = crate::ffi::Message::new(
            serde_json::from_str(r#"{"a": 1}"#).unwrap(),
            vec![],
            0.0,
            None,
        );
        let msg_ptr = message_to_ptr(msg);

        let rust_fn: extern "C" fn(
            *const crate::ffi::Message<serde_json::Value>,
        ) -> *const crate::ffi::Message<serde_json::Value> =
            unsafe { std::mem::transmute(fn_addr) };

        let res = rust_fn(msg_ptr);
        let converted = unsafe { ptr_to_message(res) };
        let _ = unsafe { ptr_to_message(msg_ptr) };

        assert_eq!(
            converted.payload,
            serde_json::from_str::<serde_json::Value>(r#"{"b": 0}"#).unwrap()
        );
        assert_eq!(converted.headers, vec![]);
        assert_eq!(converted.timestamp, 1.0);
        assert_eq!(converted.schema, None);
    }

Running it with UBsan gives

cargo +nightly miri test transformer::tests::test_build_rust_map
   Compiling rust_streams v0.1.0 (/Users/johnyang/code/streams/sentry_streams)
    Finished `test` profile [unoptimized + debuginfo] target(s) in 1.63s
     Running unittests src/lib.rs (target/miri/aarch64-apple-darwin/debug/deps/rust_streams-9cd112665fa1172e)

running 1 test
test transformer::tests::test_build_rust_map ... error: Undefined Behavior: pointer not dereferenceable: pointer must point to some allocation, but got 0x3edb5f[noalloc] which is a dangling pointer (it has no provenance)
   --> src/transformer.rs:369:19
    |
369 |         let res = rust_fn(msg_ptr);
    |                   ^^^^^^^^^^^^^^^^ pointer not dereferenceable: pointer must point to some allocation, but got 0x3edb5f[noalloc] which is a dangling pointer (it has no provenance)
    |
    = help: this indicates a bug in the program: it performed an invalid operation, and caused Undefined Behavior
    = help: see https://doc.rust-lang.org/nightly/reference/behavior-considered-undefined.html for further information
    = note: BACKTRACE on thread `transformer::tests::test_build_rust_map`:
    = note: inside `transformer::tests::test_build_rust_map` at src/transformer.rs:369:19: 369:35
note: inside closure
   --> src/transformer.rs:342:29
    |
341 |     #[test]
    |     ------- in this procedural macro expansion
342 |     fn test_build_rust_map() {
    |                             ^

note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace

error: aborting due to 1 previous error

error: test failed, to rerun pass `--lib`

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope there's a way to do this without unsafe, it doesn't seem like it's even possible to run miri with python :/

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i figured. seems like we can work around it by casting to a raw pointer first though

Copy link
Contributor

@john-z-yang john-z-yang Jul 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but it's very easy to cause UB, and I don't think miri works given all the ff calls that pyo3 does so we don't have a good way to check other than reading our code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we wrap this usize in a custom Pointer struct, so that all the unsafe code and casting is in one place and can be unittested

@john-z-yang
Copy link
Contributor

You probably tried this before, but are we not able to represent our data with some kind of dynamic dispatch?

Like making rust_map_function a trait and have build_rust_map acept something like Arc<dyn rust_map_function>?

@untitaker
Copy link
Member Author

Like making rust_map_function a trait and have build_rust_map acept something like Arc?

so rust_map_function would generate impl RustFunction for X instead of impl X? I don't know that we gain much from this because we already have dynamic dispatch (and more) through Py<PyAny>. but feel free to play around with it, anything that does not require unsafe is probably a win

@untitaker
Copy link
Member Author

I think if we want to make it return a typed function pointer, we need to figure out a way to impl FromPyObject for that typed function pointer, whether it's a Arc<dyn Fn> or a fn() -> ..

@john-z-yang
Copy link
Contributor

john-z-yang commented Jul 10, 2025

I think if we want to make it return a typed function pointer, we need to figure out a way to impl FromPyObject for that typed function pointer, whether it's a Arc<dyn Fn> or a fn() -> ..

What we put all the functions in a central registry of a map of string → fn, and we just store the key in python? I really just want to avoid unsafe

@untitaker
Copy link
Member Author

untitaker commented Jul 10, 2025

What we put all the functions in a central registry of a map of string → fn, and we just store the key in python? I really just want to avoid unsafe

you have two dynamic libraries loaded into the python interpreter:

  • rust-streams (or whatever the main crate is called)
  • user's crate with all user-defined functions

the user's crate does link back into streams, but it's a second copy of streams with its own globals. it's statically linked so you might as well pretend it's entirely inlined into the user's crate.

so when you make a global registry, you will have two of them. if you want to sync them, you are back to the problem of, how do I send a function pointer from one streams library into the other.


so i mean, i am not saying we have to do it this way. the API design is still entirely up in the air. for example we can abandon this FFI and pyo3 stuff and implement something based on grpc. but if we stick with this API design as a premise then i don't think we can avoid unsafe

@john-z-yang
Copy link
Contributor

john-z-yang commented Jul 10, 2025

What we put all the functions in a central registry of a map of string → fn, and we just store the key in python? I really just want to avoid unsafe

you have two dynamic libraries loaded into the python interpreter:

  • rust-streams (or whatever the main crate is called)
  • user's crate with all user-defined functions

the user's crate does link back into streams, but it's a second copy of streams with its own globals. it's statically linked so you might as well pretend it's entirely inlined into the user's crate.

so when you make a global registry, you will have two of them. if you want to sync them, you are back to the problem of, how do I send a function pointer from one streams library into the other.

so i mean, i am not saying we have to do it this way. the API design is still entirely up in the air. for example we can abandon this FFI and pyo3 stuff and implement something based on grpc. but if we stick with this API design as a premise then i don't think we can avoid unsafe

I see. So what is the difference between using what we have, vs the users using pyo3 themselves and create a python function that is actually implemented in rust and supply it as a python map step?

Comment on lines +17 to +26
try:
from metrics_rust_transforms import RustFilterEvents, RustTransformMsg
except ImportError as e:
raise ImportError(
"Rust extension 'metrics_rust_transforms' not found. "
"You must build it first:\n"
" cd rust_transforms\n"
" maturin develop\n"
f"Original error: {e}"
) from e
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here would we wrap the import in a try-except block just to provide a better error message or for other reasons ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it's just for the better error message.

}

// Rust equivalent of filter_events() from simple_map_filter.py
rust_filter_function!(RustFilterEvents, serde_json::Value, |msg: Message<
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a macro ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see another way. you want to define a function and wrap it in quite a bit of additional logic + export types

Comment on lines 99 to 106
let rust_fn_ptr = traced_with_gil!(|py| {
function
.bind(py)
.call_method0("get_rust_function_pointer")
.unwrap()
.extract::<usize>()
.unwrap()
});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I understand the need for the function pointer is that we need to give the rust runtime a way to call the function without importing the module statically. And that is because the runner imports the pyo3 version of the applicaiton code and that is because the rust application does not import the runner. Is that correct ?

So a few questions:

  • Let's pretend for a second the runtime was rust, calling into python to interpret the pipeline definition and produce a description of the steps to build so that the rust code could trivially build the rust arroyo consumer. That would allow a hybrid applicaiton or a rust only application to be a rust crate that thus having its function statically imported. Am I right ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I understand the need for the function pointer is that we need to give the rust runtime a way to call the function without importing the module statically.

yes

And that is because the runner imports the pyo3 version of the applicaiton code and that is because the rust application does not import the runner. Is that correct ?

i don't follow this part

Let's pretend for a second the runtime was rust, calling into python to interpret the pipeline definition and produce a description of the steps to build so that the rust code could trivially build the rust arroyo consumer. That would allow a hybrid applicaiton or a rust only application to be a rust crate that thus having its function statically imported. Am I right?

this depends on what the runner and the runtime are packaged as. if they are still python packages independently compiled from the application code, then you will still have the same issue even if the entire runtime + api layer is written in rust.

if the runtime was packaged as a rust crate, then it could be imported from user code, and would therefore end up in the same dylib. this would open new possibilities for calling rust code.

but this unsafe function pointer casting is now gone with recent commits, at the expense of holding the GIL

@untitaker untitaker marked this pull request as ready for review July 16, 2025 18:00
@untitaker untitaker requested a review from a team as a code owner July 16, 2025 18:00
Copy link
Collaborator

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some high level comment, please let me know what you think about them.

.PHONY: tests-flink

typecheck:
. ./sentry_streams/.venv/bin/activate && cd ./sentry_streams/sentry_streams/examples/rust_simple_map_filter/rust_transforms/ && maturin develop
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typechecking the example requires the rust functions to be installed in the current virtualenv. otherwise mypy will not understand what rust_transforms is.

[lib]
name = "rust_streams"
crate-type = ["cdylib"]
crate-type = ["cdylib", "rlib"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we need this for ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is so that the sentry_streams crate can be imported as a normal crate.


# For this example, we'll include the dependencies directly
[dependencies.rust_streams]
path = "../../../../" # Point to the main rust_streams crate
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we need the rust_streams crate for? I originally created it before we even started developing to set up the dev environment. It is not used today.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the sentry_streams folder contains a crate called rust_streams (see sentry_streams/Cargo.toml), I can clean this up but probably in another PR


class RustFilterEvents(RustFunction[IngestMetric, bool]):
def __init__(self) -> None: ...
def __call__(self, msg: Message[IngestMetric]) -> bool: ...
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd assume these are specific for the example you created (thus the message type is not generic and rather IngestMetric. Is that correct ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't you inheriting this from the sentry_streams package ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, the fact that the folders are nested in this repo won't be visible in the user's application (when they install sentry_streams and my_rust_transforms separately via pip)

Comment on lines +23 to +30
// Implement the FromPythonPayload and IntoPythonPayload traits. This decides how IngestMetric is
// going to be converted from the previous step's Python value.
//
// Currently, all values passed between steps are still Python objects, even between two Rust
// steps.
//
// This macro implements these traits by roundtripping values via JSON.
convert_via_json!(IngestMetric);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to ensure I understand this correctly, this conversion is needed only when the in-memory representation of the message is different between Python and Rust, right? If we were using, let's say, Arrow as message formet we would just have to move memory between python and rust without parsing and serializing.

Also do we actually need to provide python-rust and rust-python conversion if the application is fully Rust ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we were using, let's say, Arrow as message formet we would just have to move memory between python and rust without parsing and serializing.

yes but the current design also allows for this + zerocopy. in this case one would have to implement the traits manually or we add another macro as alternative to convert_via_json.

In the case of arrow this would probably mean moving the rust struct into a Python class (with Py<> wrapped) without copying it.

Also do we actually need to provide python-rust and rust-python conversion if the application is fully Rust ?

Because I didn't touch the main PyMessage enum, this is indeed still necessary. One can achieve zero-copy between Rust steps by wrapping the Rust type in an opaque pyclass, then unwrapping it again in the next step. But the CPU overhead of acquiring and releasing the GIL is still there.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's pretend for a second the Runner was a rust binary you could import as a crate in your rust application, and your application was fully rust, we would not need to have the rust_function!, is that correct? Would we simply add the function to a map whose entries can be reference in the DSL (like the snuba rust consumer) ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this design is possible, it certainly eliminates a lot of GIL overhead. but it would require you to pick between two entrypoints for the runner depending on your application, and once you add a third compiled language this also doesn't seem to work anymore.

Copy link
Contributor

@john-z-yang john-z-yang Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it would require you to pick between two entrypoints for the runner depending on your application, and once you add a third compiled language this also doesn't seem to work anymore.

Is this because we are actually defining the logical plan (pipeline stuff) in Python (within build_chain)?

If we instead we define the logical plan in rust, and our python bindings are just wrappers on our rust logical plans, in that case if someone wants to create a pipeline written fully in rust it would be easier (as they can import the logically plans statically), while python pipeline still works, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep we can do that. i would also prefer it.


/// The message type exposed to Rust functions, with typed payload.
#[derive(Debug, Clone)]
pub struct Message<T> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IS there any reason other than the generic payload not to make this one an entry in StreamingMessage ?
https://github.com/getsentry/streams/blob/main/sentry_streams/src/messages.rs#L312-L324

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mainly the generic payload but i am also not sure that PyStreamingMessage is supposed to be public API?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure that PyStreamingMessage is supposed to be public API?

That's a good point. I don't think we got in depth on that yet.

Comment on lines +173 to +178
#[pyo3(name = "__call__")]
pub fn call(
&self,
py: pyo3::Python<'_>,
py_msg: pyo3::Py<pyo3::PyAny>,
) -> pyo3::PyResult<pyo3::Py<pyo3::PyAny>> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please confirm my understanding:

  1. The user provides rust logic in the Python DSL by instantiating an object that represent the Rust function
  2. The Python class is actually a pyO3 exposed rust struct
    • The .pyi file allows us to statically type check the Rust function against the previous and next pipeline step in the DSL.
  1. The pyO3 exposed structure is actually generated by this macro from a rust function and exported in the module.
  2. The pyO3 exposed structure is a python callable so it can be embedded in the rust arroyo steps like we already do for any python function.
  3. Since the oject containing the rust class is instantiated in Python memory it has to be called by taking the GIL.

Though if the above is correct I don't think we are releasing the GIL during the Rust call.
Don't we go through this code to execute a Rust Map like we do for a python map ?
https://github.com/getsentry/streams/blob/main/sentry_streams/src/transformer.rs#L37

If that is correct we take the GIL before calling the python Callable Arroyo think is a Python function and release it when we have the transformed message.
Am I missing something ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also regarding the above points, let's pretend for a second we had a rust crate that contained the runner as a rust CLI and was able to run the pipeline, would I be correct in saying that:

point 1-2: A python class containing the rust function (pyo3) would not be necessary as the runner would be statically linked with the function. The python class could still be of some use to type check the pipeline in the python DSL

point 3-4 we would not need to make the rust function a python Callable. The DSL should just point the runner to some map that contains a reference to the function, the arroyo step would access that map and directly call the python code without going through the GIL between steps.

Would this be correct ?

IF yes what if we decoupled the runtime from the pipeline definition a bit more:

  1. We could make the runner a rust CLI that takes a low level definition of the pipeline as input.
  2. The low level definition of the pipeline is a simple data structure that only contains a sequence of the parameters we pass to the add_step function on the consumer https://github.com/getsentry/streams/blob/main/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py#L280. All the parsing, transformation, routing, valdiation, settings, etc. are still managed by the python translator.
  3. This cli script would be very simple and be added to the python version so we would be able to both import streams into a python application as a python package, but also into a rust application as a crate. This should work with minimal duplication (just the CLI and the parsing of this low level pipeline definition).

Does this make any sense? Basically for rust only application we would import the crate in a rust application, for python only we would do what we do today, for hybrid applications we would likely do the python version.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that is correct we take the GIL before calling the python Callable Arroyo think is a Python function and release it when we have the transformed message.

we use allow_threads from pyo3 to release the GIL from within the rust function. so the callstack is:

  • traced_with_gil
    • call the python function
      • call the rust function
        • allow_threads
          • we have no GIL here

essentially we acquire the GIL to start the function call, and re-acquire the GIL when returning from the callstack.

(this also makes the timings we collect in traced_with_gil wrong, but we can fix that later)


Does this make any sense? Basically for rust only application we would import the crate in a rust application, for python only we would do what we do today, for hybrid applications we would likely do the python version.

high-level it does make sense to expose the entire runner CLI as a rust library so that the user can statically link the entire runtime into their application once, and launch it themselves.

I see some potential drawbacks with it:

  • the changes to API now go beyond a few steps, we have to "uproot" the entire runner framework to accomodate many languages
  • since the user statically links the entire runtime, they control the version
  • one cannot reference rust steps from multiple python libraries, instead they'd have to be linked into one crate first
  • a second statically linked, compiled language like Go cannot be mixed into the same pipeline as Rust, because one would have to choose whether to launch the runner "from Rust" or "from Go"

the upside is that GIL overhead is entirely gone for calling, and I think the approach would also work fine for Hybrid Python/Rust.

I don't think this would be too hard to prototype in a few days. I'd skip changing the DSL to YAML though, I don't think it's necessary.

Copy link
Collaborator

@fpacifici fpacifici Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this would be too hard to prototype in a few days. I'd skip changing the DSL to YAML though, I don't think it's necessary.

Let me try to break down this. I should have some time today. Then I may ask you to build the crate as I will be off tomorrow.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually no, I could not get to this today. Will not make changes till monday.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you want to break down? I can get started on a prototype on monday, I think I know what to do

Comment on lines +186 to +193
// Release GIL and call Rust function
let result_msg = py.allow_threads(|| {
// clone metadata, but try very hard to avoid cloning the payload
let (payload, metadata) = rust_msg.take();
let metadata_clone = metadata.clone();
let result_payload = transform_fn(metadata.map(|()| payload));
metadata_clone.map(|()| result_payload)
});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't transform_fn that executes the rust function ? In such case aren't we releasing the GIL only after that ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's wrapped in allow_threads, so within this closure from 187 to 192 we do not hold the GIL and re-acquire it afterwards

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, good point, thanks

Comment on lines +187 to +193
let result_msg = py.allow_threads(|| {
// clone metadata, but try very hard to avoid cloning the payload
let (payload, metadata) = rust_msg.take();
let metadata_clone = metadata.clone();
let result_payload = transform_fn(metadata.map(|()| payload));
metadata_clone.map(|()| result_payload)
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I think that traced_with_gil macro doesn't work with this, but this is a minor concern

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any ideas on how to fix? I was thinking of introducing traced_release_gil! and using a thread local to make the timings correct. but that's fairly heavyweight

Copy link
Collaborator

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed offline on merging this even if we are not fully set on the design to allow @john-z-yang and @untitaker to iterate on the idea.

I think there are two issues to solve in order to do this:

  1. Address the CI failure.
  2. The new infrastructure added for rust is not unit tested. There is a high level python integration test but that is not enough for production code as this would be production code if merged. People will start using it. I would suggest one of these options: add unit tests for pipeline.py and ffi.rs; or move the rust integration code into an experimental package/module to signify it should not be used in production code.

Up to you. I would add the unit tests

@untitaker
Copy link
Member Author

I addressed both issues and I think this is ready for a final review. This actually also uncovered a breaking bug in the implementation.

There is now even a full pipeline test using arroyo's LocalBroker. I am going to reuse this code in a later PR so we can test our examples more efficiently (see #182)

@untitaker untitaker merged commit 61b153e into main Aug 5, 2025
18 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants