Skip to content

[SPARK-56482][SQL][4.2] Enable whole-stage codegen fusion for UnionExec#55725

Open
LuciferYang wants to merge 1 commit intoapache:branch-4.2from
LuciferYang:SPARK-56482-4.2
Open

[SPARK-56482][SQL][4.2] Enable whole-stage codegen fusion for UnionExec#55725
LuciferYang wants to merge 1 commit intoapache:branch-4.2from
LuciferYang:SPARK-56482-4.2

Conversation

@LuciferYang
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

UnionExec is made to participate in whole-stage codegen on its standard (non-partitioning-aware) path, so the Union and all its children compile into one WholeStageCodegenExec stage instead of each child becoming its own stage.

Main change — UnionExec implements CodegenSupport:

  • supportCodegen falls back when any of these hold: flag off, partitioning-aware output, a UnionExec anywhere in the subtree, a child subtree that contains a known multi-input-RDD codegen operator (SortMergeJoinExec, ShuffledHashJoinExec), a child subtree that contains any Nondeterministic expression, children.size above the configured cap, columnar output, or a projection that cannot codegen.
  • doProduce emits a switch on a per-partition child-index array and dispatches to a helper method per child. Each helper wraps the child's produced code and takes int partitionIndex as a parameter, which keeps code local to each helper (important because ctx.addNewFunction may spill helpers into a nested class).
  • doConsume projects each child's output to the Union's output schema (casts inserted where necessary) and forwards to the parent's consume.
  • inputRDDs returns a single UnionRDD of the children's input RDDs; the switch uses UnionPartition.parentRddIndex to decide which child's helper to call per task.
  • needCopyResult propagates up if any child needs it; usedInputs = AttributeSet.empty (projection happens in doConsume).

Support infrastructure:

  • CodegenContext.currentPartitionIndexVar — indirection so that leaf operators that embed partitionIndex (e.g. RangeExec.initRange, SampleExec's random-seed and skip-count) read a Union-provided child-local index when fused, and the literal partitionIndex otherwise.
  • Two new internal SQLConf entries:
    • spark.sql.codegen.union.enabled — default false. This is the initial implementation and we prefer to opt users in rather than flip a default they didn't ask for.
    • spark.sql.codegen.union.maxChildren — default 64, enforced >= 2 (since EliminateUnions removes single-child unions during analysis). Only effective when spark.sql.codegen.union.enabled is true.
  • UnionExec.metrics exposes numOutputRows only when fusion is active; when the flag is off, UnionExec has no metrics (matches pre-patch behavior).
  • NUM_CHILDREN LogKey for the fallback logDebug.

Why are the changes needed?

Before this change, a plan like Union(t1, t2, t3) always breaks the WSCG boundary: each child compiles into its own generated class and UnionExec executes interpreted. That produces N+1 generated classes per Union plus the overhead of stitching interpreted output back into WSCG for the parent. Query shapes that UNION ALL several codegen-friendly branches — partitioned fact-table unions in ETL, per-channel rollups, decorrelated sub-plans — pay this cost per invocation. Fusing into one stage compiles once and streams through a single processNext.

Does this PR introduce any user-facing change?

No. The feature is off by default, and both new configs are .internal().

How was this patch tested?

  • UnionCodegenSuite covers correctness under codegen, parity against the interpreted path via assertFlagParity, every fallback branch in supportCodegenFailureReason, projection pushdown, column pruning, metrics, large-N stress, and a regression guard against indirect nested unions (e.g. Union(Project(Union,...), ...)). UnionCodegenAnsiSuite and UnionCodegenAqeSuite (same file) re-run the full suite with ANSI and AQE enabled.
  • SQLMetricsSuite — the existing SPARK-25278 regression test is unchanged (passes because UnionExec.metrics is empty when fusion is off). One new test verifies UnionExec.numOutputRows reports the total row count under fusion.
  • Existing Spark codegen and TPC-DS plan-stability suites run unchanged (this PR's doExecute behavior is identical to master when the flag is off; with the flag off, TPC-DS goldens are unchanged).
  • UnionBenchmark — added; 4 scenarios (plain, type widening, per-child ops, Union + downstream aggregate) covering N ∈ {2 .. 1024}. Results will be regenerated via the GHA benchmark workflow.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code

### What changes were proposed in this pull request?

`UnionExec` is made to participate in whole-stage codegen on its standard (non-partitioning-aware) path, so the Union and all its children compile into one `WholeStageCodegenExec` stage instead of each child becoming its own stage.

Main change — `UnionExec` implements `CodegenSupport`:

- `supportCodegen` falls back when any of these hold: flag off, partitioning-aware output, a `UnionExec` anywhere in the subtree, a child subtree that contains a known multi-input-RDD codegen operator (`SortMergeJoinExec`, `ShuffledHashJoinExec`), a child subtree that contains any `Nondeterministic` expression, `children.size` above the configured cap, columnar output, or a projection that cannot codegen.
- `doProduce` emits a `switch` on a per-partition child-index array and dispatches to a helper method per child. Each helper wraps the child's produced code and takes `int partitionIndex` as a parameter, which keeps code local to each helper (important because `ctx.addNewFunction` may spill helpers into a nested class).
- `doConsume` projects each child's output to the Union's output schema (casts inserted where necessary) and forwards to the parent's `consume`.
- `inputRDDs` returns a single `UnionRDD` of the children's input RDDs; the switch uses `UnionPartition.parentRddIndex` to decide which child's helper to call per task.
- `needCopyResult` propagates up if any child needs it; `usedInputs = AttributeSet.empty` (projection happens in `doConsume`).

Support infrastructure:

- `CodegenContext.currentPartitionIndexVar` — indirection so that leaf operators that embed `partitionIndex` (e.g. `RangeExec.initRange`, `SampleExec`'s random-seed and skip-count) read a Union-provided child-local index when fused, and the literal `partitionIndex` otherwise.
- Two new internal SQLConf entries:
  - `spark.sql.codegen.union.enabled` — default `false`. This is the initial implementation and we prefer to opt users in rather than flip a default they didn't ask for.
  - `spark.sql.codegen.union.maxChildren` — default `64`, enforced `>= 2` (since `EliminateUnions` removes single-child unions during analysis). Only effective when `spark.sql.codegen.union.enabled` is `true`.
- `UnionExec.metrics` exposes `numOutputRows` only when fusion is active; when the flag is off, `UnionExec` has no metrics (matches pre-patch behavior).
- `NUM_CHILDREN` `LogKey` for the fallback `logDebug`.

### Why are the changes needed?
Before this change, a plan like `Union(t1, t2, t3)` always breaks the WSCG boundary: each child compiles into its own generated class and `UnionExec` executes interpreted. That produces N+1 generated classes per Union plus the overhead of stitching interpreted output back into WSCG for the parent. Query shapes that `UNION ALL` several codegen-friendly branches — partitioned fact-table unions in ETL, per-channel rollups, decorrelated sub-plans — pay this cost per invocation. Fusing into one stage compiles once and streams through a single `processNext`.

### Does this PR introduce _any_ user-facing change?
No. The feature is off by default, and both new configs are `.internal()`.

### How was this patch tested?
- `UnionCodegenSuite` covers correctness under codegen, parity against the interpreted path via `assertFlagParity`, every fallback branch in `supportCodegenFailureReason`, projection pushdown, column pruning, metrics, large-N stress, and a regression guard against indirect nested unions (e.g. `Union(Project(Union,...), ...)`). `UnionCodegenAnsiSuite` and `UnionCodegenAqeSuite` (same file) re-run the full suite with ANSI and AQE enabled.
- `SQLMetricsSuite` — the existing `SPARK-25278` regression test is unchanged (passes because `UnionExec.metrics` is empty when fusion is off). One new test verifies `UnionExec.numOutputRows` reports the total row count under fusion.
- Existing Spark codegen and TPC-DS plan-stability suites run unchanged (this PR's `doExecute` behavior is identical to master when the flag is off; with the flag off, TPC-DS goldens are unchanged).
- `UnionBenchmark` — added; 4 scenarios (plain, type widening, per-child ops, Union + downstream aggregate) covering N ∈ {2 .. 1024}. Results will be regenerated via the GHA benchmark workflow.

### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code

Closes apache#55425 from LuciferYang/SPARK-56482.

Authored-by: YangJie <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant