Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -727,14 +727,20 @@ object FileSourceMetadataAttribute {
*
* The set of supported types is limited by [[ColumnVectorUtils.populate]], which the constant
* file metadata implementation relies on. In general, types that can be partition columns are
* supported (including most primitive types). Notably unsupported types include [[ObjectType]],
* [[UserDefinedType]], and the complex types ([[StructType]], [[MapType]], [[ArrayType]]).
* supported (including most primitive types), plus the complex types [[ArrayType]],
* [[MapType]], and [[StructType]] (recursively, as long as their element types are supported).
* Notably unsupported types include [[ObjectType]] and [[UserDefinedType]].
*/
def isSupportedType(dataType: DataType): Boolean = PhysicalDataType(dataType) match {
// PhysicalPrimitiveType covers: Boolean, Byte, Double, Float, Integer, Long, Null, Short
case _: PhysicalPrimitiveType | _: PhysicalDecimalType => true
case PhysicalBinaryType | PhysicalStringType(_) | PhysicalCalendarIntervalType => true
case _ => false
def isSupportedType(dataType: DataType): Boolean = dataType match {
case ArrayType(elementType, _) => isSupportedType(elementType)
case MapType(keyType, valueType, _) => isSupportedType(keyType) && isSupportedType(valueType)
case st: StructType => st.fields.forall(f => isSupportedType(f.dataType))
case _ => PhysicalDataType(dataType) match {
// PhysicalPrimitiveType covers: Boolean, Byte, Double, Float, Integer, Long, Null, Short
case _: PhysicalPrimitiveType | _: PhysicalDecimalType => true
case PhysicalBinaryType | PhysicalStringType(_) | PhysicalCalendarIntervalType => true
case _ => false
}
}

/** Returns the type unchanged if valid; otherwise throws [[IllegalArgumentException]]. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ public void initBatch(
DataType dt = requiredFields[i].dataType();
if (requestedPartitionColIds[i] != -1) {
ConstantColumnVector partitionCol = new ConstantColumnVector(capacity, dt);
ColumnVectorUtils.populate(partitionCol, partitionValues, requestedPartitionColIds[i]);
ColumnVectorUtils.populate(
partitionCol, partitionValues, requestedPartitionColIds[i], memoryMode);
orcVectorWrappers[i] = partitionCol;
} else {
int colId = requestedDataColIds[i];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ private void initBatch(
int partitionIdx = sparkSchema.fields().length;
for (int i = 0; i < partitionColumns.fields().length; i++) {
ColumnVectorUtils.populate(
(ConstantColumnVector) vectors[i + partitionIdx], partitionValues, i);
(ConstantColumnVector) vectors[i + partitionIdx], partitionValues, i, MEMORY_MODE);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.types.*;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.execution.RowToColumnConverter;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarBatch;
Expand All @@ -49,9 +51,22 @@
public class ColumnVectorUtils {

/**
* Populates the value of `row[fieldIdx]` into `ConstantColumnVector`.
* Populates the value of `row[fieldIdx]` into `ConstantColumnVector`. For complex types
* (array / map) this allocates a small backing `WritableColumnVector` on-heap by default. Use
* the {@link #populate(ConstantColumnVector, InternalRow, int, MemoryMode)} overload to control
* the backing memory mode.
*/
public static void populate(ConstantColumnVector col, InternalRow row, int fieldIdx) {
populate(col, row, fieldIdx, MemoryMode.ON_HEAP);
}

/**
* Populates the value of `row[fieldIdx]` into `ConstantColumnVector`. For array / map values,
* `memMode` selects on-heap vs off-heap allocation for the backing `WritableColumnVector` that
* holds the constant element data; it has no effect on primitive types.
*/
public static void populate(
ConstantColumnVector col, InternalRow row, int fieldIdx, MemoryMode memMode) {
DataType t = col.dataType();
PhysicalDataType pdt = PhysicalDataType.apply(t);

Expand Down Expand Up @@ -93,6 +108,34 @@ public static void populate(ConstantColumnVector col, InternalRow row, int field
col.setCalendarInterval((CalendarInterval) row.get(fieldIdx, t));
} else if (pdt instanceof PhysicalVariantType) {
col.setVariant((VariantVal)row.get(fieldIdx, t));
} else if (pdt instanceof PhysicalStructType) {
StructType st = (StructType) t;
InternalRow inner = row.getStruct(fieldIdx, st.fields().length);
InternalRow tmpRow = new GenericInternalRow(1);
for (int i = 0; i < st.fields().length; i++) {
StructField field = st.fields()[i];
tmpRow.update(0, inner.isNullAt(i) ? null : inner.get(i, field.dataType()));
// ConstantColumnVector's constructor pre-allocates struct children, so the recursive
// populate call below has a target vector to write into.
populate((ConstantColumnVector) col.getChild(i), tmpRow, 0, memMode);
}
} else if (pdt instanceof PhysicalArrayType || pdt instanceof PhysicalMapType) {
// Allocate a 1-row backing vector (on-heap or off-heap per `memMode`) to hold the
// constant complex value.
WritableColumnVector backing = memMode == MemoryMode.OFF_HEAP
? new OffHeapColumnVector(1, t)
: new OnHeapColumnVector(1, t);
// Reuse RowToColumnConverter by wrapping `t` as a single-field struct schema and
// converting the one-row input. This recursively handles all element types correctly.
StructType wrapperSchema = new StructType().add("v", t, true);
RowToColumnConverter converter = new RowToColumnConverter(wrapperSchema);
InternalRow wrapped = new GenericInternalRow(new Object[]{row.get(fieldIdx, t)});
converter.convert(wrapped, new WritableColumnVector[]{backing});
if (pdt instanceof PhysicalArrayType) {
col.setArrayWithBacking(backing.getArray(0), backing);
} else {
col.setMapWithBacking(backing.getMap(0), backing);
}
} else {
throw new RuntimeException(String.format("DataType %s is not supported" +
" in column vectorized reader.", t.sql()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class ConstantColumnVector extends ColumnVector {
private ConstantColumnVector[] childData;
private ColumnarArray arrayData;
private ColumnarMap mapData;
// Optionally owned backing storage for arrayData / mapData. Closed by close().
private WritableColumnVector ownedBacking;

private final int numRows;

Expand All @@ -62,6 +64,9 @@ public ConstantColumnVector(int numRows, DataType type) {

if (type instanceof StructType structType) {
this.childData = new ConstantColumnVector[structType.fields().length];
for (int i = 0; i < structType.fields().length; i++) {
this.childData[i] = new ConstantColumnVector(1, structType.fields()[i].dataType());
}
} else if (type instanceof CalendarIntervalType) {
// Three columns. Months as int. Days as Int. Microseconds as Long.
this.childData = new ConstantColumnVector[3];
Expand Down Expand Up @@ -97,6 +102,10 @@ public void close() {
}
arrayData = null;
mapData = null;
if (ownedBacking != null) {
ownedBacking.close();
ownedBacking = null;
}
}

@Override
Expand Down Expand Up @@ -218,24 +227,51 @@ public ColumnarArray getArray(int rowId) {
}

/**
* Sets the `ColumnarArray` `value` for all rows
* Sets the `ColumnarArray` `value` for all rows. The caller retains ownership of the backing
* storage for `value`; use `setArrayWithBacking` if this vector should own and close it.
*/
public void setArray(ColumnarArray value) {
arrayData = value;
}

/**
* Sets the `ColumnarArray` `value` for all rows and takes ownership of `backing`, which will be
* closed when this vector is closed.
*/
public void setArrayWithBacking(ColumnarArray value, WritableColumnVector backing) {
arrayData = value;
replaceOwnedBacking(backing);
}

@Override
public ColumnarMap getMap(int ordinal) {
return mapData;
}

/**
* Sets the `ColumnarMap` `value` for all rows
* Sets the `ColumnarMap` `value` for all rows. The caller retains ownership of the backing
* storage for `value`; use `setMapWithBacking` if this vector should own and close it.
*/
public void setMap(ColumnarMap value) {
mapData = value;
}

/**
* Sets the `ColumnarMap` `value` for all rows and takes ownership of `backing`, which will be
* closed when this vector is closed.
*/
public void setMapWithBacking(ColumnarMap value, WritableColumnVector backing) {
mapData = value;
replaceOwnedBacking(backing);
}

private void replaceOwnedBacking(WritableColumnVector backing) {
if (ownedBacking != null && ownedBacking != backing) {
ownedBacking.close();
}
ownedBacking = backing;
}

@Override
public Decimal getDecimal(int rowId, int precision, int scale) {
// copy and modify from WritableColumnVector
Expand Down Expand Up @@ -303,9 +339,14 @@ public ColumnVector getChild(int ordinal) {
}

/**
* Sets the child `ConstantColumnVector` `value` at the given ordinal for all rows
* Sets the child `ConstantColumnVector` `value` at the given ordinal for all rows. Closes any
* previously-set child at this ordinal (e.g., one auto-allocated by the constructor) to avoid
* leaking its backing storage.
*/
public void setChild(int ordinal, ConstantColumnVector value) {
if (childData[ordinal] != null && childData[ordinal] != value) {
childData[ordinal].close();
}
childData[ordinal] = value;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.hadoop.security.AccessControlException
import org.apache.spark.{Partition => RDDPartition, TaskContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.LogKeys.{CURRENT_FILE, PATH}
import org.apache.spark.memory.MemoryMode
import org.apache.spark.paths.SparkPath
import org.apache.spark.rdd.{InputFileBlockHolder, RDD}
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -167,6 +168,13 @@ class FileScanRDD(
metadataRow, metadataColumns.map(_.name), currentFile, metadataExtractors)
}

private val memoryMode: MemoryMode =
if (sparkSession.sessionState.conf.offHeapColumnVectorEnabled) {
MemoryMode.OFF_HEAP
} else {
MemoryMode.ON_HEAP
}

/**
* Create an array of constant column vectors containing all required metadata columns
*/
Expand All @@ -183,7 +191,7 @@ class FileScanRDD(
}

val columnVector = new ConstantColumnVector(c.numRows(), attr.dataType)
ColumnVectorUtils.populate(columnVector, tmpRow, 0)
ColumnVectorUtils.populate(columnVector, tmpRow, 0, memoryMode)
columnVector
}.toArray
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.vectorized

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -134,30 +135,108 @@ class ColumnVectorUtilsSuite extends SparkFunSuite {
}
}

testConstantColumnVector("not supported: fill map", 10,
testConstantColumnVector("fill array of ints", 10, ArrayType(IntegerType)) { vector =>
val arr = new GenericArrayData(Array[Any](1, 2, 3, 4, 5))
ColumnVectorUtils.populate(vector, InternalRow(arr), 0)
(0 until 10).foreach { i =>
assert(vector.getArray(i).toIntArray === Array(1, 2, 3, 4, 5))
}
}

testConstantColumnVector("fill array of strings", 10, ArrayType(StringType)) { vector =>
val arr = new GenericArrayData(Array[Any](
UTF8String.fromString("a"),
UTF8String.fromString("bb"),
UTF8String.fromString("ccc")))
ColumnVectorUtils.populate(vector, InternalRow(arr), 0)
(0 until 10).foreach { i =>
val a = vector.getArray(i)
assert(a.numElements() == 3)
assert(a.getUTF8String(0) == UTF8String.fromString("a"))
assert(a.getUTF8String(1) == UTF8String.fromString("bb"))
assert(a.getUTF8String(2) == UTF8String.fromString("ccc"))
}
}

testConstantColumnVector("fill map of int -> boolean", 10,
MapType(IntegerType, BooleanType)) { vector =>
val message = intercept[RuntimeException] {
ColumnVectorUtils.populate(vector, InternalRow("fakeMap"), 0)
}.getMessage
assert(message == "DataType MAP<INT, BOOLEAN> is not supported in column vectorized reader.")
val keys = new GenericArrayData(Array[Any](1, 2, 3))
val values = new GenericArrayData(Array[Any](true, false, true))
val map = new ArrayBasedMapData(keys, values)
ColumnVectorUtils.populate(vector, InternalRow(map), 0)
(0 until 10).foreach { i =>
val m = vector.getMap(i)
assert(m.numElements() == 3)
assert(m.keyArray().toIntArray === Array(1, 2, 3))
assert(m.valueArray().toBooleanArray === Array(true, false, true))
}
}

testConstantColumnVector("not supported: fill struct", 10,
testConstantColumnVector("fill struct", 10,
new StructType()
.add(StructField("name", StringType))
.add(StructField("age", IntegerType))) { vector =>
val message = intercept[RuntimeException] {
ColumnVectorUtils.populate(vector, InternalRow("fakeStruct"), 0)
}.getMessage
assert(message ==
"DataType STRUCT<name: STRING, age: INT> is not supported in column vectorized reader.")
}

testConstantColumnVector("not supported: fill array", 10,
ArrayType(IntegerType)) { vector =>
val message = intercept[RuntimeException] {
ColumnVectorUtils.populate(vector, InternalRow("fakeArray"), 0)
}.getMessage
assert(message == "DataType ARRAY<INT> is not supported in column vectorized reader.")
val row = InternalRow(UTF8String.fromString("jack"), 27)
ColumnVectorUtils.populate(vector, InternalRow(row), 0)
(0 until 10).foreach { i =>
assert(vector.getChild(0).getUTF8String(i) == UTF8String.fromString("jack"))
assert(vector.getChild(1).getInt(i) == 27)
}
}

testConstantColumnVector("fill struct with null field", 10,
new StructType()
.add(StructField("name", StringType, nullable = true))
.add(StructField("age", IntegerType))) { vector =>
val row = InternalRow(null, 27)
ColumnVectorUtils.populate(vector, InternalRow(row), 0)
(0 until 10).foreach { i =>
assert(vector.getChild(0).isNullAt(i))
assert(vector.getChild(1).getInt(i) == 27)
}
}

testConstantColumnVector("fill nested struct", 10,
new StructType()
.add(StructField("inner",
new StructType()
.add(StructField("k", StringType))
.add(StructField("v", IntegerType))))
.add(StructField("flag", BooleanType))) { vector =>
val inner = InternalRow(UTF8String.fromString("a"), 1)
val outer = InternalRow(inner, true)
ColumnVectorUtils.populate(vector, InternalRow(outer), 0)
(0 until 10).foreach { i =>
val s = vector.getChild(0)
assert(s.getChild(0).getUTF8String(i) == UTF8String.fromString("a"))
assert(s.getChild(1).getInt(i) == 1)
assert(vector.getChild(1).getBoolean(i))
}
}

testConstantColumnVector("fill nested array<struct>", 10,
ArrayType(new StructType()
.add(StructField("k", StringType))
.add(StructField("v", IntegerType)))) { vector =>
val structs = new GenericArrayData(Array[Any](
InternalRow(UTF8String.fromString("a"), 1),
InternalRow(UTF8String.fromString("bb"), 2),
InternalRow(null, 3)))
ColumnVectorUtils.populate(vector, InternalRow(structs), 0)
(0 until 10).foreach { i =>
val a = vector.getArray(i)
assert(a.numElements() == 3)
assert(a.getStruct(0, 2).getUTF8String(0) == UTF8String.fromString("a"))
assert(a.getStruct(0, 2).getInt(1) == 1)
assert(a.getStruct(1, 2).getUTF8String(0) == UTF8String.fromString("bb"))
assert(a.getStruct(1, 2).getInt(1) == 2)
assert(a.getStruct(2, 2).isNullAt(0))
assert(a.getStruct(2, 2).getInt(1) == 3)
}
}

testConstantColumnVector("fill null array", 10, ArrayType(IntegerType)) { vector =>
ColumnVectorUtils.populate(vector, InternalRow(null), 0)
assert(vector.hasNull)
}
}