Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,68 @@ private static void assertFullPageOfGroups(JsonNode mainNode, int expectedCount)
}
}

@Test
public void testDistinctWithLimitAndOffsetReturnsFullCardinality()
throws Exception {
// Default-on leaf-limit pushdown for no-aggregate DISTINCT must still honor OFFSET. The planner pushes
// offset + fetch down (the sort-exchange-copy folds offset into the inner sort's fetch), so a paginated DISTINCT
// returns the full requested page, not fetch - offset rows. 'j' has 10 distinct values (0..9), well above n + m.
setUseMultiStageQueryEngine(true);
String table = getTableName();

// Ordered: the returned rows are the global ranks (m+1)..(m+n), i.e. the 3rd, 4th, 5th smallest distinct
// values => 2, 3, 4.
Assert.assertEquals(
toResultStr(postV2Query("select distinct j from " + table + " order by j limit 3 offset 2")),
"\"j\"[\"LONG\"]\n2\n3\n4");
// Control with offset 0 => 0, 1, 2.
Assert.assertEquals(
toResultStr(postV2Query("select distinct j from " + table + " order by j limit 3")),
"\"j\"[\"LONG\"]\n0\n1\n2");

// Unordered: the result set is arbitrary, but the cardinality must be exactly the requested page size (3), and
// every value must be a valid distinct 'j'. Without accounting for the offset this would undercount.
JsonNode rows = postV2Query("select distinct j from " + table + " limit 3 offset 2").get(RESULT_TABLE).get("rows");
Assert.assertEquals(rows.size(), 3, "DISTINCT with LIMIT 3 OFFSET 2 must return a full page of 3 rows");
for (JsonNode row : rows) {
long value = row.get(0).asLong();
Assert.assertTrue(value >= 0 && value <= 9, "unexpected distinct value: " + value);
}
}

@Test
public void testGroupByNoAggregateWithLimitOffsetAndTrimEquivalence()
throws Exception {
// Covers the no-aggregate GROUP BY (non-DISTINCT) path with the default-on leaf-limit pushdown, plus
// result-equivalence between default-on group trim and the explicit opt-out when trim is a no-op.
setUseMultiStageQueryEngine(true);
String table = getTableName();

// No-aggregate GROUP BY col with LIMIT/OFFSET must return the full requested page (same trim machinery as
// DISTINCT). 'j' has 10 distinct values; ordered page (m+1)..(m+n) => 2, 3, 4.
Assert.assertEquals(
toResultStr(postV2Query("select j from " + table + " group by j order by j limit 3 offset 2")),
"\"j\"[\"LONG\"]\n2\n3\n4");

// Unordered no-aggregate GROUP BY: cardinality must be exactly the page size (3), every value a valid 'j'.
JsonNode rows = postV2Query("select j from " + table + " group by j limit 3 offset 2")
.get(RESULT_TABLE).get("rows");
Assert.assertEquals(rows.size(), 3, "GROUP BY without aggregate with LIMIT 3 OFFSET 2 must return a full page");
for (JsonNode row : rows) {
long value = row.get(0).asLong();
Assert.assertTrue(value >= 0 && value <= 9, "unexpected group key: " + value);
}

// When the total number of distinct values ('i' has 4) is below the limit, leaf/final trim is a no-op, so the
// default-on behavior must return exactly the same rows as the explicit opt-out. Order by the key for a
// deterministic comparison.
String defaultOn = toResultStr(postV2Query("select distinct i from " + table + " order by i limit 100"));
String optedOut = toResultStr(postV2Query(
"select /*+ aggOptions(is_enable_group_trim='false') */ distinct i from " + table + " order by i limit 100"));
Assert.assertEquals(defaultOn, optedOut, "default-on trim must match the opt-out when total distinct < limit");
Assert.assertEquals(defaultOn, "\"i\"[\"INT\"]\n0\n1\n2\n3");
}

@Test
public void testOrderByByKeysAndValuesIsPushedToFinalAggregationStage()
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -929,11 +929,7 @@ public void testJsonIndexDistinctSamePathWithLimit(boolean useMultiStageQueryEng

assertEquals(extractOrderedDistinctValues(baselineResponse).size(), 5);
assertEquals(extractOrderedDistinctValues(optimizedResponse).size(), 5);

// TODO: Fix LIMIT push down for MSE
if (!useMultiStageQueryEngine) {
assertEquals(optimizedResponse.get("numEntriesScannedPostFilter").asInt(), 5 * getNumAvroFiles());
}
assertEquals(optimizedResponse.get("numEntriesScannedPostFilter").asInt(), 5 * getNumAvroFiles());
}

/// Cross-path 5-arg form: filter on `$.k2`, extract `$.k1`. `getMatchingFlattenedDocsMap` applies the filter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void onMatch(RelOptRuleCall call) {
Map<String, String> hintOptions =
PinotHintStrategyTable.getHintOptions(aggRel.getHints(), PinotHintOptions.AGGREGATE_HINT_OPTIONS);

if (!isGroupTrimmingEnabled(call, hintOptions)) {
if (!isGroupTrimmingEnabled(call, hintOptions, aggRel)) {
return;
} else if (hintOptions == null) {
hintOptions = Collections.emptyMap();
Expand Down Expand Up @@ -186,7 +186,7 @@ public void onMatch(RelOptRuleCall call) {
Map<String, String> hintOptions =
PinotHintStrategyTable.getHintOptions(aggRel.getHints(), PinotHintOptions.AGGREGATE_HINT_OPTIONS);

if (!isGroupTrimmingEnabled(call, hintOptions)) {
if (!isGroupTrimmingEnabled(call, hintOptions, aggRel)) {
return;
} else if (hintOptions == null) {
hintOptions = Collections.emptyMap();
Expand Down Expand Up @@ -479,14 +479,30 @@ private static List<RexNode> findImmediateProjects(RelNode relNode) {
return null;
}

private static boolean isGroupTrimmingEnabled(RelOptRuleCall call, Map<String, String> hintOptions) {
private static boolean isGroupTrimmingEnabled(RelOptRuleCall call, Map<String, String> hintOptions,
Aggregate aggRel) {
if (hintOptions != null) {
String option = hintOptions.get(PinotHintOptions.AggregateOptions.IS_ENABLE_GROUP_TRIM);
if (option != null) {
// Explicit hint always wins (true or false), for aggregates with AND without aggregate functions.
return Boolean.parseBoolean(option);
}
}

// Group-by WITHOUT aggregate functions (DISTINCT or `GROUP BY col` with no agg calls) can always push the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can push down the limit for the following scenarios:

  • Distinct (no aggregates)
  • Aggregates with order-by and order-by doesn't include aggregates

I don't follow why we cannot push down limit when there are multiple group keys

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current PR we're pushing down limit for distinct and group by without aggregate (with or without order by). We're pushing down limit for the multiple group keys case too - the condition here is to exclude queries with multiple group sets (ROLLUP / CUBE / GROUPING SETS - probably not supported today, but if they are in the future, this push down won't be logically valid for those cases).

// limit/collations down to the leaf stage by default: ORDER BY can only reference group keys, which are fully
// materialized at the leaf, so leaf-level trim is exact (and a plain LIMIT without ORDER BY returns a valid
// subset). This mirrors PinotLogicalAggregateRule (the physical-optimizer path).
// TODO: Consider also enabling this by default for aggregation queries whose ORDER BY references only group keys
// (not aggregate results). The same argument holds there - a group's rank by its key is identical at every
// leaf, so keeping the per-leaf top-K never drops a group that belongs in the global top-N, and the kept
// groups still get their aggregates fully merged at the final stage. It is NOT safe when ORDER BY is over an
// aggregate (partial values rank differently per leaf) or for an unordered limit with aggregates (an
// arbitrarily dropped group would be under-counted).
if (aggRel.getAggCallList().isEmpty()) {
return true;
}

Context genericContext = call.getPlanner().getContext();
if (genericContext != null) {
QueryEnvironment.Config context = genericContext.unwrap(QueryEnvironment.Config.class);
Expand Down
94 changes: 94 additions & 0 deletions pinot-query-planner/src/test/resources/queries/GroupByPlans.json
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,100 @@
"\n PinotLogicalTableScan(table=[[default, a]])",
"\n"
]
},
{
"description": "Distinct with limit pushes limit to leaf and final aggregate by default (no hint)",
"sql": "EXPLAIN PLAN FOR SELECT DISTINCT col1, col2 FROM a WHERE col3 >= 0 LIMIT 10",
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n PinotLogicalAggregate(group=[{0, 1}], aggType=[FINAL], collations=[[]], limit=[10])",
"\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n PinotLogicalAggregate(group=[{0, 1}], aggType=[LEAF], collations=[[]], limit=[10])",
"\n LogicalFilter(condition=[>=($2, 0)])",
"\n PinotLogicalTableScan(table=[[default, a]])",
"\n"
]
},
{
"description": "Group by without aggregate functions with limit pushes limit to leaf and final by default",
"sql": "EXPLAIN PLAN FOR SELECT col1 FROM a GROUP BY col1 LIMIT 10",
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL], collations=[[]], limit=[10])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{0}], aggType=[LEAF], collations=[[]], limit=[10])",
"\n PinotLogicalTableScan(table=[[default, a]])",
"\n"
]
},
{
"description": "Distinct with order by on group key and limit pushes collations and limit to leaf and final by default",
"sql": "EXPLAIN PLAN FOR SELECT DISTINCT col1 FROM a ORDER BY col1 LIMIT 10",
"output": [
"Execution Plan",
"\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0], fetch=[10])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalSort(sort0=[$0], dir0=[ASC], fetch=[10])",
"\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL], collations=[[0]], limit=[10])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{0}], aggType=[LEAF], collations=[[0]], limit=[10])",
"\n PinotLogicalTableScan(table=[[default, a]])",
"\n"
]
},
{
"description": "Group by without aggregate functions with HAVING on the group key still pushes limit (filter applied at leaf before trim)",
"sql": "EXPLAIN PLAN FOR SELECT col3 FROM a GROUP BY col3 HAVING col3 > 5 LIMIT 10",
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n PinotLogicalAggregate(group=[{0}], aggType=[FINAL], collations=[[]], limit=[10])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{2}], aggType=[LEAF], collations=[[]], limit=[10])",
"\n LogicalFilter(condition=[>($2, 5)])",
"\n PinotLogicalTableScan(table=[[default, a]])",
"\n"
]
},
{
"description": "Distinct with limit and offset pushes offset+fetch to leaf and final aggregate by default",
"sql": "EXPLAIN PLAN FOR SELECT DISTINCT col1, col2 FROM a WHERE col3 >= 0 LIMIT 11 OFFSET 10",
"output": [
"Execution Plan",
"\nLogicalSort(offset=[10], fetch=[11])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[21])",
"\n PinotLogicalAggregate(group=[{0, 1}], aggType=[FINAL], collations=[[]], limit=[21])",
"\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n PinotLogicalAggregate(group=[{0, 1}], aggType=[LEAF], collations=[[]], limit=[21])",
"\n LogicalFilter(condition=[>=($2, 0)])",
"\n PinotLogicalTableScan(table=[[default, a]])",
"\n"
]
},
{
"description": "Distinct with limit does not push limit to aggregate when group trim is explicitly disabled via hint",
"sql": "EXPLAIN PLAN FOR SELECT /*+ aggOptions(is_enable_group_trim='false') */ DISTINCT col1, col2 FROM a WHERE col3 >= 0 LIMIT 10",
"output": [
"Execution Plan",
"\nLogicalSort(offset=[0], fetch=[10])",
"\n PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])",
"\n LogicalSort(fetch=[10])",
"\n PinotLogicalAggregate(group=[{0, 1}], aggType=[FINAL])",
"\n PinotLogicalExchange(distribution=[hash[0, 1]])",
"\n PinotLogicalAggregate(group=[{0, 1}], aggType=[LEAF])",
"\n LogicalFilter(condition=[>=($2, 0)])",
"\n PinotLogicalTableScan(table=[[default, a]])",
"\n"
]
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,23 @@
"\n"
]
},
{
"description": "Distinct with limit pushes limit to leaf and final aggregate by default (no hint)",
"sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT DISTINCT col1, col2 FROM a WHERE col3 >= 0 LIMIT 10",
"output": [
"Execution Plan",
"\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalSort(fetch=[10])",
"\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
"\n PhysicalSort(fetch=[10])",
"\n PhysicalAggregate(group=[{0, 1}], aggType=[FINAL], limit=[10])",
"\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1]])",
"\n PhysicalAggregate(group=[{0, 1}], aggType=[LEAF], limit=[10])",
"\n PhysicalFilter(condition=[>=($2, 0)])",
"\n PhysicalTableScan(table=[[default, a]])",
"\n"
]
},
{
"description": "SQL hint based group by optimization with group trim enabled and offset pushes down offset + fetch",
"sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT /*+ aggOptions(is_enable_group_trim='true') */ COUNT(DISTINCT col2) AS cnt FROM a WHERE a.col3 >= 0 GROUP BY col1 ORDER BY cnt DESC LIMIT 10 OFFSET 5",
Expand Down
Loading