fix: EnforceDistribution optimizer preserves fetch (LIMIT) from distribution-changing operators#21170
fix: EnforceDistribution optimizer preserves fetch (LIMIT) from distribution-changing operators#21170zhuqi-lucas wants to merge 3 commits intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Fixes a correctness bug in the physical optimizer where EnforceDistribution could drop a fetch (LIMIT) that had been pushed down into distribution-changing operators (CoalescePartitionsExec / SortPreservingMergeExec), leading to extra/duplicate rows for multi-partition inputs.
Changes:
- Track and propagate
fetchwhile stripping distribution-changing operators so it can be re-applied later. - Extend
add_merge_on_topto optionally applyfetchwhen re-addingSortPreservingMergeExec. - Add regression tests ensuring
fetchsurvivesEnforceDistributionacross coalesce and SPM cases.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
datafusion/physical-optimizer/src/enforce_distribution.rs |
Captures fetch while stripping dist-changing ops; attempts to reapply it during merge insertion or via a fallback wrapper. |
datafusion/core/tests/physical_optimizer/enforce_distribution.rs |
Adds regression tests to ensure fetch is preserved through EnforceDistribution. |
Comments suppressed due to low confidence (1)
datafusion/physical-optimizer/src/enforce_distribution.rs:992
add_merge_on_toponly applies the strippedfetchto a newly createdSortPreservingMergeExec, but not to theCoalescePartitionsExecbranch. If the child has no ordering, this leavesfetchunconsumed and triggers the fallback wrapper later, potentially producing redundantCoalescePartitionsExecnodes and extra overhead. Consider applyingfetch.take()toCoalescePartitionsExec::new(...).with_fetch(...)as well when adding the merge/coalesce on top.
let new_plan = if let Some(req) = input.plan.output_ordering() {
Arc::new(
SortPreservingMergeExec::new(req.clone(), Arc::clone(&input.plan))
.with_fetch(fetch.take()),
) as _
} else {
// If there is no input order, we can simply coalesce partitions:
Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _
};
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
datafusion/core/tests/physical_optimizer/enforce_distribution.rs
Outdated
Show resolved
Hide resolved
836549e to
c8cc0b3
Compare
rkrishn7
left a comment
There was a problem hiding this comment.
Thanks @zhuqi-lucas! Left a couple comments
| if let Some(fetch_val) = fetch.take() { | ||
| let limit_plan: Arc<dyn ExecutionPlan> = if let Some(spm) = spm { | ||
| // Re-insert the original SortPreservingMergeExec with fetch. | ||
| spm.with_fetch(Some(fetch_val)).unwrap() |
There was a problem hiding this comment.
Hmm, is spm actually stale here? It was captured prior to processing of the plan's children above. I think we might want to use the SortPreservingMergeExec constructor directly here with the new plan.
There was a problem hiding this comment.
Good catch! Yes, spm was indeed stale — it was captured in remove_dist_changing_operators before the child subtree went through ensure_distribution, so it referenced the old (pre-rewrite) child plan.
Fixed in 8d2ee25: instead of capturing the entire Arc<dyn ExecutionPlan>, we now only capture the LexOrdering from the SPM. In the fallback path, we reconstruct a fresh SortPreservingMergeExec::new(ordering, dist_context.plan) using the current (rewritten) child. This also eliminates the unwrap().
Additionally, the spm capture was gated on fetch.is_none(), which meant it would be skipped when an outer operator (e.g. CoalescePartitionsExec) already set fetch. I decoupled it to use spm_ordering.is_none() instead. Added a regression test (nested_coalesce_over_spm_preserves_spm_ordering) that confirms the old code produced CoalescePartitionsExec (losing sort semantics) while the fix correctly produces SortPreservingMergeExec.
| } else { | ||
| // The fetch came from a CoalescePartitionsExec. Re-introduce | ||
| // it as a CoalescePartitionsExec(fetch=N) wrapping the output. | ||
| Arc::new( |
There was a problem hiding this comment.
Do we want to check if the new plan outputs more than a single partition? And if so, use GlobalLimitExec instead of CoalescePartitionsExec?
Not sure this one matters too much though.
There was a problem hiding this comment.
Good point. In the current flow, when fetch is unconsumed it means add_merge_on_top didn't fire (either UnspecifiedDistribution or single partition). For single partition the CoalescePartitionsExec is essentially a no-op wrapper with fetch. For multi-partition cases, the fetch was already consumed by add_merge_on_top. So in practice I think the current behavior is correct, but I'll keep an eye on edge cases. Thanks for raising it!
8d2ee25 to
5b0a42e
Compare
…ibution-changing operators When `LimitPushdown` merges a `GlobalLimitExec` into a `CoalescePartitionsExec` or `SortPreservingMergeExec` as a `fetch` value, `EnforceDistribution` would strip and re-insert these operators without preserving the fetch. This silently drops the LIMIT for queries over multi-partition sources. The fix captures the `fetch` in `remove_dist_changing_operators` and threads it through `add_merge_on_top`, or re-introduces it as a fallback when the merge operator is not re-inserted. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…re from fetch - Capture only the LexOrdering from the stripped SortPreservingMergeExec instead of the entire plan node, then reconstruct a fresh SPM with the current (possibly rewritten) child plan in the fallback path. This avoids reusing a stale SPM that references an outdated subtree. - Decouple spm_ordering capture from fetch — use `spm_ordering.is_none()` instead of gating on `fetch.is_none()`, so the SPM ordering is recorded even when an outer operator already set fetch. - Remove unwrap() on with_fetch by constructing SPM directly. - Fix test comment: fallback is CoalescePartitionsExec/SPM, not GlobalLimitExec. - Add regression test: nested CoalescePartitionsExec(fetch=5) over SortPreservingMergeExec(fetch=3) now correctly preserves SPM ordering.
5b0a42e to
d861c98
Compare
Which issue does this PR close?
Rationale for this change
When
LimitPushdownmerges aGlobalLimitExecinto aCoalescePartitionsExecorSortPreservingMergeExecas afetchvalue, theEnforceDistributionoptimizer rule strips and re-inserts these distribution-changing operators without preserving thefetch. This silently drops the LIMIT for queries over multi-partition sources, potentially returning duplicate/extra rows.What changes are included in this PR?
remove_dist_changing_operatorsnow captures anyfetchvalue and theLexOrderingfrom a strippedSortPreservingMergeExec(if present) before stripping operators. Only the ordering is captured — not the full plan node — so that a fresh SPM can be reconstructed later with the current (possibly rewritten) child plan, avoiding stale references.add_merge_on_topaccepts an optionalfetchand applies it to the newly createdSortPreservingMergeExecorCoalescePartitionsExec.fetchwas not consumed byadd_merge_on_top(e.g., when the parent hadUnspecifiedDistributionor the child already had a single partition), the limit is re-introduced as a wrapping operator so it is never silently lost. When the original operator was an SPM, a freshSortPreservingMergeExecis reconstructed from the captured ordering to preserve sort semantics.spm_ordering.is_none()instead of gating onfetch.is_none(), so the SPM ordering is recorded even when an outer operator (e.g.CoalescePartitionsExec) already setfetch.Are these changes tested?
Yes, four tests are added:
coalesce_partitions_fetch_preserved_by_enforce_distribution— unsorted multi-partition source withCoalescePartitionsExec(fetch=1)coalesce_partitions_fetch_preserved_sorted— sorted multi-partition source withCoalescePartitionsExec(fetch=5)spm_fetch_preserved_by_enforce_distribution— sorted multi-partition source withSortPreservingMergeExec(fetch=3)nested_coalesce_over_spm_preserves_spm_ordering— nestedCoalescePartitionsExec(fetch=5)overSortPreservingMergeExec(fetch=3), verifying that SPM ordering is preserved even when an outer operator already setfetchAre there any user-facing changes?
No API changes. Queries with LIMIT over multi-partition sources will now correctly preserve the limit through the
EnforceDistributionoptimizer pass.