Describe the bug
RepartitionExec's distribution channels (distributor_channels.rs) only throttle producers when every output channel has at least one buffered item ("Gate A"). With balanced hash partitioning and a single slow consumer, the slow channel grows linearly per input batch.
To reproduce
Trace with N=4 partitions, ch3's consumer slower than the others:
- After the first input batch each channel has ≥1 item → Gate A closes.
- ch0 drains → Gate A opens.
- Producer pushes the next input batch's sub-batches: one
take-materialized batch per partition. Push to ch0 succeeds (was empty); ch1, ch2, ch3 already had data, so the empty-counter never decrements during the round.
- After the round all channels are non-empty again → Gate A re-closes. ch3 has gained 1.
Each cycle adds another batch to ch3 — linear growth in the number of input batches, until OOM (or until the per-partition SharedMemoryReservation triggers spilling).
Cause
Gate A's invariant is "no live channel is empty." A single fast channel oscillating empty/non-empty keeps the gate open forever, and every gate-open window lets the producer push to every partition — including the slow one. Skewed input data is not required; even hash distribution + one lagging consumer is enough.
Expected behavior
The producer should be throttled when total buffered memory crosses a configured threshold, regardless of how many channels are technically non-empty.
Proposed fix
Add a second gate condition — total buffered bytes across all channels ≥ a configured budget — and close the gate when either condition fires (A || B). Gate A still gives O(1) per-channel depth for balanced workloads with even consumer rates; Gate B caps total memory whenever some channel never drains. Velox's LocalExchangeMemoryManager is the design template.
I'll send a PR shortly.
Describe the bug
RepartitionExec's distribution channels (distributor_channels.rs) only throttle producers when every output channel has at least one buffered item ("Gate A"). With balanced hash partitioning and a single slow consumer, the slow channel grows linearly per input batch.To reproduce
Trace with N=4 partitions, ch3's consumer slower than the others:
take-materialized batch per partition. Push to ch0 succeeds (was empty); ch1, ch2, ch3 already had data, so the empty-counter never decrements during the round.Each cycle adds another batch to ch3 — linear growth in the number of input batches, until OOM (or until the per-partition
SharedMemoryReservationtriggers spilling).Cause
Gate A's invariant is "no live channel is empty." A single fast channel oscillating empty/non-empty keeps the gate open forever, and every gate-open window lets the producer push to every partition — including the slow one. Skewed input data is not required; even hash distribution + one lagging consumer is enough.
Expected behavior
The producer should be throttled when total buffered memory crosses a configured threshold, regardless of how many channels are technically non-empty.
Proposed fix
Add a second gate condition — total buffered bytes across all channels ≥ a configured budget — and close the gate when either condition fires (
A || B). Gate A still gives O(1) per-channel depth for balanced workloads with even consumer rates; Gate B caps total memory whenever some channel never drains. Velox'sLocalExchangeMemoryManageris the design template.I'll send a PR shortly.