Skip to content

Dev 0319 3#61495

Open
BiteTheDDDDt wants to merge 2 commits intoapache:masterfrom
BiteTheDDDDt:dev_0319_3
Open

Dev 0319 3#61495
BiteTheDDDDt wants to merge 2 commits intoapache:masterfrom
BiteTheDDDDt:dev_0319_3

Conversation

@BiteTheDDDDt
Copy link
Contributor

What problem does this PR solve?

Issue Number: close #xxx

Related PR: #xxx

Problem Summary:

Release note

None

Check List (For Author)

  • Test

    • Regression test
    • Unit Test
    • Manual test (add detailed scripts or steps below)
    • No need to test or manual test. Explain why:
      • This is a refactor/code format and no logic has been changed.
      • Previous test can cover this change.
      • No code files have been changed.
      • Other reason
  • Behavior changed:

    • No.
    • Yes.
  • Does this need documentation?

    • No.
    • Yes.

Check List (For Reviewer who merge this PR)

  • Confirm the release note
  • Confirm test cases
  • Confirm document
  • Add branch pick label

update plan

update simple count and ph string map

update fix

update refact

update

adjust default value

fmt
Copilot AI review requested due to automatic review settings March 18, 2026 16:06
@Thearas
Copy link
Contributor

Thearas commented Mar 18, 2026

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@BiteTheDDDDt
Copy link
Contributor Author

run buildall

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new “bucketed hash aggregation” optimization path that fuses local+global aggregation into a single operator for single-BE deployments, along with the required FE/BE plan and pipeline support.

Changes:

  • Add a new Thrift plan node type + payload (BUCKETED_AGGREGATION_NODE / TBucketedAggregationNode) and wire it into TPlanNode.
  • Add Nereids physical plan + translation + costing to generate and pick PhysicalBucketedHashAggregate.
  • Implement BE pipeline sink/source operators and shared-state to build per-instance bucketed hash tables and merge/output them without exchange.

Reviewed changes

Copilot reviewed 25 out of 25 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
gensrc/thrift/PlanNodes.thrift Adds new plan node type and Thrift struct for bucketed aggregation
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java Adds session variables controlling the optimization and thresholds
fe/fe-core/src/main/java/org/apache/doris/planner/BucketedAggregationNode.java Adds legacy planner node that serializes bucketed agg into Thrift
fe/fe-core/src/main/java/org/apache/doris/nereids/** Adds new physical node, visitor hooks, properties, stats, cost model, and implementation rule
be/src/exec/pipeline/pipeline_fragment_context.cpp Creates bucketed agg source/sink pipelines and registers shared state
be/src/exec/pipeline/dependency.{h,cpp} Adds BucketedAggSharedState and cleanup/destroy support
be/src/exec/operator/operator.cpp Registers new bucketed agg pipeline local states
be/src/exec/operator/bucketed_aggregation_* Implements bucketed agg sink/source operators
be/src/exec/common/hash_table/hash_map_context.h Adds reusable output buffer to hash method state
be/src/exec/common/agg_utils.h Factors agg hash-table variants and adds BucketedAggDataVariants

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +2933 to +2936
+ "消除 Exchange 开销和序列化/反序列化成本。默认关闭。",
"Whether to enable bucketed hash aggregation optimization. This optimization fuses two-phase "
+ "aggregation into a single operator on single-BE deployments, eliminating exchange overhead "
+ "and serialization/deserialization costs. Disabled by default."})
TBucketedAggregationNode bucketedAggNode = new TBucketedAggregationNode();
bucketedAggNode.setGroupingExprs(groupingExprs);
bucketedAggNode.setAggregateFunctions(aggregateFunctions);
bucketedAggNode.setIntermediateTupleId(aggInfo.getOutputTupleId().asInt());
Comment on lines +1078 to +1084
struct TBucketedAggregationNode {
1: optional list<Exprs.TExpr> grouping_exprs
2: required list<Exprs.TExpr> aggregate_functions
3: required Types.TTupleId intermediate_tuple_id
4: required Types.TTupleId output_tuple_id
5: required bool need_finalize
}
return ImmutableList.of();
}
// Only for single-BE deployments
int beNumber = Math.max(1, ctx.getEnv().getClusterInfo().getBackendsNumber(true));
Comment on lines +1436 to +1437
op = std::make_shared<BucketedAggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
Comment on lines +1462 to +1468
for (int i = 0; i < _num_instances; i++) {
auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
"BUCKETED_AGG_SINK_DEPENDENCY");
sink_dep->set_shared_state(shared_state.get());
shared_state->sink_deps.push_back(sink_dep);
}
shared_state->create_source_dependencies(_num_instances, op->operator_id(),
if (expr instanceof SlotReference) {
SlotReference slot = (SlotReference) expr;
if (slot.getOriginalColumn().isPresent()) {
groupByColumnNames.add(slot.getOriginalColumn().get().getName().toLowerCase());
}
// All distribution columns must appear in the GROUP BY keys
for (Column column : distributionColumns) {
if (!groupByColumnNames.contains(column.getName().toLowerCase())) {
@hello-stephen
Copy link
Contributor

BE Regression && UT Coverage Report

Increment line coverage 100% (0/0) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 73.11% (26915/36814)
Line Coverage 56.66% (288714/509559)
Region Coverage 53.86% (239706/445048)
Branch Coverage 55.60% (103754/186611)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants