Skip to content

fix: EnforceDistribution optimizer preserves fetch (LIMIT) from distribution-changing operators#21170

Open
zhuqi-lucas wants to merge 3 commits intoapache:mainfrom
zhuqi-lucas:fix/enforce-dist-preserve-fetch
Open

fix: EnforceDistribution optimizer preserves fetch (LIMIT) from distribution-changing operators#21170
zhuqi-lucas wants to merge 3 commits intoapache:mainfrom
zhuqi-lucas:fix/enforce-dist-preserve-fetch

Conversation

@zhuqi-lucas
Copy link
Contributor

@zhuqi-lucas zhuqi-lucas commented Mar 26, 2026

Which issue does this PR close?

Rationale for this change

When LimitPushdown merges a GlobalLimitExec into a CoalescePartitionsExec or SortPreservingMergeExec as a fetch value, the EnforceDistribution optimizer rule strips and re-inserts these distribution-changing operators without preserving the fetch. This silently drops the LIMIT for queries over multi-partition sources, potentially returning duplicate/extra rows.

What changes are included in this PR?

  1. remove_dist_changing_operators now captures any fetch value and the LexOrdering from a stripped SortPreservingMergeExec (if present) before stripping operators. Only the ordering is captured — not the full plan node — so that a fresh SPM can be reconstructed later with the current (possibly rewritten) child plan, avoiding stale references.
  2. add_merge_on_top accepts an optional fetch and applies it to the newly created SortPreservingMergeExec or CoalescePartitionsExec.
  3. Fallback re-introduction: if the fetch was not consumed by add_merge_on_top (e.g., when the parent had UnspecifiedDistribution or the child already had a single partition), the limit is re-introduced as a wrapping operator so it is never silently lost. When the original operator was an SPM, a fresh SortPreservingMergeExec is reconstructed from the captured ordering to preserve sort semantics.
  4. SPM capture decoupled from fetch: the ordering capture uses spm_ordering.is_none() instead of gating on fetch.is_none(), so the SPM ordering is recorded even when an outer operator (e.g. CoalescePartitionsExec) already set fetch.

Are these changes tested?

Yes, four tests are added:

  • coalesce_partitions_fetch_preserved_by_enforce_distribution — unsorted multi-partition source with CoalescePartitionsExec(fetch=1)
  • coalesce_partitions_fetch_preserved_sorted — sorted multi-partition source with CoalescePartitionsExec(fetch=5)
  • spm_fetch_preserved_by_enforce_distribution — sorted multi-partition source with SortPreservingMergeExec(fetch=3)
  • nested_coalesce_over_spm_preserves_spm_ordering — nested CoalescePartitionsExec(fetch=5) over SortPreservingMergeExec(fetch=3), verifying that SPM ordering is preserved even when an outer operator already set fetch

Are there any user-facing changes?

No API changes. Queries with LIMIT over multi-partition sources will now correctly preserve the limit through the EnforceDistribution optimizer pass.

Copilot AI review requested due to automatic review settings March 26, 2026 08:57
@github-actions github-actions bot added optimizer Optimizer rules core Core DataFusion crate labels Mar 26, 2026
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes a correctness bug in the physical optimizer where EnforceDistribution could drop a fetch (LIMIT) that had been pushed down into distribution-changing operators (CoalescePartitionsExec / SortPreservingMergeExec), leading to extra/duplicate rows for multi-partition inputs.

Changes:

  • Track and propagate fetch while stripping distribution-changing operators so it can be re-applied later.
  • Extend add_merge_on_top to optionally apply fetch when re-adding SortPreservingMergeExec.
  • Add regression tests ensuring fetch survives EnforceDistribution across coalesce and SPM cases.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.

File Description
datafusion/physical-optimizer/src/enforce_distribution.rs Captures fetch while stripping dist-changing ops; attempts to reapply it during merge insertion or via a fallback wrapper.
datafusion/core/tests/physical_optimizer/enforce_distribution.rs Adds regression tests to ensure fetch is preserved through EnforceDistribution.
Comments suppressed due to low confidence (1)

datafusion/physical-optimizer/src/enforce_distribution.rs:992

  • add_merge_on_top only applies the stripped fetch to a newly created SortPreservingMergeExec, but not to the CoalescePartitionsExec branch. If the child has no ordering, this leaves fetch unconsumed and triggers the fallback wrapper later, potentially producing redundant CoalescePartitionsExec nodes and extra overhead. Consider applying fetch.take() to CoalescePartitionsExec::new(...).with_fetch(...) as well when adding the merge/coalesce on top.
        let new_plan = if let Some(req) = input.plan.output_ordering() {
            Arc::new(
                SortPreservingMergeExec::new(req.clone(), Arc::clone(&input.plan))
                    .with_fetch(fetch.take()),
            ) as _
        } else {
            // If there is no input order, we can simply coalesce partitions:
            Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _
        };

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

@rkrishn7 rkrishn7 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @zhuqi-lucas! Left a couple comments

if let Some(fetch_val) = fetch.take() {
let limit_plan: Arc<dyn ExecutionPlan> = if let Some(spm) = spm {
// Re-insert the original SortPreservingMergeExec with fetch.
spm.with_fetch(Some(fetch_val)).unwrap()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, is spm actually stale here? It was captured prior to processing of the plan's children above. I think we might want to use the SortPreservingMergeExec constructor directly here with the new plan.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Yes, spm was indeed stale — it was captured in remove_dist_changing_operators before the child subtree went through ensure_distribution, so it referenced the old (pre-rewrite) child plan.

Fixed in 8d2ee25: instead of capturing the entire Arc<dyn ExecutionPlan>, we now only capture the LexOrdering from the SPM. In the fallback path, we reconstruct a fresh SortPreservingMergeExec::new(ordering, dist_context.plan) using the current (rewritten) child. This also eliminates the unwrap().

Additionally, the spm capture was gated on fetch.is_none(), which meant it would be skipped when an outer operator (e.g. CoalescePartitionsExec) already set fetch. I decoupled it to use spm_ordering.is_none() instead. Added a regression test (nested_coalesce_over_spm_preserves_spm_ordering) that confirms the old code produced CoalescePartitionsExec (losing sort semantics) while the fix correctly produces SortPreservingMergeExec.

} else {
// The fetch came from a CoalescePartitionsExec. Re-introduce
// it as a CoalescePartitionsExec(fetch=N) wrapping the output.
Arc::new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to check if the new plan outputs more than a single partition? And if so, use GlobalLimitExec instead of CoalescePartitionsExec?

Not sure this one matters too much though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. In the current flow, when fetch is unconsumed it means add_merge_on_top didn't fire (either UnspecifiedDistribution or single partition). For single partition the CoalescePartitionsExec is essentially a no-op wrapper with fetch. For multi-partition cases, the fetch was already consumed by add_merge_on_top. So in practice I think the current behavior is correct, but I'll keep an eye on edge cases. Thanks for raising it!

@zhuqi-lucas zhuqi-lucas force-pushed the fix/enforce-dist-preserve-fetch branch from 8d2ee25 to 5b0a42e Compare March 27, 2026 06:41
zhuqi-lucas and others added 3 commits March 27, 2026 14:45
…ibution-changing operators

When `LimitPushdown` merges a `GlobalLimitExec` into a
`CoalescePartitionsExec` or `SortPreservingMergeExec` as a `fetch`
value, `EnforceDistribution` would strip and re-insert these operators
without preserving the fetch. This silently drops the LIMIT for queries
over multi-partition sources.

The fix captures the `fetch` in `remove_dist_changing_operators` and
threads it through `add_merge_on_top`, or re-introduces it as a
fallback when the merge operator is not re-inserted.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…re from fetch

- Capture only the LexOrdering from the stripped SortPreservingMergeExec
  instead of the entire plan node, then reconstruct a fresh SPM with the
  current (possibly rewritten) child plan in the fallback path. This
  avoids reusing a stale SPM that references an outdated subtree.
- Decouple spm_ordering capture from fetch — use `spm_ordering.is_none()`
  instead of gating on `fetch.is_none()`, so the SPM ordering is recorded
  even when an outer operator already set fetch.
- Remove unwrap() on with_fetch by constructing SPM directly.
- Fix test comment: fallback is CoalescePartitionsExec/SPM, not GlobalLimitExec.
- Add regression test: nested CoalescePartitionsExec(fetch=5) over
  SortPreservingMergeExec(fetch=3) now correctly preserves SPM ordering.
@zhuqi-lucas zhuqi-lucas force-pushed the fix/enforce-dist-preserve-fetch branch from 5b0a42e to d861c98 Compare March 27, 2026 06:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate optimizer Optimizer rules

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Bug: EnforceDistribution optimizer loses fetch (LIMIT) from CoalescePartitionsExec and SortPreservingMergeExec

3 participants