Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.paimon.spark;

import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Blob;
import org.apache.paimon.data.BlobData;
import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
Expand All @@ -29,6 +31,8 @@
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.spark.util.shim.TypeUtils$;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.UriReader;
import org.apache.paimon.utils.UriReaderFactory;

import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
Expand All @@ -41,43 +45,43 @@
import org.apache.spark.sql.types.TimestampNTZType;
import org.apache.spark.sql.types.TimestampType;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;

/** Wrapper to fetch value from the spark internal row. */
/**
* An {@link InternalRow} wraps spark {@link org.apache.spark.sql.catalyst.InternalRow} for v2
* write.
*/
public class SparkInternalRowWrapper implements InternalRow, Serializable {

private transient org.apache.spark.sql.catalyst.InternalRow internalRow;
private final int length;
private final int rowKindIdx;
private final StructType tableSchema;
private int[] fieldIndexMap = null;
private final int length;
private final boolean blobAsDescriptor;
@Nullable private final UriReaderFactory uriReaderFactory;
@Nullable private final int[] fieldIndexMap;

public SparkInternalRowWrapper(
org.apache.spark.sql.catalyst.InternalRow internalRow,
int rowKindIdx,
StructType tableSchema,
int length) {
this.internalRow = internalRow;
this.rowKindIdx = rowKindIdx;
this.length = length;
this.tableSchema = tableSchema;
}
private transient org.apache.spark.sql.catalyst.InternalRow internalRow;

public SparkInternalRowWrapper(int rowKindIdx, StructType tableSchema, int length) {
this.rowKindIdx = rowKindIdx;
this.length = length;
this.tableSchema = tableSchema;
public SparkInternalRowWrapper(StructType tableSchema, int length) {
this(tableSchema, length, null, false, null);
}

public SparkInternalRowWrapper(
int rowKindIdx, StructType tableSchema, StructType dataSchema, int length) {
this.rowKindIdx = rowKindIdx;
this.length = length;
StructType tableSchema,
int length,
StructType dataSchema,
boolean blobAsDescriptor,
CatalogContext catalogContext) {
this.tableSchema = tableSchema;
this.fieldIndexMap = buildFieldIndexMap(tableSchema, dataSchema);
this.length = length;
this.fieldIndexMap =
dataSchema != null ? buildFieldIndexMap(tableSchema, dataSchema) : null;
this.blobAsDescriptor = blobAsDescriptor;
this.uriReaderFactory = blobAsDescriptor ? new UriReaderFactory(catalogContext) : null;
}

public SparkInternalRowWrapper replace(org.apache.spark.sql.catalyst.InternalRow internalRow) {
Expand Down Expand Up @@ -128,12 +132,6 @@ public int getFieldCount() {

@Override
public RowKind getRowKind() {
if (rowKindIdx != -1) {
int actualPos = getActualFieldPosition(rowKindIdx);
if (actualPos != -1) {
return RowKind.fromByteValue(internalRow.getByte(actualPos));
}
}
return RowKind.INSERT;
}

Expand Down Expand Up @@ -244,7 +242,13 @@ public Variant getVariant(int pos) {

@Override
public Blob getBlob(int pos) {
return new BlobData(internalRow.getBinary(pos));
if (blobAsDescriptor) {
BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(internalRow.getBinary(pos));
UriReader uriReader = uriReaderFactory.create(blobDescriptor.uri());
return Blob.fromDescriptor(uriReader, blobDescriptor);
} else {
return new BlobData(internalRow.getBinary(pos));
}
}

@Override
Expand Down Expand Up @@ -276,10 +280,8 @@ public InternalRow getRow(int pos, int numFields) {
return null;
}
return new SparkInternalRowWrapper(
internalRow.getStruct(actualPos, numFields),
-1,
(StructType) tableSchema.fields()[actualPos].dataType(),
numFields);
(StructType) tableSchema.fields()[actualPos].dataType(), numFields)
.replace(internalRow.getStruct(actualPos, numFields));
}

private static Timestamp convertToTimestamp(DataType dataType, long micros) {
Expand Down Expand Up @@ -434,8 +436,8 @@ public InternalMap getMap(int pos) {

@Override
public InternalRow getRow(int pos, int numFields) {
return new SparkInternalRowWrapper(
arrayData.getStruct(pos, numFields), -1, (StructType) elementType, numFields);
return new SparkInternalRowWrapper((StructType) elementType, numFields)
.replace(arrayData.getStruct(pos, numFields));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@

import scala.collection.JavaConverters;

/** A {@link InternalRow} wraps spark {@link Row}. */
/** An {@link InternalRow} wraps spark {@link Row} for v1 write. */
public class SparkRow implements InternalRow, Serializable {

private final RowType type;
Expand All @@ -78,7 +78,7 @@ public SparkRow(
this.row = row;
this.rowKind = rowkind;
this.blobAsDescriptor = blobAsDescriptor;
this.uriReaderFactory = new UriReaderFactory(catalogContext);
this.uriReaderFactory = blobAsDescriptor ? new UriReaderFactory(catalogContext) : null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.{ImmutableMap,
import org.apache.paimon.spark.SparkInternalRowWrapper
import org.apache.paimon.spark.SparkTypeUtils.toPaimonRowType
import org.apache.paimon.spark.catalog.functions.PaimonFunctions._
import org.apache.paimon.spark.function.{DescriptorToStringFunction, DescriptorToStringUnbound, PathToDescriptorFunction, PathToDescriptorUnbound}
import org.apache.paimon.spark.function.{DescriptorToStringUnbound, PathToDescriptorUnbound}
import org.apache.paimon.table.{BucketMode, FileStoreTable}
import org.apache.paimon.types.{ArrayType, DataType => PaimonDataType, LocalZonedTimestampType, MapType, RowType, TimestampType}
import org.apache.paimon.utils.ProjectedRow
Expand Down Expand Up @@ -90,7 +90,7 @@ class BucketFunction(NAME: String, bucketFunctionType: BucketFunctionType) exten
val serializer = new InternalRowSerializer(bucketKeyRowType)
val mapping = (1 to bucketKeyRowType.getFieldCount).toArray
val reusedRow =
new SparkInternalRowWrapper(-1, inputType, inputType.fields.length)
new SparkInternalRowWrapper(inputType, inputType.fields.length)
val bucketFunc: bucket.BucketFunction =
bucket.BucketFunction.create(bucketFunctionType, bucketKeyRowType)
new ScalarFunction[Int]() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private class FormatTableDataWriter(batchWriteBuilder: BatchWriteBuilder, writeS
private val rowConverter: InternalRow => org.apache.paimon.data.InternalRow = {
val numFields = writeSchema.fields.length
record => {
new SparkInternalRowWrapper(-1, writeSchema, numFields).replace(record)
new SparkInternalRowWrapper(writeSchema, numFields).replace(record)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ abstract class abstractInnerTableDataWrite[T] extends InnerTableDataWrite[T] wit
/** For batch write, batchId is None, for streaming write, batchId is the current batch id (>= 0). */
val batchId: Option[Long]

private val needFullCompaction: Boolean = {
private lazy val needFullCompaction: Boolean = {
fullCompactionDeltaCommits match {
case Some(deltaCommits) if deltaCommits > 0 =>
batchId match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,15 @@ case class PaimonBatchWrite(
}

override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = {
val fullCompactionDeltaCommits: Option[Int] =
Option.apply(coreOptions.fullCompactionDeltaCommits())
(_: Int, _: Long) => {
PaimonV2DataWriter(batchWriteBuilder, writeSchema, dataSchema, fullCompactionDeltaCommits)
}
(_: Int, _: Long) =>
{
PaimonV2DataWriter(
batchWriteBuilder,
writeSchema,
dataSchema,
coreOptions,
table.catalogEnvironment().catalogContext())
}
}

override def useCommitCoordinator(): Boolean = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.spark.write

import org.apache.paimon.CoreOptions
import org.apache.paimon.catalog.CatalogContext
import org.apache.paimon.spark.{SparkInternalRowWrapper, SparkUtils}
import org.apache.paimon.spark.metric.SparkMetricRegistry
import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, TableWriteImpl}
Expand All @@ -32,15 +34,19 @@ case class PaimonV2DataWriter(
writeBuilder: BatchWriteBuilder,
writeSchema: StructType,
dataSchema: StructType,
fullCompactionDeltaCommits: Option[Int],
coreOptions: CoreOptions,
catalogContext: CatalogContext,
batchId: Option[Long] = None)
extends abstractInnerTableDataWrite[InternalRow]
with InnerTableV2DataWrite {

private val ioManager = SparkUtils.createIOManager()

private val metricRegistry = SparkMetricRegistry()

val fullCompactionDeltaCommits: Option[Int] =
Option.apply(coreOptions.fullCompactionDeltaCommits())
val blobAsDescriptor: Boolean = coreOptions.blobAsDescriptor()

val write: TableWriteImpl[InternalRow] = {
writeBuilder
.newWrite()
Expand All @@ -51,7 +57,12 @@ case class PaimonV2DataWriter(

private val rowConverter: InternalRow => SparkInternalRowWrapper = {
val numFields = writeSchema.fields.length
val reusableWrapper = new SparkInternalRowWrapper(-1, writeSchema, dataSchema, numFields)
val reusableWrapper = new SparkInternalRowWrapper(
writeSchema,
numFields,
dataSchema,
blobAsDescriptor,
catalogContext)
record => reusableWrapper.replace(record)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
package org.apache.paimon.spark.sql

import org.apache.paimon.catalog.CatalogContext
import org.apache.paimon.data.Blob
import org.apache.paimon.data.BlobDescriptor
import org.apache.paimon.data.{Blob, BlobDescriptor}
import org.apache.paimon.fs.Path
import org.apache.paimon.fs.local.LocalFileIO
import org.apache.paimon.options.Options
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.utils.UriReaderFactory

import org.apache.spark.SparkConf
import org.apache.spark.sql.Row

import java.util
Expand All @@ -36,6 +36,10 @@ class BlobTestBase extends PaimonSparkTestBase {

private val RANDOM = new Random

override def sparkConf: SparkConf = {
super.sparkConf.set("spark.paimon.write.use-v2-write", "false")
}

test("Blob: test basic") {
withTable("t") {
sql(
Expand Down Expand Up @@ -216,11 +220,17 @@ class BlobTestBase extends PaimonSparkTestBase {

def bytesToHex(bytes: Array[Byte]): String = {
val hexChars = new Array[Char](bytes.length * 2)
for (j <- 0 until bytes.length) {
for (j <- bytes.indices) {
val v = bytes(j) & 0xff
hexChars(j * 2) = HEX_ARRAY(v >>> 4)
hexChars(j * 2 + 1) = HEX_ARRAY(v & 0x0f)
}
new String(hexChars)
}
}

class BlobTestWithV2Write extends BlobTestBase {
override def sparkConf: SparkConf = {
super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
}
}