Skip to content

perf: batch reset invalid stream offsets in stream supervisor#19431

Open
jtuglu1 wants to merge 1 commit intoapache:masterfrom
jtuglu1:reset-offsets-in-batch-to-speed-up-supervisor-recovery
Open

perf: batch reset invalid stream offsets in stream supervisor#19431
jtuglu1 wants to merge 1 commit intoapache:masterfrom
jtuglu1:reset-offsets-in-batch-to-speed-up-supervisor-recovery

Conversation

@jtuglu1
Copy link
Copy Markdown
Contributor

@jtuglu1 jtuglu1 commented May 8, 2026

Description

Problem

I've noticed that recent Druid can be extremely slow to recover from a lagging supervisor (where offsets are invalid and tuningConfig.resetOffsetAutomatically=true). I have observed the following issue occurs:

  1. Kafka supervisor falls behind, offsets are invalid
  2. Supervisor starts creating a new task group
  3. Supervisor runs into an invalid offset for a particular partition, throwing an exception. This exception propagates up to the supervisor, forcing us to wait until another round of runInternal(), which can take O(XXs) depending on the supervisor run loop configuration. Since these partitions are evaluated serially, if multiple partitions are invalid offsets, this means we keep looping for every invalid offset for every partition for every taskGroup, leading to many minutes of downtime. This issue is exacerbated when ingesting from large Kafka topics with many partitions.

Solution

For each taskGroup, identify + reset all invalid partition offsets in one go. Once we have established the set of invalid offsets, perform an internal reset, then throw the exception. This reduces the time-to-recovery of a fatally-lagged supervisor from N runInternal() calls to 1 runInternal() call, where N is the # of invalid partition offsets.

Release note

Batch reset invalid stream offsets in stream supervisor to speed up stream supervisor recovery.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@jtuglu1 jtuglu1 changed the title perf: reset invalid stream offsets in batch during recovery perf: batch reset invalid stream offsets in stream supervisor May 8, 2026
@jtuglu1 jtuglu1 requested review from abhishekrb19 and gianm May 8, 2026 01:52
@jtuglu1 jtuglu1 force-pushed the reset-offsets-in-batch-to-speed-up-supervisor-recovery branch from 594147e to 51a32eb Compare May 8, 2026 02:31
@jtuglu1 jtuglu1 force-pushed the reset-offsets-in-batch-to-speed-up-supervisor-recovery branch from 51a32eb to 657e28d Compare May 8, 2026 02:38
@jtuglu1 jtuglu1 requested review from FrankChen021 and kfaraz May 8, 2026 04:35
// Fetch metadata offsets once and collect all stale partitions across all groups before committing
// any state changes. New TaskGroups are staged locally and only written to activelyReadingTaskGroups
// if no reset is required, keeping state consistent: either all new groups are committed or none are.
final Map<PartitionIdType, SequenceOffsetType> metadataOffsets = getOffsetsFromMetadataStorage();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LMK what you think about this – I think this shouldn't impact task groups without invalid offsets as currently they'd just be added to the activelyReadingTaskGroups then be delayed in processing (see few lines down) since generateStartingSequencesForPartitionGroup would just throw an exception (and force a new runInternal()).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant