Skip to content

Conversation

@bmckerry
Copy link
Member

@bmckerry bmckerry commented Aug 27, 2025

Followup to #202

This PR adds a test for using the ArroyoDelegate from the PythonAdapter

@bmckerry bmckerry changed the title fix(python_adapter): retrieve messages as Sequence fix(python_adapter): add test for ArroyoAdapter in rust Aug 29, 2025
@bmckerry bmckerry changed the title fix(python_adapter): add test for ArroyoAdapter in rust fix(python_adapter): add test for ArroyoDelegate in rust Aug 29, 2025
@bmckerry bmckerry marked this pull request as ready for review August 29, 2025 20:38
@bmckerry bmckerry requested a review from a team as a code owner August 29, 2025 20:38
Comment on lines +18 to +43
def rust_to_arroyo_msg(
message: RustMessage, committable: Committable
) -> ArroyoMessage[RustMessage]:
arroyo_committable = {
Partition(Topic(partition[0]), partition[1]): offset
for partition, offset in committable.items()
}
return ArroyoMessage(
Value(
message,
arroyo_committable,
datetime.fromtimestamp(message.timestamp) if message.timestamp else None,
)
)


def arroyo_msg_to_rust(
message: ArroyoMessage[Union[FilteredPayload, RustMessage]],
) -> Tuple[RustMessage, Committable] | None:
if isinstance(message.payload, FilteredPayload):
return None
committable = {
(partition.topic.name, partition.index): offset
for partition, offset in message.committable.items()
}
return (message.payload, committable)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These seem unnecessary.
They seem 95% copies of these https://github.com/getsentry/streams/blob/main/sentry_streams/sentry_streams/adapters/arroyo/multi_process_delegate.py#L46-L91

The difference seems toi be that your RunTask op takes RustMessage and return RustMessage rather than taking and returning a PyAnyMessage.
If you make your function take and return a PyAnyMessage you should be able to avoid these methods entirely and actually test the real logic we run in production instead, which would be recommended.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried originally doing what you're suggesting, but spent >30 min wrestling with mypy to make it happy regarding the TMapIn/TMapOut generics in the ArroyoStrategyDelegate.

I can spend more time on this to get the typing right, it was just a nightmare.

assert!(res.is_ok());

let commit_request = operator.poll();
assert!(commit_request.is_ok());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this returning OK now that your fix has not been merged yet ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no idea

@getsentry getsentry deleted a comment from cursor bot Sep 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants