Open
Conversation
update plan update simple count and ph string map update fix update refact update adjust default value fmt
Contributor
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
Contributor
Author
|
run buildall |
There was a problem hiding this comment.
Pull request overview
This PR introduces a new “bucketed hash aggregation” optimization path that fuses local+global aggregation into a single operator for single-BE deployments, along with the required FE/BE plan and pipeline support.
Changes:
- Add a new Thrift plan node type + payload (
BUCKETED_AGGREGATION_NODE/TBucketedAggregationNode) and wire it intoTPlanNode. - Add Nereids physical plan + translation + costing to generate and pick
PhysicalBucketedHashAggregate. - Implement BE pipeline sink/source operators and shared-state to build per-instance bucketed hash tables and merge/output them without exchange.
Reviewed changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| gensrc/thrift/PlanNodes.thrift | Adds new plan node type and Thrift struct for bucketed aggregation |
| fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java | Adds session variables controlling the optimization and thresholds |
| fe/fe-core/src/main/java/org/apache/doris/planner/BucketedAggregationNode.java | Adds legacy planner node that serializes bucketed agg into Thrift |
| fe/fe-core/src/main/java/org/apache/doris/nereids/** | Adds new physical node, visitor hooks, properties, stats, cost model, and implementation rule |
| be/src/exec/pipeline/pipeline_fragment_context.cpp | Creates bucketed agg source/sink pipelines and registers shared state |
| be/src/exec/pipeline/dependency.{h,cpp} | Adds BucketedAggSharedState and cleanup/destroy support |
| be/src/exec/operator/operator.cpp | Registers new bucketed agg pipeline local states |
| be/src/exec/operator/bucketed_aggregation_* | Implements bucketed agg sink/source operators |
| be/src/exec/common/hash_table/hash_map_context.h | Adds reusable output buffer to hash method state |
| be/src/exec/common/agg_utils.h | Factors agg hash-table variants and adds BucketedAggDataVariants |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+2933
to
+2936
| + "消除 Exchange 开销和序列化/反序列化成本。默认关闭。", | ||
| "Whether to enable bucketed hash aggregation optimization. This optimization fuses two-phase " | ||
| + "aggregation into a single operator on single-BE deployments, eliminating exchange overhead " | ||
| + "and serialization/deserialization costs. Disabled by default."}) |
| TBucketedAggregationNode bucketedAggNode = new TBucketedAggregationNode(); | ||
| bucketedAggNode.setGroupingExprs(groupingExprs); | ||
| bucketedAggNode.setAggregateFunctions(aggregateFunctions); | ||
| bucketedAggNode.setIntermediateTupleId(aggInfo.getOutputTupleId().asInt()); |
Comment on lines
+1078
to
+1084
| struct TBucketedAggregationNode { | ||
| 1: optional list<Exprs.TExpr> grouping_exprs | ||
| 2: required list<Exprs.TExpr> aggregate_functions | ||
| 3: required Types.TTupleId intermediate_tuple_id | ||
| 4: required Types.TTupleId output_tuple_id | ||
| 5: required bool need_finalize | ||
| } |
| return ImmutableList.of(); | ||
| } | ||
| // Only for single-BE deployments | ||
| int beNumber = Math.max(1, ctx.getEnv().getClusterInfo().getBackendsNumber(true)); |
Comment on lines
+1436
to
+1437
| op = std::make_shared<BucketedAggSourceOperatorX>(pool, tnode, next_operator_id(), descs); | ||
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
Comment on lines
+1462
to
+1468
| for (int i = 0; i < _num_instances; i++) { | ||
| auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(), | ||
| "BUCKETED_AGG_SINK_DEPENDENCY"); | ||
| sink_dep->set_shared_state(shared_state.get()); | ||
| shared_state->sink_deps.push_back(sink_dep); | ||
| } | ||
| shared_state->create_source_dependencies(_num_instances, op->operator_id(), |
| if (expr instanceof SlotReference) { | ||
| SlotReference slot = (SlotReference) expr; | ||
| if (slot.getOriginalColumn().isPresent()) { | ||
| groupByColumnNames.add(slot.getOriginalColumn().get().getName().toLowerCase()); |
| } | ||
| // All distribution columns must appear in the GROUP BY keys | ||
| for (Column column : distributionColumns) { | ||
| if (!groupByColumnNames.contains(column.getName().toLowerCase())) { |
Contributor
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)