Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions datafusion/core/tests/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3915,6 +3915,129 @@ fn test_replace_order_preserving_variants_with_fetch() -> Result<()> {
Ok(())
}

/// When `LimitPushdown` merges a `GlobalLimitExec` into `CoalescePartitionsExec`
/// as `fetch`, `remove_dist_changing_operators` must preserve that fetch value.
/// Otherwise, queries with LIMIT over multi-partition sources silently lose
/// the limit and return duplicate/extra rows.
///
/// Regression test for: https://github.com/apache/datafusion/issues/21169
#[test]
fn coalesce_partitions_fetch_preserved_by_enforce_distribution() -> Result<()> {
// Simulate what LimitPushdown produces:
// CoalescePartitionsExec(fetch=1)
// DataSourceExec (2 partitions)
let parquet = parquet_exec_multiple();
let coalesce_with_fetch: Arc<dyn ExecutionPlan> =
Arc::new(CoalescePartitionsExec::new(parquet).with_fetch(Some(1)));

let result = ensure_distribution_helper(coalesce_with_fetch, 10, false)?;

// The fetch=1 must survive. It can appear either as:
// - CoalescePartitionsExec: fetch=1 (re-inserted with fetch), or
// - SortPreservingMergeExec: fetch=1 (when a merge is re-added with fetch)
let plan_str = displayable(result.as_ref()).indent(true).to_string();
assert!(
plan_str.contains("fetch=1"),
"fetch=1 was lost after EnforceDistribution!\nPlan:\n{plan_str}"
);
Ok(())
}

/// Same as above, but with a sorted multi-partition source.
/// The fetch should be preserved on `SortPreservingMergeExec`.
#[test]
fn coalesce_partitions_fetch_preserved_sorted() -> Result<()> {
let schema = schema();
let sort_key: LexOrdering = [PhysicalSortExpr {
expr: col("c", &schema)?,
options: SortOptions::default(),
}]
.into();

// CoalescePartitionsExec(fetch=5) over sorted multi-partition source
let parquet = parquet_exec_multiple_sorted(vec![sort_key]);
let coalesce_with_fetch: Arc<dyn ExecutionPlan> =
Arc::new(CoalescePartitionsExec::new(parquet).with_fetch(Some(5)));

let result = ensure_distribution_helper(coalesce_with_fetch, 10, false)?;

let plan_str = displayable(result.as_ref()).indent(true).to_string();
assert!(
plan_str.contains("fetch=5"),
"fetch=5 was lost after EnforceDistribution!\nPlan:\n{plan_str}"
);
Ok(())
}

/// SortPreservingMergeExec with fetch should also be preserved.
#[test]
fn spm_fetch_preserved_by_enforce_distribution() -> Result<()> {
let schema = schema();
let sort_key: LexOrdering = [PhysicalSortExpr {
expr: col("c", &schema)?,
options: SortOptions::default(),
}]
.into();

// SortPreservingMergeExec(fetch=3) over sorted multi-partition source
let parquet = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
let spm_with_fetch: Arc<dyn ExecutionPlan> =
Arc::new(SortPreservingMergeExec::new(sort_key, parquet).with_fetch(Some(3)));

let result = ensure_distribution_helper(spm_with_fetch, 10, false)?;

let plan_str = displayable(result.as_ref()).indent(true).to_string();
assert!(
plan_str.contains("fetch=3"),
"fetch=3 was lost after EnforceDistribution!\nPlan:\n{plan_str}"
);
Ok(())
}

/// When both a `CoalescePartitionsExec(fetch=N)` and an inner
/// `SortPreservingMergeExec(fetch=M)` are stripped, the SPM ordering must
/// still be captured even though fetch was already set when the SPM is
/// encountered. The reconstructed fallback should use a
/// `SortPreservingMergeExec` (not a plain `CoalescePartitionsExec`) to
/// preserve sort semantics, and must wrap the *rewritten* child plan.
///
/// Regression test for: the `spm` capture was gated on `fetch.is_none()`,
/// causing it to be skipped when an outer operator already set `fetch`.
#[test]
fn nested_coalesce_over_spm_preserves_spm_ordering() -> Result<()> {
let schema = schema();
let sort_key: LexOrdering = [PhysicalSortExpr {
expr: col("c", &schema)?,
options: SortOptions::default(),
}]
.into();

// Build: CoalescePartitionsExec(fetch=5)
// -> SortPreservingMergeExec(fetch=3, [c ASC])
// -> sorted multi-partition parquet
let parquet = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
let spm: Arc<dyn ExecutionPlan> =
Arc::new(SortPreservingMergeExec::new(sort_key, parquet).with_fetch(Some(3)));
let coalesce_over_spm: Arc<dyn ExecutionPlan> =
Arc::new(CoalescePartitionsExec::new(spm).with_fetch(Some(5)));

let result = ensure_distribution_helper(coalesce_over_spm, 10, false)?;

let plan_str = displayable(result.as_ref()).indent(true).to_string();
// The minimum fetch (min(5,3)=3) must survive.
assert!(
plan_str.contains("fetch=3"),
"fetch=3 was lost after EnforceDistribution!\nPlan:\n{plan_str}"
);
// The result should use SortPreservingMergeExec (not CoalescePartitionsExec)
// to preserve the ordering semantics.
assert!(
plan_str.contains("SortPreservingMergeExec"),
"Expected SortPreservingMergeExec to preserve ordering, but got:\n{plan_str}"
);
Ok(())
}

/// When a parent requires SinglePartition and maintains input order, order-preserving
/// variants (e.g. SortPreservingMergeExec) should be kept so that ordering can
/// propagate to ancestors. Replacing them with CoalescePartitionsExec would destroy
Expand Down
98 changes: 78 additions & 20 deletions datafusion/physical-optimizer/src/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ use datafusion_expr::logical_plan::{Aggregate, JoinType};
use datafusion_physical_expr::expressions::{Column, NoOp};
use datafusion_physical_expr::utils::map_columns_before_projection;
use datafusion_physical_expr::{
EquivalenceProperties, PhysicalExpr, PhysicalExprRef, physical_exprs_equal,
EquivalenceProperties, LexOrdering, PhysicalExpr, PhysicalExprRef,
physical_exprs_equal,
};
use datafusion_physical_plan::ExecutionPlanProperties;
use datafusion_physical_plan::aggregates::{
Expand Down Expand Up @@ -967,9 +968,12 @@ fn preserving_order_enables_streaming(

/// # Returns
///
/// Updated node with an execution plan, where the desired single distribution
/// requirement is satisfied.
fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
/// Updated node with an execution plan, where desired single
/// distribution is satisfied by adding [`SortPreservingMergeExec`].
fn add_merge_on_top(
input: DistributionContext,
fetch: &mut Option<usize>,
) -> DistributionContext {
// Apply only when the partition count is larger than one.
if input.plan.output_partitioning().partition_count() > 1 {
// When there is an existing ordering, we preserve ordering
Expand All @@ -979,13 +983,16 @@ fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
// - Usage of order preserving variants is not desirable
// (determined by flag `config.optimizer.prefer_existing_sort`)
let new_plan = if let Some(req) = input.plan.output_ordering() {
Arc::new(SortPreservingMergeExec::new(
req.clone(),
Arc::clone(&input.plan),
)) as _
Arc::new(
SortPreservingMergeExec::new(req.clone(), Arc::clone(&input.plan))
.with_fetch(fetch.take()),
) as _
} else {
// If there is no input order, we can simply coalesce partitions:
Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _
Arc::new(
CoalescePartitionsExec::new(Arc::clone(&input.plan))
.with_fetch(fetch.take()),
) as _
};

DistributionContext::new(new_plan, true, vec![input])
Expand Down Expand Up @@ -1013,18 +1020,39 @@ fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
/// ```
fn remove_dist_changing_operators(
mut distribution_context: DistributionContext,
) -> Result<DistributionContext> {
) -> Result<(DistributionContext, Option<usize>, Option<LexOrdering>)> {
let mut fetch = None;
let mut spm_ordering: Option<LexOrdering> = None;
while is_repartition(&distribution_context.plan)
|| is_coalesce_partitions(&distribution_context.plan)
|| is_sort_preserving_merge(&distribution_context.plan)
{
// Track whether the stripped operator was a SortPreservingMergeExec,
// independently of whether it carries a fetch. We only need the
// ordering so we can reconstruct a fresh SPM later if needed.
if is_sort_preserving_merge(&distribution_context.plan)
&& spm_ordering.is_none()
&& let Some(spm) = distribution_context
.plan
.as_any()
.downcast_ref::<SortPreservingMergeExec>()
{
spm_ordering = Some(spm.expr().clone());
}
// Preserve any `fetch` (limit) that was pushed into a
// `SortPreservingMergeExec` or `CoalescePartitionsExec` by
// `LimitPushdown`. Without this, the limit would be lost when
// the operator is stripped.
if let Some(child_fetch) = distribution_context.plan.fetch() {
fetch = Some(fetch.map_or(child_fetch, |f: usize| f.min(child_fetch)));
}
// All of above operators have a single child. First child is only child.
// Remove any distribution changing operators at the beginning:
distribution_context = distribution_context.children.swap_remove(0);
// Note that they will be re-inserted later on if necessary or helpful.
}

Ok(distribution_context)
Ok((distribution_context, fetch, spm_ordering))
}

/// Updates the [`DistributionContext`] if preserving ordering while changing partitioning is not helpful or desirable.
Expand Down Expand Up @@ -1220,11 +1248,15 @@ pub fn ensure_distribution(
unbounded_and_pipeline_friendly || config.optimizer.prefer_existing_sort;

// Remove unnecessary repartition from the physical plan if any
let DistributionContext {
mut plan,
data,
children,
} = remove_dist_changing_operators(dist_context)?;
let (
DistributionContext {
mut plan,
data,
children,
},
mut fetch,
spm_ordering,
) = remove_dist_changing_operators(dist_context)?;

if let Some(exec) = plan.as_any().downcast_ref::<WindowAggExec>() {
if let Some(updated_window) = get_best_fitting_window(
Expand Down Expand Up @@ -1354,7 +1386,7 @@ pub fn ensure_distribution(
// Satisfy the distribution requirement if it is unmet.
match &requirement {
Distribution::SinglePartition => {
child = add_merge_on_top(child);
child = add_merge_on_top(child, &mut fetch);
}
Distribution::HashPartitioned(exprs) => {
// See https://github.com/apache/datafusion/issues/18341#issuecomment-3503238325 for background
Expand Down Expand Up @@ -1483,9 +1515,35 @@ pub fn ensure_distribution(
plan.with_new_children(children_plans)?
};

Ok(Transformed::yes(DistributionContext::new(
plan, data, children,
)))
let mut dist_context = DistributionContext::new(Arc::clone(&plan), data, children);

// If `fetch` was not consumed by `add_merge_on_top` (e.g. when the parent
// had `UnspecifiedDistribution` or the child already had a single
// partition), the limit that was originally embedded in a distribution
// changing operator would be silently lost. Re-introduce it so the
// query still returns the correct number of rows.
if let Some(fetch_val) = fetch.take() {
let limit_plan: Arc<dyn ExecutionPlan> = if let Some(ordering) = spm_ordering {
// Reconstruct a fresh SortPreservingMergeExec using the
// captured ordering and the *current* (possibly rewritten)
// child plan, rather than reusing the stale pre-optimization
// SPM which may reference an outdated subtree.
Arc::new(
SortPreservingMergeExec::new(ordering, Arc::clone(&dist_context.plan))
.with_fetch(Some(fetch_val)),
)
} else {
// The fetch came from a CoalescePartitionsExec. Re-introduce
// it as a CoalescePartitionsExec(fetch=N) wrapping the output.
Arc::new(
CoalescePartitionsExec::new(Arc::clone(&dist_context.plan))
.with_fetch(Some(fetch_val)),
)
};
dist_context = DistributionContext::new(limit_plan, data, vec![dist_context]);
}

Ok(Transformed::yes(dist_context))
}

/// Keeps track of distribution changing operators (like `RepartitionExec`,
Expand Down
Loading