Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 104 additions & 71 deletions datafusion/datasource-parquet/src/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,23 +125,16 @@ impl DatafusionArrowPredicate {
/// Create a new `DatafusionArrowPredicate` from a `FilterCandidate`
pub fn try_new(
candidate: FilterCandidate,
metadata: &ParquetMetaData,
rows_pruned: metrics::Count,
rows_matched: metrics::Count,
time: metrics::Time,
) -> Result<Self> {
let physical_expr =
reassign_expr_columns(candidate.expr, &candidate.filter_schema)?;
reassign_expr_columns(candidate.expr, &candidate.read_plan.projected_schema)?;

Ok(Self {
physical_expr,
// Use leaf indices: when nested columns are involved, we must specify
// leaf (primitive) column indices in the Parquet schema so the decoder
// can properly project and filter nested structures.
projection_mask: ProjectionMask::leaves(
metadata.file_metadata().schema_descr(),
candidate.projection.leaf_indices.iter().copied(),
),
projection_mask: candidate.read_plan.projection_mask,
rows_pruned,
rows_matched,
time,
Expand Down Expand Up @@ -191,22 +184,24 @@ pub(crate) struct FilterCandidate {
/// the filter and to order the filters when `reorder_predicates` is true.
/// This is generated by summing the compressed size of all columns that the filter references.
required_bytes: usize,
/// Column indices into the parquet file schema required to evaluate this filter.
projection: LeafProjection,
/// The Arrow schema containing only the columns required by this filter,
/// projected from the file's Arrow schema.
filter_schema: SchemaRef,
/// The resolved Parquet read plan (leaf indices + projected schema).
read_plan: ParquetReadPlan,
}

/// Projection specification for nested columns using Parquet leaf column indices.
/// The result of resolving which Parquet leaf columns and Arrow schema fields
/// are needed to evaluate an expression against a Parquet file
///
/// For nested types like List and Struct, Parquet stores data in leaf columns
/// (the primitive fields). This struct tracks which leaf columns are needed
/// to evaluate a filter expression.
/// This is the shared output of the column resolution pipeline used by both
/// the row filter to build `ArrowPredicate`s and the opener to build `ProjectionMask`s
#[derive(Debug, Clone)]
struct LeafProjection {
/// Leaf column indices in the Parquet schema descriptor.
leaf_indices: Vec<usize>,
pub(crate) struct ParquetReadPlan {
/// Projection mask built from leaf column indices in the Parquet schema.
/// Using a `ProjectionMask` directly (rather than raw indices) prevents
/// bugs from accidentally mixing up root vs leaf indices.
pub projection_mask: ProjectionMask,
/// The projected Arrow schema containing only the columns/fields required
/// Struct types are pruned to include only the accessed sub-fields
pub projected_schema: SchemaRef,
}

/// Helper to build a `FilterCandidate`.
Expand Down Expand Up @@ -238,39 +233,15 @@ impl FilterCandidateBuilder {
/// * `Ok(None)` if the expression cannot be used as an ArrowFilter
/// * `Err(e)` if an error occurs while building the candidate
pub fn build(self, metadata: &ParquetMetaData) -> Result<Option<FilterCandidate>> {
let Some(required_columns) = pushdown_columns(&self.expr, &self.file_schema)?
else {
return Ok(None);
};

let schema_descr = metadata.file_metadata().schema_descr();
let root_indices: Vec<_> =
required_columns.required_columns.into_iter().collect();

let mut leaf_indices = leaf_indices_for_roots(&root_indices, schema_descr);

let struct_leaf_indices = resolve_struct_field_leaves(
&required_columns.struct_field_accesses,
&self.file_schema,
schema_descr,
);
leaf_indices.extend_from_slice(&struct_leaf_indices);
leaf_indices.sort_unstable();
leaf_indices.dedup();

let projected_schema = build_filter_schema(
&self.file_schema,
&root_indices,
&required_columns.struct_field_accesses,
);

let required_bytes = size_of_columns(&leaf_indices, metadata)?;
Ok(Some(FilterCandidate {
expr: self.expr,
required_bytes,
projection: LeafProjection { leaf_indices },
filter_schema: projected_schema,
}))
Ok(
build_parquet_read_plan(&self.expr, &self.file_schema, metadata)?.map(
|(read_plan, required_bytes)| FilterCandidate {
expr: self.expr,
required_bytes,
read_plan,
},
),
)
}
}

Expand Down Expand Up @@ -551,17 +522,77 @@ fn pushdown_columns(
Ok((!checker.prevents_pushdown()).then(|| checker.into_sorted_columns()))
}

fn leaf_indices_for_roots(
root_indices: &[usize],
/// Resolves which Parquet leaf columns and Arrow schema fields are needed
/// to evaluate `expr` against a Parquet file
///
/// Returns `Ok(Some((plan, required_bytes)))` when the expression can be
/// evaluated using only pushdown-compatible columns. `Ok(None)` when it
/// cannot (it references whole struct columns or columns missing from disk).
///
/// The `required_bytes` is the total compressed size of all referenced columns
/// across all row groups, used to estimate filter evaluation cost.
///
/// Note: this is a shared entry point used by both row filter construction and
/// the opener's projection logic
pub(crate) fn build_parquet_read_plan(
expr: &Arc<dyn PhysicalExpr>,
file_schema: &Schema,
metadata: &ParquetMetaData,
) -> Result<Option<(ParquetReadPlan, usize)>> {
let schema_descr = metadata.file_metadata().schema_descr();

let Some(required_columns) = pushdown_columns(expr, file_schema)? else {
return Ok(None);
};

let root_indices = &required_columns.required_columns;

let mut leaf_indices =
leaf_indices_for_roots(root_indices.iter().copied(), schema_descr);

let struct_leaf_indices = resolve_struct_field_leaves(
&required_columns.struct_field_accesses,
file_schema,
schema_descr,
);
leaf_indices.extend_from_slice(&struct_leaf_indices);
leaf_indices.sort_unstable();
leaf_indices.dedup();

let required_bytes = size_of_columns(&leaf_indices, metadata)?;

let projection_mask =
ProjectionMask::leaves(schema_descr, leaf_indices.iter().copied());

let projected_schema = build_filter_schema(
file_schema,
root_indices,
&required_columns.struct_field_accesses,
);

Ok(Some((
ParquetReadPlan {
projection_mask,
projected_schema,
},
required_bytes,
)))
}

fn leaf_indices_for_roots<I>(
root_indices: I,
schema_descr: &SchemaDescriptor,
) -> Vec<usize> {
) -> Vec<usize>
where
I: IntoIterator<Item = usize>,
{
// Always map root (Arrow) indices to Parquet leaf indices via the schema
// descriptor. Arrow root indices only equal Parquet leaf indices when the
// schema has no group columns (Struct, Map, etc.); when group columns
// exist, their children become separate leaves and shift all subsequent
// leaf indices.
// Struct columns are unsupported.
let root_set: BTreeSet<_> = root_indices.iter().copied().collect();
let root_set: BTreeSet<_> = root_indices.into_iter().collect();

(0..schema_descr.num_columns())
.filter(|leaf_idx| {
Expand Down Expand Up @@ -887,7 +918,6 @@ pub fn build_row_filter(

DatafusionArrowPredicate::try_new(
candidate,
metadata,
predicate_rows_pruned,
predicate_rows_matched,
time.clone(),
Expand Down Expand Up @@ -957,7 +987,9 @@ mod test {
.expect("building candidate")
.expect("list pushdown should be supported");

assert_eq!(candidate.projection.leaf_indices, vec![list_index]);
let expected_mask =
ProjectionMask::leaves(metadata.file_metadata().schema_descr(), [list_index]);
assert_eq!(candidate.read_plan.projection_mask, expected_mask);
}

#[test]
Expand Down Expand Up @@ -997,7 +1029,6 @@ mod test {

let mut row_filter = DatafusionArrowPredicate::try_new(
candidate,
&metadata,
Count::new(),
Count::new(),
Time::new(),
Expand Down Expand Up @@ -1037,7 +1068,6 @@ mod test {

let mut row_filter = DatafusionArrowPredicate::try_new(
candidate,
&metadata,
Count::new(),
Count::new(),
Time::new(),
Expand Down Expand Up @@ -1414,10 +1444,11 @@ mod test {
.expect("filter on primitive col_b should be pushable");

// col_b is Parquet leaf 3 (shifted by struct_col's two children).
let expected_mask =
ProjectionMask::leaves(metadata.file_metadata().schema_descr(), [3]);
assert_eq!(
candidate.projection.leaf_indices,
vec![3],
"leaf_indices should be [3] for col_b"
candidate.read_plan.projection_mask, expected_mask,
"projection_mask should select only leaf 3 for col_b"
);
}

Expand Down Expand Up @@ -1564,10 +1595,11 @@ mod test {

// The filter accesses only s.value, so only Parquet leaf 1 is needed.
// Leaf 2 (s.label) is not read, reducing unnecessary I/O.
let expected_mask =
ProjectionMask::leaves(metadata.file_metadata().schema_descr(), [1]);
assert_eq!(
candidate.projection.leaf_indices,
vec![1],
"leaf_indices should contain only the accessed struct field leaf"
candidate.read_plan.projection_mask, expected_mask,
"projection_mask should select only the accessed struct field leaf"
);
}

Expand Down Expand Up @@ -1687,10 +1719,11 @@ mod test {
.expect("deeply nested get_field filter should be pushable");

// Only s.outer.inner (leaf 2) should be projected,
let expected_mask =
ProjectionMask::leaves(metadata.file_metadata().schema_descr(), [2]);
assert_eq!(
candidate.projection.leaf_indices,
vec![2],
"leaf_indices should be [2] for s.outer.inner, skipping sibling and cousin leaves"
candidate.read_plan.projection_mask, expected_mask,
"projection_mask should select only leaf 2 for s.outer.inner, skipping sibling and cousin leaves"
);
}

Expand Down
Loading