Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 51 additions & 1 deletion datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,13 @@ impl ExecutionPlan for RepartitionExec {
if preserve_order {
// Store streams from all the input partitions:
// Each input partition gets its own spill reader to maintain proper FIFO ordering
//
// Use a separate metrics set for the intermediate PerPartitionStream
// instances. These feed into the StreamingMerge which is the actual
// output — only the merge's BaselineMetrics should contribute to the
// operator's reported output_rows. Without this, every row would be
// counted twice (once by PerPartitionStream, once by StreamingMerge).
let intermediate_metrics = ExecutionPlanMetricsSet::new();
let input_streams = rx
.into_iter()
.zip(spill_readers)
Expand All @@ -1049,7 +1056,7 @@ impl ExecutionPlan for RepartitionExec {
Arc::clone(&reservation),
spill_stream,
1, // Each receiver handles one input partition
BaselineMetrics::new(&metrics, partition),
BaselineMetrics::new(&intermediate_metrics, partition),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we are just going to ignore the metrics, should we just remove them from PerPartitionStream ?

It seems like using a local copy of ExecutionPlanMetrics means they metrics in the PerPartitionStream are no longer accessable. So we can probably just remove the metrics to make it clearer they aren't used

Copy link
Contributor Author

@xanderbailey xanderbailey Mar 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good question, we still use PerPartitionStream here

Ok(Box::pin(PerPartitionStream::new(
. We can make the metrics optional which is maybe more explicit?

None, // subsequent merge sort already does batching https://github.com/apache/datafusion/blob/e4dcf0c85611ad0bd291f03a8e03fe56d773eb16/datafusion/physical-plan/src/sorts/merge.rs#L286
)) as SendableRecordBatchStream
})
Expand Down Expand Up @@ -2953,4 +2960,47 @@ mod test {
let exec = Arc::new(exec);
Arc::new(TestMemoryExec::update_cache(&exec))
}

/// preserve_order repartition should not double-count
/// output rows.
#[tokio::test]
async fn test_preserve_order_output_rows_not_double_counted() -> Result<()> {
use datafusion_execution::TaskContext;

// Two sorted input partitions, 2 rows each (4 total)
let batch1 = record_batch!(("c0", UInt32, [1, 3])).unwrap();
let batch2 = record_batch!(("c0", UInt32, [2, 4])).unwrap();
let schema = batch1.schema();
let sort_exprs = sort_exprs(&schema);

let input_partitions = vec![vec![batch1], vec![batch2]];
let exec = TestMemoryExec::try_new(&input_partitions, Arc::clone(&schema), None)?
.try_with_sort_information(vec![sort_exprs.clone(), sort_exprs])?;
let exec = Arc::new(exec);
let exec = Arc::new(TestMemoryExec::update_cache(&exec));

let exec = RepartitionExec::try_new(exec, Partitioning::RoundRobinBatch(3))?
.with_preserve_order();

let task_ctx = Arc::new(TaskContext::default());
let mut total_rows = 0;
for i in 0..exec.partitioning().partition_count() {
let mut stream = exec.execute(i, Arc::clone(&task_ctx))?;
while let Some(result) = stream.next().await {
total_rows += result?.num_rows();
}
}

assert_eq!(total_rows, 4, "actual rows collected should be 4");

let metrics = exec.metrics().unwrap();
let reported_output_rows = metrics.output_rows().unwrap();
assert_eq!(
reported_output_rows, total_rows,
"metrics output_rows ({reported_output_rows}) should match \
actual rows collected ({total_rows}), not double-count"
);

Ok(())
}
}
Loading