Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 44 additions & 30 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,16 @@ incremental = false
inherits = "release"
debug = true
strip = false

[patch.crates-io]
parquet = { git = "https://github.com/pydantic/arrow-rs.git", branch = "57-1-0-9066" }
arrow = { git = "https://github.com/pydantic/arrow-rs.git", branch = "57-1-0-9066" }
arrow-array = { git = "https://github.com/pydantic/arrow-rs.git", branch = "57-1-0-9066" }
arrow-buffer = { git = "https://github.com/pydantic/arrow-rs.git", branch = "57-1-0-9066" }
arrow-cast = { git = "https://github.com/pydantic/arrow-rs.git", branch = "57-1-0-9066" }
arrow-data = { git = "https://github.com/pydantic/arrow-rs.git", branch = "57-1-0-9066" }
arrow-ipc = { git = "https://github.com/pydantic/arrow-rs.git", branch = "57-1-0-9066" }
arrow-ord = { git = "https://github.com/pydantic/arrow-rs.git", branch = "57-1-0-9066" }
arrow-schema = { git = "https://github.com/pydantic/arrow-rs.git", branch = "57-1-0-9066" }
arrow-select = { git = "https://github.com/pydantic/arrow-rs.git", branch = "57-1-0-9066" }
arrow-string = { git = "https://github.com/pydantic/arrow-rs.git", branch = "57-1-0-9066" }
103 changes: 103 additions & 0 deletions datafusion/core/tests/parquet/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
//! select * from data limit 10;
//! ```

use arrow::array::{ArrayRef, Int32Array, Int64Array, StructArray};
use arrow::compute::concat_batches;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use arrow::record_batch::RecordBatch;
use datafusion::physical_plan::collect;
use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
Expand All @@ -35,7 +37,9 @@ use datafusion::prelude::{
};
use datafusion::test_util::parquet::{ParquetScanOptions, TestParquetFile};
use datafusion_expr::utils::{conjunction, disjunction, split_conjunction};
use datafusion_functions::expr_fn::get_field;
use std::path::Path;
use std::sync::Arc;

use datafusion_common::test_util::parquet_test_data;
use datafusion_execution::config::SessionConfig;
Expand Down Expand Up @@ -731,3 +735,102 @@ impl PredicateCacheTest {
Ok(())
}
}

/// Number of rows for struct filter pushdown tests
const STRUCT_NUM_ROWS: usize = 4096;

/// Creates test batches with a struct column containing an `id` field.
///
/// Schema: struct_col { id: Int32 }, value: Int64
/// Data: id values from 0 to STRUCT_NUM_ROWS-1, value same as id
fn create_struct_test_batches() -> Vec<RecordBatch> {
let struct_fields = Fields::from(vec![Field::new("id", DataType::Int32, false)]);
let schema = Arc::new(Schema::new(vec![
Field::new("struct_col", DataType::Struct(struct_fields.clone()), false),
Field::new("value", DataType::Int64, false),
]));

// Create the data: struct_col.id = [0, 1, 2, ...], value = [0, 1, 2, ...]
let ids: Vec<i32> = (0..STRUCT_NUM_ROWS as i32).collect();
let id_array = Int32Array::from(ids);
let struct_array = StructArray::from(vec![(
Arc::new(Field::new("id", DataType::Int32, false)),
Arc::new(id_array) as ArrayRef,
)]);

let values: Vec<i64> = (0..STRUCT_NUM_ROWS as i64).collect();
let value_array = Int64Array::from(values);

let batch =
RecordBatch::try_new(schema, vec![Arc::new(struct_array), Arc::new(value_array)])
.unwrap();

vec![batch]
}

/// Tests that filter pushdown works for struct column field access.
///
/// Verifies that:
/// 1. Filters on struct fields (e.g., `struct_col['id'] = 500`) are pushed down
/// 2. The `pushdown_rows_pruned` metric shows rows were actually pruned
/// 3. The correct number of rows are returned
#[tokio::test]
async fn struct_filter_pushdown() {
let batches = create_struct_test_batches();

// Set the row group size smaller so we can test row group pruning
let props = WriterProperties::builder()
.set_max_row_group_size(1024)
.build();

let tempdir = TempDir::new_in(Path::new(".")).unwrap();

let test_parquet_file = TestParquetFile::try_new(
tempdir.path().join("struct_data.parquet"),
props,
batches,
)
.unwrap();

// Test 1: Selective equality filter on struct field
// struct_col['id'] = 500 should match exactly 1 row
let case = TestCase::new(&test_parquet_file)
.with_name("struct_field_equality")
.with_filter(get_field(col("struct_col"), "id").eq(lit(500)))
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(1);
case.run().await;

// Test 2: Comparison filter on struct field
// struct_col['id'] > 3000 should match 1095 rows (4095 - 3000)
let case = TestCase::new(&test_parquet_file)
.with_name("struct_field_comparison")
.with_filter(get_field(col("struct_col"), "id").gt(lit(3000)))
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(1095);
case.run().await;

// Test 3: Conjunction filter with struct field
// struct_col['id'] >= 1000 AND struct_col['id'] < 2000 should match 1000 rows
let case = TestCase::new(&test_parquet_file)
.with_name("struct_field_conjunction")
.with_filter(
conjunction([
get_field(col("struct_col"), "id").gt_eq(lit(1000)),
get_field(col("struct_col"), "id").lt(lit(2000)),
])
.unwrap(),
)
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(1000);
case.run().await;

// Test 4: Filter that matches nothing
// struct_col['id'] = 9999 should match 0 rows (max id is 4095)
let case = TestCase::new(&test_parquet_file)
.with_name("struct_field_no_match")
.with_filter(get_field(col("struct_col"), "id").eq(lit(9999)))
.with_pushdown_expected(PushdownExpected::Some)
.with_expected_rows(0);
case.run().await;
}
7 changes: 7 additions & 0 deletions datafusion/datasource-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,11 @@ tokio = { workspace = true }
[dev-dependencies]
chrono = { workspace = true }
criterion = { workspace = true }
datafusion-functions = { workspace = true }
datafusion-functions-nested = { workspace = true }
rand = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread"] }

# Note: add additional linter rules in lib.rs.
# Rust does not support workspace + new linter rules in subcrates yet
Expand All @@ -80,3 +83,7 @@ parquet_encryption = [
[[bench]]
name = "parquet_nested_filter_pushdown"
harness = false

[[bench]]
name = "parquet_struct_filter_pushdown"
harness = false
Loading