[SPARK-56482][SQL][4.3] Enable whole-stage codegen fusion for UnionExec#55724
Open
LuciferYang wants to merge 1 commit intoapache:branch-4.xfrom
Open
[SPARK-56482][SQL][4.3] Enable whole-stage codegen fusion for UnionExec#55724LuciferYang wants to merge 1 commit intoapache:branch-4.xfrom
UnionExec#55724LuciferYang wants to merge 1 commit intoapache:branch-4.xfrom
Conversation
### What changes were proposed in this pull request?
`UnionExec` is made to participate in whole-stage codegen on its standard (non-partitioning-aware) path, so the Union and all its children compile into one `WholeStageCodegenExec` stage instead of each child becoming its own stage.
Main change — `UnionExec` implements `CodegenSupport`:
- `supportCodegen` falls back when any of these hold: flag off, partitioning-aware output, a `UnionExec` anywhere in the subtree, a child subtree that contains a known multi-input-RDD codegen operator (`SortMergeJoinExec`, `ShuffledHashJoinExec`), a child subtree that contains any `Nondeterministic` expression, `children.size` above the configured cap, columnar output, or a projection that cannot codegen.
- `doProduce` emits a `switch` on a per-partition child-index array and dispatches to a helper method per child. Each helper wraps the child's produced code and takes `int partitionIndex` as a parameter, which keeps code local to each helper (important because `ctx.addNewFunction` may spill helpers into a nested class).
- `doConsume` projects each child's output to the Union's output schema (casts inserted where necessary) and forwards to the parent's `consume`.
- `inputRDDs` returns a single `UnionRDD` of the children's input RDDs; the switch uses `UnionPartition.parentRddIndex` to decide which child's helper to call per task.
- `needCopyResult` propagates up if any child needs it; `usedInputs = AttributeSet.empty` (projection happens in `doConsume`).
Support infrastructure:
- `CodegenContext.currentPartitionIndexVar` — indirection so that leaf operators that embed `partitionIndex` (e.g. `RangeExec.initRange`, `SampleExec`'s random-seed and skip-count) read a Union-provided child-local index when fused, and the literal `partitionIndex` otherwise.
- Two new internal SQLConf entries:
- `spark.sql.codegen.union.enabled` — default `false`. This is the initial implementation and we prefer to opt users in rather than flip a default they didn't ask for.
- `spark.sql.codegen.union.maxChildren` — default `64`, enforced `>= 2` (since `EliminateUnions` removes single-child unions during analysis). Only effective when `spark.sql.codegen.union.enabled` is `true`.
- `UnionExec.metrics` exposes `numOutputRows` only when fusion is active; when the flag is off, `UnionExec` has no metrics (matches pre-patch behavior).
- `NUM_CHILDREN` `LogKey` for the fallback `logDebug`.
### Why are the changes needed?
Before this change, a plan like `Union(t1, t2, t3)` always breaks the WSCG boundary: each child compiles into its own generated class and `UnionExec` executes interpreted. That produces N+1 generated classes per Union plus the overhead of stitching interpreted output back into WSCG for the parent. Query shapes that `UNION ALL` several codegen-friendly branches — partitioned fact-table unions in ETL, per-channel rollups, decorrelated sub-plans — pay this cost per invocation. Fusing into one stage compiles once and streams through a single `processNext`.
### Does this PR introduce _any_ user-facing change?
No. The feature is off by default, and both new configs are `.internal()`.
### How was this patch tested?
- `UnionCodegenSuite` covers correctness under codegen, parity against the interpreted path via `assertFlagParity`, every fallback branch in `supportCodegenFailureReason`, projection pushdown, column pruning, metrics, large-N stress, and a regression guard against indirect nested unions (e.g. `Union(Project(Union,...), ...)`). `UnionCodegenAnsiSuite` and `UnionCodegenAqeSuite` (same file) re-run the full suite with ANSI and AQE enabled.
- `SQLMetricsSuite` — the existing `SPARK-25278` regression test is unchanged (passes because `UnionExec.metrics` is empty when fusion is off). One new test verifies `UnionExec.numOutputRows` reports the total row count under fusion.
- Existing Spark codegen and TPC-DS plan-stability suites run unchanged (this PR's `doExecute` behavior is identical to master when the flag is off; with the flag off, TPC-DS goldens are unchanged).
- `UnionBenchmark` — added; 4 scenarios (plain, type widening, per-child ops, Union + downstream aggregate) covering N ∈ {2 .. 1024}. Results will be regenerated via the GHA benchmark workflow.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code
Closes apache#55425 from LuciferYang/SPARK-56482.
Authored-by: YangJie <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
UnionExecis made to participate in whole-stage codegen on its standard (non-partitioning-aware) path, so the Union and all its children compile into oneWholeStageCodegenExecstage instead of each child becoming its own stage.Main change —
UnionExecimplementsCodegenSupport:supportCodegenfalls back when any of these hold: flag off, partitioning-aware output, aUnionExecanywhere in the subtree, a child subtree that contains a known multi-input-RDD codegen operator (SortMergeJoinExec,ShuffledHashJoinExec), a child subtree that contains anyNondeterministicexpression,children.sizeabove the configured cap, columnar output, or a projection that cannot codegen.doProduceemits aswitchon a per-partition child-index array and dispatches to a helper method per child. Each helper wraps the child's produced code and takesint partitionIndexas a parameter, which keeps code local to each helper (important becausectx.addNewFunctionmay spill helpers into a nested class).doConsumeprojects each child's output to the Union's output schema (casts inserted where necessary) and forwards to the parent'sconsume.inputRDDsreturns a singleUnionRDDof the children's input RDDs; the switch usesUnionPartition.parentRddIndexto decide which child's helper to call per task.needCopyResultpropagates up if any child needs it;usedInputs = AttributeSet.empty(projection happens indoConsume).Support infrastructure:
CodegenContext.currentPartitionIndexVar— indirection so that leaf operators that embedpartitionIndex(e.g.RangeExec.initRange,SampleExec's random-seed and skip-count) read a Union-provided child-local index when fused, and the literalpartitionIndexotherwise.spark.sql.codegen.union.enabled— defaultfalse. This is the initial implementation and we prefer to opt users in rather than flip a default they didn't ask for.spark.sql.codegen.union.maxChildren— default64, enforced>= 2(sinceEliminateUnionsremoves single-child unions during analysis). Only effective whenspark.sql.codegen.union.enabledistrue.UnionExec.metricsexposesnumOutputRowsonly when fusion is active; when the flag is off,UnionExechas no metrics (matches pre-patch behavior).NUM_CHILDRENLogKeyfor the fallbacklogDebug.Why are the changes needed?
Before this change, a plan like
Union(t1, t2, t3)always breaks the WSCG boundary: each child compiles into its own generated class andUnionExecexecutes interpreted. That produces N+1 generated classes per Union plus the overhead of stitching interpreted output back into WSCG for the parent. Query shapes thatUNION ALLseveral codegen-friendly branches — partitioned fact-table unions in ETL, per-channel rollups, decorrelated sub-plans — pay this cost per invocation. Fusing into one stage compiles once and streams through a singleprocessNext.Does this PR introduce any user-facing change?
No. The feature is off by default, and both new configs are
.internal().How was this patch tested?
UnionCodegenSuitecovers correctness under codegen, parity against the interpreted path viaassertFlagParity, every fallback branch insupportCodegenFailureReason, projection pushdown, column pruning, metrics, large-N stress, and a regression guard against indirect nested unions (e.g.Union(Project(Union,...), ...)).UnionCodegenAnsiSuiteandUnionCodegenAqeSuite(same file) re-run the full suite with ANSI and AQE enabled.SQLMetricsSuite— the existingSPARK-25278regression test is unchanged (passes becauseUnionExec.metricsis empty when fusion is off). One new test verifiesUnionExec.numOutputRowsreports the total row count under fusion.doExecutebehavior is identical to master when the flag is off; with the flag off, TPC-DS goldens are unchanged).UnionBenchmark— added; 4 scenarios (plain, type widening, per-child ops, Union + downstream aggregate) covering N ∈ {2 .. 1024}. Results will be regenerated via the GHA benchmark workflow.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code