Skip to content

[GLUTEN-12187][VL] Port AttachDistributedSequenceExec to Velox backend#12188

Open
baibaichen wants to merge 4 commits into
apache:mainfrom
baibaichen:snt/attach-dist-seq
Open

[GLUTEN-12187][VL] Port AttachDistributedSequenceExec to Velox backend#12188
baibaichen wants to merge 4 commits into
apache:mainfrom
baibaichen:snt/attach-dist-seq

Conversation

@baibaichen
Copy link
Copy Markdown
Contributor

@baibaichen baibaichen commented May 29, 2026

What changes were proposed in this pull request?

Closes #12187.

Adds a Velox implementation of Spark's AttachDistributedSequenceExec (prepends a contiguous Long id column to child output). Used by pandas-on-Spark distributed-sequence index and DataFrame.zipWithIndex.

How is this implemented?

Plan-level

  • New abstract base ColumnarAttachDistributedSequenceBaseExec in gluten-substrait/ with factory from(plan) delegating to the backend API.
  • New offload rule case + validator gate in OffloadSingleNodeRules / Validators.
  • New backend hook genColumnarAttachDistributedSequenceExec on SparkPlanExecApi. Velox override returns the columnar impl; the CH override throws GlutenNotSupportException until that backend is ported.
  • Config: spark.gluten.sql.columnar.attachDistributedSequence (default true) lets users disable the offload.

Velox runtime (ColumnarAttachDistributedSequenceExec)

For >1 partition:

  1. Materialize the child output once via Gluten's existing ColumnarCachedBatchSerializer, persisted at MEMORY_AND_DISK_SER. The cache blob is Velox-native serialization (CachedColumnarBatch) — kryo-friendly and typically much more compact than unsafe-row SER.
  2. Count pass: read CachedColumnarBatch.numRows of partitions [0, numPartitions - 1) — no native deserialization.
  3. Assign pass: convertCachedBatchToColumnarBatch → Velox-native batch → ColumnarBatches.load (zero-copy Arrow C-Data ABI handoff) → prepend one ArrowWritableColumnVector with the id column.

Single-partition queries skip caching entirely (startOffset = 0).

Memory hygiene

  • The base class exposes a doColumnarCleanup() hook called from cleanupResources(). The Velox impl uses it to unpersist the cached RDD when the query finishes, so BlockManager does not hold the serialized batches beyond the operator's lifetime.
  • The persisted RDD is cached behind a synchronized accessor so repeated doExecuteColumnar() calls share a single persist() handle.
  • assignIds wraps the per-batch build in a try/catch that closes the freshly-loaded heavy batch on failure, so a mid-build OOM (e.g. while allocating the id vector) cannot leak Arrow buffers.

Known overhead: cache write/read crosses the heap boundary

Per batch, the cache path does two full data copies:

  • Write: JNI serialize copies the off-heap Velox batch into an on-heap Array[Byte] (CachedColumnarBatch.bytes).
  • Read: JNI deserialize copies the bytes back into a fresh off-heap Velox batch.

This is the price for zipWithIndex's two-pass semantics (count + assign) without re-executing the child plan. The Velox↔Arrow hand-offs elsewhere in the operator are zero-copy ABI transfers and not relevant to this cost.

Alternative considered

We considered the row-mode pipeline Velox → C2R → RDD[InternalRow] cached → R2C. For Gluten that costs a full C2R/R2C transition on every row, and the unsafe-row serialized cache is typically 2–5× larger than Velox-native serialization for wide / nested data. The columnar path keeps everything columnar and pays serialization only once.

How was this patch tested?

New VeloxAttachDistributedSequenceExecSuite in backends-velox.

Does this PR introduce any user-facing change?

Yes — a new config:

  • spark.gluten.sql.columnar.attachDistributedSequence (default true).

When enabled, df.zipWithIndex and pandas-on-Spark distributed-sequence index materialize the id column columnarly on Velox instead of falling back.

@github-actions github-actions Bot added CORE works for Gluten Core VELOX CLICKHOUSE labels May 29, 2026
@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@baibaichen baibaichen force-pushed the snt/attach-dist-seq branch from 0d5f602 to a50a94a Compare May 29, 2026 06:16
@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

Adds a Velox implementation of Spark's AttachDistributedSequenceExec that
prepends a contiguous, globally increasing Long id column to its child
output. Used by pandas-on-Spark distributed-sequence index and
DataFrame.zipWithIndex.

Plan-level
- New abstract base ColumnarAttachDistributedSequenceBaseExec in
  gluten-substrait/, with factory from(plan) delegating to the backend
  API and a doColumnarCleanup() hook called from cleanupResources().
- New offload rule case + validator gate in OffloadSingleNodeRules /
  Validators.
- New backend hook genColumnarAttachDistributedSequenceExec on
  SparkPlanExecApi. Velox override returns the columnar impl; CH
  override throws GlutenNotSupportException until that backend is
  ported.
- Config spark.gluten.sql.columnar.attachDistributedSequence
  (default true) lets users disable the offload.

Velox runtime
- For >1 partition, materialize the child output once via Gluten's
  existing ColumnarCachedBatchSerializer, persisted at
  MEMORY_AND_DISK_SER. The cache blob is Velox-native serialization
  (CachedColumnarBatch), much more compact than unsafe-row SER for
  wide / nested data.
- Count pass reads CachedColumnarBatch.numRows for partitions
  [0, numPartitions - 1) -- no native deserialization required.
- Assign pass: convertCachedBatchToColumnarBatch -> ColumnarBatches.load
  (zero-copy Arrow C-Data ABI handoff) -> prepend one
  ArrowWritableColumnVector with the id column.
- Single-partition queries skip caching entirely.

Memory hygiene
- doColumnarCleanup() unpersists the cached RDD when the query
  finishes so BlockManager does not hold the serialized batches
  beyond the operator's lifetime.
- The persisted RDD is cached behind a synchronized accessor so
  repeated doExecuteColumnar() calls share a single persist() handle.
- assignIds wraps the per-batch build in a try/catch that closes the
  freshly-loaded heavy batch on failure, preventing Arrow buffer
  leaks on mid-build OOM.

Tests
- New VeloxAttachDistributedSequenceExecSuite in backends-velox.

Closes apache#12187

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@baibaichen baibaichen force-pushed the snt/attach-dist-seq branch from a50a94a to fe3bf16 Compare May 29, 2026 06:17
@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@github-actions github-actions Bot added the DOCS label May 29, 2026
The original implementation persisted the child's columnar output via
ColumnarCachedBatchSerializer to avoid re-executing the child plan twice.
That path fails on zero-column batches that can result from column
pruning (e.g. df.select("id") prunes every input column away):
ensureVeloxBatch -> isVeloxBatch -> getIndicatorVector throws because the
batch is neither LIGHT nor HEAVY.

Switch to the simpler vanilla-Spark-style approach (matches the
pandas-on-Spark cache="NONE" option): run the child once to count rows
per partition for [0, numPartitions - 1), then run it again to attach
the id column. One extra child execution; full robustness across
arbitrary projections.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

…t batch

Round-3 (drop-cache) made our output batch flow through OffloadArrowDataExec
when a downstream Velox consumer (e.g. shuffle after repartition) follows
our op. That path calls ColumnarBatches.offload -> getRefCntHeavy, which
asserts every column in the heavy batch shares the same reference count.

Our previous output mixed a freshly allocated id column (refCnt=1) with
retain'd input columns (refCnt=2), tripping the assertion -- which surfaced
as the SPARK-36338 regression in the inherited GlutenDataFrameSuite.

Allocate fresh ArrowWritableColumnVectors for every output column and copy
input values per row via ValueVector.copyFromSafe. All output columns then
have refCnt=1, the input batch is untouched, and the offload path works
regardless of which transition the planner inserts.

Test additions:
- New 'output survives a downstream Velox shuffle (offload path)' test
  reproduces the bug locally (repartition after attach).
- Set spark.sql.ansi.enabled=false in suite sparkConf so the columnar exec
  is actually selected under Spark 4.x where ANSI is default true (Gluten
  falls back the whole plan otherwise).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[VL] Port AttachDistributedSequenceExec to Velox backend

1 participant