-
-
Notifications
You must be signed in to change notification settings - Fork 0
fix(python_adapter): add test for ArroyoDelegate in rust #200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| def rust_to_arroyo_msg( | ||
| message: RustMessage, committable: Committable | ||
| ) -> ArroyoMessage[RustMessage]: | ||
| arroyo_committable = { | ||
| Partition(Topic(partition[0]), partition[1]): offset | ||
| for partition, offset in committable.items() | ||
| } | ||
| return ArroyoMessage( | ||
| Value( | ||
| message, | ||
| arroyo_committable, | ||
| datetime.fromtimestamp(message.timestamp) if message.timestamp else None, | ||
| ) | ||
| ) | ||
|
|
||
|
|
||
| def arroyo_msg_to_rust( | ||
| message: ArroyoMessage[Union[FilteredPayload, RustMessage]], | ||
| ) -> Tuple[RustMessage, Committable] | None: | ||
| if isinstance(message.payload, FilteredPayload): | ||
| return None | ||
| committable = { | ||
| (partition.topic.name, partition.index): offset | ||
| for partition, offset in message.committable.items() | ||
| } | ||
| return (message.payload, committable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These seem unnecessary.
They seem 95% copies of these https://github.com/getsentry/streams/blob/main/sentry_streams/sentry_streams/adapters/arroyo/multi_process_delegate.py#L46-L91
The difference seems toi be that your RunTask op takes RustMessage and return RustMessage rather than taking and returning a PyAnyMessage.
If you make your function take and return a PyAnyMessage you should be able to avoid these methods entirely and actually test the real logic we run in production instead, which would be recommended.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried originally doing what you're suggesting, but spent >30 min wrestling with mypy to make it happy regarding the TMapIn/TMapOut generics in the ArroyoStrategyDelegate.
I can spend more time on this to get the typing right, it was just a nightmare.
| assert!(res.is_ok()); | ||
|
|
||
| let commit_request = operator.poll(); | ||
| assert!(commit_request.is_ok()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this returning OK now that your fix has not been merged yet ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no idea
Followup to #202
This PR adds a test for using the ArroyoDelegate from the PythonAdapter