deps: DataFusion 52.0.0 migration (SchemaAdapter changes, etc.) [iceberg]#3536
deps: DataFusion 52.0.0 migration (SchemaAdapter changes, etc.) [iceberg]#3536comphead wants to merge 38 commits intoapache:mainfrom
Conversation
|
@sqlbenchmark run tpch |
1 similar comment
|
@sqlbenchmark run tpch |
|
The changes in this PR are somehow causing large differences in memory usage compared to a recent build from the main branch.
DF52 shows a significant shift from off-heap execution memory to JVM heap. Peak off-heap execution dropped from 14.20 GB to 5.30 GB (-63%), while JVM heap rose from 6.38 GB to 10.60 GB (+66%). Net combined peak memory actually decreased (~20.6 GB to ~15.9 GB). |
The total mem used seems to be less? +4 for Heap and -8 for on Heap, so -4 total. The JVM growth looks weird as DF52 has very few JVM changes. Is the baseline for latest main? |
Comet TPC-H Benchmark ResultsBaseline: Query Times
Spark Configuration
Automated benchmark run by dfbench |
|
@sqlbenchmark run tpch --iterations 3 |
|
not sure how true this is yet, but Claude claims that we there may be a performance regression related to Iceberg scans: |
The baseline was a build from Friday. Claude is suggesting this as the root cause: "The memory shift is primarily driven by DataFusion 52's internal changes to how operators manage buffers (batch coalescing, expression evaluation), combined with these buffers likely not being tracked through Comet's CometUnifiedMemoryPool → JNI → Spark accounting path. The result is that native memory still exists but is "invisible" to Spark's off-heap accounting, while Spark's JVM-side operations expand to fill the perceived available space." |
Comet TPC-H Benchmark ResultsBaseline: Query Times
Spark Configuration
Automated benchmark run by dfbench |
|
I ran the queries individually and compared memory usage between main and this PR. Key findings from Claude analysis of the results:
|
|
I ran a memory profiling comparison to main with replaceSortMergeJoin=false as well.
|
| // let result_type = match &result { | ||
| // ColumnarValue::Array(array) => array.data_type().clone(), | ||
| // ColumnarValue::Scalar(scalar) => scalar.data_type(), | ||
| // }; | ||
|
|
||
| // println!( | ||
| // "spark_cast: {} -> {} (requested: {})", | ||
| // input_type, result_type, data_type | ||
| // ); |
| // println!("cast_array BEFORE postprocess:"); | ||
| // println!(" from_type: {}", from_type); | ||
| // println!(" to_type: {}", to_type); | ||
| // println!(" intermediate data_type: {}", x.data_type()); |
There was a problem hiding this comment.
debug logging can be removed?
native/spark-expr/src/utils.rs
Outdated
| } | ||
|
|
||
| fn datetime_cast_err(value: i64) -> ArrowError { | ||
| println!("{}", std::backtrace::Backtrace::force_capture()); |
There was a problem hiding this comment.
debug logging can be removed?
native/spark-expr/src/utils.rs
Outdated
| } | ||
| _ => { | ||
| // Not supported | ||
| panic!( |
native/spark-expr/src/utils.rs
Outdated
| } | ||
| _ => { | ||
| // Not supported | ||
| panic!( |
| .with_table_partition_cols(partition_fields) | ||
| /// Wraps a `SendableRecordBatchStream` to print each batch as it flows through. | ||
| /// Returns a new `SendableRecordBatchStream` that yields the same batches. | ||
| pub fn dbg_batch_stream(stream: SendableRecordBatchStream) -> SendableRecordBatchStream { |
There was a problem hiding this comment.
Found it useful for outputting a stream, preserving the original stream to the output. it is not used now, but would be handy for debug purposes.
| let millis_values: TimestampMillisecondArray = micros_array | ||
| .iter() | ||
| .map(|opt| opt.map(|v| v / 1000)) | ||
| .collect(); |
There was a problem hiding this comment.
| let millis_values: TimestampMillisecondArray = micros_array | |
| .iter() | |
| .map(|opt| opt.map(|v| v / 1000)) | |
| .collect(); | |
| let millis_values: TimestampMillisecondArray = | |
| arrow::compute::kernels::arity::unary(micros_array, |v| v / 1000); |
native/spark-expr/src/utils.rs
Outdated
| let micros_array: TimestampMicrosecondArray = millis_array | ||
| .iter() | ||
| .map(|opt| opt.map(|v| v * 1000)) | ||
| .collect(); |
There was a problem hiding this comment.
This could use unary kernel instead
| inner: S, | ||
| schema: SchemaRef, | ||
| /// Cached schema adapter with its source schema. Created when schema changes. | ||
| cached_adapter: Option<(SchemaRef, Arc<dyn SchemaMapper>)>, |
There was a problem hiding this comment.
Is caching no longer possible with DF52?
| + case s: FileSourceScanExec if s.bucketedScan => s | ||
| + case s: CometScanExec if s.bucketedScan => s | ||
| + case s: CometNativeScanExec if s.bucketedScan => s | ||
| + } |
There was a problem hiding this comment.
I think this file just has a whitespace change, can you get the version of the file from main to reduce the diff?
There was a problem hiding this comment.
I still don't see any changes other than whitespace here. Can we remove this from the PR?
|
@andygrove @mbutrovich |
| /// This handles the common case where the field names differ (e.g., Parquet uses "key_value" | ||
| /// while Spark uses "entries") but the key/value types are the same. | ||
| /// Returns None if the types are not compatible or not Map types. | ||
| fn adapt_map_to_schema(column: &ArrayRef, target_type: &DataType) -> Option<ArrayRef> { |
There was a problem hiding this comment.
does this need to handle nested maps?
There was a problem hiding this comment.
I'm actually thinking this should be part of schema rewrite rather than standalone cast. checking it
There was a problem hiding this comment.
removed it, as it repeated another cast
|
|
||
| test("cast TimestampType to LongType") { | ||
| castTest(generateTimestampsExtended(), DataTypes.LongType) | ||
| // currently fails on timestamps outside chrono |
There was a problem hiding this comment.
is there a follow on issue for this?
There was a problem hiding this comment.
double checking if it still the issue after timestamp conversion
andygrove
left a comment
There was a problem hiding this comment.
Thanks @comphead. This LGTM other than a couple of minor comments I added.
I am not sure about the Iceberg changes so will hold off on approving until @mbutrovich can review those in more detail.
spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala
Outdated
Show resolved
Hide resolved
spark/src/test/scala/org/apache/comet/CometMathExpressionSuite.scala
Outdated
Show resolved
Hide resolved
* fix: [df52] schema pruning crash on complex nested types
When `data_schema` is provided but `projection_vector` is None (the
NativeBatchReader / native_iceberg_compat path), the base schema was
incorrectly set to the pruned `required_schema`. This caused DataFusion
to think the table had only the pruned columns, leading to column index
misalignment in PhysicalExprAdapter. For example, reading "friends" at
logical index 0 would map to physical index 0 ("id") instead of the
correct index 4.
Fix: when `data_schema` is provided without a `projection_vector`,
compute the projection by mapping required field names to their indices
in the full data schema. Also harden `wrap_all_type_mismatches` to use
name-based lookup for physical fields instead of positional index.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* fix: handle field ID mapping in projection computation
When computing a name-based projection from required_schema to
data_schema, fall back to using required_schema directly when not
all fields can be matched by name. This handles Parquet field ID
mapping where column names differ between the read schema and file
schema.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
---------
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
) Add IgnoreCometSuite to ParquetVariantShreddingSuite in the 4.0.1 diff. VariantType shredding is a Spark 4.0 feature that Comet does not yet support (apache#2209). VariantShreddingSuite was already skipped but ParquetVariantShreddingSuite was missed, causing test failures in CI. Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Which issue does this PR close?
Closes #3046 .
Closes #3515
Rationale for this change
What changes are included in this PR?
How are these changes tested?