Skip to content

RepartitionExec channels grow unboundedly with one slow consumer #22090

@JanKaul

Description

@JanKaul

Describe the bug

RepartitionExec's distribution channels (distributor_channels.rs) only throttle producers when every output channel has at least one buffered item ("Gate A"). With balanced hash partitioning and a single slow consumer, the slow channel grows linearly per input batch.

To reproduce

Trace with N=4 partitions, ch3's consumer slower than the others:

  1. After the first input batch each channel has ≥1 item → Gate A closes.
  2. ch0 drains → Gate A opens.
  3. Producer pushes the next input batch's sub-batches: one take-materialized batch per partition. Push to ch0 succeeds (was empty); ch1, ch2, ch3 already had data, so the empty-counter never decrements during the round.
  4. After the round all channels are non-empty again → Gate A re-closes. ch3 has gained 1.

Each cycle adds another batch to ch3 — linear growth in the number of input batches, until OOM (or until the per-partition SharedMemoryReservation triggers spilling).

Cause

Gate A's invariant is "no live channel is empty." A single fast channel oscillating empty/non-empty keeps the gate open forever, and every gate-open window lets the producer push to every partition — including the slow one. Skewed input data is not required; even hash distribution + one lagging consumer is enough.

Expected behavior

The producer should be throttled when total buffered memory crosses a configured threshold, regardless of how many channels are technically non-empty.

Proposed fix

Add a second gate condition — total buffered bytes across all channels ≥ a configured budget — and close the gate when either condition fires (A || B). Gate A still gives O(1) per-channel depth for balanced workloads with even consumer rates; Gate B caps total memory whenever some channel never drains. Velox's LocalExchangeMemoryManager is the design template.

I'll send a PR shortly.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions