Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions sentry_streams/sentry_streams/adapters/arroyo/test_delegate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from datetime import datetime
from typing import Tuple, Union

from arroyo.processing.strategies.run_task import RunTask
from arroyo.types import FilteredPayload
from arroyo.types import Message as ArroyoMessage
from arroyo.types import Partition, Topic, Value

from sentry_streams.adapters.arroyo.rust_step import (
ArroyoStrategyDelegate,
Committable,
OutputRetriever,
RustOperatorFactory,
)
from sentry_streams.pipeline.message import RustMessage


def rust_to_arroyo_msg(
message: RustMessage, committable: Committable
) -> ArroyoMessage[RustMessage]:
arroyo_committable = {
Partition(Topic(partition[0]), partition[1]): offset
for partition, offset in committable.items()
}
return ArroyoMessage(
Value(
message,
arroyo_committable,
datetime.fromtimestamp(message.timestamp) if message.timestamp else None,
)
)


def arroyo_msg_to_rust(
message: ArroyoMessage[Union[FilteredPayload, RustMessage]],
) -> Tuple[RustMessage, Committable] | None:
if isinstance(message.payload, FilteredPayload):
return None
committable = {
(partition.topic.name, partition.index): offset
for partition, offset in message.committable.items()
}
return (message.payload, committable)
Comment on lines +18 to +43
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These seem unnecessary.
They seem 95% copies of these https://github.com/getsentry/streams/blob/main/sentry_streams/sentry_streams/adapters/arroyo/multi_process_delegate.py#L46-L91

The difference seems toi be that your RunTask op takes RustMessage and return RustMessage rather than taking and returning a PyAnyMessage.
If you make your function take and return a PyAnyMessage you should be able to avoid these methods entirely and actually test the real logic we run in production instead, which would be recommended.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried originally doing what you're suggesting, but spent >30 min wrestling with mypy to make it happy regarding the TMapIn/TMapOut generics in the ArroyoStrategyDelegate.

I can spend more time on this to get the typing right, it was just a nightmare.



def noop(
msg: ArroyoMessage[Union[FilteredPayload, RustMessage]],
) -> Union[FilteredPayload, RustMessage]:
return msg.payload


class TestDelegateFactory(RustOperatorFactory):
"""
A delegate used in rust_streams tests for the PythonAdapter step.
"""

def build(
self,
) -> ArroyoStrategyDelegate[FilteredPayload | RustMessage, FilteredPayload | RustMessage]:
retriever = OutputRetriever(out_transformer=arroyo_msg_to_rust)
inner = RunTask(noop, retriever)
return ArroyoStrategyDelegate(inner, rust_to_arroyo_msg, retriever)
30 changes: 30 additions & 0 deletions sentry_streams/src/python_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ mod tests {
use crate::testutils::build_routed_value;
use crate::testutils::make_committable;
use pyo3::ffi::c_str;
use pyo3::BoundObject;
use pyo3::IntoPyObjectExt;
use sentry_arroyo::processing::strategies::noop::Noop;
use std::collections::{BTreeMap, HashMap};
Expand Down Expand Up @@ -521,4 +522,33 @@ class RustOperatorDelegateFactory:
} // Unlock the MutexGuard around `actual_messages`
})
}

#[test]
fn test_arroyo_python_delegate() {
crate::testutils::initialize_python();
traced_with_gil!(|py| {
let delegate = c_str!(
r#"
from sentry_streams.adapters.arroyo.test_delegate import TestDelegateFactory
"#
);
let scope = PyModule::new(py, "test_scope").unwrap();
py.run(delegate, Some(&scope.dict()), None).unwrap();
let operator = scope.getattr("TestDelegateFactory").unwrap();
let instance = operator.call0().unwrap();

let mut operator = PythonAdapter::new(
Route::new("source1".to_string(), vec!["waypoint1".to_string()]),
instance.unbind(),
Box::new(Noop {}),
);

let message = make_msg(py, "ok");
let res = operator.submit(message);
assert!(res.is_ok());

let commit_request = operator.poll();
assert!(commit_request.is_ok());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this returning OK now that your fix has not been merged yet ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no idea

})
}
}