Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ Stage-0
Stage-1
Map 1 llap
File Output Operator [FS_10]
Select Operator [SEL_9] (rows=30 width=520)
Select Operator [SEL_9] (rows=3 width=520)
Output:["_col0","_col1","_col2","_col3","_col4","_col5"]
Map Join Operator [MAPJOIN_45] (rows=30 width=336)
Map Join Operator [MAPJOIN_45] (rows=3 width=336)
BucketMapJoin:true,Conds:SEL_2._col0, _col1=RS_7._col0, _col1(Inner),Output:["_col0","_col1","_col2","_col3"]
<-Map 2 [CUSTOM_EDGE] llap
MULTICAST [RS_7]
Expand Down Expand Up @@ -175,19 +175,19 @@ Stage-0
Stage-1
Map 1 llap
File Output Operator [FS_14]
Select Operator [SEL_13] (rows=10 width=520)
Select Operator [SEL_13] (rows=3 width=520)
Output:["_col0","_col1","_col2","_col3","_col4","_col5"]
Map Join Operator [MAPJOIN_49] (rows=10 width=336)
Map Join Operator [MAPJOIN_49] (rows=3 width=336)
BucketMapJoin:true,Conds:SEL_2._col0, _col1=RS_11._col0, _col1(Inner),Output:["_col0","_col1","_col2","_col3"]
<-Reducer 3 [CUSTOM_EDGE] llap
MULTICAST [RS_11]
PartitionCols:_col1
Group By Operator [GBY_8] (rows=1 width=168)
Group By Operator [GBY_8] (rows=3 width=168)
Output:["_col0","_col1"],keys:KEY._col0, KEY._col1
<-Map 2 [SIMPLE_EDGE] llap
SHUFFLE [RS_7]
PartitionCols:_col0, _col1
Group By Operator [GBY_6] (rows=1 width=168)
Group By Operator [GBY_6] (rows=3 width=168)
Output:["_col0","_col1"],keys:date_col, decimal_col
Select Operator [SEL_5] (rows=3 width=168)
Output:["date_col","decimal_col"]
Expand Down Expand Up @@ -245,9 +245,9 @@ Stage-0
Stage-1
Map 1 llap
File Output Operator [FS_10]
Select Operator [SEL_9] (rows=30 width=520)
Select Operator [SEL_9] (rows=3 width=520)
Output:["_col0","_col1","_col2","_col3","_col4","_col5"]
Map Join Operator [MAPJOIN_45] (rows=30 width=336)
Map Join Operator [MAPJOIN_45] (rows=3 width=336)
Conds:SEL_2._col0, _col1=RS_7._col0, _col1(Inner),Output:["_col0","_col1","_col2","_col3"]
<-Map 2 [BROADCAST_EDGE] llap
BROADCAST [RS_7]
Expand Down Expand Up @@ -309,19 +309,19 @@ Stage-0
Stage-1
Map 1 llap
File Output Operator [FS_14]
Select Operator [SEL_13] (rows=10 width=520)
Select Operator [SEL_13] (rows=3 width=520)
Output:["_col0","_col1","_col2","_col3","_col4","_col5"]
Map Join Operator [MAPJOIN_49] (rows=10 width=336)
Map Join Operator [MAPJOIN_49] (rows=3 width=336)
Conds:SEL_2._col0, _col1=RS_11._col0, _col1(Inner),Output:["_col0","_col1","_col2","_col3"]
<-Reducer 3 [BROADCAST_EDGE] llap
BROADCAST [RS_11]
PartitionCols:_col0, _col1
Group By Operator [GBY_8] (rows=1 width=168)
Group By Operator [GBY_8] (rows=3 width=168)
Output:["_col0","_col1"],keys:KEY._col0, KEY._col1
<-Map 2 [SIMPLE_EDGE] llap
SHUFFLE [RS_7]
PartitionCols:_col0, _col1
Group By Operator [GBY_6] (rows=1 width=168)
Group By Operator [GBY_6] (rows=3 width=168)
Output:["_col0","_col1"],keys:date_col, decimal_col
Select Operator [SEL_5] (rows=3 width=168)
Output:["date_col","decimal_col"]
Expand Down Expand Up @@ -379,9 +379,9 @@ Stage-0
Stage-1
Map 1 vectorized, llap
File Output Operator [FS_54]
Select Operator [SEL_53] (rows=30 width=520)
Select Operator [SEL_53] (rows=3 width=520)
Output:["_col0","_col1","_col2","_col3","_col4","_col5"]
Map Join Operator [MAPJOIN_52] (rows=30 width=336)
Map Join Operator [MAPJOIN_52] (rows=3 width=336)
BucketMapJoin:true,Conds:SEL_51._col0, _col1=RS_49._col0, _col1(Inner),Output:["_col0","_col1","_col2","_col3"]
<-Map 2 [CUSTOM_EDGE] vectorized, llap
MULTICAST [RS_49]
Expand Down Expand Up @@ -443,19 +443,19 @@ Stage-0
Stage-1
Map 1 vectorized, llap
File Output Operator [FS_61]
Select Operator [SEL_60] (rows=10 width=520)
Select Operator [SEL_60] (rows=3 width=520)
Output:["_col0","_col1","_col2","_col3","_col4","_col5"]
Map Join Operator [MAPJOIN_59] (rows=10 width=336)
Map Join Operator [MAPJOIN_59] (rows=3 width=336)
BucketMapJoin:true,Conds:SEL_58._col0, _col1=RS_56._col0, _col1(Inner),Output:["_col0","_col1","_col2","_col3"]
<-Reducer 3 [CUSTOM_EDGE] vectorized, llap
MULTICAST [RS_56]
PartitionCols:_col1
Group By Operator [GBY_55] (rows=1 width=168)
Group By Operator [GBY_55] (rows=3 width=168)
Output:["_col0","_col1"],keys:KEY._col0, KEY._col1
<-Map 2 [SIMPLE_EDGE] vectorized, llap
SHUFFLE [RS_54]
PartitionCols:_col0, _col1
Group By Operator [GBY_53] (rows=1 width=168)
Group By Operator [GBY_53] (rows=3 width=168)
Output:["_col0","_col1"],keys:date_col, decimal_col
Select Operator [SEL_52] (rows=3 width=168)
Output:["date_col","decimal_col"]
Expand Down Expand Up @@ -513,9 +513,9 @@ Stage-0
Stage-1
Map 1 vectorized, llap
File Output Operator [FS_54]
Select Operator [SEL_53] (rows=30 width=520)
Select Operator [SEL_53] (rows=3 width=520)
Output:["_col0","_col1","_col2","_col3","_col4","_col5"]
Map Join Operator [MAPJOIN_52] (rows=30 width=336)
Map Join Operator [MAPJOIN_52] (rows=3 width=336)
Conds:SEL_51._col0, _col1=RS_49._col0, _col1(Inner),Output:["_col0","_col1","_col2","_col3"]
<-Map 2 [BROADCAST_EDGE] vectorized, llap
BROADCAST [RS_49]
Expand Down Expand Up @@ -577,19 +577,19 @@ Stage-0
Stage-1
Map 1 vectorized, llap
File Output Operator [FS_61]
Select Operator [SEL_60] (rows=10 width=520)
Select Operator [SEL_60] (rows=3 width=520)
Output:["_col0","_col1","_col2","_col3","_col4","_col5"]
Map Join Operator [MAPJOIN_59] (rows=10 width=336)
Map Join Operator [MAPJOIN_59] (rows=3 width=336)
Conds:SEL_58._col0, _col1=RS_56._col0, _col1(Inner),Output:["_col0","_col1","_col2","_col3"]
<-Reducer 3 [BROADCAST_EDGE] vectorized, llap
BROADCAST [RS_56]
PartitionCols:_col0, _col1
Group By Operator [GBY_55] (rows=1 width=168)
Group By Operator [GBY_55] (rows=3 width=168)
Output:["_col0","_col1"],keys:KEY._col0, KEY._col1
<-Map 2 [SIMPLE_EDGE] vectorized, llap
SHUFFLE [RS_54]
PartitionCols:_col0, _col1
Group By Operator [GBY_53] (rows=1 width=168)
Group By Operator [GBY_53] (rows=3 width=168)
Output:["_col0","_col1"],keys:date_col, decimal_col
Select Operator [SEL_52] (rows=3 width=168)
Output:["date_col","decimal_col"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2030,6 +2030,18 @@ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
}
}

// NDV=0 means join key statistics are unavailable - fall back to joinFactor heuristic
if (allSatisfyPreCondition) {
for (int pos = 0; pos < parents.size(); pos++) {
ReduceSinkOperator parent = (ReduceSinkOperator) jop.getParentOperators().get(pos);
List<String> keyExprs = StatsUtils.getQualifedReducerKeyNames(parent.getConf().getOutputKeyColumnNames());
if (!satisfyPrecondition(parent.getStatistics(), keyExprs)) {
allSatisfyPreCondition = false;
break;
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This is probably ok but I want to check it again

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@okumin I am unsure I fully understand this comment, could you please provide more info?

Copy link
Contributor

@okumin okumin Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@konstantinb Sorry for confusing you. This is a comment for myself. I took a glance at this code, and it seems to be OK, but I have not dug into the entire semantics of JoinStatsRule. I can't merge an OSS pull request with a very optimistic imagination. So, I want to take a deep look again later. I'll write this sort of info in my private note next time. Sorry


if (allSatisfyPreCondition) {

// statistics object that is combination of statistics from all
Expand Down Expand Up @@ -3237,6 +3249,21 @@ static boolean satisfyPrecondition(Statistics stats) {
&& !stats.getColumnStatsState().equals(Statistics.State.NONE);
}

static boolean satisfyPrecondition(Statistics stats, List<String> joinKeys) {
// Empty tables have numRows bumped from 0 to 1 (see BasicStats.SetMinRowNumber),
// so numRows <= 1 may indicate an empty table where NDV=0 is legitimate, not "unknown"
if (stats.getNumRows() <= 1) {
return true;
}
for (String col : joinKeys) {
ColStatistics cs = stats.getColumnStatisticsFromColName(col);
if (cs != null && cs.getCountDistint() == 0L) {
return false;
}
}
return true;
}

// check if all parent statistics are available
private static boolean isAllParentsContainStatistics(Operator<? extends OperatorDesc> op) {
for (Operator<? extends OperatorDesc> parent : op.getParentOperators()) {
Expand Down
26 changes: 12 additions & 14 deletions ql/src/java/org/apache/hadoop/hive/ql/plan/Statistics.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,32 +240,30 @@ public void setColumnStats(List<ColStatistics> colStats) {
}

public void addToColumnStats(List<ColStatistics> colStats) {

if (columnStats == null) {
columnStats = Maps.newHashMap();
}

if (colStats != null) {
for (ColStatistics cs : colStats) {
ColStatistics updatedCS = null;
if (cs != null) {

String key = cs.getColumnName();
// if column statistics for a column is already found then merge the statistics
if (columnStats.containsKey(key) && columnStats.get(key) != null) {
updatedCS = columnStats.get(key);
updatedCS.setAvgColLen(Math.max(updatedCS.getAvgColLen(), cs.getAvgColLen()));
updatedCS.setNumNulls(StatsUtils.safeAdd(updatedCS.getNumNulls(), cs.getNumNulls()));
updatedCS.setCountDistint(Math.max(updatedCS.getCountDistint(), cs.getCountDistint()));
columnStats.put(key, updatedCS);
} else {
columnStats.put(key, cs);
}
columnStats.merge(cs.getColumnName(), cs, Statistics::mergeColStats);
}
}
}
}

private static ColStatistics mergeColStats(ColStatistics existing, ColStatistics incoming) {
existing.setAvgColLen(Math.max(existing.getAvgColLen(), incoming.getAvgColLen()));
existing.setNumNulls(StatsUtils.safeAdd(existing.getNumNulls(), incoming.getNumNulls()));
if (existing.getCountDistint() > 0 && incoming.getCountDistint() > 0) {
existing.setCountDistint(Math.max(existing.getCountDistint(), incoming.getCountDistint()));
} else {
existing.setCountDistint(0);
}
return existing;
}

public void updateColumnStatsState(State newState) {
this.columnStatsState = inferColumnStatsState(columnStatsState, newState);
}
Expand Down
24 changes: 19 additions & 5 deletions ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,7 @@ public static ColStatistics getColStatistics(ColumnStatisticsObj cso, String col
cs.setNumNulls(csd.getBinaryStats().getNumNulls());
} else if (colTypeLowerCase.equals(serdeConstants.TIMESTAMP_TYPE_NAME)) {
cs.setAvgColLen(JavaDataModel.get().lengthOfTimestamp());
cs.setCountDistint(csd.getTimestampStats().getNumDVs());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unsure if this was deliberately not added or an unintended omission. It does seem to improve stats' calculations of multiple .q test files, especially after more conservative NDV handling by PessimisticStatCombiner

cs.setNumNulls(csd.getTimestampStats().getNumNulls());
Long lowVal = (csd.getTimestampStats().getLowValue() != null) ? csd.getTimestampStats().getLowValue()
.getSecondsSinceEpoch() : null;
Expand Down Expand Up @@ -862,6 +863,7 @@ public static ColStatistics getColStatistics(ColumnStatisticsObj cso, String col
cs.setHistogram(csd.getDecimalStats().getHistogram());
} else if (colTypeLowerCase.equals(serdeConstants.DATE_TYPE_NAME)) {
cs.setAvgColLen(JavaDataModel.get().lengthOfDate());
cs.setCountDistint(csd.getDateStats().getNumDVs());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unsure if this was deliberately not added or an unintended omission. It does seem to improve stats' calculations of multiple .q test files, especially after more conservative NDV handling by PessimisticStatCombiner

cs.setNumNulls(csd.getDateStats().getNumNulls());
Long lowVal = (csd.getDateStats().getLowValue() != null) ? csd.getDateStats().getLowValue()
.getDaysSinceEpoch() : null;
Expand Down Expand Up @@ -2086,11 +2088,7 @@ private static List<Long> extractNDVGroupingColumns(List<ColStatistics> colStats
// compute product of distinct values of grouping columns
for (ColStatistics cs : colStats) {
if (cs != null) {
long ndv = cs.getCountDistint();
if (cs.getNumNulls() > 0) {
ndv = StatsUtils.safeAdd(ndv, 1);
}
ndvValues.add(ndv);
ndvValues.add(getGroupingColumnNdv(cs, parentStats));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create a separate pull request next time you update something with global impact. This PR affects approximately 200 test cases and would make it harder for a reviewer to validate them if it included two or more types of changes.

Copy link
Contributor

@okumin okumin Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After taking a glance at all test files, I started feeling I would like to separate unrelated changes, like below.

  • HIVE-29368: UDF changes
  • HIVE-XXXXX: cs.setCountDistint(csd.getTimestampStats().getNumDVs()) and similar changes
  • HIVE-XXXXX: getGroupingColumnNdv and related changed

This is because I can review each of them in 30 minutes if they are separated, so I will spend only 90 minutes in total. If all are included, it is not very obvious why each test case has changed. I need more focus, and we can't make a checkpoint because we can't merge it unless all changes are reasonable and all test cases are green (I know some integration tests are still failing and Sonar Cloud is reporting some remaining issues). This proposal is negotiable because it requires your efforts. I should have proposed it at the beginning.

} else {
if (parentStats.getColumnStatsState().equals(Statistics.State.COMPLETE)) {
// the column must be an aggregate column inserted by GBY. We
Expand All @@ -2109,4 +2107,20 @@ private static List<Long> extractNDVGroupingColumns(List<ColStatistics> colStats

return ndvValues;
}

private static long getGroupingColumnNdv(ColStatistics cs, Statistics parentStats) {
long ndv = cs.getCountDistint();

if (ndv == 0L) {
// Typically, ndv == 0 means "NDV unknown", and no safe GROUPBY adjustments are possible
// However, there is a special exception for "constant NULL" columns. They are intentionally generated
// with NDV values of 0 and numNulls == numRows, while their actual NDV is 1
if (cs.getNumNulls() >= parentStats.getNumRows()) {
ndv = 1L;
}
} else if (cs.getNumNulls() > 0L) {
ndv = StatsUtils.safeAdd(ndv, 1L);
}
return ndv;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hive.ql.stats.estimator;

import java.util.List;
import java.util.Optional;

import org.apache.hadoop.hive.ql.plan.ColStatistics;

/**
* Base class for StatEstimators that handle branching expressions (CASE/WHEN, IF, COALESCE).
* Combines branch statistics using PessimisticStatCombiner and accounts for distinct constants.
*/
public abstract class BranchingStatEstimator implements StatEstimator {
protected final int numberOfDistinctConstants;

protected BranchingStatEstimator(int numberOfDistinctConstants) {
this.numberOfDistinctConstants = numberOfDistinctConstants;
}

@Override
public Optional<ColStatistics> estimate(List<ColStatistics> argStats) {
PessimisticStatCombiner combiner = new PessimisticStatCombiner();
addBranchStats(combiner, argStats);
Optional<ColStatistics> result = combiner.getResult();
if (result.isPresent()) {
ColStatistics stat = result.get();
if (numberOfDistinctConstants > stat.getCountDistint() && stat.getCountDistint() > 0) {
stat.setCountDistint(numberOfDistinctConstants);
}
}
return result;
}

protected abstract void addBranchStats(PessimisticStatCombiner combiner, List<ColStatistics> argStats);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,31 +37,30 @@ public void add(ColStatistics stat) {
result.setRange(null);
result.setIsEstimated(true);
return;
} else {
if (stat.getAvgColLen() > result.getAvgColLen()) {
result.setAvgColLen(stat.getAvgColLen());
}
if (stat.getCountDistint() > result.getCountDistint()) {
result.setCountDistint(stat.getCountDistint());
}
if (stat.getNumNulls() > result.getNumNulls()) {
result.setNumNulls(stat.getNumNulls());
}
if (stat.getNumTrues() > result.getNumTrues()) {
result.setNumTrues(stat.getNumTrues());
}
if (stat.getNumFalses() > result.getNumFalses()) {
result.setNumFalses(stat.getNumFalses());
}
if (stat.isFilteredColumn()) {
result.setFilterColumn();
}

}

if (stat.getAvgColLen() > result.getAvgColLen()) {
result.setAvgColLen(stat.getAvgColLen());
}
if (stat.getCountDistint() == 0 || result.getCountDistint() == 0) {
result.setCountDistint(0L);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could appear counter-intuitive at first, however, when combining statistics of different logical branches of the same column, and having no reliable information about their interdependencies (i.e. in a "truly pessimistic" scenario), every other option appears to introduce undesired under-estimations, which often lead to catastrophic query failures.

For example, a simple column generated by an CASE..WHEN clause with three constants produces an NDV of 1 by the original code, while, in most cases, the "true" NDV is 3. If such a column participates in a GROUP BY condition later on, its estimated number of records naturally becomes "1". Even this seemingly small under-estimation could lead to bad decision of converting to a mapjoin or not, especially over large data sets.

Alternatively, trying to "total up" NDV values of the same columns could cause over-estimation of the true NDV of such a column, which, it its turn, could lead to a severe underestimation of records matching an "IN" filter, ultimately producing equally bad results as the previous case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edit: per the PR feedback, this has been refined to only set NDV to "Unknown" if either part of the combined values is also "Unknown", resulting in much better estimates

} else if (stat.getCountDistint() > result.getCountDistint()) {
result.setCountDistint(stat.getCountDistint());
}
if (stat.getNumNulls() > result.getNumNulls()) {
result.setNumNulls(stat.getNumNulls());
}
if (stat.getNumTrues() > result.getNumTrues()) {
result.setNumTrues(stat.getNumTrues());
}
if (stat.getNumFalses() > result.getNumFalses()) {
result.setNumFalses(stat.getNumFalses());
}
if (stat.isFilteredColumn()) {
result.setFilterColumn();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to change this method? I'm expecting stat = result here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not something I actually changed; the diff shows up because I've removed some pre-existing empty lines per the Quality Gate feedback

}

public Optional<ColStatistics> getResult() {
return Optional.of(result);

}
}
Loading