Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions core/src/test/java/org/apache/iceberg/variants/VariantTestHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.variants;

import java.math.BigDecimal;
import java.nio.ByteBuffer;

public final class VariantTestHelper {

private VariantTestHelper() {}

public static final VariantPrimitive<?>[] PRIMITIVES =
new VariantPrimitive[] {
Variants.ofNull(),
Variants.of(true),
Variants.of(false),
Variants.of((byte) 34),
Variants.of((byte) -34),
Variants.of((short) 1234),
Variants.of((short) -1234),
Variants.of(12345),
Variants.of(-12345),
Variants.of(9876543210L),
Variants.of(-9876543210L),
Variants.of(10.11F),
Variants.of(-10.11F),
Variants.of(14.3D),
Variants.of(-14.3D),
Variants.ofIsoDate("2024-11-07"),
Variants.ofIsoDate("1957-11-07"),
Variants.ofIsoTimestamptz("2024-11-07T12:33:54.123456+00:00"),
Variants.ofIsoTimestamptz("1957-11-07T12:33:54.123456+00:00"),
Variants.ofIsoTimestampntz("2024-11-07T12:33:54.123456"),
Variants.ofIsoTimestampntz("1957-11-07T12:33:54.123456"),
Variants.of(new BigDecimal("12345.6789")), // decimal4
Variants.of(new BigDecimal("-12345.6789")), // decimal4
Variants.of(new BigDecimal("123456789.987654321")), // decimal8
Variants.of(new BigDecimal("-123456789.987654321")), // decimal8
Variants.of(new BigDecimal("9876543210.123456789")), // decimal16
Variants.of(new BigDecimal("-9876543210.123456789")), // decimal16
Variants.of(ByteBuffer.wrap(new byte[] {0x0a, 0x0b, 0x0c, 0x0d})),
Variants.of("iceberg"),
Variants.ofUUID("f24f9b64-81fa-49d1-b74e-8c09a6e31c56"),
};

public static final VariantPrimitive<?>[] UNSUPPORTED_PRIMITIVES =
new VariantPrimitive[] {
Variants.ofIsoTime("12:33:54.123456"),
Variants.ofIsoTimestamptzNanos("2024-11-07T12:33:54.123456789+00:00"),
Variants.ofIsoTimestampntzNanos("2024-11-07T12:33:54.123456789"),
};
Comment on lines +62 to +67
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which sense are those unsupported? Only in Spark?

Copy link
Copy Markdown
Contributor Author

@Guosmilesmile Guosmilesmile Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Flink is also unsupported.

https://github.com/apache/flink/blob/1779d20a28f5d43410138314fe3292d0e64f3148/flink-core/src/main/java/org/apache/flink/types/variant/BinaryVariantUtil.java#L315-L376

The assertions in this test case weren’t written well, so I adjusted them now.


/**
* Tests round-trip conversion of all primitive variant types.
*
* @param testFunction the engine-specific test function
* @param primitive the primitive variant value to test
*/
public static void testVariantPrimitiveRoundTrip(
VariantTestFunction testFunction, VariantPrimitive<?> primitive) {
testFunction.test(Variants.emptyMetadata(), primitive);
}

/**
* Tests round-trip conversion of array variants.
*
* @param testFunction the engine-specific test function
*/
public static void testVariantArrayRoundTrip(VariantTestFunction testFunction) {
VariantMetadata metadata = Variants.emptyMetadata();
ValueArray array = Variants.array();
array.add(Variants.of("hello"));
array.add(Variants.of((byte) 42));
array.add(Variants.ofNull());

testFunction.test(metadata, array);
}

/**
* Tests round-trip conversion of object variants.
*
* @param testFunction the engine-specific test function
*/
public static void testVariantObjectRoundTrip(VariantTestFunction testFunction) {
VariantMetadata metadata = Variants.metadata("name", "age", "active");
ShreddedObject object = Variants.object(metadata);
object.put("name", Variants.of("John Doe"));
object.put("age", Variants.of((byte) 30));
object.put("active", Variants.of(true));

testFunction.test(metadata, object);
}

/**
* Tests round-trip conversion of nested variant structures.
*
* @param testFunction the engine-specific test function
*/
public static void testVariantNestedStructures(VariantTestFunction testFunction) {
VariantMetadata metadata = Variants.metadata("user", "scores", "address", "city", "state");

// Create nested object: address
ShreddedObject address = Variants.object(metadata);
address.put("city", Variants.of("Anytown"));
address.put("state", Variants.of("CA"));

// Create array of scores
ValueArray scores = Variants.array();
scores.add(Variants.of((byte) 95));
scores.add(Variants.of((byte) 87));
scores.add(Variants.of((byte) 92));

// Create main object
ShreddedObject mainObject = Variants.object(metadata);
mainObject.put("user", Variants.of("Jane"));
mainObject.put("scores", scores);
mainObject.put("address", address);

testFunction.test(metadata, mainObject);
}

@FunctionalInterface
public interface VariantTestFunction {
void test(VariantMetadata metadata, VariantValue value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.VariantType;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -200,4 +201,9 @@ public Type visit(RowType rowType) {

return Types.StructType.of(newFields);
}

@Override
public Type visit(VariantType variantType) {
return Types.VariantType.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.VariantType;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
Expand Down Expand Up @@ -83,6 +84,11 @@ public LogicalType map(Types.MapType map, LogicalType keyResult, LogicalType val
return new MapType(keyResult.copy(false), valueResult.copy(map.isValueOptional()));
}

@Override
public LogicalType variant(Types.VariantType variant) {
return new VariantType();
}

@Override
public LogicalType primitive(Type.PrimitiveType primitive) {
switch (primitive.typeId()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand All @@ -34,12 +35,16 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.types.variant.BinaryVariant;
import org.apache.flink.types.variant.Variant;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetVariantReaders.DelegatingValueReader;
import org.apache.iceberg.parquet.ParquetVariantVisitor;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
import org.apache.iceberg.parquet.VariantReaderBuilder;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -138,6 +143,17 @@ private ParquetValueReader<?> defaultReader(
throw new IllegalArgumentException(String.format("Missing required field: %s", field.name()));
}

@Override
public ParquetVariantVisitor<ParquetValueReader<?>> variantVisitor() {
return new VariantReaderBuilder(type, Arrays.asList(currentPath()));
}

@Override
public ParquetValueReader<?> variant(
Types.VariantType expected, GroupType variant, ParquetValueReader<?> variantReader) {
return new VariantReader(variantReader);
}

@Override
public ParquetValueReader<?> list(
Types.ListType expectedList, GroupType array, ParquetValueReader<?> elementReader) {
Expand Down Expand Up @@ -853,4 +869,28 @@ public double[] toDoubleArray() {
return ArrayUtil.toPrimitive((Double[]) values);
}
}

/** Variant reader to convert from Iceberg Variant to Flink Variant */
private static class VariantReader
extends DelegatingValueReader<org.apache.iceberg.variants.Variant, Variant> {
@SuppressWarnings("unchecked")
private VariantReader(ParquetValueReader<?> reader) {
super((ParquetValueReader<org.apache.iceberg.variants.Variant>) reader);
}

@Override
public Variant read(Variant reuse) {
org.apache.iceberg.variants.Variant icebergVariant = super.readFromDelegate(null);

byte[] metadataBytes = new byte[icebergVariant.metadata().sizeInBytes()];
ByteBuffer metadataBuffer = ByteBuffer.wrap(metadataBytes).order(ByteOrder.LITTLE_ENDIAN);
icebergVariant.metadata().writeTo(metadataBuffer, 0);

byte[] valueBytes = new byte[icebergVariant.value().sizeInBytes()];
ByteBuffer valueBuffer = ByteBuffer.wrap(valueBytes).order(ByteOrder.LITTLE_ENDIAN);
icebergVariant.value().writeTo(valueBuffer, 0);

return new BinaryVariant(valueBytes, metadataBytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@
*/
package org.apache.iceberg.flink.data;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.MapData;
Expand All @@ -37,15 +41,25 @@
import org.apache.flink.table.types.logical.RowType.RowField;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VariantType;
import org.apache.flink.types.variant.BinaryVariant;
import org.apache.flink.types.variant.Variant;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.flink.FlinkRowData;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
import org.apache.iceberg.parquet.ParquetVariantVisitor;
import org.apache.iceberg.parquet.TripleWriter;
import org.apache.iceberg.parquet.VariantWriterBuilder;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.DecimalUtil;
import org.apache.iceberg.variants.VariantMetadata;
import org.apache.iceberg.variants.VariantValue;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
Expand Down Expand Up @@ -85,6 +99,14 @@ public ParquetValueWriter<?> message(
return struct(sStruct, message.asGroupType(), fields);
}

@Override
public ParquetValueWriter<?> variant(VariantType sVariant, GroupType variant) {
ParquetValueWriter<?> writer =
ParquetVariantVisitor.visit(
variant, new VariantWriterBuilder(type, Arrays.asList(currentPath())));
return new VariantWriter(writer);
}

@Override
public ParquetValueWriter<?> struct(
RowType sStruct, GroupType struct, List<ParquetValueWriter<?>> fieldWriters) {
Expand Down Expand Up @@ -588,6 +610,49 @@ public Map.Entry<K, V> next() {
}
}

/** Variant writer converts from VariantVal to Variant */
private static class VariantWriter implements ParquetValueWriter<Variant> {
private final ParquetValueWriter<org.apache.iceberg.variants.Variant> writer;

@SuppressWarnings("unchecked")
private VariantWriter(ParquetValueWriter<?> writer) {
this.writer = (ParquetValueWriter<org.apache.iceberg.variants.Variant>) writer;
}

@Override
public void write(int repetitionLevel, Variant variant) {
Preconditions.checkArgument(
variant instanceof BinaryVariant,
"Expected BinaryVariant but got: %s",
variant.getClass().getSimpleName());

BinaryVariant binaryVariant = (BinaryVariant) variant;
VariantMetadata metadata =
VariantMetadata.from(
ByteBuffer.wrap(binaryVariant.getMetadata()).order(ByteOrder.LITTLE_ENDIAN));
VariantValue value =
VariantValue.from(
metadata, ByteBuffer.wrap(binaryVariant.getValue()).order(ByteOrder.LITTLE_ENDIAN));

writer.write(repetitionLevel, org.apache.iceberg.variants.Variant.of(metadata, value));
}

@Override
public List<TripleWriter<?>> columns() {
return writer.columns();
}

@Override
public void setColumnStore(ColumnWriteStore columnStore) {
writer.setColumnStore(columnStore);
}

@Override
public Stream<FieldMetrics<?>> metrics() {
return writer.metrics();
}
}

private static class RowDataWriter extends ParquetValueWriters.StructWriter<RowData> {
private final RowData.FieldGetter[] fieldGetter;

Expand Down
Loading
Loading