Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions common/src/main/scala/org/apache/comet/vector/CDataUtil.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.vector

import org.apache.arrow.c.{ArrowArray, ArrowImporter, ArrowSchema, CDataDictionaryProvider}
import org.apache.arrow.memory.BufferAllocator
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.comet.CometArrowAllocator

/**
* Import-only C Data Interface bridge for Comet's shaded Arrow side.
*
* The caller (in the spark module) provides an export callback that fills pre-allocated
* ArrowArray/ArrowSchema structs at given memory addresses. This object allocates those structs
* using the shaded allocator and imports the resulting vectors as CometVectors.
*
* This design eliminates the need for reflection to cross the shading boundary: the spark module
* calls unshaded Arrow directly, and the common module calls shaded Arrow directly. The two sides
* communicate through Long memory addresses only.
*/
object CDataUtil {

/**
* Imports a columnar batch from the C Data Interface using a child of the global
* [[CometArrowAllocator]]. This is the preferred entry point from the spark module since it
* avoids passing a shaded allocator type across the shading boundary.
*/
def importBatch(
numCols: Int,
numRows: Int,
exportFn: (Int, Long, Long) => Unit): ColumnarBatch = {
val allocator =
CometArrowAllocator.newChildAllocator("CDataUtil-import", 0, Long.MaxValue)
importBatch(numCols, numRows, allocator, exportFn)
}

/**
* Imports a columnar batch from the C Data Interface.
*
* Allocates shaded ArrowArray/ArrowSchema structs for each column, invokes the provided export
* function to fill them (using unshaded Arrow on the caller side), then imports the vectors
* into CometVectors.
*
* @param numCols
* number of columns to import
* @param numRows
* row count for the resulting ColumnarBatch
* @param allocator
* shaded BufferAllocator for struct and vector allocation
* @param exportFn
* callback (colIndex, arrayAddr, schemaAddr) => Unit that exports the unshaded vector into
* the struct memory at the given addresses
* @return
* a ColumnarBatch with CometVector columns
*/
def importBatch(
numCols: Int,
numRows: Int,
allocator: BufferAllocator,
exportFn: (Int, Long, Long) => Unit): ColumnarBatch = {
val cometVectors = (0 until numCols).map { idx =>
val arrowArray = ArrowArray.allocateNew(allocator)
val arrowSchema = ArrowSchema.allocateNew(allocator)
try {
exportFn(idx, arrowArray.memoryAddress(), arrowSchema.memoryAddress())
val importer = new ArrowImporter(allocator)
val dictionaryProvider = new CDataDictionaryProvider()
val vector = importer.importVector(arrowArray, arrowSchema, dictionaryProvider)
CometVector.getVector(vector, true, dictionaryProvider)
} catch {
case e: Exception =>
arrowArray.close()
arrowSchema.close()
throw e
}
}
new ColumnarBatch(cometVectors.toArray, numRows)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.{ColumnarArray, ColumnarBatch}

import org.apache.comet.CometArrowAllocator
import org.apache.comet.vector.NativeUtil
import org.apache.comet.vector.{CDataUtil, NativeUtil}

object CometArrowConverters extends Logging {
// This is similar how Spark converts internal row to Arrow format except that it is transforming
Expand Down Expand Up @@ -145,7 +145,8 @@ object CometArrowConverters extends Logging {
schema: StructType,
maxRecordsPerBatch: Int,
timeZoneId: String,
context: TaskContext)
context: TaskContext,
zeroCopyExportFn: Option[(Int, Long, Long) => Unit] = None)
extends ArrowBatchIterBase(schema, timeZoneId, context)
with AutoCloseable {

Expand All @@ -159,6 +160,21 @@ object CometArrowConverters extends Logging {
override protected def nextBatch(): ColumnarBatch = {
val rowsInBatch = colBatch.numRows()
if (rowsProduced < rowsInBatch) {
// On the first call, try zero-copy if an export function was provided
if (rowsProduced == 0 && zeroCopyExportFn.isDefined) {
try {
val zeroCopy = CDataUtil.importBatch(
colBatch.numCols(),
rowsInBatch,
allocator,
zeroCopyExportFn.get)
rowsProduced = rowsInBatch
return zeroCopy
} catch {
case e: Exception =>
logWarning("Zero-copy C Data import failed, falling back to copy", e)
}
}
// the arrow writer shall be reset before writing the next batch
arrowWriter.reset()
val rowsToProduce =
Expand Down Expand Up @@ -190,7 +206,14 @@ object CometArrowConverters extends Logging {
schema: StructType,
maxRecordsPerBatch: Int,
timeZoneId: String,
context: TaskContext): Iterator[ColumnarBatch] = {
new ColumnBatchToArrowBatchIter(colBatch, schema, maxRecordsPerBatch, timeZoneId, context)
context: TaskContext,
zeroCopyExportFn: Option[(Int, Long, Long) => Unit] = None): Iterator[ColumnarBatch] = {
new ColumnBatchToArrowBatchIter(
colBatch,
schema,
maxRecordsPerBatch,
timeZoneId,
context,
zeroCopyExportFn)
}
}
15 changes: 15 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,7 @@ under the License.
<exclude>dev/release/rat_exclude_files.txt</exclude>
<exclude>dev/release/requirements.txt</exclude>
<exclude>native/proto/src/generated/**</exclude>
<exclude>pixi.lock</exclude>
</excludes>
</configuration>
</plugin>
Expand Down Expand Up @@ -1166,6 +1167,20 @@ under the License.
<ignoreClass>com.google.thirdparty.publicsuffix.PublicSuffixType</ignoreClass>
</ignoreClasses>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-c-data</artifactId>
<ignoreClasses>
<!-- These JNI classes are excluded from shading in comet-common because
they must keep their original package for native library loading.
arrow-c-data is also declared as provided in comet-spark for
zero-copy columnar conversion via ArrowCDataExport. -->
<ignoreClass>org.apache.arrow.c.jni.JniWrapper</ignoreClass>
<ignoreClass>org.apache.arrow.c.jni.PrivateData</ignoreClass>
<ignoreClass>org.apache.arrow.c.jni.CDataJniException</ignoreClass>
<ignoreClass>org.apache.arrow.c.ArrayStreamExporter$ExportedArrayStreamPrivateData</ignoreClass>
</ignoreClasses>
</dependency>
</dependencies>
<findAllDuplicates>true</findAllDuplicates>
<ignoreWhenIdentical>true</ignoreWhenIdentical>
Expand Down
8 changes: 5 additions & 3 deletions spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,10 @@ under the License.
to provide InMemoryKMS class that is shaded below, to make Spark test happy. -->
</dependency>
<!-- We shade & relocate Arrow dependencies in comet-common, so comet-spark module no longer
depends on Arrow. However, when running `mvn test` we still need Arrow classes in the
classpath, since the Maven shading happens in 'package' phase which is after 'test' -->
depends on arrow-vector. However, arrow-c-data is used by ArrowCDataExport for zero-copy
conversion (provided scope since it may not be available in all runtime environments).
arrow-memory-unsafe is still needed for tests (Maven shading in common happens in
'package' phase which is after 'test'). -->
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-unsafe</artifactId>
Expand All @@ -139,7 +141,7 @@ under the License.
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-c-data</artifactId>
<scope>test</scope>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.comet

import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}

/**
* Creates export functions for zero-copy transfer of unshaded Arrow vectors through the C Data
* Interface.
*
* This lives in the spark module where unshaded Arrow types are directly available, eliminating
* the need for reflection. The export function writes into pre-allocated C Data struct addresses
* provided by the common module's shaded side.
*
* At runtime, arrow-c-data may not be on the classpath (Spark does not bundle it). The
* [[cDataAvailable]] check ensures graceful degradation to the copy-based path.
*/
object ArrowCDataExport {

/** Whether unshaded arrow-c-data classes are available on the runtime classpath. */
private lazy val cDataAvailable: Boolean = {
try {
Class.forName("org.apache.arrow.c.Data") // scalastyle:ignore classforname
true
} catch {
case _: ClassNotFoundException => false
}
}

/**
* Returns an export function if the batch is entirely backed by [[ArrowColumnVector]] and the
* arrow-c-data library is available at runtime. Returns [[None]] otherwise.
*
* The returned function has signature `(colIndex, arrayAddr, schemaAddr) => Unit`. When called,
* it exports the unshaded Arrow vector at the given column index into the C Data structs at the
* provided memory addresses.
*/
def makeExportFn(batch: ColumnarBatch): Option[(Int, Long, Long) => Unit] = {
if (!cDataAvailable) return None
if (batch.numCols() == 0) return None

var i = 0
while (i < batch.numCols()) {
if (!batch.column(i).isInstanceOf[ArrowColumnVector]) return None
i += 1
}

Some(CDataExporter.exportFn(batch))
}

/**
* Isolated object that references arrow-c-data classes. The JVM will not load this object (and
* therefore will not attempt to resolve [[org.apache.arrow.c.Data]] etc.) until it is first
* accessed, which only happens after [[cDataAvailable]] confirms the classes exist.
*/
private object CDataExporter {
def exportFn(batch: ColumnarBatch): (Int, Long, Long) => Unit = {
(colIdx: Int, arrayAddr: Long, schemaAddr: Long) =>
{
val arrowCol = batch.column(colIdx).asInstanceOf[ArrowColumnVector]
val fv =
arrowCol.getValueVector.asInstanceOf[org.apache.arrow.vector.FieldVector]
org.apache.arrow.c.Data.exportVector(
fv.getAllocator,
fv,
null,
org.apache.arrow.c.ArrowArray.wrap(arrayAddr),
org.apache.arrow.c.ArrowSchema.wrap(schemaAddr))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,17 @@ case class CometSparkToColumnarExec(child: SparkPlan)
child
.executeColumnar()
.mapPartitionsInternal { sparkBatches =>
val context = TaskContext.get()
val arrowBatches =
sparkBatches.flatMap { sparkBatch =>
val context = TaskContext.get()
val exportFn = ArrowCDataExport.makeExportFn(sparkBatch)
CometArrowConverters.columnarBatchToArrowBatchIter(
sparkBatch,
schema,
maxRecordsPerBatch,
timeZoneId,
context)
context,
exportFn)
}
createTimingIter(arrowBatches, numInputRows, numOutputBatches, conversionTime)
}
Expand Down
Loading