Skip to content

Estimate aggregate output rows using existing NDV statistics #20926

Open
buraksenn wants to merge 3 commits intoapache:mainfrom
buraksenn:top-k-aggregate-estimation
Open

Estimate aggregate output rows using existing NDV statistics #20926
buraksenn wants to merge 3 commits intoapache:mainfrom
buraksenn:top-k-aggregate-estimation

Conversation

@buraksenn
Copy link
Contributor

Which issue does this PR close?

Part of #20766

Rationale for this change

Grouped aggregations currently estimate output rows as input_rows, ignoring available NDV statistics. Spark's AggregateEstimation and Trino's AggregationStatsRule both use NDV products to tighten this estimate. This PR is highly referenced by both.

What changes are included in this PR?

  • Estimate aggregate output rows as min(input_rows, product(NDV_i + null_adj_i) * grouping_sets)
  • Cap by Top K limit when active since output row cannot be higher than K
  • Propagate distinct_count from child stats to group-by output columns

Are these changes tested?

Yes existing and new tests that cover different scenarios and edge cases

Are there any user-facing changes?

No

@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Mar 13, 2026
@github-actions github-actions bot added the core Core DataFusion crate label Mar 13, 2026
Copy link
Member

@asolimando asolimando left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @buraksenn for working on this, I have left a few comments, hoping this helps!

let ndv = *col_stats.distinct_count.get_value()?;
let null_adjustment = match col_stats.null_count.get_value() {
Some(&n) if n > 0 => 1usize,
_ => 0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: it's a reasonable default but I'd add this to the comment of the function explicitly

let ndv_product = self.compute_group_ndv(child_statistics);
if let Some(ndv) = ndv_product {
let grouping_set_num = self.group_by.groups.len();
let ndv_estimate = ndv.saturating_mul(grouping_set_num);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look correct to me, grouping sets target different columns.

Consider GROUPING SETS ((a), (b), (a, b)), here you would compute NDV(a) * NDV(b) * 3, while the number of distinct values should be NDV(a) + NDV(b) + NDV(a)*NDV(b).

Trino is bailing out for (multiple) grouping sets, Spark does not take them into account as, AFAIU, it rewrites them as union of aggregates at earlier phases.

It's fine to either bail out like Trino does, but if you want to support this, the code would be something like this (pseudocode):

res = 0;
for each grouping set gs:
  part_res = 0;
  for each col in gs:
     part_res *= ndv(col) + null_count(col(gs)) > 1 ? 1 : 0;
  res += part_res;  

child_statistics.num_rows.map(|x| x * grouping_set_num)
}
} else if let Some(limit_opts) = &self.limit_options {
Precision::Inexact(limit_opts.limit)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we might have ndv even if num_rows is unset, maybe we can return Inexact(min(ndv, limit))?

for (expr, _) in self.group_by.expr.iter() {
let col = expr.as_any().downcast_ref::<Column>()?;
let col_stats = &child_statistics.column_statistics[col.index()];
let ndv = *col_stats.distinct_count.get_value()?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we multiply by ndv, we might end up with a total of zero if any ndv is zero. If the column has only null values, you might have num_rows > = 1 and ndv = 0, so let's use min(num_rows, ndv) here to be more robust.

@buraksenn
Copy link
Contributor Author

buraksenn commented Mar 16, 2026

Thanks @asolimando for detailed review. I thought this one was more polished after looking at Trino and Spark in detail but I've missed some important points. I'll carefully apply your reviews and adjust implementation early tomorrow

@asolimando
Copy link
Member

Thanks @asolimando for detailed review. I thought this one was more polished after looking at Trino and Spark in detail but I've missed some important points. I'll carefully apply your reviews and adjust implementation early tomorrow

No worries, I think the PR already brings a considerable improvement over the existing, and that the final outcome will be a more precise estimation than Trino and Spark. I am bit busy today and tomorrow so don't rush unless it's good for your schedule. I will make sure to review changes by the end of the week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants