Skip to content

Comments

deps: DataFusion 52.0.0 migration (SchemaAdapter changes, etc.) [iceberg]#3536

Open
comphead wants to merge 38 commits intoapache:mainfrom
comphead:df52
Open

deps: DataFusion 52.0.0 migration (SchemaAdapter changes, etc.) [iceberg]#3536
comphead wants to merge 38 commits intoapache:mainfrom
comphead:df52

Conversation

@comphead
Copy link
Contributor

@comphead comphead commented Feb 16, 2026

Which issue does this PR close?

Closes #3046 .
Closes #3515

Rationale for this change

What changes are included in this PR?

How are these changes tested?

@andygrove
Copy link
Member

@sqlbenchmark run tpch

1 similar comment
@andygrove
Copy link
Member

@sqlbenchmark run tpch

@andygrove
Copy link
Member

andygrove commented Feb 16, 2026

The changes in this PR are somehow causing large differences in memory usage compared to a recent build from the main branch.

Metric Baseline DF52 Delta
peak_JVMHeapMemory 6.38 GB 10.60 GB +4.22 GB (+66%)
peak_JVMOffHeapMemory 148.01 MB 147.28 MB -0.73 MB (~0%)
peak_OnHeapExecutionMemory 0 B 0 B --
peak_OffHeapExecutionMemory 14.20 GB 5.30 GB -8.90 GB (-63%)
peak_OnHeapUnifiedMemory 74.86 MB 63.63 MB -11.24 MB (-15%)
peak_OffHeapUnifiedMemory 14.20 GB 5.30 GB -8.90 GB (-63%)

DF52 shows a significant shift from off-heap execution memory to JVM heap. Peak off-heap execution dropped from 14.20 GB to 5.30 GB (-63%), while JVM heap rose from 6.38 GB to 10.60 GB (+66%). Net combined peak memory actually decreased (~20.6 GB to ~15.9 GB).

@comphead
Copy link
Contributor Author

The changes in this PR are somehow causing large differences in memory usage compared to a recent build from the main branch.

Metric Baseline DF52 Delta
peak_JVMHeapMemory 6.38 GB 10.60 GB +4.22 GB (+66%)
peak_JVMOffHeapMemory 148.01 MB 147.28 MB -0.73 MB (~0%)
peak_OnHeapExecutionMemory 0 B 0 B --
peak_OffHeapExecutionMemory 14.20 GB 5.30 GB -8.90 GB (-63%)
peak_OnHeapUnifiedMemory 74.86 MB 63.63 MB -11.24 MB (-15%)
peak_OffHeapUnifiedMemory 14.20 GB 5.30 GB -8.90 GB (-63%)
DF52 shows a significant shift from off-heap execution memory to JVM heap. Peak off-heap execution dropped from 14.20 GB to 5.30 GB (-63%), while JVM heap rose from 6.38 GB to 10.60 GB (+66%). Net combined peak memory actually decreased (~20.6 GB to ~15.9 GB).

The total mem used seems to be less? +4 for Heap and -8 for on Heap, so -4 total. The JVM growth looks weird as DF52 has very few JVM changes. Is the baseline for latest main?

@sqlbenchmark
Copy link

Comet TPC-H Benchmark Results

Baseline: main (e22f35c)
PR: 749a407 - DataFusion 52 migration
Scale Factor: SF100
Iterations: 1

Query Times

Query Baseline Avg (s) Baseline Best (s) PR Avg (s) PR Best (s) Change (Avg) Change (Best)
Q1 10.64 10.64 10.75 10.75 ⚪ +1.1% ⚪ +1.1%
Q2 5.90 5.90 5.76 5.76 ⚪ -2.4% ⚪ -2.4%
Q3 9.79 9.79 9.81 9.81 ⚪ +0.2% ⚪ +0.2%
Q4 11.62 11.62 13.99 13.99 🔴 +20.4% 🔴 +20.4%
Q5 19.16 19.16 18.81 18.81 ⚪ -1.8% ⚪ -1.8%
Q6 2.52 2.52 2.52 2.52 ⚪ -0.1% ⚪ -0.1%
Q7 12.44 12.44 11.79 11.79 🟢 -5.2% 🟢 -5.2%
Q8 25.43 25.43 24.39 24.39 ⚪ -4.1% ⚪ -4.1%
Q9 39.40 39.40 37.94 37.94 ⚪ -3.7% ⚪ -3.7%
Q10 10.37 10.37 10.62 10.62 ⚪ +2.4% ⚪ +2.4%
Q11 4.50 4.50 4.80 4.80 🔴 +6.5% 🔴 +6.5%
Q12 7.07 7.07 6.41 6.41 🟢 -9.3% 🟢 -9.3%
Q13 7.36 7.36 7.40 7.40 ⚪ +0.6% ⚪ +0.6%
Q14 3.59 3.59 3.27 3.27 🟢 -8.9% 🟢 -8.9%
Q15 7.21 7.21 7.32 7.32 ⚪ +1.4% ⚪ +1.4%
Q16 4.73 4.73 4.93 4.93 ⚪ +4.0% ⚪ +4.0%
Q17 32.63 32.63 32.55 32.55 ⚪ -0.2% ⚪ -0.2%
Q18 33.85 33.85 34.62 34.62 ⚪ +2.3% ⚪ +2.3%
Q19 6.85 6.85 6.01 6.01 🟢 -12.2% 🟢 -12.2%
Q20 6.75 6.75 6.12 6.12 🟢 -9.3% 🟢 -9.3%
Q21 46.71 46.71 51.97 51.97 🔴 +11.3% 🔴 +11.3%
Q22 4.92 4.92 5.41 5.41 🔴 +9.9% 🔴 +9.9%
Total 313.45 313.45 317.18 317.18 ⚪ +1.2% ⚪ +1.2%
Spark Configuration
Setting Value
Spark Master local[*]
Driver Memory 32G
Driver Cores 8
Executor Memory 32G
Executor Cores 8
Off-Heap Enabled true
Off-Heap Size 24g
Shuffle Manager CometShuffleManager
Comet Replace SMJ true

Automated benchmark run by dfbench

@andygrove
Copy link
Member

@sqlbenchmark run tpch --iterations 3

@andygrove
Copy link
Member

andygrove commented Feb 16, 2026

not sure how true this is yet, but Claude claims that we there may be a performance regression related to Iceberg scans:

In iceberg_scan.rs, the old code cached (SchemaRef, Arc<dyn SchemaMapper>) and reused it across batches with the same schema. The new adapt_batch_with_expressions() creates a new SparkPhysicalExprAdapterFactory + adapter + expression trees for every batch that needs adaptation. This adds per-batch allocation churn, though the actual data buffers are shared via Arc.

@andygrove
Copy link
Member

The changes in this PR are somehow causing large differences in memory usage compared to a recent build from the main branch.
Metric Baseline DF52 Delta
peak_JVMHeapMemory 6.38 GB 10.60 GB +4.22 GB (+66%)
peak_JVMOffHeapMemory 148.01 MB 147.28 MB -0.73 MB (~0%)
peak_OnHeapExecutionMemory 0 B 0 B --
peak_OffHeapExecutionMemory 14.20 GB 5.30 GB -8.90 GB (-63%)
peak_OnHeapUnifiedMemory 74.86 MB 63.63 MB -11.24 MB (-15%)
peak_OffHeapUnifiedMemory 14.20 GB 5.30 GB -8.90 GB (-63%)
DF52 shows a significant shift from off-heap execution memory to JVM heap. Peak off-heap execution dropped from 14.20 GB to 5.30 GB (-63%), while JVM heap rose from 6.38 GB to 10.60 GB (+66%). Net combined peak memory actually decreased (~20.6 GB to ~15.9 GB).

The total mem used seems to be less? +4 for Heap and -8 for on Heap, so -4 total. The JVM growth looks weird as DF52 has very few JVM changes. Is the baseline for latest main?

The baseline was a build from Friday.

Claude is suggesting this as the root cause:

"The memory shift is primarily driven by DataFusion 52's internal changes to how operators manage buffers (batch coalescing, expression evaluation), combined with these buffers likely not being tracked through Comet's CometUnifiedMemoryPool → JNI → Spark accounting path. The result is that native memory still exists but is "invisible" to Spark's off-heap accounting, while Spark's JVM-side operations expand to fill the perceived available space."

@sqlbenchmark
Copy link

Comet TPC-H Benchmark Results

Baseline: main (e22f35c)
PR: 749a407 - DataFusion 52 migration
Scale Factor: SF100
Iterations: 3

Query Times

Query Baseline Avg (s) Baseline Best (s) PR Avg (s) PR Best (s) Change (Avg) Change (Best)
Q1 10.03 9.46 10.02 9.43 ⚪ -0.2% ⚪ -0.3%
Q2 5.43 5.11 5.34 4.98 ⚪ -1.7% ⚪ -2.7%
Q3 9.96 9.79 10.01 9.82 ⚪ +0.5% ⚪ +0.3%
Q4 10.95 10.72 13.36 12.60 🔴 +21.9% 🔴 +17.6%
Q5 19.33 19.03 19.36 18.80 ⚪ +0.2% ⚪ -1.2%
Q6 2.46 2.32 2.51 2.32 ⚪ +1.8% ⚪ -0.1%
Q7 12.46 12.06 12.24 11.74 ⚪ -1.7% ⚪ -2.7%
Q8 25.65 25.10 25.47 24.41 ⚪ -0.7% ⚪ -2.7%
Q9 41.14 39.22 40.66 38.29 ⚪ -1.2% ⚪ -2.3%
Q10 10.63 10.18 11.11 10.84 ⚪ +4.5% 🔴 +6.5%
Q11 4.41 4.38 4.75 4.52 🔴 +7.7% ⚪ +3.2%
Q12 7.37 7.25 6.60 6.44 🟢 -10.5% 🟢 -11.1%
Q13 7.43 7.29 7.51 7.23 ⚪ +1.2% ⚪ -0.8%
Q14 3.56 3.51 3.39 3.27 ⚪ -4.9% 🟢 -7.0%
Q15 7.23 7.07 7.35 7.13 ⚪ +1.7% ⚪ +0.9%
Q16 4.39 4.25 4.36 4.27 ⚪ -0.7% ⚪ +0.4%
Q17 33.93 32.87 34.23 32.74 ⚪ +0.9% ⚪ -0.4%
Q18 34.33 33.82 35.75 35.14 ⚪ +4.2% ⚪ +3.9%
Q19 7.00 6.73 6.48 6.25 🟢 -7.4% 🟢 -7.1%
Q20 6.68 6.52 6.47 6.33 ⚪ -3.1% ⚪ -3.0%
Q21 46.87 46.40 52.85 51.53 🔴 +12.8% 🔴 +11.1%
Q22 5.02 5.00 5.52 5.42 🔴 +10.0% 🔴 +8.4%
Total 316.29 308.08 325.36 313.50 ⚪ +2.9% ⚪ +1.8%
Spark Configuration
Setting Value
Spark Master local[*]
Driver Memory 32G
Driver Cores 8
Executor Memory 32G
Executor Cores 8
Off-Heap Enabled true
Off-Heap Size 24g
Shuffle Manager CometShuffleManager
Comet Replace SMJ true

Automated benchmark run by dfbench

@andygrove
Copy link
Member

I ran the queries individually and compared memory usage between main and this PR.

Key findings from Claude analysis of the results:

  1. The memory shift is NOT consistent — it's highly query-dependent. Some queries see off-heap decrease (Q4, Q10, Q11), others see large increases (Q7, Q12, Q13). There is no single directional trend.
  2. Off-heap and JVM heap sometimes move inversely. Q11 is the clearest example: off-heap dropped 56.4% while JVM heap increased 127%. Q10 shows the same pattern (off-heap -72.7%, heap +36.5%). DF52 appears to shift work between native and JVM memory for certain query shapes.
  3. Join-heavy queries are most affected. The queries with the largest memory changes (Q4, Q7, Q10, Q11, Q12, Q13, Q21) all involve complex joins, correlated subqueries, or GROUP BY with HAVING. Simpler scan-and-aggregate queries (Q1, Q6) are stable. This points to changes in DataFusion 52's hash join/aggregate memory management.

@andygrove
Copy link
Member

I ran a memory profiling comparison to main with replaceSortMergeJoin=false as well.

Query BL Time (s) DF52 Time (s) Time Delta BL OffHeap Exec DF52 OffHeap Exec OffHeap Delta BL JVM Heap DF52 JVM Heap Heap Delta
Q1 82.6 82.5 -0.2% 514 MB 514 MB 0.0% 5.53 GB 4.15 GB -25.0%
Q2 15.9 15.6 -1.8% n/a n/a n/a n/a 2.84 GB n/a
Q3 30.0 30.1 +0.3% 3.75 GB 1.69 GB -54.9% 4.37 GB 3.55 GB -18.8%
Q4 24.9 25.1 +0.9% 16.00 GB 16.00 GB 0.0% 2.52 GB 2.15 GB -14.8%
Q5 58.2 57.3 -1.4% 3.51 GB 3.00 GB -14.5% 3.72 GB 5.55 GB +49.3%
Q6 4.8 5.0 +5.3% n/a n/a n/a n/a n/a n/a
Q7 28.5 27.7 -2.5% 2.62 GB n/a n/a 6.55 GB 6.76 GB +3.3%
Q8 42.8 41.9 -2.1% 657 MB n/a n/a 7.01 GB 6.12 GB -12.6%
Q9 86.1 86.6 +0.6% 3.00 GB 3.00 GB 0.0% 5.39 GB 6.56 GB +21.7%
Q10 24.1 25.0 +3.7% 3.04 GB 1.43 GB -53.0% 2.29 GB 1.25 GB -45.5%
Q11 17.2 17.4 +1.2% 1.00 GB 2.00 GB +100.0% 2.82 GB 3.79 GB +34.5%
Q12 16.7 16.3 -2.7% 644 MB n/a n/a 4.37 GB n/a n/a
Q13 25.2 24.8 -1.7% 5.16 GB 5.13 GB -0.5% 3.05 GB 4.06 GB +33.1%
Q14 8.6 8.7 +0.9% n/a n/a n/a n/a n/a n/a
Q15 19.6 19.5 -0.4% 460 MB 520 MB +12.9% 1.81 GB 2.13 GB +17.2%
Q16 11.4 10.6 -6.7% n/a 2.00 GB n/a n/a 3.69 GB n/a
Q17 77.7 78.0 +0.4% 2.03 GB 2.02 GB -0.8% 4.44 GB 5.24 GB +17.9%
Q18 82.7 82.1 -0.8% 3.19 GB 4.75 GB +49.0% 5.96 GB 5.91 GB -0.9%
Q19 9.6 9.6 -0.5% n/a n/a n/a n/a n/a n/a
Q20 14.4 14.5 +0.8% n/a n/a n/a 3.44 GB n/a n/a
Q21 79.1 78.6 -0.7% 5.13 GB 5.25 GB +2.4% 4.58 GB 4.93 GB +7.7%
Q22 12.6 12.6 -0.4% n/a n/a n/a 1.95 GB n/a n/a
Total 772.6 769.4 -0.4%

Comment on lines 731 to 739
// let result_type = match &result {
// ColumnarValue::Array(array) => array.data_type().clone(),
// ColumnarValue::Scalar(scalar) => scalar.data_type(),
// };

// println!(
// "spark_cast: {} -> {} (requested: {})",
// input_type, result_type, data_type
// );
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be removed now?

Comment on lines 955 to 958
// println!("cast_array BEFORE postprocess:");
// println!(" from_type: {}", from_type);
// println!(" to_type: {}", to_type);
// println!(" intermediate data_type: {}", x.data_type());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug logging can be removed?

}

fn datetime_cast_err(value: i64) -> ArrowError {
println!("{}", std::backtrace::Backtrace::force_capture());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug logging can be removed?

}
_ => {
// Not supported
panic!(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should return an Err here

}
_ => {
// Not supported
panic!(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should return an Err here

.with_table_partition_cols(partition_fields)
/// Wraps a `SendableRecordBatchStream` to print each batch as it flows through.
/// Returns a new `SendableRecordBatchStream` that yields the same batches.
pub fn dbg_batch_stream(stream: SendableRecordBatchStream) -> SendableRecordBatchStream {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function is unused now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found it useful for outputting a stream, preserving the original stream to the output. it is not used now, but would be handy for debug purposes.

@mbutrovich mbutrovich changed the title chore: [df52] migration deps: DataFusion 52.0.0 migration (SchemaAdapter changes, etc.) Feb 18, 2026
Comment on lines 157 to 160
let millis_values: TimestampMillisecondArray = micros_array
.iter()
.map(|opt| opt.map(|v| v / 1000))
.collect();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let millis_values: TimestampMillisecondArray = micros_array
.iter()
.map(|opt| opt.map(|v| v / 1000))
.collect();
let millis_values: TimestampMillisecondArray =
arrow::compute::kernels::arity::unary(micros_array, |v| v / 1000);

Comment on lines 85 to 88
let micros_array: TimestampMicrosecondArray = millis_array
.iter()
.map(|opt| opt.map(|v| v * 1000))
.collect();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could use unary kernel instead

inner: S,
schema: SchemaRef,
/// Cached schema adapter with its source schema. Created when schema changes.
cached_adapter: Option<(SchemaRef, Arc<dyn SchemaMapper>)>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is caching no longer possible with DF52?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checking this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added back

+ case s: FileSourceScanExec if s.bucketedScan => s
+ case s: CometScanExec if s.bucketedScan => s
+ case s: CometNativeScanExec if s.bucketedScan => s
+ }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this file just has a whitespace change, can you get the version of the file from main to reduce the diff?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't see any changes other than whitespace here. Can we remove this from the PR?

@comphead
Copy link
Contributor Author

@andygrove @mbutrovich
May I ask you for the second round of review?

/// This handles the common case where the field names differ (e.g., Parquet uses "key_value"
/// while Spark uses "entries") but the key/value types are the same.
/// Returns None if the types are not compatible or not Map types.
fn adapt_map_to_schema(column: &ArrayRef, target_type: &DataType) -> Option<ArrayRef> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to handle nested maps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually thinking this should be part of schema rewrite rather than standalone cast. checking it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed it, as it repeated another cast


test("cast TimestampType to LongType") {
castTest(generateTimestampsExtended(), DataTypes.LongType)
// currently fails on timestamps outside chrono
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a follow on issue for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double checking if it still the issue after timestamp conversion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

passed

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @comphead. This LGTM other than a couple of minor comments I added.

I am not sure about the Iceberg changes so will hold off on approving until @mbutrovich can review those in more detail.

@mbutrovich mbutrovich self-requested a review February 20, 2026 19:22
comphead and others added 24 commits February 24, 2026 13:24
* fix: [df52] schema pruning crash on complex nested types

When `data_schema` is provided but `projection_vector` is None (the
NativeBatchReader / native_iceberg_compat path), the base schema was
incorrectly set to the pruned `required_schema`. This caused DataFusion
to think the table had only the pruned columns, leading to column index
misalignment in PhysicalExprAdapter. For example, reading "friends" at
logical index 0 would map to physical index 0 ("id") instead of the
correct index 4.

Fix: when `data_schema` is provided without a `projection_vector`,
compute the projection by mapping required field names to their indices
in the full data schema. Also harden `wrap_all_type_mismatches` to use
name-based lookup for physical fields instead of positional index.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: handle field ID mapping in projection computation

When computing a name-based projection from required_schema to
data_schema, fall back to using required_schema directly when not
all fields can be matched by name. This handles Parquet field ID
mapping where column names differ between the read schema and file
schema.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
)

Add IgnoreCometSuite to ParquetVariantShreddingSuite in the 4.0.1 diff.
VariantType shredding is a Spark 4.0 feature that Comet does not yet
support (apache#2209). VariantShreddingSuite was already skipped but
ParquetVariantShreddingSuite was missed, causing test failures in CI.

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Default values doesn't work with native_datafusion DataFusion 52 migration

4 participants