Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions crates/iceberg/src/arrow/delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::sync::Arc;

use futures::{StreamExt, TryStreamExt};
use parquet::arrow::arrow_reader::ArrowReaderOptions;

use crate::arrow::ArrowReader;
use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
Expand Down Expand Up @@ -60,13 +61,22 @@ impl BasicDeleteFileLoader {
Essentially a super-cut-down ArrowReader. We can't use ArrowReader directly
as that introduces a circular dependency.
*/
let parquet_metadata = ArrowReader::load_parquet_metadata(
data_file_path,
&self.file_io,
false,
file_size_in_bytes,
None,
)
.await?;

let record_batch_stream = ArrowReader::create_parquet_record_batch_stream_builder(
data_file_path,
self.file_io.clone(),
false,
None,
None,
ArrowReaderOptions::default(),
file_size_in_bytes,
parquet_metadata,
)
.await?
.build()?
Expand Down
120 changes: 83 additions & 37 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use fnv::FnvHashSet;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use parquet::arrow::arrow_reader::{
ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
};
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder, ProjectionMask};
Expand Down Expand Up @@ -229,22 +229,32 @@ impl ArrowReader {
let delete_filter_rx =
delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema));

// Migrated tables lack field IDs, requiring us to inspect the schema to choose
// between field-ID-based or position-based projection
let initial_stream_builder = Self::create_parquet_record_batch_stream_builder(
// Load Parquet metadata once for this file
let parquet_metadata = Self::load_parquet_metadata(
&task.data_file_path,
file_io.clone(),
&file_io,
should_load_page_index,
None,
metadata_size_hint,
task.file_size_in_bytes,
metadata_size_hint,
)
.await?;

// Migrated tables lack field IDs, requiring us to inspect the schema to choose
// between field-ID-based or position-based projection
let default_arrow_metadata =
ArrowReaderMetadata::try_new(Arc::clone(&parquet_metadata), Default::default())
.map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to create ArrowReaderMetadata from ParquetMetaData",
)
.with_source(e)
})?;

// Check if Parquet file has embedded field IDs
// Corresponds to Java's ParquetSchemaUtil.hasIds()
// Reference: parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java:118
let missing_field_ids = initial_stream_builder
let missing_field_ids = default_arrow_metadata
.schema()
.fields()
.iter()
Expand All @@ -266,39 +276,40 @@ impl ArrowReader {
// - Branch 1: hasIds(fileSchema) → trust embedded field IDs, use pruneColumns()
// - Branch 2: nameMapping present → applyNameMapping(), then pruneColumns()
// - Branch 3: fallback → addFallbackIds(), then pruneColumnsFallback()
let mut record_batch_stream_builder = if missing_field_ids {
let options = if missing_field_ids {
// Parquet file lacks field IDs - must assign them before reading
let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
// Branch 2: Apply name mapping to assign correct Iceberg field IDs
// Per spec rule #2: "Use schema.name-mapping.default metadata to map field id
// to columns without field id"
// Corresponds to Java's ParquetSchemaUtil.applyNameMapping()
apply_name_mapping_to_arrow_schema(
Arc::clone(initial_stream_builder.schema()),
Arc::clone(default_arrow_metadata.schema()),
name_mapping,
)?
} else {
// Branch 3: No name mapping - use position-based fallback IDs
// Corresponds to Java's ParquetSchemaUtil.addFallbackIds()
add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema())
add_fallback_field_ids_to_arrow_schema(default_arrow_metadata.schema())
};

let options = ArrowReaderOptions::new().with_schema(arrow_schema);

Self::create_parquet_record_batch_stream_builder(
&task.data_file_path,
file_io.clone(),
should_load_page_index,
Some(options),
metadata_size_hint,
task.file_size_in_bytes,
)
.await?
ArrowReaderOptions::new().with_schema(arrow_schema)
} else {
// Branch 1: File has embedded field IDs - trust them
initial_stream_builder
ArrowReaderOptions::default()
};

// Create a single builder with the correct options and pre-loaded metadata
let mut record_batch_stream_builder = Self::create_parquet_record_batch_stream_builder(
&task.data_file_path,
file_io.clone(),
should_load_page_index,
options,
task.file_size_in_bytes,
parquet_metadata,
)
.await?;

// Filter out metadata fields for Parquet projection (they don't exist in files)
let project_field_ids_without_metadata: Vec<i32> = task
.project_field_ids
Expand Down Expand Up @@ -490,19 +501,17 @@ impl ArrowReader {
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
}

pub(crate) async fn create_parquet_record_batch_stream_builder(
/// Loads Parquet metadata from storage, handling file size optimization.
pub(crate) async fn load_parquet_metadata(
data_file_path: &str,
file_io: FileIO,
file_io: &FileIO,
should_load_page_index: bool,
arrow_reader_options: Option<ArrowReaderOptions>,
metadata_size_hint: Option<usize>,
file_size_in_bytes: u64,
) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader>> {
// Get the metadata for the Parquet file we need to read and build
// a reader for the data within
metadata_size_hint: Option<usize>,
) -> Result<Arc<ParquetMetaData>> {
let parquet_file = file_io.new_input(data_file_path)?;
let parquet_reader = parquet_file.reader().await?;
let mut parquet_file_reader = ArrowFileReader::new(
let mut reader = ArrowFileReader::new(
FileMetadata {
size: file_size_in_bytes,
},
Expand All @@ -513,14 +522,51 @@ impl ArrowReader {
.with_preload_page_index(should_load_page_index);

if let Some(hint) = metadata_size_hint {
parquet_file_reader = parquet_file_reader.with_metadata_size_hint(hint);
reader = reader.with_metadata_size_hint(hint);
}

// Create the record batch stream builder, which wraps the parquet file reader
let options = arrow_reader_options.unwrap_or_default();
let record_batch_stream_builder =
ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader, options).await?;
Ok(record_batch_stream_builder)
let arrow_metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
.await
.map_err(|e| {
Error::new(ErrorKind::Unexpected, "Failed to load Parquet metadata").with_source(e)
})?;

Ok(Arc::clone(arrow_metadata.metadata()))
}

pub(crate) async fn create_parquet_record_batch_stream_builder(
data_file_path: &str,
file_io: FileIO,
should_load_page_index: bool,
arrow_reader_options: ArrowReaderOptions,
file_size_in_bytes: u64,
parquet_metadata: Arc<ParquetMetaData>,
) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader>> {
let arrow_reader_metadata =
ArrowReaderMetadata::try_new(parquet_metadata, arrow_reader_options).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to create ArrowReaderMetadata from ParquetMetaData",
)
.with_source(e)
})?;

let parquet_file = file_io.new_input(data_file_path)?;
let parquet_reader = parquet_file.reader().await?;
let parquet_file_reader = ArrowFileReader::new(
FileMetadata {
size: file_size_in_bytes,
},
parquet_reader,
)
.with_preload_column_index(true)
.with_preload_offset_index(true)
.with_preload_page_index(should_load_page_index);

Ok(ParquetRecordBatchStreamBuilder::new_with_metadata(
parquet_file_reader,
arrow_reader_metadata,
))
}

/// computes a `RowSelection` from positional delete indices.
Expand Down
Loading