-
-
Notifications
You must be signed in to change notification settings - Fork 0
dlq #223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| crate::testutils::initialize_python(); | ||
|
|
||
| let mut router = create_simple_router(c_str!("lambda x: {}[0]"), Noop {}); | ||
| import_py_dep("sentry_streams.pipeline.exception", "InvalidMessageError"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this test was wrong previuosly, it tested the same thing as the test before it.
fpacifici
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some high level questions.
| if self._dlq_config is not None: | ||
| topic_override = self._dlq_config.get("topic") | ||
|
|
||
| self._dlq_producer.produce(metadata, topic_override) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about just producing the payload we got at the specific step directly on the DLQ as well ? Metadata could be either a message wrapper or headers.
| _metrics_wrapped_function, step.name, application_function | ||
| ) | ||
| # Wrap with DLQ handling first, then metrics | ||
| dlq_wrapped = self.__wrap_with_dlq(step.name, application_function, stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
High level design questions.
- Would we have a DLQ for aggregate steps and batching steps ?
- In case we did not support aggregate steps (that are lossy), this may be ok, though, what about invalid messages for the following steps ?
- For batch scenarios, we should at least identify specific invalid messages and DLQ them as they still retain the original identity. If we did not support this, the DLQ would not be useful for SBC either as we batch first and process everything in multiprocess in batches.
I think the semantics of aggergate steps could be this:
- An invalid message is DLQed when we try to add it to an aggregate and fail. This does not compromise the existing aggregate
- DLQ in batched message should still work as if the messages were not batched. Batching is generally an optimization, it should not change the semantics of the application, other aggregations do.
- We should supoprt DLQ even after a batch/aggregate step. At this point the original identity may be lost, so I think we may want to DLQ the aggregate payload entirely. This would be viable in the new architecture as we could reprocess data from a specific step of the pipeline as the steps have identity and they are statically connected.
Configuration Schema
DLQ Message Format
{ "original_topic": "source-topic", "original_partition": 5, "original_offset": 12345, "original_key": "user-123", "step_name": "my_parser", "consumer_group": "pipeline-my_source", "error": "Schema validation failed", "timestamp": 1704067200.123 }Implementation
The implementation will change drastically, still. Right now the exceptions from userland are mostly caught in python and directly forwarded to a dlq producer, then reraised to skip the message. The producer should move to Rust to avoid it becoming a bottleneck.