Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3594,6 +3594,9 @@ public void testExplainPlanQueryV2()
assertEquals(response1Json.get("rows").get(0).get(2).asText(), "Rule Execution Times\n"
+ "Rule: SortRemove -> Time:*\n"
+ "Rule: AggregateProjectMerge -> Time:*\n"
+ "Rule: AggregateProjectPullUpConstants -> Time:*\n"
+ "Rule: ProjectAggregateMerge -> Time:*\n"
+ "Rule: SortRemoveConstantKeys -> Time:*\n"
+ "Rule: EvaluateProjectLiteral -> Time:*\n"
+ "Rule: AggregateRemove -> Time:*\n");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.calcite.rel.rules.AggregateCaseToFilterRule;
import org.apache.calcite.rel.rules.AggregateJoinTransposeRule;
import org.apache.calcite.rel.rules.AggregateProjectMergeRule;
import org.apache.calcite.rel.rules.AggregateProjectPullUpConstantsRule;
import org.apache.calcite.rel.rules.AggregateRemoveRule;
import org.apache.calcite.rel.rules.AggregateUnionAggregateRule;
import org.apache.calcite.rel.rules.CoreRules;
Expand All @@ -31,6 +32,7 @@
import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
import org.apache.calcite.rel.rules.FilterSetOpTransposeRule;
import org.apache.calcite.rel.rules.JoinPushExpressionsRule;
import org.apache.calcite.rel.rules.ProjectAggregateMergeRule;
import org.apache.calcite.rel.rules.ProjectFilterTransposeRule;
import org.apache.calcite.rel.rules.ProjectMergeRule;
import org.apache.calcite.rel.rules.ProjectRemoveRule;
Expand All @@ -41,7 +43,11 @@
import org.apache.calcite.rel.rules.SemiJoinRule;
import org.apache.calcite.rel.rules.SortJoinCopyRule;
import org.apache.calcite.rel.rules.SortJoinTransposeRule;
import org.apache.calcite.rel.rules.SortMergeRule;
import org.apache.calcite.rel.rules.SortProjectTransposeRule;
import org.apache.calcite.rel.rules.SortRemoveConstantKeysRule;
import org.apache.calcite.rel.rules.SortRemoveRule;
import org.apache.calcite.rel.rules.UnionMergeRule;
import org.apache.calcite.rel.rules.UnionToDistinctRule;
import org.apache.pinot.calcite.rel.rules.PinotFilterJoinRule.PinotFilterIntoJoinRule;
import org.apache.pinot.calcite.rel.rules.PinotFilterJoinRule.PinotJoinConditionPushRule;
Expand Down Expand Up @@ -113,6 +119,11 @@ private PinotQueryRuleSets() {
SortJoinCopyRule.Config.DEFAULT
.withDescription(PlannerRuleNames.SORT_JOIN_COPY).toRule(),

// Push Sort below Project so LIMIT applies before projection expressions are evaluated.
// Default-on; sits with other transpose rules.
SortProjectTransposeRule.Config.DEFAULT
.withDescription(PlannerRuleNames.SORT_PROJECT_TRANSPOSE).toRule(),

// join rules
JoinPushExpressionsRule.Config.DEFAULT
.withDescription(PlannerRuleNames.JOIN_PUSH_EXPRESSIONS).toRule(),
Expand Down Expand Up @@ -196,8 +207,28 @@ private PinotQueryRuleSets() {
.withDescription(PlannerRuleNames.FILTER_MERGE).toRule(),
AggregateRemoveRule.Config.DEFAULT
.withDescription(PlannerRuleNames.AGGREGATE_REMOVE).toRule(),
// Drop constant columns from GROUP BY keys when the Aggregate's input can prove constancy
// (typically from an equality filter on the column). Reduces shuffle key width on
// multi-tenant queries like `WHERE tenant_id = 'X' GROUP BY tenant_id, ...`. Default-on.
// Use Config.ANY (matches any RelNode below the Aggregate). Config.DEFAULT requires a
// LogicalProject directly below the Aggregate, which never appears in Pinot's pipeline
// because filter pushdown consumes the Project before PRUNE_RULES runs.
AggregateProjectPullUpConstantsRule.Config.ANY
.withDescription(PlannerRuleNames.AGGREGATE_PROJECT_PULL_UP_CONSTANTS).toRule(),
SortRemoveRule.Config.DEFAULT
.withDescription(PlannerRuleNames.SORT_REMOVE).toRule(),
// Collapse stacked Sort/LIMIT nodes (e.g. from sub-query flattening) into a single Sort. Default-on.
SortMergeRule.Config.LIMIT_MERGE
.withDescription(PlannerRuleNames.LIMIT_MERGE).toRule(),
// Drop constant columns from ORDER BY (e.g. WHERE x='Y' ORDER BY x, ts → ORDER BY ts). Default-on.
SortRemoveConstantKeysRule.Config.DEFAULT
.withDescription(PlannerRuleNames.SORT_REMOVE_CONSTANT_KEYS).toRule(),
// Flatten nested UNION ALLs into a single n-ary union (eliminates intermediate exchange stages). Default-on.
UnionMergeRule.Config.DEFAULT
.withDescription(PlannerRuleNames.UNION_MERGE).toRule(),
// Drop unused aggregate calls when a Project on top of the Aggregate doesn't reference them. Default-on.
ProjectAggregateMergeRule.Config.DEFAULT
.withDescription(PlannerRuleNames.PROJECT_AGGREGATE_MERGE).toRule(),
PruneEmptyRules.CorrelateLeftEmptyRuleConfig.DEFAULT
.withDescription(PlannerRuleNames.PRUNE_EMPTY_CORRELATE_LEFT).toRule(),
PruneEmptyRules.CorrelateRightEmptyRuleConfig.DEFAULT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,130 @@ public void testEnableSortJoinCopy() {
//@formatter:on
}

// ---------------------------------------------------------------------------
// Tests for Calcite optimization rules added to the planner in this change. The 5 default-on
// rules each have a paired enabled-by-default / disabled assertion to keep the contract
// explicit; the SortProjectTranspose opt-in rule has the same pair so a future flip to
// default-on is caught. ResourceBasedQueryPlansTest covers the broader plan-shape surface.
// ---------------------------------------------------------------------------

@Test
public void testAggregateProjectPullUpConstantsEnabledByDefault() {
// Default-on. `WHERE col1='US' GROUP BY col1, col2` drops col1 from the group key and
// re-introduces it as a projected literal — shuffle key shrinks from (col1, col2) to (col2).
String query = "EXPLAIN PLAN FOR SELECT col1, col2, COUNT(*) FROM a WHERE col1 = 'US' GROUP BY col1, col2";
String explain = _queryEnvironment.explainQuery(query, RANDOM_REQUEST_ID_GEN.nextLong());
assertFalse(explain.contains("group=[{0, 1}]"),
"AggregateProjectPullUpConstants should remove col1 from group keys. Plan:\n" + explain);
assertTrue(explain.contains("col1=[_UTF-8'US'"),
"AggregateProjectPullUpConstants should re-project col1 as the literal 'US'. Plan:\n" + explain);
}

@Test
public void testDisableAggregateProjectPullUpConstants() {
// Disabling the rule must leave the full (col1, col2) group key intact.
String query = "EXPLAIN PLAN FOR SELECT col1, col2, COUNT(*) FROM a WHERE col1 = 'US' GROUP BY col1, col2";
String explain = explainQueryWithRuleDisabled(query, PlannerRuleNames.AGGREGATE_PROJECT_PULL_UP_CONSTANTS);
assertTrue(explain.contains("group=[{0, 1}]"),
"Without AggregateProjectPullUpConstants, both columns must remain in GROUP BY. Plan:\n" + explain);
}

@Test
public void testLimitMergeEnabledByDefault() {
// Default-on. An outer LIMIT 5 over an inner LIMIT 10 collapses to the tighter outer LIMIT.
String query = "EXPLAIN PLAN FOR SELECT col1 FROM (SELECT col1 FROM a LIMIT 10) LIMIT 5";
String explain = _queryEnvironment.explainQuery(query, RANDOM_REQUEST_ID_GEN.nextLong());
assertFalse(explain.contains("fetch=[10]"),
"LimitMerge should drop the wider inner LIMIT=10. Plan:\n" + explain);
assertTrue(explain.contains("fetch=[5]"),
"LimitMerge should keep the tighter outer LIMIT=5. Plan:\n" + explain);
}

@Test
public void testDisableLimitMerge() {
// Disabling the rule must keep both LIMIT nodes (the inner fetch=10 survives).
String query = "EXPLAIN PLAN FOR SELECT col1 FROM (SELECT col1 FROM a LIMIT 10) LIMIT 5";
String explain = explainQueryWithRuleDisabled(query, PlannerRuleNames.LIMIT_MERGE);
assertTrue(explain.contains("fetch=[10]"),
"Without LimitMerge, the inner fetch=[10] must remain. Plan:\n" + explain);
}

@Test
public void testUnionMergeEnabledByDefault() {
// Default-on. A 3-way UNION ALL must be a single n-ary LogicalUnion, not Union(Union(a,b), c).
String query = "EXPLAIN PLAN FOR "
+ "SELECT col1, col2 FROM a UNION ALL "
+ "SELECT col1, col2 FROM b UNION ALL "
+ "SELECT col1, col2 FROM c";
String explain = _queryEnvironment.explainQuery(query, RANDOM_REQUEST_ID_GEN.nextLong());
int unionCount = explain.split("LogicalUnion\\(all=\\[true]\\)", -1).length - 1;
assertEquals(unionCount, 1,
"UnionMerge should collapse nested LogicalUnion to a single n-ary Union. Plan:\n" + explain);
}

@Test
public void testDisableUnionMerge() {
// Disabling the rule preserves the nested Union(Union(a,b),c) shape.
String query = "EXPLAIN PLAN FOR "
+ "SELECT col1, col2 FROM a UNION ALL "
+ "SELECT col1, col2 FROM b UNION ALL "
+ "SELECT col1, col2 FROM c";
String explain = explainQueryWithRuleDisabled(query, PlannerRuleNames.UNION_MERGE);
int unionCount = explain.split("LogicalUnion\\(all=\\[true]\\)", -1).length - 1;
assertEquals(unionCount, 2,
"Without UnionMerge, two LogicalUnion nodes must remain in a nested 3-way UNION ALL. Plan:\n" + explain);
}

@Test
public void testSortRemoveConstantKeysEnabledByDefault() {
// Default-on. ORDER BY pinning a filtered-to-constant column should drop that column from the
// sort key (and from the resulting exchange's hash key).
String query = "EXPLAIN PLAN FOR SELECT col1, col2 FROM a WHERE col1 = 'US' ORDER BY col1, col2";
String explain = _queryEnvironment.explainQuery(query, RANDOM_REQUEST_ID_GEN.nextLong());
assertFalse(explain.contains("sort0=[$0], sort1=[$1]"),
"SortRemoveConstantKeys should drop col1 from the multi-key ORDER BY. Plan:\n" + explain);
}

@Test
public void testDisableSortRemoveConstantKeys() {
// Disabling the rule preserves the multi-key (col1, col2) ORDER BY.
String query = "EXPLAIN PLAN FOR SELECT col1, col2 FROM a WHERE col1 = 'US' ORDER BY col1, col2";
String explain = explainQueryWithRuleDisabled(query, PlannerRuleNames.SORT_REMOVE_CONSTANT_KEYS);
assertTrue(explain.contains("sort0=[$0], sort1=[$1]"),
"Without SortRemoveConstantKeys, the multi-key ORDER BY must remain. Plan:\n" + explain);
}

// NOTE: ProjectAggregateMergeRule has no dedicated unit test. On the query shapes we tested
// (e.g. SELECT col1, total FROM (SELECT col1, SUM(col2) AS total, COUNT(*) AS unused FROM a
// GROUP BY col1)), other Pinot rules already prune the unused aggregate call before
// ProjectAggregateMergeRule gets a chance to fire. The rule is registered defensively in case
// a future query shape evades the existing pruning, but its standalone behavior is not
// observable in current test queries.

@Test
public void testSortProjectTransposeDisabledByDefault() {
// Default-OFF (opt-in). Plan must keep Sort above Project — without the rule the outer
// projection wraps the sort, not the other way around.
String query = "EXPLAIN PLAN FOR SELECT col1 FROM a ORDER BY col1";
String explain = _queryEnvironment.explainQuery(query, RANDOM_REQUEST_ID_GEN.nextLong());
int sortIdx = explain.indexOf("LogicalSort");
int projectIdx = explain.indexOf("LogicalProject");
assertTrue(sortIdx >= 0 && projectIdx > sortIdx,
"Default plan must place Sort above Project. Plan:\n" + explain);
}

@Test
public void testEnableSortProjectTranspose() {
// Opt-in. With the rule enabled the Project bubbles above the Sort so LIMIT can apply before
// projection expressions are evaluated.
String query = "EXPLAIN PLAN FOR SELECT col1 FROM a ORDER BY col1";
String explain = explainQueryWithRuleEnabled(query, PlannerRuleNames.SORT_PROJECT_TRANSPOSE);
int sortIdx = explain.indexOf("LogicalSort");
int projectIdx = explain.indexOf("LogicalProject");
assertTrue(projectIdx >= 0 && sortIdx > projectIdx,
"With SortProjectTranspose enabled, Project must be above Sort. Plan:\n" + explain);
}

@Test
public void testAggregateUnionAggregateDisabledByDefault() {
// Verify that the AggregateUnionAggregateRule is disabled by default
Expand Down
16 changes: 8 additions & 8 deletions pinot-query-planner/src/test/resources/queries/GroupByPlans.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@
"sql": "EXPLAIN PLAN FOR SELECT a.col2, a.col1, SUM(a.col3) FROM a WHERE a.col3 >= 0 AND a.col1 = 'a' GROUP BY a.col1, a.col2",
"output": [
"Execution Plan",
"\nLogicalProject(col2=[$1], col1=[$0], EXPR$2=[$2])",
"\n PinotLogicalAggregate(group=[{0, 1}], agg#0=[$SUM0($2)], aggType=[FINAL])",
"\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n PinotLogicalAggregate(group=[{0, 1}], agg#0=[$SUM0($2)], aggType=[LEAF])",
"\nLogicalProject(col2=[$0], col1=[_UTF-8'a':VARCHAR CHARACTER SET \"UTF-8\"], EXPR$2=[$1])",
"\n PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], aggType=[FINAL])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{1}], agg#0=[$SUM0($2)], aggType=[LEAF])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($0, _UTF-8'a'))])",
"\n PinotLogicalTableScan(table=[[default, a]])",
"\n"
Expand Down Expand Up @@ -180,10 +180,10 @@
"sql": "EXPLAIN PLAN FOR SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ a.col2, a.col1, SUM(a.col3) FROM a WHERE a.col3 >= 0 AND a.col1 = 'a' GROUP BY a.col1, a.col2",
"output": [
"Execution Plan",
"\nLogicalProject(col2=[$1], col1=[$0], EXPR$2=[$2])",
"\n PinotLogicalAggregate(group=[{0, 1}], agg#0=[$SUM0($2)], aggType=[DIRECT])",
"\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\nLogicalProject(col2=[$0], col1=[_UTF-8'a':VARCHAR CHARACTER SET \"UTF-8\"], EXPR$2=[$1])",
"\n PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], aggType=[DIRECT])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($0, _UTF-8'a'))])",
"\n PinotLogicalTableScan(table=[[default, a]])",
"\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
"\n PhysicalTableScan(table=[[default, a]])",
"\n"
]
},{
},
{
"description": "Verify that override for lite mode leaf stage fan-out adjusted limit works.",
"sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; SET liteModeLeafStageFanOutAdjustedLimit=1000; EXPLAIN PLAN FOR SELECT COUNT(*) FROM a WHERE col1 = 'foo'",
"output": [
Expand Down Expand Up @@ -795,11 +796,12 @@
"\n PhysicalFilter(condition=[=($3, 1)])",
"\n PhysicalWindow(window#0=[window(partition {1} order by [2] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
"\n PhysicalSort(sort0=[$2], dir0=[ASC])",
"\n PhysicalAggregate(group=[{0, 1, 2}], aggType=[FINAL])",
"\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalAggregate(group=[{0, 1, 2}], aggType=[LEAF], limit=[100000])",
"\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
"\n PhysicalTableScan(table=[[default, a]])",
"\n PhysicalProject(col1=[_UTF-8'foo':VARCHAR CHARACTER SET \"UTF-8\"], col2=[$0], col3=[$1])",
"\n PhysicalAggregate(group=[{0, 1}], aggType=[FINAL])",
"\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalAggregate(group=[{1, 2}], aggType=[LEAF], limit=[100000])",
"\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
"\n PhysicalTableScan(table=[[default, a]])",
"\n"
]
},
Expand All @@ -809,11 +811,12 @@
"output": [
"Execution Plan",
"\nPhysicalSort(offset=[100], fetch=[400])",
"\n PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT($3)], aggType=[FINAL])",
"\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT()], aggType=[LEAF], limit=[100000])",
"\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
"\n PhysicalTableScan(table=[[default, a]])",
"\n PhysicalProject(col1=[_UTF-8'foo':VARCHAR CHARACTER SET \"UTF-8\"], col2=[$0], col3=[$1], EXPR$3=[$2])",
"\n PhysicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], aggType=[FINAL])",
"\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalAggregate(group=[{1, 2}], agg#0=[COUNT()], aggType=[LEAF], limit=[100000])",
"\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
"\n PhysicalTableScan(table=[[default, a]])",
"\n"
]
},
Expand Down Expand Up @@ -902,11 +905,12 @@
"Execution Plan",
"\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalSort(offset=[100], fetch=[400])",
"\n PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT($3)], aggType=[FINAL])",
"\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT()], aggType=[LEAF], limit=[100000])",
"\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
"\n PhysicalTableScan(table=[[default, a]])",
"\n PhysicalProject(col1=[_UTF-8'foo':VARCHAR CHARACTER SET \"UTF-8\"], col2=[$0], col3=[$1], EXPR$3=[$2])",
"\n PhysicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], aggType=[FINAL])",
"\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalAggregate(group=[{1, 2}], agg#0=[COUNT()], aggType=[LEAF], limit=[100000])",
"\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
"\n PhysicalTableScan(table=[[default, a]])",
"\n"
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@
"sql": "EXPLAIN PLAN FOR SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ a.col2, a.col1, SUM(a.col3) FROM a WHERE a.col3 >= 0 AND a.col1 = 'a' GROUP BY a.col1, a.col2",
"output": [
"Execution Plan",
"\nLogicalProject(col2=[$1], col1=[$0], EXPR$2=[$2])",
"\n PinotLogicalAggregate(group=[{0, 1}], agg#0=[$SUM0($2)], aggType=[DIRECT])",
"\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\nLogicalProject(col2=[$0], col1=[_UTF-8'a':VARCHAR CHARACTER SET \"UTF-8\"], EXPR$2=[$1])",
"\n PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], aggType=[DIRECT])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(>=($2, 0), =($0, _UTF-8'a'))])",
"\n PinotLogicalTableScan(table=[[default, a]])",
"\n"
Expand Down
Loading
Loading