-
Notifications
You must be signed in to change notification settings - Fork 1.9k
feat: Extract NDV (distinct_count) statistics from Parquet metadata #19957
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This change adds support for reading Number of Distinct Values (NDV) statistics from Parquet file metadata when available. Previously, `distinct_count` in `ColumnStatistics` was always set to `Precision::Absent`. Now it is populated from parquet row group column statistics when present: - Single row group with NDV: `Precision::Exact(ndv)` - Multiple row groups with NDV: `Precision::Inexact(max)` as lower bound (we can't accurately merge NDV since duplicates may exist across row groups; max is more conservative than sum for join cardinality estimation) - No NDV available: `Precision::Absent` This provides foundation for improved join cardinality estimation and other statistics-based optimizations. Relates to apache#15265
- Statistics merge: use max as conservative lower bound instead of discarding NDV (duplicates may exist across partitions) - Projection: preserve NDV for single-column expressions as upper bound
gene-bordegaray
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have a few minor comments but this looks good 💯
datafusion/common/src/stats.rs
Outdated
| Precision::Inexact(*v) | ||
| } | ||
| (Precision::Absent, Precision::Absent) => Precision::Absent, | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this verbosity could be reduced to something like:
col_stats.distinct_count = col_stats.distinct_count.get_value()
.max(item_col_stats.distinct_count.get_value())
.map(|&v| Precision::Inexact(v))
.unwrap_or(Precision::Absent);or we could introduce some method like max_inexact() on Precision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot, this is very neat, addressed in db182e5!
| is_max_value_exact: &mut [Option<bool>], | ||
| is_min_value_exact: &mut [Option<bool>], | ||
| column_byte_sizes: &[Precision<usize>], | ||
| distinct_counts: &[Precision<usize>], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A nit but maybe these could be extracted into a struct that encapsulates these parameters as fields - say extend StatisticsAccumulators and use this or create a new struct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adopted StatisticsAccumulators as suggested, it feels better and I got rid of the "too many arguments" warning suppression, addressed in 4833ef5
| use parquet::arrow::parquet_to_arrow_schema; | ||
| use parquet::file::reader::{FileReader, SerializedFileReader}; | ||
| use std::fs::File; | ||
| use std::path::PathBuf; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: since these tests are in their own module, I think moving these to the ndv_test module level would be ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion, adopted in e36c46a
| // TODO stats: estimate more statistics from expressions | ||
| // (expressions should compute their statistics themselves) | ||
| ColumnStatistics::new_unknown() | ||
| // TODO: expressions should compute their own statistics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
noice, this is useful thanks for understanding implications with using and propagating distincts. thank you 😄
Partition columns now preserve distinct_count as Inexact(1) when merging statistics, reflecting that each partition file has a single distinct partition value.
Use get_value().max() chain instead of verbose match statement for merging NDV in Statistics::try_merge()
Encapsulate get_col_stats parameters by adding build_column_statistics() method to StatisticsAccumulators, removing the standalone function.
Move imports to module level in ndv_tests since they're in their own module anyway.
gene-bordegaray
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great refactor, very clean
| } | ||
|
|
||
| fn summarize_min_max_null_counts( | ||
| impl StatisticsAccumulators<'_> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn't know about this notation, an anonymous lifetime. cool 😄
Which issue does this PR close?
Related: #18628, #8227
(I am not sure if an new issue specifically for the scope of the PR is needed, happy to create it if needed)
Rationale for this change
This work originates from a discussion in datafusion-distributed about improving the
TaskEstimatorAPI:datafusion-contrib/datafusion-distributed#296 (comment)
We agreed that improved statistics support in DataFusion would benefit both projects. For distributed-datafusion, better cardinality estimation helps decide how to split computation across network boundaries.
This also benefits DataFusion directly, as CBO is already in place, for example, join cardinality estimation (
joins/utils.rs:586-646) usesdistinct_countviamax_distinct_countto compute join selectivity.Currently this field is always
Absentwhen reading from Parquet, so this PR fills that gap.What changes are included in this PR?
Commit 1 - Reading NDV from Parquet files:
distinct_countfrom Parquet row group column statisticsPrecision::Exact(ndv)Precision::Inexact(max)as conservative lower boundPrecision::AbsentCommit 2 - Statistics propagation (can be split to a separate PR, if preferred):
Statistics::try_merge(): use max as conservative lower bound instead of discarding NDVProjection: preserve NDV for single-column expressions as upper boundI'm including the second commit to showcase how I intend to use the statistics, but these changes can be split to a follow-up PR to keep review scope limited.
Are these changes tested?
Yes, 7 unit tests are added for NDV extraction:
row_filter.rs:685-696, usingparquet_to_arrow_schemato derive the schema from the file)Are there any user-facing changes?
No breaking changes. Statistics consumers will now see populated
distinct_countvalues when available in Parquet metadata.Disclaimer: I used AI (Claude Code) to assist translating my ideas into code as I am still ramping up with the codebase and especially with Rust (guidance on both aspects is highly appreciated). I have a good understanding of the core concepts (statistics, CBO etc.) and have carefully double-checked that the PR matches my intentions and understanding.
cc: @gabotechs @jayshrivastava @NGA-TRAN @gene-bordegaray