[SPARK-56482][SQL][FOLLOWUP] Simplify UnionExec codegen and narrow partition-index gate#55719
Open
cloud-fan wants to merge 1 commit intoapache:masterfrom
Open
[SPARK-56482][SQL][FOLLOWUP] Simplify UnionExec codegen and narrow partition-index gate#55719cloud-fan wants to merge 1 commit intoapache:masterfrom
cloud-fan wants to merge 1 commit intoapache:masterfrom
Conversation
…rtition-index gate ### What changes were proposed in this pull request? Followup to SPARK-56482 (apache#55425). Two groups of changes to `UnionExec`'s whole-stage codegen path. Code cleanness: - Hoist `metricTerm("numOutputRows")` to `doProduce` and store it on the instance. `doConsume` runs once per child during emission, so the previous code registered the same metric N times in `references[]` for an N-child Union; now once. - Drop the dead `assert` in `perChildProjections` and the duplicate `allChildOutputDataTypesMatch` lazy val. The dataType comparison now has a single source of truth in the `type-mismatch` branch of the gate. - Inline the one-shot `hasAnyPartitionIndexDependentDescendant` lazy val. - Drop the unreachable `case other` in the `UnionPartition` match and replace with `asInstanceOf`. `unionedInputRDD` is built as `new UnionRDD(...)` two lines up, and `getPartitions` only ever returns `UnionPartition[_]`. - Factor `isPlainUnion` helper used by the gate and `doExecute` so the invariant "codegen path matches `sparkContext.union` semantics" lives in one place. - Hoist child-local idx to a `childLocalIdx` local at helper entry. References emitted by `RangeExec`/`SampleExec` now read a plain int instead of re-evaluating `((int[]) refs[K])[partitionIndex]` per use. - Drop the `try/finally` around codegen state restoration. Codegen failure aborts the whole stage, so the restoration is unreachable. Initial review findings: - Narrow `hasPartitionIndexDependentCodegen` to exclude `InputFileName`, `InputFileBlockStart`, and `InputFileBlockLength`. These are `Nondeterministic` but read from `InputFileBlockHolder` (a per-task thread-local) and do not embed `partitionIndex`, so they are safe under fusion. Queries like `SELECT input_file_name() FROM a UNION ALL SELECT input_file_name() FROM b` now fuse. - Add tests for the `partitioning-aware` fallback branch and a positive test for `input_file_name` fusion. ### Why are the changes needed? The cleanups remove accidental complexity in the fused code path: an N-fold metric reference, two duplicated dataType comparisons, an unreachable defensive guard, a per-iteration array deref, and a try/finally that protects against an unreachable case. The gate narrowing turns a missed optimization (file-scan unions) into a fused plan. ### Does this PR introduce _any_ user-facing change? No. `spark.sql.codegen.wholeStage.union.enabled` remains off by default; when on, the new behavior fuses additional plans (file-scan unions with `input_file_name()`) that the previous gate over-rejected. ### How was this patch tested? `UnionCodegenSuite`, `UnionCodegenAnsiSuite`, `UnionCodegenAqeSuite`, and the relevant `SQLMetricsSuite` test all pass. Two tests added: `partitioning-aware union falls back to non-codegen` and `input_file_name child fuses (Nondeterministic but partition-index-free)`. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code Co-authored-by: Isaac
Contributor
Author
|
cc @LuciferYang |
LuciferYang
reviewed
May 7, 2026
| "numOutputRows should be 0 for all-empty union") | ||
| } | ||
| } | ||
|
|
Contributor
There was a problem hiding this comment.
test("SPARK-56482: union with sample children fuses (or falls back) without crashing") {
val a = rangeDF(20).sample(false, 0.5, 1L)
val b = rangeDF(20).sample(false, 0.5, 1L)
val df = a.union(b).filter(col("id") > 0)
df.collect()
assertFlagParity(() => a.union(b).orderBy("id"))
}After this PR, the test above appears to fail
13:47:26.119 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13:47:27.985 ERROR org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: Failed to compile the generated Java code.
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 168, Column 44: Unknown variable or type "childLocalIdx"
at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:13014)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:7199)
at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:236)
at org.codehaus.janino.UnitCompiler$23.visitPackage(UnitCompiler.java:6684)
at org.codehaus.janino.UnitCompiler$23.visitPackage(UnitCompiler.java:6681)
at org.codehaus.janino.Java$Package.accept(Java.java:4627)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6681)
13:47:27.992 ERROR org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private boolean range_initRange_0;
/* 010 */ private long range_nextIndex_0;
/* 011 */ private TaskCont...
[info] - SPARK-56482: union with sample children fuses (or falls back) without crashing *** FAILED *** (1 second, 214 milliseconds)
[info] java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 168, Column 44: Failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 168, Column 44: Unknown variable or type "childLocalIdx"
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Followup to SPARK-56482 (#55425). Two groups of changes to
UnionExec's whole-stage codegen path.Code cleanness:
metricTerm("numOutputRows")todoProduceand store it on the instance.doConsumeruns once per child during emission, so the previous code registered the same metric N times inreferences[]for an N-child Union; now once.assertinperChildProjectionsand the duplicateallChildOutputDataTypesMatchlazy val. The dataType comparison now has a single source of truth in thetype-mismatchbranch of the gate.hasAnyPartitionIndexDependentDescendantlazy val.case otherin theUnionPartitionmatch and replace withasInstanceOf.unionedInputRDDis built asnew UnionRDD(...)two lines up, andgetPartitionsonly ever returnsUnionPartition[_].isPlainUnionhelper used by the gate anddoExecuteso the invariant "codegen path matchessparkContext.unionsemantics" lives in one place.childLocalIdxlocal at helper entry. References emitted byRangeExec/SampleExecnow read a plain int instead of re-evaluating((int[]) refs[K])[partitionIndex]per use.try/finallyaround codegen state restoration. Codegen failure aborts the whole stage, so the restoration is unreachable.Gate narrowing:
hasPartitionIndexDependentCodegento excludeInputFileName,InputFileBlockStart, andInputFileBlockLength. These areNondeterministicbut read fromInputFileBlockHolder(a per-task thread-local) and do not embedpartitionIndex, so they are safe under fusion. Queries likeSELECT input_file_name() FROM a UNION ALL SELECT input_file_name() FROM bnow fuse.Why are the changes needed?
The cleanups remove accidental complexity in the fused code path: an N-fold metric reference, two duplicated dataType comparisons, an unreachable defensive guard, a per-iteration array deref, and a
try/finallythat protects against an unreachable case. The gate narrowing turns a missed optimization (file-scan unions) into a fused plan.Does this PR introduce any user-facing change?
No.
spark.sql.codegen.wholeStage.union.enabledremains off by default; when on, the new behavior fuses additional plans (file-scan unions withinput_file_name()) that the previous gate over-rejected.How was this patch tested?
UnionCodegenSuite,UnionCodegenAnsiSuite,UnionCodegenAqeSuite, and the relevantSQLMetricsSuitetest all pass. Two tests added:partitioning-aware union falls back to non-codegen— covers asupportCodegenFailureReasonbranch that lacked explicit coverage.input_file_name child fuses (Nondeterministic but partition-index-free)— validates the gate narrowing.The
columnarfallback branch is not covered by a new test: reliably constructing a plan whereUnion.supportsColumnaris true via the user-facing API turned out to be brittle, sinceApplyColumnarRulesAndInsertTransitionsaggressively rebalances columnar/row transitions.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code