Skip to content

Commit c8cc0b3

Browse files
zhuqi-lucasclaude
andcommitted
fix: EnforceDistribution optimizer preserves fetch (LIMIT) from distribution-changing operators
When `LimitPushdown` merges a `GlobalLimitExec` into a `CoalescePartitionsExec` or `SortPreservingMergeExec` as a `fetch` value, `EnforceDistribution` would strip and re-insert these operators without preserving the fetch. This silently drops the LIMIT for queries over multi-partition sources. The fix captures the `fetch` in `remove_dist_changing_operators` and threads it through `add_merge_on_top`, or re-introduces it as a fallback when the merge operator is not re-inserted. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 4e2e4e8 commit c8cc0b3

2 files changed

Lines changed: 147 additions & 19 deletions

File tree

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3915,6 +3915,85 @@ fn test_replace_order_preserving_variants_with_fetch() -> Result<()> {
39153915
Ok(())
39163916
}
39173917

3918+
/// When `LimitPushdown` merges a `GlobalLimitExec` into `CoalescePartitionsExec`
3919+
/// as `fetch`, `remove_dist_changing_operators` must preserve that fetch value.
3920+
/// Otherwise, queries with LIMIT over multi-partition sources silently lose
3921+
/// the limit and return duplicate/extra rows.
3922+
///
3923+
/// Regression test for: https://github.com/apache/datafusion/issues/21169
3924+
#[test]
3925+
fn coalesce_partitions_fetch_preserved_by_enforce_distribution() -> Result<()> {
3926+
// Simulate what LimitPushdown produces:
3927+
// CoalescePartitionsExec(fetch=1)
3928+
// DataSourceExec (2 partitions)
3929+
let parquet = parquet_exec_multiple();
3930+
let coalesce_with_fetch: Arc<dyn ExecutionPlan> =
3931+
Arc::new(CoalescePartitionsExec::new(parquet).with_fetch(Some(1)));
3932+
3933+
let result = ensure_distribution_helper(coalesce_with_fetch, 10, false)?;
3934+
3935+
// The fetch=1 must survive. It can appear either as:
3936+
// - CoalescePartitionsExec: fetch=1 (re-inserted with fetch), or
3937+
// - GlobalLimitExec: skip=0, fetch=1 (fallback when merge wasn't re-added)
3938+
let plan_str = displayable(result.as_ref()).indent(true).to_string();
3939+
assert!(
3940+
plan_str.contains("fetch=1"),
3941+
"fetch=1 was lost after EnforceDistribution!\nPlan:\n{plan_str}"
3942+
);
3943+
Ok(())
3944+
}
3945+
3946+
/// Same as above, but with a sorted multi-partition source.
3947+
/// The fetch should be preserved on `SortPreservingMergeExec`.
3948+
#[test]
3949+
fn coalesce_partitions_fetch_preserved_sorted() -> Result<()> {
3950+
let schema = schema();
3951+
let sort_key: LexOrdering = [PhysicalSortExpr {
3952+
expr: col("c", &schema)?,
3953+
options: SortOptions::default(),
3954+
}]
3955+
.into();
3956+
3957+
// CoalescePartitionsExec(fetch=5) over sorted multi-partition source
3958+
let parquet = parquet_exec_multiple_sorted(vec![sort_key]);
3959+
let coalesce_with_fetch: Arc<dyn ExecutionPlan> =
3960+
Arc::new(CoalescePartitionsExec::new(parquet).with_fetch(Some(5)));
3961+
3962+
let result = ensure_distribution_helper(coalesce_with_fetch, 10, false)?;
3963+
3964+
let plan_str = displayable(result.as_ref()).indent(true).to_string();
3965+
assert!(
3966+
plan_str.contains("fetch=5"),
3967+
"fetch=5 was lost after EnforceDistribution!\nPlan:\n{plan_str}"
3968+
);
3969+
Ok(())
3970+
}
3971+
3972+
/// SortPreservingMergeExec with fetch should also be preserved.
3973+
#[test]
3974+
fn spm_fetch_preserved_by_enforce_distribution() -> Result<()> {
3975+
let schema = schema();
3976+
let sort_key: LexOrdering = [PhysicalSortExpr {
3977+
expr: col("c", &schema)?,
3978+
options: SortOptions::default(),
3979+
}]
3980+
.into();
3981+
3982+
// SortPreservingMergeExec(fetch=3) over sorted multi-partition source
3983+
let parquet = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
3984+
let spm_with_fetch: Arc<dyn ExecutionPlan> =
3985+
Arc::new(SortPreservingMergeExec::new(sort_key, parquet).with_fetch(Some(3)));
3986+
3987+
let result = ensure_distribution_helper(spm_with_fetch, 10, false)?;
3988+
3989+
let plan_str = displayable(result.as_ref()).indent(true).to_string();
3990+
assert!(
3991+
plan_str.contains("fetch=3"),
3992+
"fetch=3 was lost after EnforceDistribution!\nPlan:\n{plan_str}"
3993+
);
3994+
Ok(())
3995+
}
3996+
39183997
/// When a parent requires SinglePartition and maintains input order, order-preserving
39193998
/// variants (e.g. SortPreservingMergeExec) should be kept so that ordering can
39203999
/// propagate to ancestors. Replacing them with CoalescePartitionsExec would destroy

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 68 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -967,9 +967,12 @@ fn preserving_order_enables_streaming(
967967

968968
/// # Returns
969969
///
970-
/// Updated node with an execution plan, where the desired single distribution
971-
/// requirement is satisfied.
972-
fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
970+
/// Updated node with an execution plan, where desired single
971+
/// distribution is satisfied by adding [`SortPreservingMergeExec`].
972+
fn add_merge_on_top(
973+
input: DistributionContext,
974+
fetch: &mut Option<usize>,
975+
) -> DistributionContext {
973976
// Apply only when the partition count is larger than one.
974977
if input.plan.output_partitioning().partition_count() > 1 {
975978
// When there is an existing ordering, we preserve ordering
@@ -979,13 +982,16 @@ fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
979982
// - Usage of order preserving variants is not desirable
980983
// (determined by flag `config.optimizer.prefer_existing_sort`)
981984
let new_plan = if let Some(req) = input.plan.output_ordering() {
982-
Arc::new(SortPreservingMergeExec::new(
983-
req.clone(),
984-
Arc::clone(&input.plan),
985-
)) as _
985+
Arc::new(
986+
SortPreservingMergeExec::new(req.clone(), Arc::clone(&input.plan))
987+
.with_fetch(fetch.take()),
988+
) as _
986989
} else {
987990
// If there is no input order, we can simply coalesce partitions:
988-
Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _
991+
Arc::new(
992+
CoalescePartitionsExec::new(Arc::clone(&input.plan))
993+
.with_fetch(fetch.take()),
994+
) as _
989995
};
990996

991997
DistributionContext::new(new_plan, true, vec![input])
@@ -1011,20 +1017,39 @@ fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
10111017
/// ```text
10121018
/// "DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
10131019
/// ```
1020+
#[expect(clippy::type_complexity)]
10141021
fn remove_dist_changing_operators(
10151022
mut distribution_context: DistributionContext,
1016-
) -> Result<DistributionContext> {
1023+
) -> Result<(
1024+
DistributionContext,
1025+
Option<usize>,
1026+
Option<Arc<dyn ExecutionPlan>>,
1027+
)> {
1028+
let mut fetch = None;
1029+
let mut spm: Option<Arc<dyn ExecutionPlan>> = None;
10171030
while is_repartition(&distribution_context.plan)
10181031
|| is_coalesce_partitions(&distribution_context.plan)
10191032
|| is_sort_preserving_merge(&distribution_context.plan)
10201033
{
1034+
// Preserve any `fetch` (limit) that was pushed into a
1035+
// `SortPreservingMergeExec` or `CoalescePartitionsExec` by
1036+
// `LimitPushdown`. Without this, the limit would be lost when
1037+
// the operator is stripped.
1038+
if let Some(child_fetch) = distribution_context.plan.fetch() {
1039+
if is_sort_preserving_merge(&distribution_context.plan)
1040+
&& fetch.is_none()
1041+
{
1042+
spm = Some(Arc::clone(&distribution_context.plan));
1043+
}
1044+
fetch = Some(fetch.map_or(child_fetch, |f: usize| f.min(child_fetch)));
1045+
}
10211046
// All of above operators have a single child. First child is only child.
10221047
// Remove any distribution changing operators at the beginning:
10231048
distribution_context = distribution_context.children.swap_remove(0);
10241049
// Note that they will be re-inserted later on if necessary or helpful.
10251050
}
10261051

1027-
Ok(distribution_context)
1052+
Ok((distribution_context, fetch, spm))
10281053
}
10291054

10301055
/// Updates the [`DistributionContext`] if preserving ordering while changing partitioning is not helpful or desirable.
@@ -1220,11 +1245,15 @@ pub fn ensure_distribution(
12201245
unbounded_and_pipeline_friendly || config.optimizer.prefer_existing_sort;
12211246

12221247
// Remove unnecessary repartition from the physical plan if any
1223-
let DistributionContext {
1224-
mut plan,
1225-
data,
1226-
children,
1227-
} = remove_dist_changing_operators(dist_context)?;
1248+
let (
1249+
DistributionContext {
1250+
mut plan,
1251+
data,
1252+
children,
1253+
},
1254+
mut fetch,
1255+
spm,
1256+
) = remove_dist_changing_operators(dist_context)?;
12281257

12291258
if let Some(exec) = plan.as_any().downcast_ref::<WindowAggExec>() {
12301259
if let Some(updated_window) = get_best_fitting_window(
@@ -1354,7 +1383,7 @@ pub fn ensure_distribution(
13541383
// Satisfy the distribution requirement if it is unmet.
13551384
match &requirement {
13561385
Distribution::SinglePartition => {
1357-
child = add_merge_on_top(child);
1386+
child = add_merge_on_top(child, &mut fetch);
13581387
}
13591388
Distribution::HashPartitioned(exprs) => {
13601389
// See https://github.com/apache/datafusion/issues/18341#issuecomment-3503238325 for background
@@ -1483,9 +1512,29 @@ pub fn ensure_distribution(
14831512
plan.with_new_children(children_plans)?
14841513
};
14851514

1486-
Ok(Transformed::yes(DistributionContext::new(
1487-
plan, data, children,
1488-
)))
1515+
let mut dist_context = DistributionContext::new(Arc::clone(&plan), data, children);
1516+
1517+
// If `fetch` was not consumed by `add_merge_on_top` (e.g. when the parent
1518+
// had `UnspecifiedDistribution` or the child already had a single
1519+
// partition), the limit that was originally embedded in a distribution
1520+
// changing operator would be silently lost. Re-introduce it so the
1521+
// query still returns the correct number of rows.
1522+
if let Some(fetch_val) = fetch.take() {
1523+
let limit_plan: Arc<dyn ExecutionPlan> = if let Some(spm) = spm {
1524+
// Re-insert the original SortPreservingMergeExec with fetch.
1525+
spm.with_fetch(Some(fetch_val)).unwrap()
1526+
} else {
1527+
// The fetch came from a CoalescePartitionsExec. Re-introduce
1528+
// it as a CoalescePartitionsExec(fetch=N) wrapping the output.
1529+
Arc::new(
1530+
CoalescePartitionsExec::new(Arc::clone(&dist_context.plan))
1531+
.with_fetch(Some(fetch_val)),
1532+
)
1533+
};
1534+
dist_context = DistributionContext::new(limit_plan, data, vec![dist_context]);
1535+
}
1536+
1537+
Ok(Transformed::yes(dist_context))
14891538
}
14901539

14911540
/// Keeps track of distribution changing operators (like `RepartitionExec`,

0 commit comments

Comments
 (0)