Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
use std::sync::Arc;

use arrow::array::{record_batch, RecordBatch};
use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};

use datafusion::assert_batches_eq;
use datafusion::common::not_impl_err;
use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion::common::{Result, ScalarValue};
use datafusion::common::Result;
use datafusion::datasource::listing::{
ListingTable, ListingTableConfig, ListingTableConfigExt, ListingTableUrl,
};
Expand Down Expand Up @@ -209,14 +209,4 @@ impl PhysicalExprAdapter for CustomCastsPhysicalExprAdapter {
})
.data()
}

fn with_partition_values(
&self,
partition_values: Vec<(FieldRef, ScalarValue)>,
) -> Arc<dyn PhysicalExprAdapter> {
Arc::new(Self {
inner: self.inner.with_partition_values(partition_values),
..self.clone()
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ use std::collections::HashMap;
use std::sync::Arc;

use arrow::array::RecordBatch;
use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;

use datafusion::assert_batches_eq;
use datafusion::catalog::memory::DataSourceExec;
use datafusion::catalog::{Session, TableProvider};
use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion::common::DFSchema;
use datafusion::common::{Result, ScalarValue};
use datafusion::datasource::listing::PartitionedFile;
Expand All @@ -39,12 +38,12 @@ use datafusion::logical_expr::utils::conjunction;
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::{lit, SessionConfig};
use datafusion_physical_expr_adapter::{
DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory,
replace_columns_with_literals, DefaultPhysicalExprAdapterFactory,
PhysicalExprAdapter, PhysicalExprAdapterFactory,
};
use futures::StreamExt;
use object_store::memory::InMemory;
Expand All @@ -60,16 +59,16 @@ const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
/// This example demonstrates how to:
/// 1. Store default values in field metadata using a constant key
/// 2. Create a custom PhysicalExprAdapter that reads these defaults
/// 3. Inject default values for missing columns in filter predicates
/// 3. Inject default values for missing columns in filter predicates using `replace_columns_with_literals`
/// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema adaptation
/// 5. Wrap string default values in cast expressions for proper type conversion
/// 5. Convert string default values to proper types using `ScalarValue::cast_to()` at planning time
///
/// Important: PhysicalExprAdapter is specifically designed for rewriting filter predicates
/// that get pushed down to file scans. For handling missing columns in projections,
/// other mechanisms in DataFusion are used (like SchemaAdapter).
///
/// The metadata-based approach provides a flexible way to store default values as strings
/// and cast them to the appropriate types at query time.
/// and cast them to the appropriate types at planning time, avoiding runtime overhead.
pub async fn default_column_values() -> Result<()> {
println!("=== Creating example data with missing columns and default values ===");

Expand Down Expand Up @@ -138,8 +137,8 @@ pub async fn default_column_values() -> Result<()> {
println!("This example demonstrates how PhysicalExprAdapter works:");
println!("1. Physical schema only has 'id' and 'name' columns");
println!("2. Logical schema has 'id', 'name', 'status', and 'priority' columns with defaults");
println!("3. Our custom adapter intercepts filter expressions on missing columns");
println!("4. Default values from metadata are injected as cast expressions");
println!("3. Our custom adapter uses replace_columns_with_literals to inject default values");
println!("4. Default values from metadata are cast to proper types at planning time");
println!("5. The DefaultPhysicalExprAdapter handles other schema adaptations");
println!("\nNote: PhysicalExprAdapter is specifically for filter predicates.");
println!("For projection columns, different mechanisms handle missing columns.");
Expand Down Expand Up @@ -206,7 +205,7 @@ impl TableProvider for DefaultValueTableProvider {
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
Arc::clone(&self.schema)
}

fn table_type(&self) -> TableType {
Expand All @@ -227,7 +226,7 @@ impl TableProvider for DefaultValueTableProvider {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let schema = self.schema.clone();
let schema = Arc::clone(&self.schema);
let df_schema = DFSchema::try_from(schema.clone())?;
let filter = state.create_physical_expr(
conjunction(filters.iter().cloned()).unwrap_or_else(|| lit(true)),
Expand Down Expand Up @@ -280,14 +279,15 @@ impl PhysicalExprAdapterFactory for DefaultValuePhysicalExprAdapterFactory {
physical_file_schema: SchemaRef,
) -> Arc<dyn PhysicalExprAdapter> {
let default_factory = DefaultPhysicalExprAdapterFactory;
let default_adapter = default_factory
.create(logical_file_schema.clone(), physical_file_schema.clone());
let default_adapter = default_factory.create(
Arc::clone(&logical_file_schema),
Arc::clone(&physical_file_schema),
);

Arc::new(DefaultValuePhysicalExprAdapter {
logical_file_schema,
physical_file_schema,
default_adapter,
partition_values: Vec::new(),
})
}
}
Expand All @@ -299,98 +299,36 @@ struct DefaultValuePhysicalExprAdapter {
logical_file_schema: SchemaRef,
physical_file_schema: SchemaRef,
default_adapter: Arc<dyn PhysicalExprAdapter>,
partition_values: Vec<(FieldRef, ScalarValue)>,
}

impl PhysicalExprAdapter for DefaultValuePhysicalExprAdapter {
fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
// First try our custom default value injection for missing columns
let rewritten = expr
.transform(|expr| {
self.inject_default_values(
expr,
&self.logical_file_schema,
&self.physical_file_schema,
)
})
.data()?;

// Then apply the default adapter as a fallback to handle standard schema differences
// like type casting, partition column handling, etc.
let default_adapter = if !self.partition_values.is_empty() {
self.default_adapter
.with_partition_values(self.partition_values.clone())
} else {
self.default_adapter.clone()
};

default_adapter.rewrite(rewritten)
}

fn with_partition_values(
&self,
partition_values: Vec<(FieldRef, ScalarValue)>,
) -> Arc<dyn PhysicalExprAdapter> {
Arc::new(DefaultValuePhysicalExprAdapter {
logical_file_schema: self.logical_file_schema.clone(),
physical_file_schema: self.physical_file_schema.clone(),
default_adapter: self.default_adapter.clone(),
partition_values,
})
}
}

impl DefaultValuePhysicalExprAdapter {
fn inject_default_values(
&self,
expr: Arc<dyn PhysicalExpr>,
logical_file_schema: &Schema,
physical_file_schema: &Schema,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
let column_name = column.name();

// Check if this column exists in the physical schema
if physical_file_schema.index_of(column_name).is_err() {
// Column is missing from physical schema, check if logical schema has a default
if let Ok(logical_field) =
logical_file_schema.field_with_name(column_name)
{
if let Some(default_value_str) =
logical_field.metadata().get(DEFAULT_VALUE_METADATA_KEY)
{
// Create a string literal and wrap it in a cast expression
let default_literal = self.create_default_value_expr(
default_value_str,
logical_field.data_type(),
)?;
return Ok(Transformed::yes(default_literal));
}
}
// Pre-compute replacements for missing columns with default values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is great

let mut replacements = HashMap::new();
for field in self.logical_file_schema.fields() {
// Skip columns that exist in physical schema
if self.physical_file_schema.index_of(field.name()).is_ok() {
continue;
}
}

// No transformation needed
Ok(Transformed::no(expr))
}

fn create_default_value_expr(
&self,
value_str: &str,
data_type: &DataType,
) -> Result<Arc<dyn PhysicalExpr>> {
// Create a string literal with the default value
let string_literal =
Arc::new(Literal::new(ScalarValue::Utf8(Some(value_str.to_string()))));

// If the target type is already Utf8, return the string literal directly
if matches!(data_type, DataType::Utf8) {
return Ok(string_literal);
// Check if this missing column has a default value in metadata
if let Some(default_str) = field.metadata().get(DEFAULT_VALUE_METADATA_KEY) {
// Create a Utf8 ScalarValue from the string and cast it to the target type
let string_value = ScalarValue::Utf8(Some(default_str.to_string()));
let typed_value = string_value.cast_to(field.data_type())?;
replacements.insert(field.name().as_str(), typed_value);
}
}

// Otherwise, wrap the string literal in a cast expression
let cast_expr = Arc::new(CastExpr::new(string_literal, data_type.clone(), None));
// Replace columns with their default literals if any
let rewritten = if !replacements.is_empty() {
let refs: HashMap<_, _> = replacements.iter().map(|(k, v)| (*k, v)).collect();
replace_columns_with_literals(expr, &refs)?
} else {
expr
};

Ok(cast_expr)
// Apply the default adapter as a fallback for other schema adaptations
self.default_adapter.rewrite(rewritten)
}
}
35 changes: 7 additions & 28 deletions datafusion-examples/examples/data_io/json_shredding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::any::Any;
use std::sync::Arc;

use arrow::array::{RecordBatch, StringArray};
use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};

use datafusion::assert_batches_eq;
use datafusion::common::tree_node::{
Expand Down Expand Up @@ -277,14 +277,14 @@ impl PhysicalExprAdapterFactory for ShreddedJsonRewriterFactory {
physical_file_schema: SchemaRef,
) -> Arc<dyn PhysicalExprAdapter> {
let default_factory = DefaultPhysicalExprAdapterFactory;
let default_adapter = default_factory
.create(logical_file_schema.clone(), physical_file_schema.clone());
let default_adapter = default_factory.create(
Arc::clone(&logical_file_schema),
Arc::clone(&physical_file_schema),
);

Arc::new(ShreddedJsonRewriter {
logical_file_schema,
physical_file_schema,
default_adapter,
partition_values: Vec::new(),
})
}
}
Expand All @@ -293,10 +293,8 @@ impl PhysicalExprAdapterFactory for ShreddedJsonRewriterFactory {
/// and wraps DefaultPhysicalExprAdapter for standard schema adaptation
#[derive(Debug)]
struct ShreddedJsonRewriter {
logical_file_schema: SchemaRef,
physical_file_schema: SchemaRef,
default_adapter: Arc<dyn PhysicalExprAdapter>,
partition_values: Vec<(FieldRef, ScalarValue)>,
}

impl PhysicalExprAdapter for ShreddedJsonRewriter {
Expand All @@ -307,27 +305,8 @@ impl PhysicalExprAdapter for ShreddedJsonRewriter {
.data()?;

// Then apply the default adapter as a fallback to handle standard schema differences
// like type casting, missing columns, and partition column handling
let default_adapter = if !self.partition_values.is_empty() {
self.default_adapter
.with_partition_values(self.partition_values.clone())
} else {
self.default_adapter.clone()
};

default_adapter.rewrite(rewritten)
}

fn with_partition_values(
&self,
partition_values: Vec<(FieldRef, ScalarValue)>,
) -> Arc<dyn PhysicalExprAdapter> {
Arc::new(ShreddedJsonRewriter {
logical_file_schema: self.logical_file_schema.clone(),
physical_file_schema: self.physical_file_schema.clone(),
default_adapter: self.default_adapter.clone(),
partition_values,
})
// like type casting and missing columns
self.default_adapter.rewrite(rewritten)
}
}

Expand Down
13 changes: 13 additions & 0 deletions datafusion/common/src/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ pub trait PruningStatistics {
/// This feeds into [`CompositePruningStatistics`] to allow pruning
/// with filters that depend both on partition columns and data columns
/// (e.g. `WHERE partition_col = data_col`).
#[deprecated(
since = "52.0.0",
note = "This struct is no longer used internally. Use `replace_columns_with_literals` from `datafusion-physical-expr-adapter` to substitute partition column values before pruning. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
)]
#[derive(Clone)]
pub struct PartitionPruningStatistics {
/// Values for each column for each container.
Expand All @@ -156,6 +160,7 @@ pub struct PartitionPruningStatistics {
partition_schema: SchemaRef,
}

#[expect(deprecated)]
impl PartitionPruningStatistics {
/// Create a new instance of [`PartitionPruningStatistics`].
///
Expand Down Expand Up @@ -232,6 +237,7 @@ impl PartitionPruningStatistics {
}
}

#[expect(deprecated)]
impl PruningStatistics for PartitionPruningStatistics {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
let index = self.partition_schema.index_of(column.name()).ok()?;
Expand Down Expand Up @@ -439,10 +445,15 @@ impl PruningStatistics for PrunableStatistics {
/// the first one is returned without any regard for completeness or accuracy.
/// That is: if the first statistics has information for a column, even if it is incomplete,
/// that is returned even if a later statistics has more complete information.
#[deprecated(
since = "52.0.0",
note = "This struct is no longer used internally. It may be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first. Please open an issue if you have a use case for it."
)]
pub struct CompositePruningStatistics {
pub statistics: Vec<Box<dyn PruningStatistics>>,
}

#[expect(deprecated)]
impl CompositePruningStatistics {
/// Create a new instance of [`CompositePruningStatistics`] from
/// a vector of [`PruningStatistics`].
Expand All @@ -457,6 +468,7 @@ impl CompositePruningStatistics {
}
}

#[expect(deprecated)]
impl PruningStatistics for CompositePruningStatistics {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
for stats in &self.statistics {
Expand Down Expand Up @@ -513,6 +525,7 @@ impl PruningStatistics for CompositePruningStatistics {
}

#[cfg(test)]
#[expect(deprecated)]
mod tests {
use crate::{
ColumnStatistics,
Expand Down
Loading