[GLUTEN-12187][VL] Port AttachDistributedSequenceExec to Velox backend#12188
Open
baibaichen wants to merge 4 commits into
Open
[GLUTEN-12187][VL] Port AttachDistributedSequenceExec to Velox backend#12188baibaichen wants to merge 4 commits into
baibaichen wants to merge 4 commits into
Conversation
|
Run Gluten Clickhouse CI on x86 |
0d5f602 to
a50a94a
Compare
|
Run Gluten Clickhouse CI on x86 |
Adds a Velox implementation of Spark's AttachDistributedSequenceExec that prepends a contiguous, globally increasing Long id column to its child output. Used by pandas-on-Spark distributed-sequence index and DataFrame.zipWithIndex. Plan-level - New abstract base ColumnarAttachDistributedSequenceBaseExec in gluten-substrait/, with factory from(plan) delegating to the backend API and a doColumnarCleanup() hook called from cleanupResources(). - New offload rule case + validator gate in OffloadSingleNodeRules / Validators. - New backend hook genColumnarAttachDistributedSequenceExec on SparkPlanExecApi. Velox override returns the columnar impl; CH override throws GlutenNotSupportException until that backend is ported. - Config spark.gluten.sql.columnar.attachDistributedSequence (default true) lets users disable the offload. Velox runtime - For >1 partition, materialize the child output once via Gluten's existing ColumnarCachedBatchSerializer, persisted at MEMORY_AND_DISK_SER. The cache blob is Velox-native serialization (CachedColumnarBatch), much more compact than unsafe-row SER for wide / nested data. - Count pass reads CachedColumnarBatch.numRows for partitions [0, numPartitions - 1) -- no native deserialization required. - Assign pass: convertCachedBatchToColumnarBatch -> ColumnarBatches.load (zero-copy Arrow C-Data ABI handoff) -> prepend one ArrowWritableColumnVector with the id column. - Single-partition queries skip caching entirely. Memory hygiene - doColumnarCleanup() unpersists the cached RDD when the query finishes so BlockManager does not hold the serialized batches beyond the operator's lifetime. - The persisted RDD is cached behind a synchronized accessor so repeated doExecuteColumnar() calls share a single persist() handle. - assignIds wraps the per-batch build in a try/catch that closes the freshly-loaded heavy batch on failure, preventing Arrow buffer leaks on mid-build OOM. Tests - New VeloxAttachDistributedSequenceExecSuite in backends-velox. Closes apache#12187 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
a50a94a to
fe3bf16
Compare
|
Run Gluten Clickhouse CI on x86 |
…tedSequence config
|
Run Gluten Clickhouse CI on x86 |
The original implementation persisted the child's columnar output via
ColumnarCachedBatchSerializer to avoid re-executing the child plan twice.
That path fails on zero-column batches that can result from column
pruning (e.g. df.select("id") prunes every input column away):
ensureVeloxBatch -> isVeloxBatch -> getIndicatorVector throws because the
batch is neither LIGHT nor HEAVY.
Switch to the simpler vanilla-Spark-style approach (matches the
pandas-on-Spark cache="NONE" option): run the child once to count rows
per partition for [0, numPartitions - 1), then run it again to attach
the id column. One extra child execution; full robustness across
arbitrary projections.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Run Gluten Clickhouse CI on x86 |
…t batch Round-3 (drop-cache) made our output batch flow through OffloadArrowDataExec when a downstream Velox consumer (e.g. shuffle after repartition) follows our op. That path calls ColumnarBatches.offload -> getRefCntHeavy, which asserts every column in the heavy batch shares the same reference count. Our previous output mixed a freshly allocated id column (refCnt=1) with retain'd input columns (refCnt=2), tripping the assertion -- which surfaced as the SPARK-36338 regression in the inherited GlutenDataFrameSuite. Allocate fresh ArrowWritableColumnVectors for every output column and copy input values per row via ValueVector.copyFromSafe. All output columns then have refCnt=1, the input batch is untouched, and the offload path works regardless of which transition the planner inserts. Test additions: - New 'output survives a downstream Velox shuffle (offload path)' test reproduces the bug locally (repartition after attach). - Set spark.sql.ansi.enabled=false in suite sparkConf so the columnar exec is actually selected under Spark 4.x where ANSI is default true (Gluten falls back the whole plan otherwise). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Run Gluten Clickhouse CI on x86 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Closes #12187.
Adds a Velox implementation of Spark's
AttachDistributedSequenceExec(prepends a contiguousLongid column to child output). Used by pandas-on-Spark distributed-sequence index andDataFrame.zipWithIndex.How is this implemented?
Plan-level
ColumnarAttachDistributedSequenceBaseExecingluten-substrait/with factoryfrom(plan)delegating to the backend API.OffloadSingleNodeRules/Validators.genColumnarAttachDistributedSequenceExeconSparkPlanExecApi. Velox override returns the columnar impl; the CH override throwsGlutenNotSupportExceptionuntil that backend is ported.spark.gluten.sql.columnar.attachDistributedSequence(defaulttrue) lets users disable the offload.Velox runtime (
ColumnarAttachDistributedSequenceExec)For >1 partition:
ColumnarCachedBatchSerializer, persisted atMEMORY_AND_DISK_SER. The cache blob is Velox-native serialization (CachedColumnarBatch) — kryo-friendly and typically much more compact than unsafe-row SER.CachedColumnarBatch.numRowsof partitions[0, numPartitions - 1)— no native deserialization.convertCachedBatchToColumnarBatch→ Velox-native batch →ColumnarBatches.load(zero-copy Arrow C-Data ABI handoff) → prepend oneArrowWritableColumnVectorwith the id column.Single-partition queries skip caching entirely (
startOffset = 0).Memory hygiene
doColumnarCleanup()hook called fromcleanupResources(). The Velox impl uses it tounpersistthe cached RDD when the query finishes, so BlockManager does not hold the serialized batches beyond the operator's lifetime.synchronizedaccessor so repeateddoExecuteColumnar()calls share a singlepersist()handle.assignIdswraps the per-batch build in atry/catchthat closes the freshly-loaded heavy batch on failure, so a mid-build OOM (e.g. while allocating the id vector) cannot leak Arrow buffers.Known overhead: cache write/read crosses the heap boundary
Per batch, the cache path does two full data copies:
serializecopies the off-heap Velox batch into an on-heapArray[Byte](CachedColumnarBatch.bytes).deserializecopies the bytes back into a fresh off-heap Velox batch.This is the price for
zipWithIndex's two-pass semantics (count + assign) without re-executing the child plan. The Velox↔Arrow hand-offs elsewhere in the operator are zero-copy ABI transfers and not relevant to this cost.Alternative considered
We considered the row-mode pipeline
Velox → C2R → RDD[InternalRow] cached → R2C. For Gluten that costs a full C2R/R2C transition on every row, and the unsafe-row serialized cache is typically 2–5× larger than Velox-native serialization for wide / nested data. The columnar path keeps everything columnar and pays serialization only once.How was this patch tested?
New
VeloxAttachDistributedSequenceExecSuiteinbackends-velox.Does this PR introduce any user-facing change?
Yes — a new config:
spark.gluten.sql.columnar.attachDistributedSequence(defaulttrue).When enabled,
df.zipWithIndexand pandas-on-Sparkdistributed-sequenceindex materialize the id column columnarly on Velox instead of falling back.