Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
aa1abae
Avoid concat_batches
Dandandan Mar 14, 2026
622d251
Fmt
Dandandan Mar 14, 2026
75ea2a4
Avoid interleave for single batch
Dandandan Mar 14, 2026
d99cf53
Avoid interleave for single batch
Dandandan Mar 14, 2026
21eb596
WIP
Dandandan Mar 14, 2026
c4be04d
Merge branch 'main' into avoid_concat
Dandandan Mar 14, 2026
b808be6
WIP
Dandandan Mar 14, 2026
6275223
WIP
Dandandan Mar 14, 2026
3edef0a
WIP
Dandandan Mar 14, 2026
f715910
WIP
Dandandan Mar 14, 2026
22b11e9
WIP
Dandandan Mar 14, 2026
78196a7
WIP
Dandandan Mar 14, 2026
6f98d25
WIP
Dandandan Mar 14, 2026
c5ed684
WIP
Dandandan Mar 14, 2026
64c0e47
WIP
Dandandan Mar 14, 2026
b5380c9
WIP
Dandandan Mar 15, 2026
46c7771
Clippy
Dandandan Mar 15, 2026
56c04d5
WIP
Dandandan Mar 15, 2026
114e182
WIP
Dandandan Mar 15, 2026
aefebec
Use take instead of interleave for single-batch build side in hash join
Dandandan Mar 17, 2026
acde8fe
Simplify apply_null_mask and restore equal_rows_arr empty behavior
Dandandan Mar 17, 2026
e2043ad
Optimize equal_rows_arr with element-wise comparison
Dandandan Mar 17, 2026
97b0203
Simplify equal_rows_arr: remove dyn Fn, hoist indices, use bitwise AND
Dandandan Mar 17, 2026
9986fb0
Fix
Dandandan Mar 17, 2026
42aa7b2
Extend element-wise comparison to multi-batch builds in equal_rows_arr
Dandandan Mar 17, 2026
7f5db8a
Simplify: extract shared dispatch_elementwise macro, clean up multi-b…
Dandandan Mar 17, 2026
37531f6
Coalesce batches in CoalescePartitionsExec for downstream performance
Dandandan Mar 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 78 additions & 9 deletions datafusion/physical-plan/src/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,33 @@
//! into a single partition

use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::stream::{ObservedStream, RecordBatchReceiverStream};
use super::{
DisplayAs, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream,
Statistics,
DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase};
use crate::projection::{ProjectionExec, make_with_child};
use crate::sort_pushdown::SortOrderPushdownResult;
use crate::{DisplayFormatType, ExecutionPlan, Partitioning, check_if_same_properties};
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::TreeNodeRecursion;
use datafusion_common::{Result, assert_eq_or_internal_err, internal_err};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalExpr;
use futures::ready;
use futures::stream::{Stream, StreamExt};

/// Merge execution plan executes partitions in parallel and combines them into a single
/// partition. No guarantees are made about the order of the resulting partition.
Expand Down Expand Up @@ -214,6 +221,8 @@ impl ExecutionPlan for CoalescePartitionsExec {
let elapsed_compute = baseline_metrics.elapsed_compute().clone();
let _timer = elapsed_compute.timer();

let batch_size = context.session_config().batch_size();

// use a stream that allows each sender to put in at
// least one result in an attempt to maximize
// parallelism.
Expand All @@ -231,11 +240,23 @@ impl ExecutionPlan for CoalescePartitionsExec {
}

let stream = builder.build();
Ok(Box::pin(ObservedStream::new(
stream,
baseline_metrics,
self.fetch,
)))
// Coalesce small batches from multiple partitions into
// larger batches of target_batch_size. This improves
// downstream performance (e.g. hash join build side
// benefits from fewer, larger batches).
Ok(Box::pin(CoalescedStream {
input: Box::pin(ObservedStream::new(
stream,
baseline_metrics,
self.fetch,
)),
coalescer: LimitedBatchCoalescer::new(
self.schema(),
batch_size,
None, // fetch is already handled by ObservedStream
),
completed: false,
}))
}
}
}
Expand Down Expand Up @@ -352,6 +373,55 @@ impl ExecutionPlan for CoalescePartitionsExec {
}
}

/// Stream that coalesces small batches into larger ones using
/// [`LimitedBatchCoalescer`].
struct CoalescedStream {
input: SendableRecordBatchStream,
coalescer: LimitedBatchCoalescer,
completed: bool,
}

impl Stream for CoalescedStream {
type Item = Result<RecordBatch>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
loop {
if let Some(batch) = self.coalescer.next_completed_batch() {
return Poll::Ready(Some(Ok(batch)));
}
if self.completed {
return Poll::Ready(None);
}
let input_batch = ready!(self.input.poll_next_unpin(cx));
match input_batch {
None => {
self.completed = true;
self.coalescer.finish()?;
}
Some(Ok(batch)) => {
match self.coalescer.push_batch(batch)? {
PushBatchStatus::Continue => {}
PushBatchStatus::LimitReached => {
self.completed = true;
self.coalescer.finish()?;
}
}
}
other => return Poll::Ready(other),
}
}
}
}

impl RecordBatchStream for CoalescedStream {
fn schema(&self) -> SchemaRef {
self.coalescer.schema()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -383,10 +453,9 @@ mod tests {
1
);

// the result should contain 4 batches (one per input partition)
// the result should contain all rows (coalesced into fewer batches)
let iter = merge.execute(0, task_ctx)?;
let batches = common::collect(iter).await?;
assert_eq!(batches.len(), num_partitions);

// there should be a total of 400 rows (100 per each partition)
let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
Expand Down
80 changes: 52 additions & 28 deletions datafusion/physical-plan/src/joins/array_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,17 @@ impl ArrayMap {
max_val.wrapping_sub(min_val)
}

/// Creates a new [`ArrayMap`] from the given array of join keys.
/// Creates a new [`ArrayMap`] from per-batch arrays of join keys.
///
/// Note: This function processes only the non-null values in the input `array`,
/// Note: This function processes only the non-null values in the input arrays,
/// ignoring any rows where the key is `NULL`.
///
pub(crate) fn try_new(array: &ArrayRef, min_val: u64, max_val: u64) -> Result<Self> {
pub(crate) fn try_new(
arrays: &[&ArrayRef],
total_num_rows: usize,
min_val: u64,
max_val: u64,
) -> Result<Self> {
let range = max_val.wrapping_sub(min_val);
if range >= usize::MAX as u64 {
return internal_err!("ArrayMap key range is too large to be allocated.");
Expand All @@ -173,10 +178,16 @@ impl ArrayMap {
let mut next: Vec<u32> = vec![];
let mut num_of_distinct_key = 0;

let data_type = arrays
.first()
.map(|a| a.data_type().clone())
.unwrap_or(DataType::Int32);

downcast_supported_integer!(
array.data_type() => (
fill_data,
array,
&data_type => (
fill_data_batched,
arrays,
total_num_rows,
min_val,
&mut data,
&mut next,
Expand All @@ -192,8 +203,9 @@ impl ArrayMap {
})
}

fn fill_data<T: ArrowNumericType>(
array: &ArrayRef,
fn fill_data_batched<T: ArrowNumericType>(
arrays: &[&ArrayRef],
total_num_rows: usize,
offset_val: u64,
data: &mut [u32],
next: &mut Vec<u32>,
Expand All @@ -202,25 +214,32 @@ impl ArrayMap {
where
T::Native: AsPrimitive<u64>,
{
let arr = array.as_primitive::<T>();
// Iterate in reverse to maintain FIFO order when there are duplicate keys.
for (i, val) in arr.iter().enumerate().rev() {
if let Some(val) = val {
let key: u64 = val.as_();
let idx = key.wrapping_sub(offset_val) as usize;
if idx >= data.len() {
return internal_err!("failed build Array idx >= data.len()");
}

if data[idx] != 0 {
if next.is_empty() {
*next = vec![0; array.len()]
// We iterate batches in reverse, and within each batch iterate rows in reverse,
// using a flat index that spans all batches.
let mut flat_offset = total_num_rows;
for array in arrays.iter().rev() {
let arr = array.as_primitive::<T>();
flat_offset -= arr.len();
for (row_idx, val) in arr.iter().enumerate().rev() {
if let Some(val) = val {
let key: u64 = val.as_();
let idx = key.wrapping_sub(offset_val) as usize;
if idx >= data.len() {
return internal_err!("failed build Array idx >= data.len()");
}
next[i] = data[idx]
} else {
*num_of_distinct_key += 1;
let flat_idx = flat_offset + row_idx;

if data[idx] != 0 {
if next.is_empty() {
*next = vec![0; total_num_rows]
}
next[flat_idx] = data[idx]
} else {
*num_of_distinct_key += 1;
}
data[idx] = flat_idx as u32 + 1;
}
data[idx] = (i) as u32 + 1;
}
}
Ok(())
Expand Down Expand Up @@ -419,7 +438,7 @@ mod tests {
#[test]
fn test_array_map_limit_offset_duplicate_elements() -> Result<()> {
let build: ArrayRef = Arc::new(Int32Array::from(vec![1, 1, 2]));
let map = ArrayMap::try_new(&build, 1, 2)?;
let map = ArrayMap::try_new(&[&build], build.len(), 1, 2)?;
let probe = [Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef];

let mut prob_idx = Vec::new();
Expand Down Expand Up @@ -450,7 +469,7 @@ mod tests {
#[test]
fn test_array_map_with_limit_and_misses() -> Result<()> {
let build: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
let map = ArrayMap::try_new(&build, 1, 2)?;
let map = ArrayMap::try_new(&[&build], build.len(), 1, 2)?;
let probe = [Arc::new(Int32Array::from(vec![10, 1, 2])) as ArrayRef];

let (mut p_idx, mut b_idx) = (vec![], vec![]);
Expand Down Expand Up @@ -483,7 +502,7 @@ mod tests {
#[test]
fn test_array_map_with_build_duplicates_and_misses() -> Result<()> {
let build_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 1]));
let array_map = ArrayMap::try_new(&build_array, 1, 1)?;
let array_map = ArrayMap::try_new(&[&build_array], build_array.len(), 1, 1)?;
// prob: 10(m), 1(h1, h2), 20(m), 1(h1, h2)
let probe_array: ArrayRef = Arc::new(Int32Array::from(vec![10, 1, 20, 1]));
let prob_side_keys = [probe_array];
Expand Down Expand Up @@ -513,7 +532,12 @@ mod tests {
let min_val = -5_i128;
let max_val = 10_i128;

let array_map = ArrayMap::try_new(&build_array, min_val as u64, max_val as u64)?;
let array_map = ArrayMap::try_new(
&[&build_array],
build_array.len(),
min_val as u64,
max_val as u64,
)?;

// Probe array
let probe_array: ArrayRef = Arc::new(Int64Array::from(vec![0, -5, 10, -1]));
Expand Down
Loading
Loading