feat(eap-items): Route partition-boundary stragglers to a late-arrivals topic#7972
Draft
onewland wants to merge 2 commits into
Draft
feat(eap-items): Route partition-boundary stragglers to a late-arrivals topic#7972onewland wants to merge 2 commits into
onewland wants to merge 2 commits into
Conversation
…ls topic The eap-items partition-boundary killswitch currently DLQs messages whose event_timestamp belongs to a prior weekly partition. The same DLQ topic also collects genuinely invalid messages (protobuf decode failures, etc.), which makes it unsafe to auto-replay: a drain consumer would faithfully re-attempt the truly invalid ones and loop. Split these streams. Late arrivals now flow to a dedicated `snuba-eap-items-late-arrivals` topic (configured via a new `late_arrivals_topic` field on the stream loader); the DLQ goes back to its original "genuinely invalid only" meaning. The mechanism mirrors the existing ProduceReplacements pattern: the processor returns a typed `InsertOrLateArrival<T>`, and a new ProduceLateArrivals strategy in the pipeline fans out the variants — Insert(b) to ClickHouse, LateArrival(payload) to the side topic. Considered (and rejected) a pre-processor router doing a partial protobuf decode (mirrors BLQRouter) and an error-routing wrapper using sentinel anyhow errors; both either duplicate work the processor already does or rely on fragile error-type matching. The typed-enum shape is the cleanest fit in this codebase. Notes for reviewers: * Runtime config key renamed: `eap_items_dlq_grace_period_min:<storage>` -> `eap_items_late_arrival_grace_period_min:<storage>`. The metric is renamed similarly: `eap_items.messages.dlqed_prior_partition` -> `eap_items.messages.routed_late_arrival`. Dashboards/alerts need to be updated in lockstep with deploy. The killswitch's code path no longer mentions "DLQ" anywhere - it never routes there now. * `eap_items_dlq_replay.yaml` is intentionally NOT changed in this PR. The existing DLQ topic carries a backlog of partition-boundary stragglers from the previous regime; the replay consumer can still drain that backlog while it lasts. After the DLQ is drained, repointing/renaming the replay storage to consume the new topic is a follow-up. * New Topic enum entry `EAP_ITEMS_LATE_ARRIVALS = "snuba-eap-items- late-arrivals"` requires the Kafka topic to be provisioned in ops before this rolls out. If the topic is missing in any environment the consumer will fail to start. * For storages whose processor returns `InsertOrLateArrival` but which configure no `late_arrivals_topic`, the factory wires up a `ProduceLateArrivals::disabled` adapter that silently drops the LateArrival branch - matches the existing "killswitch disabled when runtime config is unset" semantic. * The Rust ConsumerConfig field has `#[serde(default)]`, so the order in which the Rust and Python sides land is safe - Rust deserializes whether or not Python emits the field. * Drive-by: added `# type: ignore[attr-defined]` to the existing `confluent_kafka.admin` import in `table_storage.py`. The symbols resolve at runtime; the library's type stubs don't explicitly re-export them. mypy in pre-commit flagged this on the modified file even though the imports are pre-existing. Co-Authored-By: Claude <noreply@anthropic.com>
Two follow-ups to the previous commit that I missed because they were not exercised by `cargo check --lib` or the targeted pytest run: * `benches/processors.rs` constructs `ConsumerStrategyFactoryV2` directly; add the new `late_arrivals_concurrency` and `late_arrivals_config` fields. Picked up by the "Linting - Rust" job which runs clippy across all targets, including benches. * `tests/utils/streams/test_topics.py::test_valid_topics` enforces that every Topic enum value has a registered schema in `sentry-kafka-schemas`. The new `EAP_ITEMS_LATE_ARRIVALS` doesn't yet — add a `pending_schema_registration` tuple parallel to `deprecated_topics`, with a note to remove the entry once the schema lands upstream. Co-Authored-By: Claude <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Splits late-arriving eap-items messages off the regular DLQ into a dedicated
snuba-eap-items-late-arrivalstopic, so the DLQ can go back to meaning "genuinely invalid" and the late-arrivals stream becomes safe for automated replay.Today the partition-boundary killswitch in the EAP items processor
anyhow::bail!s on prior-week messages, and the same DLQ topic also collects protobuf-decode failures and other true-invalid cases. Any drain consumer pointed at the DLQ will faithfully re-attempt those, looping forever, so we can never automate replay of the partition-boundary stragglers without first manually scrubbing the DLQ.The mechanism mirrors the existing
ProduceReplacementspattern inerrors. The processor now returns a typedInsertOrLateArrival<T>enum; a newProduceLateArrivalsstrategy in the pipeline fans out the variants —Insert(b)to ClickHouse,LateArrival(payload)to the side topic carrying the original Kafka payload. A newlate_arrivals_topicfield on the storage stream-loader plumbs the topic name through PythonConsumerConfigto Rust.Considered (and rejected) two other shapes:
BLQRouter). Would duplicate the protobuf decode work the processor already does.anyhow::Error. Relies on fragile string/type matching on errors.The typed-enum shape is the cleanest fit and lines up with what's already in the codebase for replacements.
Notes for reviewers
Runtime config key renamed:
eap_items_dlq_grace_period_min:<storage>→eap_items_late_arrival_grace_period_min:<storage>. The metric is also renamed:eap_items.messages.dlqed_prior_partition→eap_items.messages.routed_late_arrival. Dashboards and alerts need to be updated in lockstep with the deploy. The killswitch code path no longer mentions "DLQ" anywhere — it never routes there.eap_items_dlq_replay.yamlis intentionally NOT changed. The existing DLQ topic still carries a backlog of partition-boundary stragglers from the previous regime; the replay consumer can still drain that backlog. After the DLQ is drained, repointing/renaming the replay storage to consume the new topic is a follow-up PR.New Topic enum entry
EAP_ITEMS_LATE_ARRIVALS(snuba-eap-items-late-arrivals) requires the Kafka topic to be provisioned in ops before this rolls out. If the topic is missing in any environment, the consumer will fail to start.Storages without a
late_arrivals_topicconfigured: the factory usesProduceLateArrivals::disabled, which silently drops the LateArrival branch. Matches the existing "killswitch disabled when runtime config is unset" semantic. Today onlyeap_itemsconfigures the topic;eap_items_dlq_replay(and any future EAP storage) opt out by leaving it unset.Order-safety for the rollout: the Rust
ConsumerConfig.late_arrivals_topichas#[serde(default)], so the Rust side deserializes whether or not the Python side emits the field. The two halves can land independently.Drive-by: added
# type: ignore[attr-defined]to the pre-existingconfluent_kafka.adminimport intable_storage.py. The symbols resolve at runtime but the stubs don't explicitly re-export them; pre-commit's mypy started flagging the file once I modified it.Tests
processors::eap_items::tests::test_process_message_routes_late_arrival_when_armedand the row-binary variant cover the integration viapatch_str_config_for_test.strategies::late_arrivals::tests::forwards_insert_diverts_late_arrivalcovers the strategy fan-out directly.*_dlq_*to*_route_*and now assertshould_route_late_arrivalsemantics.All 146 Rust tests pass; relevant Python storage/consumer tests pass.