-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Move partition handling out of PhysicalExprAdapter #19128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move partition handling out of PhysicalExprAdapter #19128
Conversation
| &mut self, | ||
| expr: Arc<dyn PhysicalExpr>, | ||
| ) -> Result<Arc<dyn PhysicalExpr>> { | ||
| pub fn simplify(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is backwards compatible (aside from compiler warnings that the variable doesn't need to be mutable) and means we can keep multiple references, arc it, etc.
|
This is blocked by one of #19129 or #19130, I think either should make it work. Currently some tests fail because we can't prune based on As a follow up to this we should be able to delete |
|
I've merged #19129 into this branch to get CI passing. I'll fix the conflicts once we merge that. |
eb5ff93 to
0ec0329
Compare
| /// - `Result<Arc<dyn PhysicalExpr>>`: The rewritten physical expression with columns replaced by literals. | ||
| pub fn replace_columns_with_literals( | ||
| expr: Arc<dyn PhysicalExpr>, | ||
| replacements: &HashMap<&str, &ScalarValue>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Open to accepting HashMap<String, ScalarValue> or something if that makes lifetimes easier
569010f to
1949a45
Compare
387fc6d to
c981725
Compare
6147632 to
5595f37
Compare
|
Starting to check this one out |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went through this one carefully and it makes a lot of sense to me -- "Partition Columns" is a higher level concept in my mind than expression rewriting. Thank you @adriangb
Replacing column references with expressions is a better match
While reviewing this PR, I worked some on the documentation and made a PR targeting this one for your contemplation (I can update it to target main if you want to merge this PR as is):
Finally, should we mark this PR as an API change / update the upgrading.md doc? I can't keep track of what versions these APIs were changed
|
|
||
| /// Replace column references in the given physical expression with literal values. | ||
| /// | ||
| /// This is used to substitute partition column references with their literal values during expression rewriting. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another potential usecase is filling values for columns with default values (rather than NULL)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point! I'm going to update datafusion-examples/examples/custom_data_source/default_column_values.rs to use this new method precisely for that.
| replacements: &HashMap<&str, &ScalarValue>, | ||
| ) -> Result<Arc<dyn PhysicalExpr>> { | ||
| expr.transform(|expr| { | ||
| if let Some(column) = expr.as_any().downcast_ref::<Column>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the let chains are quite cool
| .map(|(field, value)| (field.name().as_str(), value)) | ||
| .collect(); | ||
|
|
||
| self.predicate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it worth skipping this clone/rewrite if there are no partition_values (aka if partition_values.is_empty()?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
| // Make a FilePruner only if there is either a dynamic expr in the predicate or the file has file-level statistics. | ||
| // File-level statistics may allow us to prune the file without loading any row groups or metadata. | ||
| // If there is a dynamic filter we may be able to prune the file later as the dynamic filter is updated. | ||
| // This does allow the case where there is a dynamic filter but no statistics, in which case |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It took me a bit to parse this comment
I left a suggestion of how we might be able to improve it
| limit: None, | ||
| predicate: Some(predicate), | ||
| logical_file_schema: schema.clone(), | ||
| table_schema: TableSchema::from_file_schema(schema.clone()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| table_schema: TableSchema::from_file_schema(schema.clone()), | |
| table_schema: TableSchema::from_file_schema(Arc::clone(schema.clone)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure why clippy didn't catch this
| predicate: Some(predicate), | ||
| logical_file_schema: file_schema.clone(), | ||
| table_schema: TableSchema::new( | ||
| file_schema.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| file_schema.clone(), | |
| Arc::clone(file_schema), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(and several other examples below)
| return Ok(Transformed::yes(default_literal)); | ||
| } | ||
| } | ||
| // Pre-compute replacements for missing columns with default values |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is great
This PR does some refactoring of
PhysicalExprAdapterandPhysicalExprSimplifierthat I found necessary and/or beneficial while working on #19111.Changes made
Replace
PhysicalExprAdapter::with_partition_valueswithreplace_columns_with_literalsThis is a nice improvement because it:
PhysicalExprAdaptertrait that users might need to implement simpler (less boilerplate for users).This will require any users calling
PhysicalExprAdapterdirectly to change their code, I can add an entry to the upgrade guide.Remove partition pruning logic from
FilePrunerand deprecate now unusedPrunableStatisticsandCompositePruningStatistics.Since we replace partition values with literals we no longer need these structures, they get handled like any other literals.
This is a good chunk of code / complexity that we can bin off.
Use
TableSchemainstead ofSchemaRef+Vec<FieldRef>inParquetOpenerTableSchemais basicallySchemaRef+Vec<FieldRef>already and sinceParquetSourcehas aTableSchemaits less code and less clones to propagate that intoParquetOpener