Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,34 @@ private boolean isCrossProduct(JoinOperator joinOp) {
return true;
}

/**
* When estimated rows already exceed {@link ConfVars#XPROD_SMALL_TABLE_ROWS_THRESHOLD}, still allow
* cross-product map join if {@link #computeOnlineDataSize} fits within the unconditional map-join
* byte budget ({@link ConfVars#HIVE_CONVERT_JOIN_NOCONDITIONAL_TASK_THRESHOLD}). NDV-based cardinality
* can overshoot row counts on tiny tables while bytes remain broadcast-safe.
*/
@VisibleForTesting
boolean crossProductBuildSideWithinBroadcastBudgetAfterRowCheck(Statistics parentStats,
long xprodRowThreshold, long noconditionalMaxBytes) {
long onlineBytes = computeOnlineDataSize(parentStats);
if (onlineBytes <= 0) {
LOG.debug(
"Cross-product map join: row estimate {} exceeds {} but online size unavailable; not applying byte fallback",
parentStats.getNumRows(), xprodRowThreshold);
return false;
}
if (onlineBytes <= noconditionalMaxBytes) {
LOG.info(
"Cross-product map join: row estimate {} exceeds {} but online size {} within broadcast budget {}; allowing map join",
parentStats.getNumRows(), xprodRowThreshold, onlineBytes, noconditionalMaxBytes);
return true;
}
LOG.debug(
"Cross-product map join: row estimate {} exceeds {} and online size {} exceeds budget {}",
parentStats.getNumRows(), xprodRowThreshold, onlineBytes, noconditionalMaxBytes);
return false;
}

/**
* Return result for getMapJoinConversion method.
*/
Expand Down Expand Up @@ -1299,14 +1327,21 @@ && checkShuffleSizeForLargeTable(joinOp, bigTablePosition, context)) {
boolean cartesianProductEdgeEnabled =
HiveConf.getBoolVar(context.conf, HiveConf.ConfVars.TEZ_CARTESIAN_PRODUCT_EDGE_ENABLED);
if (cartesianProductEdgeEnabled && !hasOuterJoin(joinOp) && isCrossProduct(joinOp)) {
final long xprodRowThreshold =
HiveConf.getIntVar(context.conf, HiveConf.ConfVars.XPROD_SMALL_TABLE_ROWS_THRESHOLD);
final long noconditionalBroadcastBudget =
HiveConf.getLongVar(context.conf, HiveConf.ConfVars.HIVE_CONVERT_JOIN_NOCONDITIONAL_TASK_THRESHOLD);
for (int i = 0 ; i < joinOp.getParentOperators().size(); i ++) {
if (i != bigTablePosition) {
Statistics parentStats = joinOp.getParentOperators().get(i).getStatistics();
if (parentStats.getNumRows() >
HiveConf.getIntVar(context.conf, HiveConf.ConfVars.XPROD_SMALL_TABLE_ROWS_THRESHOLD)) {
// if any of smaller side is estimated to generate more than
// threshold rows we would disable mapjoin
return null;
if (parentStats.getNumRows() > xprodRowThreshold) {
// NDV-based filters often estimate a few rows on tiny lookups (e.g. 2 vs 1); row count
// alone can reject a safe broadcast. If estimated online size still fits the same
// unconditional map-join byte budget, allow conversion (same knob as noconditionaltask).
if (!crossProductBuildSideWithinBroadcastBudgetAfterRowCheck(parentStats, xprodRowThreshold,
noconditionalBroadcastBudget)) {
return null;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.optimizer;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
Expand All @@ -37,6 +38,65 @@

class TestConvertJoinMapJoin {

@Test
void crossProductByteFallback_allowsWhenOnlineSizeWithinBudget() {
ConvertJoinMapJoin converter = new ConvertJoinMapJoin();
converter.hashTableLoadFactor = 0.75f;
Statistics stats = new Statistics(2L, 500L, 0L, 0L);
assertTrue(
converter.crossProductBuildSideWithinBroadcastBudgetAfterRowCheck(stats, 1L, 10_000_000L));
}

@Test
void crossProductByteFallback_rejectsWhenOnlineSizeExceedsBudget() {
ConvertJoinMapJoin converter = new ConvertJoinMapJoin();
converter.hashTableLoadFactor = 0.75f;
Statistics stats = new Statistics(50_000L, 50_000_000L, 0L, 0L);
assertFalse(
converter.crossProductBuildSideWithinBroadcastBudgetAfterRowCheck(stats, 1L, 10_000_000L));
}

@Test
void crossProductByteFallback_rejectsWhenBudgetTooSmallForEstimatedSize() {
ConvertJoinMapJoin converter = new ConvertJoinMapJoin();
converter.hashTableLoadFactor = 0.75f;
Statistics stats = new Statistics(2L, 500L, 0L, 0L);
assertFalse(converter.crossProductBuildSideWithinBroadcastBudgetAfterRowCheck(stats, 1L, 1L));
}

/**
* NDV-driven filter selectivity can estimate a tiny lookup at ~2 rows / a few hundred bytes
* onlineDataSize when {@code hive.xprod.mapjoin.small.table.rows=1}. The row-only gate would
* reject the broadcast even though the build side is well below the noconditionaltask byte
* budget. The byte fallback must still admit map-join in that shape.
*/
@Test
void crossProductByteFallback_twoRowsTinyOnlineSize() {
ConvertJoinMapJoin converter = new ConvertJoinMapJoin();
converter.hashTableLoadFactor = 0.75f;
final long ndvDrivenRowEstimate = 2L;
final long tinyDataSizeBytes = 296L;
Statistics stats = new Statistics(ndvDrivenRowEstimate, tinyDataSizeBytes, 0L, 0L);
final long xprodRowThreshold = 1L;
final long noconditionalBudgetBytes = 10_000_000L;
assertTrue(converter.crossProductBuildSideWithinBroadcastBudgetAfterRowCheck(
stats, xprodRowThreshold, noconditionalBudgetBytes));
}

/**
* Same small row estimate (2) as the previous case, but with bytes large enough that the build
* side exceeds the broadcast budget — the byte fallback must reject so the row-only cap still
* bites.
*/
@Test
void crossProductByteFallback_rejectsTwoRowsWhenEstimatedPayloadExceedsBudget() {
ConvertJoinMapJoin converter = new ConvertJoinMapJoin();
converter.hashTableLoadFactor = 0.75f;
Statistics stats = new Statistics(2L, 50_000_000L, 0L, 0L);
assertFalse(converter.crossProductBuildSideWithinBroadcastBudgetAfterRowCheck(
stats, 1L, 10_000_000L));
}

@Test
void testComputeOnlineDataSizeGenericLargeDataSize() {
ConvertJoinMapJoin converter = new ConvertJoinMapJoin();
Expand Down
Loading