Skip to content

Commit dab176d

Browse files
unify parquet read plan
1 parent b61aee7 commit dab176d

1 file changed

Lines changed: 76 additions & 50 deletions

File tree

datafusion/datasource-parquet/src/row_filter.rs

Lines changed: 76 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ impl DatafusionArrowPredicate {
131131
time: metrics::Time,
132132
) -> Result<Self> {
133133
let physical_expr =
134-
reassign_expr_columns(candidate.expr, &candidate.filter_schema)?;
134+
reassign_expr_columns(candidate.expr, &candidate.read_plan.projected_schema)?;
135135

136136
Ok(Self {
137137
physical_expr,
@@ -140,7 +140,7 @@ impl DatafusionArrowPredicate {
140140
// can properly project and filter nested structures.
141141
projection_mask: ProjectionMask::leaves(
142142
metadata.file_metadata().schema_descr(),
143-
candidate.projection.leaf_indices.iter().copied(),
143+
candidate.read_plan.leaf_indices.iter().copied(),
144144
),
145145
rows_pruned,
146146
rows_matched,
@@ -191,22 +191,22 @@ pub(crate) struct FilterCandidate {
191191
/// the filter and to order the filters when `reorder_predicates` is true.
192192
/// This is generated by summing the compressed size of all columns that the filter references.
193193
required_bytes: usize,
194-
/// Column indices into the parquet file schema required to evaluate this filter.
195-
projection: LeafProjection,
196-
/// The Arrow schema containing only the columns required by this filter,
197-
/// projected from the file's Arrow schema.
198-
filter_schema: SchemaRef,
194+
/// The resolved Parquet read plan (leaf indices + projected schema).
195+
read_plan: ParquetReadPlan,
199196
}
200197

201-
/// Projection specification for nested columns using Parquet leaf column indices.
198+
/// The result of resolving which Parquet leaf columns and Arrow schema fields
199+
/// are needed to evaluate an expression against a Parquet file
202200
///
203-
/// For nested types like List and Struct, Parquet stores data in leaf columns
204-
/// (the primitive fields). This struct tracks which leaf columns are needed
205-
/// to evaluate a filter expression.
201+
/// This is the shared output of the column resolution pipeline used by both
202+
/// the row filter to build `ArrowPredicate`s and the opener to build `ProjectionMask`s
206203
#[derive(Debug, Clone)]
207-
struct LeafProjection {
208-
/// Leaf column indices in the Parquet schema descriptor.
209-
leaf_indices: Vec<usize>,
204+
pub(crate) struct ParquetReadPlan {
205+
/// Leaf column indices in the Parquet schema descriptor to decode
206+
pub leaf_indices: Vec<usize>,
207+
/// The projected Arrow schema containing only the columns/fields required
208+
/// Struct types are pruned to include only the accessed sub-fields
209+
pub projected_schema: SchemaRef,
210210
}
211211

212212
/// Helper to build a `FilterCandidate`.
@@ -238,38 +238,17 @@ impl FilterCandidateBuilder {
238238
/// * `Ok(None)` if the expression cannot be used as an ArrowFilter
239239
/// * `Err(e)` if an error occurs while building the candidate
240240
pub fn build(self, metadata: &ParquetMetaData) -> Result<Option<FilterCandidate>> {
241-
let Some(required_columns) = pushdown_columns(&self.expr, &self.file_schema)?
242-
else {
243-
return Ok(None);
244-
};
245-
246241
let schema_descr = metadata.file_metadata().schema_descr();
247-
let root_indices: Vec<_> =
248-
required_columns.required_columns.into_iter().collect();
249-
250-
let mut leaf_indices = leaf_indices_for_roots(&root_indices, schema_descr);
251-
252-
let struct_leaf_indices = resolve_struct_field_leaves(
253-
&required_columns.struct_field_accesses,
254-
&self.file_schema,
255-
schema_descr,
256-
);
257-
leaf_indices.extend_from_slice(&struct_leaf_indices);
258-
leaf_indices.sort_unstable();
259-
leaf_indices.dedup();
260-
261-
let projected_schema = build_filter_schema(
262-
&self.file_schema,
263-
&root_indices,
264-
&required_columns.struct_field_accesses,
265-
);
242+
let read_plan =
243+
match build_parquet_read_plan(&self.expr, &self.file_schema, schema_descr)? {
244+
Some(plan) => plan,
245+
None => return Ok(None),
246+
};
266247

267-
let required_bytes = size_of_columns(&leaf_indices, metadata)?;
268248
Ok(Some(FilterCandidate {
269249
expr: self.expr,
270-
required_bytes,
271-
projection: LeafProjection { leaf_indices },
272-
filter_schema: projected_schema,
250+
required_bytes: size_of_columns(&read_plan.leaf_indices, metadata)?,
251+
read_plan,
273252
}))
274253
}
275254
}
@@ -551,17 +530,64 @@ fn pushdown_columns(
551530
Ok((!checker.prevents_pushdown()).then(|| checker.into_sorted_columns()))
552531
}
553532

554-
fn leaf_indices_for_roots(
555-
root_indices: &[usize],
533+
/// Resolves which Parquet leaf columns and Arrow schema fields are needed
534+
/// to evaluate `expr` against a Parquet file
535+
///
536+
/// Returns `Ok(Some(plan))` when the expression can be evaluated using only
537+
/// pushdown-compatible columns. `Ok(None)` when it can not (it references
538+
/// whole struct columns or columns missing from disk)
539+
///
540+
/// Note: this is a shared entry point used by both row filter construction and
541+
/// the opener's projection logic
542+
pub(crate) fn build_parquet_read_plan(
543+
expr: &Arc<dyn PhysicalExpr>,
544+
file_schema: &Schema,
545+
schema_descr: &SchemaDescriptor,
546+
) -> Result<Option<ParquetReadPlan>> {
547+
let Some(required_columns) = pushdown_columns(expr, file_schema)? else {
548+
return Ok(None);
549+
};
550+
551+
let root_indices = &required_columns.required_columns;
552+
553+
let mut leaf_indices =
554+
leaf_indices_for_roots(root_indices.iter().copied(), schema_descr);
555+
556+
let struct_leaf_indices = resolve_struct_field_leaves(
557+
&required_columns.struct_field_accesses,
558+
file_schema,
559+
schema_descr,
560+
);
561+
leaf_indices.extend_from_slice(&struct_leaf_indices);
562+
leaf_indices.sort_unstable();
563+
leaf_indices.dedup();
564+
565+
let projected_schema = build_filter_schema(
566+
file_schema,
567+
root_indices,
568+
&required_columns.struct_field_accesses,
569+
);
570+
571+
Ok(Some(ParquetReadPlan {
572+
leaf_indices,
573+
projected_schema,
574+
}))
575+
}
576+
577+
fn leaf_indices_for_roots<I>(
578+
root_indices: I,
556579
schema_descr: &SchemaDescriptor,
557-
) -> Vec<usize> {
580+
) -> Vec<usize>
581+
where
582+
I: IntoIterator<Item = usize>,
583+
{
558584
// Always map root (Arrow) indices to Parquet leaf indices via the schema
559585
// descriptor. Arrow root indices only equal Parquet leaf indices when the
560586
// schema has no group columns (Struct, Map, etc.); when group columns
561587
// exist, their children become separate leaves and shift all subsequent
562588
// leaf indices.
563589
// Struct columns are unsupported.
564-
let root_set: BTreeSet<_> = root_indices.iter().copied().collect();
590+
let root_set: BTreeSet<_> = root_indices.into_iter().collect();
565591

566592
(0..schema_descr.num_columns())
567593
.filter(|leaf_idx| {
@@ -957,7 +983,7 @@ mod test {
957983
.expect("building candidate")
958984
.expect("list pushdown should be supported");
959985

960-
assert_eq!(candidate.projection.leaf_indices, vec![list_index]);
986+
assert_eq!(candidate.read_plan.leaf_indices, vec![list_index]);
961987
}
962988

963989
#[test]
@@ -1415,7 +1441,7 @@ mod test {
14151441

14161442
// col_b is Parquet leaf 3 (shifted by struct_col's two children).
14171443
assert_eq!(
1418-
candidate.projection.leaf_indices,
1444+
candidate.read_plan.leaf_indices,
14191445
vec![3],
14201446
"leaf_indices should be [3] for col_b"
14211447
);
@@ -1565,7 +1591,7 @@ mod test {
15651591
// The filter accesses only s.value, so only Parquet leaf 1 is needed.
15661592
// Leaf 2 (s.label) is not read, reducing unnecessary I/O.
15671593
assert_eq!(
1568-
candidate.projection.leaf_indices,
1594+
candidate.read_plan.leaf_indices,
15691595
vec![1],
15701596
"leaf_indices should contain only the accessed struct field leaf"
15711597
);
@@ -1688,7 +1714,7 @@ mod test {
16881714

16891715
// Only s.outer.inner (leaf 2) should be projected,
16901716
assert_eq!(
1691-
candidate.projection.leaf_indices,
1717+
candidate.read_plan.leaf_indices,
16921718
vec![2],
16931719
"leaf_indices should be [2] for s.outer.inner, skipping sibling and cousin leaves"
16941720
);

0 commit comments

Comments
 (0)