Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -446,11 +446,11 @@ private[this] final class ScalarReader(
}

case object SparkShreddingUtils {
val VariantValueFieldName = "value";
val TypedValueFieldName = "typed_value";
val MetadataFieldName = "metadata";
private val VARIANT_VALUE_FIELD_NAME = "value"
private val TYPED_VALUE_FIELD_NAME = "typed_value"
private val METADATA_FIELD_NAME = "metadata"

val VARIANT_WRITE_SHREDDING_KEY: String = "__VARIANT_WRITE_SHREDDING_KEY"
private val VARIANT_WRITE_SHREDDING_KEY: String = "__VARIANT_WRITE_SHREDDING_KEY"

def buildVariantSchema(schema: DataType): VariantSchema = {
schema match {
Expand Down Expand Up @@ -481,40 +481,42 @@ case object SparkShreddingUtils {
// Always set containsNull to false. One of value or typed_value must always be set for
// array elements.
val arrayShreddingSchema =
ArrayType(variantShreddingSchema(elementType, false, false), containsNull = false)
ArrayType(variantShreddingSchema(elementType, isTopLevel = false,
isObjectField = false), containsNull = false)
Seq(
StructField(VariantValueFieldName, BinaryType, nullable = true),
StructField(TypedValueFieldName, arrayShreddingSchema, nullable = true)
StructField(VARIANT_VALUE_FIELD_NAME, BinaryType, nullable = true),
StructField(TYPED_VALUE_FIELD_NAME, arrayShreddingSchema, nullable = true)
)
case StructType(fields) =>
// The field name level is always non-nullable: Variant null values are represented in the
// "value" columna as "00", and missing values are represented by setting both "value" and
// "value" column as "00", and missing values are represented by setting both "value" and
// "typed_value" to null.
val objectShreddingSchema = StructType(fields.map(f =>
f.copy(dataType = variantShreddingSchema(f.dataType, false, true), nullable = false)))
f.copy(dataType = variantShreddingSchema(f.dataType, isTopLevel = false,
isObjectField = true), nullable = false)))
Seq(
StructField(VariantValueFieldName, BinaryType, nullable = true),
StructField(TypedValueFieldName, objectShreddingSchema, nullable = true)
StructField(VARIANT_VALUE_FIELD_NAME, BinaryType, nullable = true),
StructField(TYPED_VALUE_FIELD_NAME, objectShreddingSchema, nullable = true)
)
case VariantType =>
// For Variant, we don't need a typed column. If there is no typed column, value is required
// for array elements or top-level fields, but optional for objects (where a null represents
// a missing field).
Seq(
StructField(VariantValueFieldName, BinaryType, nullable = isObjectField)
StructField(VARIANT_VALUE_FIELD_NAME, BinaryType, nullable = isObjectField)
)
case _: NumericType | BooleanType | _: StringType | BinaryType | _: DatetimeType =>
Seq(
StructField(VariantValueFieldName, BinaryType, nullable = true),
StructField(TypedValueFieldName, dataType, nullable = true)
StructField(VARIANT_VALUE_FIELD_NAME, BinaryType, nullable = true),
StructField(TYPED_VALUE_FIELD_NAME, dataType, nullable = true)
)
case _ =>
// No other types have a corresponding shreddings schema.
throw QueryCompilationErrors.invalidVariantShreddingSchema(dataType)
}

if (isTopLevel) {
StructType(StructField(MetadataFieldName, BinaryType, nullable = false) +: fields)
StructType(StructField(METADATA_FIELD_NAME, BinaryType, nullable = false) +: fields)
} else {
StructType(fields)
}
Expand Down Expand Up @@ -560,7 +562,7 @@ case object SparkShreddingUtils {
}
schema.fields.zipWithIndex.foreach { case (f, i) =>
f.name match {
case TypedValueFieldName =>
case TYPED_VALUE_FIELD_NAME =>
if (typedIdx != -1) {
throw QueryCompilationErrors.invalidVariantShreddingSchema(schema)
}
Expand All @@ -585,7 +587,7 @@ case object SparkShreddingUtils {
case s: StructType => arraySchema = buildVariantSchema(s, topLevel = false)
case _ => throw QueryCompilationErrors.invalidVariantShreddingSchema(schema)
}
case t => scalarSchema = (t match {
case t => scalarSchema = t match {
case BooleanType => new VariantSchema.BooleanType
case ByteType => new VariantSchema.IntegralType(VariantSchema.IntegralSize.BYTE)
case ShortType => new VariantSchema.IntegralType(VariantSchema.IntegralSize.SHORT)
Expand All @@ -600,14 +602,14 @@ case object SparkShreddingUtils {
case TimestampNTZType => new VariantSchema.TimestampNTZType
case d: DecimalType => new VariantSchema.DecimalType(d.precision, d.scale)
case _ => throw QueryCompilationErrors.invalidVariantShreddingSchema(schema)
})
}
}
case VariantValueFieldName =>
case VARIANT_VALUE_FIELD_NAME =>
if (variantIdx != -1 || f.dataType != BinaryType) {
throw QueryCompilationErrors.invalidVariantShreddingSchema(schema)
}
variantIdx = i
case MetadataFieldName =>
case METADATA_FIELD_NAME =>
if (topLevelMetadataIdx != -1 || f.dataType != BinaryType) {
throw QueryCompilationErrors.invalidVariantShreddingSchema(schema)
}
Expand Down Expand Up @@ -651,7 +653,8 @@ case object SparkShreddingUtils {
converter.convertField(column.getChild(0)).sparkType
}

class SparkShreddedResult(schema: VariantSchema) extends VariantShreddingWriter.ShreddedResult {
private class SparkShreddedResult(schema: VariantSchema)
extends VariantShreddingWriter.ShreddedResult {
// Result is stored as an InternalRow.
val row = new GenericInternalRow(schema.numFields)

Expand All @@ -662,8 +665,8 @@ case object SparkShreddingUtils {
}

override def addObject(values: Array[VariantShreddingWriter.ShreddedResult]): Unit = {
val innerRow = new GenericInternalRow(schema.objectSchema.size)
for (i <- 0 until values.length) {
val innerRow = new GenericInternalRow(schema.objectSchema.length)
for (i <- values.indices) {
innerRow.update(i, values(i).asInstanceOf[SparkShreddedResult].row)
}
row.update(schema.typedIdx, innerRow)
Expand All @@ -688,7 +691,7 @@ case object SparkShreddingUtils {
}
}

class SparkShreddedResultBuilder() extends VariantShreddingWriter.ShreddedResultBuilder {
private class SparkShreddedResultBuilder extends VariantShreddingWriter.ShreddedResultBuilder {
override def createEmpty(schema: VariantSchema): VariantShreddingWriter.ShreddedResult = {
new SparkShreddedResult(schema)
}
Expand Down