Skip to content

Commit a5ccd80

Browse files
committed
Fix metrics for repartition
1 parent d2278a9 commit a5ccd80

1 file changed

Lines changed: 53 additions & 1 deletion

File tree

  • datafusion/physical-plan/src/repartition

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1037,6 +1037,13 @@ impl ExecutionPlan for RepartitionExec {
10371037
if preserve_order {
10381038
// Store streams from all the input partitions:
10391039
// Each input partition gets its own spill reader to maintain proper FIFO ordering
1040+
//
1041+
// Use a separate metrics set for the intermediate PerPartitionStream
1042+
// instances. These feed into the StreamingMerge which is the actual
1043+
// output — only the merge's BaselineMetrics should contribute to the
1044+
// operator's reported output_rows. Without this, every row would be
1045+
// counted twice (once by PerPartitionStream, once by StreamingMerge).
1046+
let intermediate_metrics = ExecutionPlanMetricsSet::new();
10401047
let input_streams = rx
10411048
.into_iter()
10421049
.zip(spill_readers)
@@ -1049,7 +1056,7 @@ impl ExecutionPlan for RepartitionExec {
10491056
Arc::clone(&reservation),
10501057
spill_stream,
10511058
1, // Each receiver handles one input partition
1052-
BaselineMetrics::new(&metrics, partition),
1059+
BaselineMetrics::new(&intermediate_metrics, partition),
10531060
None, // subsequent merge sort already does batching https://github.com/apache/datafusion/blob/e4dcf0c85611ad0bd291f03a8e03fe56d773eb16/datafusion/physical-plan/src/sorts/merge.rs#L286
10541061
)) as SendableRecordBatchStream
10551062
})
@@ -2953,4 +2960,49 @@ mod test {
29532960
let exec = Arc::new(exec);
29542961
Arc::new(TestMemoryExec::update_cache(&exec))
29552962
}
2963+
2964+
/// Regression test: preserve_order repartition should not double-count
2965+
/// output rows. Before the fix, PerPartitionStream and StreamingMerge
2966+
/// both registered output_rows on the same MetricsSet, causing the
2967+
/// reported output_rows to be 2x the actual count.
2968+
#[tokio::test]
2969+
async fn test_preserve_order_output_rows_not_double_counted() -> Result<()> {
2970+
use datafusion_execution::TaskContext;
2971+
2972+
// Two sorted input partitions, 2 rows each (4 total)
2973+
let batch1 = record_batch!(("c0", UInt32, [1, 3])).unwrap();
2974+
let batch2 = record_batch!(("c0", UInt32, [2, 4])).unwrap();
2975+
let schema = batch1.schema();
2976+
let sort_exprs = sort_exprs(&schema);
2977+
2978+
let input_partitions = vec![vec![batch1], vec![batch2]];
2979+
let exec = TestMemoryExec::try_new(&input_partitions, Arc::clone(&schema), None)?
2980+
.try_with_sort_information(vec![sort_exprs.clone(), sort_exprs])?;
2981+
let exec = Arc::new(exec);
2982+
let exec = Arc::new(TestMemoryExec::update_cache(&exec));
2983+
2984+
let exec = RepartitionExec::try_new(exec, Partitioning::RoundRobinBatch(3))?
2985+
.with_preserve_order();
2986+
2987+
let task_ctx = Arc::new(TaskContext::default());
2988+
let mut total_rows = 0;
2989+
for i in 0..exec.partitioning().partition_count() {
2990+
let mut stream = exec.execute(i, Arc::clone(&task_ctx))?;
2991+
while let Some(result) = stream.next().await {
2992+
total_rows += result?.num_rows();
2993+
}
2994+
}
2995+
2996+
assert_eq!(total_rows, 4, "actual rows collected should be 4");
2997+
2998+
let metrics = exec.metrics().unwrap();
2999+
let reported_output_rows = metrics.output_rows().unwrap();
3000+
assert_eq!(
3001+
reported_output_rows, total_rows,
3002+
"metrics output_rows ({reported_output_rows}) should match \
3003+
actual rows collected ({total_rows}), not double-count"
3004+
);
3005+
3006+
Ok(())
3007+
}
29563008
}

0 commit comments

Comments
 (0)