Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/iceberg/src/arrow/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ mod positional_deletes;
mod predicate_visitor;
mod projection;
mod row_filter;
mod virtual_columns;
pub use file_reader::ArrowFileReader;
pub(crate) use options::ParquetReadOptions;
use predicate_visitor::{CollectFieldIdVisitor, PredicateConverter};
Expand Down
80 changes: 50 additions & 30 deletions crates/iceberg/src/arrow/reader/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use futures::{StreamExt, TryStreamExt};
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder};

use super::virtual_columns::collect_arrow_virtual_columns;
use super::{
ArrowFileReader, ArrowReader, ParquetReadOptions, add_fallback_field_ids_to_arrow_schema,
apply_name_mapping_to_arrow_schema,
Expand Down Expand Up @@ -133,6 +134,9 @@ impl FileScanTaskReader {
.next()
.is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());

// Reader-supplied metadata columns (e.g. `_pos`) requested via projection.
let virtual_columns = collect_arrow_virtual_columns(task.project_field_ids())?;

// Three-branch schema resolution strategy matching Java's ReadConf constructor
//
// Per Iceberg spec Column Projection rules:
Expand All @@ -148,44 +152,60 @@ impl FileScanTaskReader {
// - Branch 1: hasIds(fileSchema) → trust embedded field IDs, use pruneColumns()
// - Branch 2: nameMapping present → applyNameMapping(), then pruneColumns()
// - Branch 3: fallback → addFallbackIds(), then pruneColumnsFallback()
let arrow_metadata = if missing_field_ids {
// Parquet file lacks field IDs - must assign them before reading
let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
// Branch 2: Apply name mapping to assign correct Iceberg field IDs
// Per spec rule #2: "Use schema.name-mapping.default metadata to map field id
// to columns without field id"
// Corresponds to Java's ParquetSchemaUtil.applyNameMapping()
apply_name_mapping_to_arrow_schema(
Arc::clone(arrow_metadata.schema()),
name_mapping,
)?
let arrow_metadata =
if missing_field_ids {
// Parquet file lacks field IDs - must assign them before reading
let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
// Branch 2: Apply name mapping to assign correct Iceberg field IDs
// Per spec rule #2: "Use schema.name-mapping.default metadata to map field id
// to columns without field id"
// Corresponds to Java's ParquetSchemaUtil.applyNameMapping()
apply_name_mapping_to_arrow_schema(
Arc::clone(arrow_metadata.schema()),
name_mapping,
)?
} else {
// Branch 3: No name mapping - use position-based fallback IDs
// Corresponds to Java's ParquetSchemaUtil.addFallbackIds()
add_fallback_field_ids_to_arrow_schema(arrow_metadata.schema())
};

let options = ArrowReaderOptions::new()
.with_schema(arrow_schema)
.with_virtual_columns(virtual_columns.clone())?;
ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options)
.map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to create ArrowReaderMetadata with field ID schema",
)
.with_source(e)
})?
} else if !virtual_columns.is_empty() {
// Branch 1 with virtual columns: rebuild metadata so arrow-rs emits them.
let options =
ArrowReaderOptions::new().with_virtual_columns(virtual_columns.clone())?;
ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options)
.map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to create ArrowReaderMetadata with virtual columns",
)
.with_source(e)
})?
} else {
// Branch 3: No name mapping - use position-based fallback IDs
// Corresponds to Java's ParquetSchemaUtil.addFallbackIds()
add_fallback_field_ids_to_arrow_schema(arrow_metadata.schema())
// Branch 1: File has embedded field IDs - trust them
arrow_metadata
};

let options = ArrowReaderOptions::new().with_schema(arrow_schema);
ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err(
|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to create ArrowReaderMetadata with field ID schema",
)
.with_source(e)
},
)?
} else {
// Branch 1: File has embedded field IDs - trust them
arrow_metadata
};

// Coerce INT96 timestamp columns to the resolution specified by the Iceberg schema.
// This must happen before building the stream reader to avoid i64 overflow in arrow-rs.
let arrow_metadata = if let Some(coerced_schema) =
coerce_int96_timestamps(arrow_metadata.schema(), &task.schema)
{
let options = ArrowReaderOptions::new().with_schema(Arc::clone(&coerced_schema));
let options = ArrowReaderOptions::new()
.with_schema(Arc::clone(&coerced_schema))
.with_virtual_columns(virtual_columns.clone())?;
ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err(
|e| {
Error::new(
Expand Down
3 changes: 2 additions & 1 deletion crates/iceberg/src/arrow/reader/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ impl ArrowReader {
if parquet_pos < parquet_root_fields.len() {
root_indices.push(parquet_pos);
}
// RecordBatchTransformer adds missing columns with NULL values
// Missing columns are filled post-read by RecordBatchTransformer
// (with NULLs) or by the parquet reader's virtual-columns mechanism.
}

if root_indices.is_empty() {
Expand Down
110 changes: 110 additions & 0 deletions crates/iceberg/src/arrow/reader/virtual_columns.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Maps Iceberg reserved metadata field ids to Apache Arrow virtual columns
//! consumed via [`parquet::arrow::arrow_reader::ArrowReaderOptions::with_virtual_columns`].
//!
//! Today only `_pos` is wired (mapped to the `RowNumber` extension type).

use std::collections::HashMap;
use std::sync::Arc;

use arrow_schema::{DataType, Field, FieldRef};
use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, RowNumber};

use crate::Result;
use crate::error::Error;
use crate::metadata_columns::{
RESERVED_COL_NAME_POS, RESERVED_FIELD_ID_POS, is_reader_supplied_metadata_field,
};

/// Returns the Arrow virtual columns to request for the given projected
/// Iceberg field ids; empty when none are reader-supplied.
pub(crate) fn collect_arrow_virtual_columns(project_field_ids: &[i32]) -> Result<Vec<FieldRef>> {
project_field_ids
.iter()
.copied()
.filter(|id| is_reader_supplied_metadata_field(*id))
.map(iceberg_metadata_field_to_arrow_virtual)
.collect()
}

/// Builds the Arrow `Field` for a single reader-supplied metadata field id.
///
/// The `PARQUET:field_id` metadata key lets `RecordBatchTransformer` route
/// the column by id; the extension type makes arrow-rs accept it as a
/// virtual column.
fn iceberg_metadata_field_to_arrow_virtual(field_id: i32) -> Result<FieldRef> {
let metadata = HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string())]);

let mut field = match field_id {
RESERVED_FIELD_ID_POS => {
Field::new(RESERVED_COL_NAME_POS, DataType::Int64, false).with_metadata(metadata)
}
_ => {
return Err(Error::new(
crate::ErrorKind::Unexpected,
format!(
"Iceberg metadata field id {field_id} is not produced by the Parquet reader"
),
));
}
};

if field_id == RESERVED_FIELD_ID_POS {
field.try_with_extension_type(RowNumber)?;
}

Ok(Arc::new(field))
}

#[cfg(test)]
mod tests {
use parquet::arrow::is_virtual_column;

use super::*;
use crate::metadata_columns::RESERVED_FIELD_ID_FILE;

#[test]
fn collect_returns_empty_for_no_metadata_fields() {
let virtuals = collect_arrow_virtual_columns(&[1, 2, 3]).unwrap();
assert!(virtuals.is_empty());
}

#[test]
fn collect_returns_pos_field_when_requested() {
let virtuals = collect_arrow_virtual_columns(&[1, RESERVED_FIELD_ID_POS, 2]).unwrap();
assert_eq!(virtuals.len(), 1);

let pos = &virtuals[0];
assert_eq!(pos.name(), RESERVED_COL_NAME_POS);
assert_eq!(pos.data_type(), &DataType::Int64);
assert!(!pos.is_nullable());
assert!(is_virtual_column(pos));
assert_eq!(
pos.metadata().get(PARQUET_FIELD_ID_META_KEY),
Some(&RESERVED_FIELD_ID_POS.to_string()),
);
}

#[test]
fn collect_skips_constant_metadata_fields() {
// `_file` is a metadata column but is added as a constant, not by the reader.
let virtuals = collect_arrow_virtual_columns(&[RESERVED_FIELD_ID_FILE, 1]).unwrap();
assert!(virtuals.is_empty());
}
}
44 changes: 42 additions & 2 deletions crates/iceberg/src/arrow/record_batch_transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use arrow_schema::{
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;

use crate::arrow::value::{create_primitive_array_repeated, create_primitive_array_single_element};
use crate::arrow::{datum_to_arrow_type_with_ree, schema_to_arrow_schema};
use crate::metadata_columns::get_metadata_field;
use crate::arrow::{datum_to_arrow_type_with_ree, schema_to_arrow_schema, type_to_arrow_type};
use crate::metadata_columns::{get_metadata_field, is_reader_supplied_metadata_field};
use crate::spec::{
Datum, Literal, PartitionSpec, PrimitiveLiteral, Schema as IcebergSchema, Struct, Transform,
};
Expand Down Expand Up @@ -394,6 +394,19 @@ impl RecordBatchTransformer {
.with_metadata(field.metadata().clone());
Ok(Arc::new(constant_field))
}
} else if is_reader_supplied_metadata_field(*field_id) {
// Reader-supplied virtual field (e.g. `_pos`) is absent from
// the snapshot schema; build its target field from the
// iceberg metadata-column definition.
let iceberg_field = get_metadata_field(*field_id)?;
let arrow_type = type_to_arrow_type(&iceberg_field.field_type)?;
let arrow_field =
Field::new(&iceberg_field.name, arrow_type, !iceberg_field.required)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
iceberg_field.id.to_string(),
)]));
Ok(Arc::new(arrow_field))
} else {
// Regular field - use schema as-is
Ok(field_id_to_mapped_schema_map
Expand Down Expand Up @@ -456,6 +469,16 @@ impl RecordBatchTransformer {
return SchemaComparison::Different;
}

// Same type at the same position with different field ids means
// the columns are actually reordered; force the field-id-aware path.
if let (Some(source_id), Some(target_id)) = (
source_field.metadata().get(PARQUET_FIELD_ID_META_KEY),
target_field.metadata().get(PARQUET_FIELD_ID_META_KEY),
) && source_id != target_id
{
return SchemaComparison::Different;
}

if source_field.name() != target_field.name() {
names_changed = true;
}
Expand Down Expand Up @@ -493,6 +516,23 @@ impl RecordBatchTransformer {
});
}

if is_reader_supplied_metadata_field(*field_id) {
// Routed by field id from the source batch (parquet virtual column).
let (_, source_index) = field_id_to_source_schema_map
.get(field_id)
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
format!(
"reader-supplied virtual field id {field_id} not found in source batch"
),
)
})?;
return Ok(ColumnSource::PassThrough {
source_index: *source_index,
});
}

let (target_field, _) =
field_id_to_mapped_schema_map
.get(field_id)
Expand Down
6 changes: 6 additions & 0 deletions crates/iceberg/src/metadata_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,12 @@ pub fn get_metadata_field_id(column_name: &str) -> Result<i32> {
}
}

/// Returns `true` if the metadata field is produced by the file reader
/// (e.g. `_pos` via Parquet virtual columns) rather than added as a constant.
pub fn is_reader_supplied_metadata_field(field_id: i32) -> bool {
matches!(field_id, RESERVED_FIELD_ID_POS)
}

/// Checks if a field ID is a metadata field.
///
/// # Arguments
Expand Down
Loading
Loading