Skip to content

feat(eap-items): Route partition-boundary stragglers to a late-arrivals topic#7972

Draft
onewland wants to merge 2 commits into
masterfrom
ohn/eap-items-late-arrivals-topic
Draft

feat(eap-items): Route partition-boundary stragglers to a late-arrivals topic#7972
onewland wants to merge 2 commits into
masterfrom
ohn/eap-items-late-arrivals-topic

Conversation

@onewland
Copy link
Copy Markdown
Contributor

@onewland onewland commented May 27, 2026

Splits late-arriving eap-items messages off the regular DLQ into a dedicated snuba-eap-items-late-arrivals topic, so the DLQ can go back to meaning "genuinely invalid" and the late-arrivals stream becomes safe for automated replay.

Today the partition-boundary killswitch in the EAP items processor anyhow::bail!s on prior-week messages, and the same DLQ topic also collects protobuf-decode failures and other true-invalid cases. Any drain consumer pointed at the DLQ will faithfully re-attempt those, looping forever, so we can never automate replay of the partition-boundary stragglers without first manually scrubbing the DLQ.

The mechanism mirrors the existing ProduceReplacements pattern in errors. The processor now returns a typed InsertOrLateArrival<T> enum; a new ProduceLateArrivals strategy in the pipeline fans out the variants — Insert(b) to ClickHouse, LateArrival(payload) to the side topic carrying the original Kafka payload. A new late_arrivals_topic field on the storage stream-loader plumbs the topic name through Python ConsumerConfig to Rust.

Considered (and rejected) two other shapes:

  • A pre-processor router doing a partial protobuf decode (mirrors BLQRouter). Would duplicate the protobuf decode work the processor already does.
  • An error-routing wrapper that intercepts a sentinel anyhow::Error. Relies on fragile string/type matching on errors.

The typed-enum shape is the cleanest fit and lines up with what's already in the codebase for replacements.

Notes for reviewers

  • Runtime config key renamed: eap_items_dlq_grace_period_min:<storage>eap_items_late_arrival_grace_period_min:<storage>. The metric is also renamed: eap_items.messages.dlqed_prior_partitioneap_items.messages.routed_late_arrival. Dashboards and alerts need to be updated in lockstep with the deploy. The killswitch code path no longer mentions "DLQ" anywhere — it never routes there.

  • eap_items_dlq_replay.yaml is intentionally NOT changed. The existing DLQ topic still carries a backlog of partition-boundary stragglers from the previous regime; the replay consumer can still drain that backlog. After the DLQ is drained, repointing/renaming the replay storage to consume the new topic is a follow-up PR.

  • New Topic enum entry EAP_ITEMS_LATE_ARRIVALS (snuba-eap-items-late-arrivals) requires the Kafka topic to be provisioned in ops before this rolls out. If the topic is missing in any environment, the consumer will fail to start.

  • Storages without a late_arrivals_topic configured: the factory uses ProduceLateArrivals::disabled, which silently drops the LateArrival branch. Matches the existing "killswitch disabled when runtime config is unset" semantic. Today only eap_items configures the topic; eap_items_dlq_replay (and any future EAP storage) opt out by leaving it unset.

  • Order-safety for the rollout: the Rust ConsumerConfig.late_arrivals_topic has #[serde(default)], so the Rust side deserializes whether or not the Python side emits the field. The two halves can land independently.

  • Drive-by: added # type: ignore[attr-defined] to the pre-existing confluent_kafka.admin import in table_storage.py. The symbols resolve at runtime but the stubs don't explicitly re-export them; pre-commit's mypy started flagging the file once I modified it.

Tests

  • New processors::eap_items::tests::test_process_message_routes_late_arrival_when_armed and the row-binary variant cover the integration via patch_str_config_for_test.
  • New strategies::late_arrivals::tests::forwards_insert_diverts_late_arrival covers the strategy fan-out directly.
  • Existing partition-boundary unit tests were renamed from *_dlq_* to *_route_* and now assert should_route_late_arrival semantics.

All 146 Rust tests pass; relevant Python storage/consumer tests pass.

onewland and others added 2 commits May 27, 2026 13:47
…ls topic

The eap-items partition-boundary killswitch currently DLQs messages whose
event_timestamp belongs to a prior weekly partition. The same DLQ topic
also collects genuinely invalid messages (protobuf decode failures, etc.),
which makes it unsafe to auto-replay: a drain consumer would faithfully
re-attempt the truly invalid ones and loop. Split these streams.

Late arrivals now flow to a dedicated `snuba-eap-items-late-arrivals`
topic (configured via a new `late_arrivals_topic` field on the stream
loader); the DLQ goes back to its original "genuinely invalid only"
meaning. The mechanism mirrors the existing ProduceReplacements pattern:
the processor returns a typed `InsertOrLateArrival<T>`, and a new
ProduceLateArrivals strategy in the pipeline fans out the variants —
Insert(b) to ClickHouse, LateArrival(payload) to the side topic.

Considered (and rejected) a pre-processor router doing a partial
protobuf decode (mirrors BLQRouter) and an error-routing wrapper using
sentinel anyhow errors; both either duplicate work the processor
already does or rely on fragile error-type matching. The typed-enum
shape is the cleanest fit in this codebase.

Notes for reviewers:

* Runtime config key renamed: `eap_items_dlq_grace_period_min:<storage>`
  -> `eap_items_late_arrival_grace_period_min:<storage>`. The metric
  is renamed similarly: `eap_items.messages.dlqed_prior_partition` ->
  `eap_items.messages.routed_late_arrival`. Dashboards/alerts need to
  be updated in lockstep with deploy. The killswitch's code path no
  longer mentions "DLQ" anywhere - it never routes there now.

* `eap_items_dlq_replay.yaml` is intentionally NOT changed in this PR.
  The existing DLQ topic carries a backlog of partition-boundary
  stragglers from the previous regime; the replay consumer can still
  drain that backlog while it lasts. After the DLQ is drained,
  repointing/renaming the replay storage to consume the new topic is
  a follow-up.

* New Topic enum entry `EAP_ITEMS_LATE_ARRIVALS = "snuba-eap-items-
  late-arrivals"` requires the Kafka topic to be provisioned in ops
  before this rolls out. If the topic is missing in any environment
  the consumer will fail to start.

* For storages whose processor returns `InsertOrLateArrival` but which
  configure no `late_arrivals_topic`, the factory wires up a
  `ProduceLateArrivals::disabled` adapter that silently drops the
  LateArrival branch - matches the existing "killswitch disabled when
  runtime config is unset" semantic.

* The Rust ConsumerConfig field has `#[serde(default)]`, so the order
  in which the Rust and Python sides land is safe - Rust deserializes
  whether or not Python emits the field.

* Drive-by: added `# type: ignore[attr-defined]` to the existing
  `confluent_kafka.admin` import in `table_storage.py`. The symbols
  resolve at runtime; the library's type stubs don't explicitly
  re-export them. mypy in pre-commit flagged this on the modified
  file even though the imports are pre-existing.

Co-Authored-By: Claude <noreply@anthropic.com>
Two follow-ups to the previous commit that I missed because they were not
exercised by `cargo check --lib` or the targeted pytest run:

* `benches/processors.rs` constructs `ConsumerStrategyFactoryV2` directly;
  add the new `late_arrivals_concurrency` and `late_arrivals_config`
  fields. Picked up by the "Linting - Rust" job which runs clippy across
  all targets, including benches.

* `tests/utils/streams/test_topics.py::test_valid_topics` enforces that
  every Topic enum value has a registered schema in `sentry-kafka-schemas`.
  The new `EAP_ITEMS_LATE_ARRIVALS` doesn't yet — add a
  `pending_schema_registration` tuple parallel to `deprecated_topics`,
  with a note to remove the entry once the schema lands upstream.

Co-Authored-By: Claude <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant