Skip to content

Conversation

@untitaker
Copy link
Member

@untitaker untitaker commented Jan 13, 2026

Configuration Schema

pipeline:
  dlq:
    topic: "default-dlq"           # Default DLQ topic
    bootstrap_servers: ["127.0.0.1:9092"]  # Optional, defaults to source
  segments:
    - steps_config:
        my_source:
          bootstrap_servers: ["127.0.0.1:9092"]
        my_parser:
          dlq:
            enabled: true          # Required opt-in
        my_map_step:
          dlq:
            enabled: true
            topic: "custom-dlq"    # Optional: override default
        my_reducer:
          # No dlq = crash on error (current behavior)

DLQ Message Format

{
  "original_topic": "source-topic",
  "original_partition": 5,
  "original_offset": 12345,
  "original_key": "user-123",
  "step_name": "my_parser",
  "consumer_group": "pipeline-my_source",
  "error": "Schema validation failed",
  "timestamp": 1704067200.123
}

Implementation

The implementation will change drastically, still. Right now the exceptions from userland are mostly caught in python and directly forwarded to a dlq producer, then reraised to skip the message. The producer should move to Rust to avoid it becoming a bottleneck.

crate::testutils::initialize_python();

let mut router = create_simple_router(c_str!("lambda x: {}[0]"), Noop {});
import_py_dep("sentry_streams.pipeline.exception", "InvalidMessageError");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this test was wrong previuosly, it tested the same thing as the test before it.

Copy link
Collaborator

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some high level questions.

if self._dlq_config is not None:
topic_override = self._dlq_config.get("topic")

self._dlq_producer.produce(metadata, topic_override)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about just producing the payload we got at the specific step directly on the DLQ as well ? Metadata could be either a message wrapper or headers.

_metrics_wrapped_function, step.name, application_function
)
# Wrap with DLQ handling first, then metrics
dlq_wrapped = self.__wrap_with_dlq(step.name, application_function, stream)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High level design questions.

  • Would we have a DLQ for aggregate steps and batching steps ?
  • In case we did not support aggregate steps (that are lossy), this may be ok, though, what about invalid messages for the following steps ?
  • For batch scenarios, we should at least identify specific invalid messages and DLQ them as they still retain the original identity. If we did not support this, the DLQ would not be useful for SBC either as we batch first and process everything in multiprocess in batches.

I think the semantics of aggergate steps could be this:

  • An invalid message is DLQed when we try to add it to an aggregate and fail. This does not compromise the existing aggregate
  • DLQ in batched message should still work as if the messages were not batched. Batching is generally an optimization, it should not change the semantics of the application, other aggregations do.
  • We should supoprt DLQ even after a batch/aggregate step. At this point the original identity may be lost, so I think we may want to DLQ the aggregate payload entirely. This would be viable in the new architecture as we could reprocess data from a specific step of the pipeline as the steps have identity and they are statically connected.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants