Skip to content

[SPARK-56482][SQL][FOLLOWUP] Simplify UnionExec codegen and narrow partition-index gate#55719

Open
cloud-fan wants to merge 1 commit intoapache:masterfrom
cloud-fan:SPARK-56482-followup
Open

[SPARK-56482][SQL][FOLLOWUP] Simplify UnionExec codegen and narrow partition-index gate#55719
cloud-fan wants to merge 1 commit intoapache:masterfrom
cloud-fan:SPARK-56482-followup

Conversation

@cloud-fan
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Followup to SPARK-56482 (#55425). Two groups of changes to UnionExec's whole-stage codegen path.

Code cleanness:

  • Hoist metricTerm("numOutputRows") to doProduce and store it on the instance. doConsume runs once per child during emission, so the previous code registered the same metric N times in references[] for an N-child Union; now once.
  • Drop the dead assert in perChildProjections and the duplicate allChildOutputDataTypesMatch lazy val. The dataType comparison now has a single source of truth in the type-mismatch branch of the gate.
  • Inline the one-shot hasAnyPartitionIndexDependentDescendant lazy val.
  • Drop the unreachable case other in the UnionPartition match and replace with asInstanceOf. unionedInputRDD is built as new UnionRDD(...) two lines up, and getPartitions only ever returns UnionPartition[_].
  • Factor isPlainUnion helper used by the gate and doExecute so the invariant "codegen path matches sparkContext.union semantics" lives in one place.
  • Hoist the child-local idx to a childLocalIdx local at helper entry. References emitted by RangeExec/SampleExec now read a plain int instead of re-evaluating ((int[]) refs[K])[partitionIndex] per use.
  • Drop the try/finally around codegen state restoration. Codegen failure aborts the whole stage, so the restoration is unreachable.

Gate narrowing:

  • Narrow hasPartitionIndexDependentCodegen to exclude InputFileName, InputFileBlockStart, and InputFileBlockLength. These are Nondeterministic but read from InputFileBlockHolder (a per-task thread-local) and do not embed partitionIndex, so they are safe under fusion. Queries like SELECT input_file_name() FROM a UNION ALL SELECT input_file_name() FROM b now fuse.

Why are the changes needed?

The cleanups remove accidental complexity in the fused code path: an N-fold metric reference, two duplicated dataType comparisons, an unreachable defensive guard, a per-iteration array deref, and a try/finally that protects against an unreachable case. The gate narrowing turns a missed optimization (file-scan unions) into a fused plan.

Does this PR introduce any user-facing change?

No. spark.sql.codegen.wholeStage.union.enabled remains off by default; when on, the new behavior fuses additional plans (file-scan unions with input_file_name()) that the previous gate over-rejected.

How was this patch tested?

UnionCodegenSuite, UnionCodegenAnsiSuite, UnionCodegenAqeSuite, and the relevant SQLMetricsSuite test all pass. Two tests added:

  • partitioning-aware union falls back to non-codegen — covers a supportCodegenFailureReason branch that lacked explicit coverage.
  • input_file_name child fuses (Nondeterministic but partition-index-free) — validates the gate narrowing.

The columnar fallback branch is not covered by a new test: reliably constructing a plan where Union.supportsColumnar is true via the user-facing API turned out to be brittle, since ApplyColumnarRulesAndInsertTransitions aggressively rebalances columnar/row transitions.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code

…rtition-index gate

### What changes were proposed in this pull request?

Followup to SPARK-56482 (apache#55425). Two groups of changes to `UnionExec`'s
whole-stage codegen path.

Code cleanness:

- Hoist `metricTerm("numOutputRows")` to `doProduce` and store it on the
  instance. `doConsume` runs once per child during emission, so the
  previous code registered the same metric N times in `references[]` for
  an N-child Union; now once.
- Drop the dead `assert` in `perChildProjections` and the duplicate
  `allChildOutputDataTypesMatch` lazy val. The dataType comparison now
  has a single source of truth in the `type-mismatch` branch of the gate.
- Inline the one-shot `hasAnyPartitionIndexDependentDescendant` lazy val.
- Drop the unreachable `case other` in the `UnionPartition` match and
  replace with `asInstanceOf`. `unionedInputRDD` is built as
  `new UnionRDD(...)` two lines up, and `getPartitions` only ever returns
  `UnionPartition[_]`.
- Factor `isPlainUnion` helper used by the gate and `doExecute` so the
  invariant "codegen path matches `sparkContext.union` semantics" lives
  in one place.
- Hoist child-local idx to a `childLocalIdx` local at helper entry.
  References emitted by `RangeExec`/`SampleExec` now read a plain int
  instead of re-evaluating `((int[]) refs[K])[partitionIndex]` per use.
- Drop the `try/finally` around codegen state restoration. Codegen
  failure aborts the whole stage, so the restoration is unreachable.

Initial review findings:

- Narrow `hasPartitionIndexDependentCodegen` to exclude `InputFileName`,
  `InputFileBlockStart`, and `InputFileBlockLength`. These are
  `Nondeterministic` but read from `InputFileBlockHolder` (a per-task
  thread-local) and do not embed `partitionIndex`, so they are safe
  under fusion. Queries like
  `SELECT input_file_name() FROM a UNION ALL SELECT input_file_name() FROM b`
  now fuse.
- Add tests for the `partitioning-aware` fallback branch and a positive
  test for `input_file_name` fusion.

### Why are the changes needed?

The cleanups remove accidental complexity in the fused code path: an
N-fold metric reference, two duplicated dataType comparisons, an
unreachable defensive guard, a per-iteration array deref, and a
try/finally that protects against an unreachable case. The gate
narrowing turns a missed optimization (file-scan unions) into a fused
plan.

### Does this PR introduce _any_ user-facing change?

No. `spark.sql.codegen.wholeStage.union.enabled` remains off by default;
when on, the new behavior fuses additional plans (file-scan unions with
`input_file_name()`) that the previous gate over-rejected.

### How was this patch tested?

`UnionCodegenSuite`, `UnionCodegenAnsiSuite`, `UnionCodegenAqeSuite`, and
the relevant `SQLMetricsSuite` test all pass. Two tests added:
`partitioning-aware union falls back to non-codegen` and
`input_file_name child fuses (Nondeterministic but partition-index-free)`.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code

Co-authored-by: Isaac
@cloud-fan
Copy link
Copy Markdown
Contributor Author

cc @LuciferYang

"numOutputRows should be 0 for all-empty union")
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 test("SPARK-56482: union with sample children fuses (or falls back) without crashing") {
      val a = rangeDF(20).sample(false, 0.5, 1L)
      val b = rangeDF(20).sample(false, 0.5, 1L)
      val df = a.union(b).filter(col("id") > 0)
      df.collect()
      assertFlagParity(() => a.union(b).orderBy("id"))
    }

After this PR, the test above appears to fail

13:47:26.119 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13:47:27.985 ERROR org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: Failed to compile the generated Java code.
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 168, Column 44: Unknown variable or type "childLocalIdx"
        at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:13014)
        at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:7199)
        at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:236)
        at org.codehaus.janino.UnitCompiler$23.visitPackage(UnitCompiler.java:6684)
        at org.codehaus.janino.UnitCompiler$23.visitPackage(UnitCompiler.java:6681)
        at org.codehaus.janino.Java$Package.accept(Java.java:4627)
        at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6681)
13:47:27.992 ERROR org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean range_initRange_0;
/* 010 */   private long range_nextIndex_0;
/* 011 */   private TaskCont...
[info] - SPARK-56482: union with sample children fuses (or falls back) without crashing *** FAILED *** (1 second, 214 milliseconds)
[info]   java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 168, Column 44: Failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 168, Column 44: Unknown variable or type "childLocalIdx"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants