Skip to content

Flink: Support Variant to Flink 2.1#15265

Open
Guosmilesmile wants to merge 3 commits intoapache:mainfrom
Guosmilesmile:variant
Open

Flink: Support Variant to Flink 2.1#15265
Guosmilesmile wants to merge 3 commits intoapache:mainfrom
Guosmilesmile:variant

Conversation

@Guosmilesmile
Copy link
Contributor

@Guosmilesmile Guosmilesmile commented Feb 8, 2026

Starting from Flink 2.1, variant is supported. This PR adds variant support based on that.

Overall, the logic in this PR relies on the existing Parquet variant reader/writer to handle most of the work.

For test cases, this PR adds validations for type conversions and an end-to-end verification from the Flink SQL side.

This PR is developed based on #14259.

Here’s a summary of the changes:

  1. Type Mapping Additions:

    • Added support for converting between Flink’s VariantType and Iceberg’s VariantType in both directions (FlinkTypeToType and TypeToFlinkType).
  2. Parquet Read/Write Integration:

    • Implemented Parquet readers and writers for the Variant type. This enables seamless reading and writing of Variant data as BinaryVariant objects for Flink and as Variant objects for Iceberg internal representation.
  3. Schema Visitor Enhancements:

    • Updated Parquet schema visitors (ParquetWithFlinkSchemaVisitor) to recognize and properly handle Variant logical types and annotations.
  4. Enhanced Testing:

    • Introduced new and improved tests for Variant support in Flink, including:
      • End-to-end tests for reading and writing Variant columns via Flink SQL (TestFlinkVariantType).
      • Comprehensive Parquet round-trip and type-conversion tests (TestFlinkVariants), parameterized for multiple Variant subtypes.
      • Created a shared VariantTestHelper for reusable Variant test cases across both Flink and Spark integration tests.
    • Refactored Spark tests to use the new shared Variant test data.

@Guosmilesmile Guosmilesmile marked this pull request as ready for review February 8, 2026 13:36
@pvary
Copy link
Contributor

pvary commented Feb 8, 2026

CC: @mxm, @gyfora

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @Guosmilesmile! I'll take a look. Just checking, did we talk with @talatuyarer that we can continue his work?

@Guosmilesmile
Copy link
Contributor Author

I noticed it has been closed for a while. So I opened a new PR to continue this work. I will ask him whether it’s appropriate for me to keep pushing this forward.

@talatuyarer
Copy link
Contributor

It just closed due to inactivity I was waiting someone review like @mxm :) You can use my code build on top of it. Having variant type support is important for me. I am happy to give a review also whatever works for you @Guosmilesmile

@Guosmilesmile
Copy link
Contributor Author

@talatuyarer Thank you for your reply. I noticed that the issues raised by Steven and Aihuaxu's reviews in the previous PR were not resolved, and it was closed due to prolonged inactivity, so I created a new PR to continue this feature. This PR fixes those issues and some bug,also adds some end-to-end test cases.

Which PR to continue this feature in is up to you.

@mxm
Copy link
Contributor

mxm commented Feb 12, 2026

Thank you two for your replies! Looks like @talatuyarer is fine with continuing here. I'll continue reviewing.

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks mostly good to me @Guosmilesmile / @talatuyarer! Nice work.

Comment on lines +62 to +67
public static final VariantPrimitive<?>[] UNSUPPORTED_PRIMITIVES =
new VariantPrimitive[] {
Variants.ofIsoTime("12:33:54.123456"),
Variants.ofIsoTimestamptzNanos("2024-11-07T12:33:54.123456789+00:00"),
Variants.ofIsoTimestampntzNanos("2024-11-07T12:33:54.123456789"),
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which sense are those unsupported? Only in Spark?

Copy link
Contributor Author

@Guosmilesmile Guosmilesmile Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Flink is also unsupported.

https://github.com/apache/flink/blob/1779d20a28f5d43410138314fe3292d0e64f3148/flink-core/src/main/java/org/apache/flink/types/variant/BinaryVariantUtil.java#L315-L376

The assertions in this test case weren’t written well, so I adjusted them now.

icebergVariant.metadata().writeTo(metadataBuffer, 0);

byte[] valueBytes = new byte[icebergVariant.value().sizeInBytes()];
ByteBuffer valueBuffer = ByteBuffer.wrap(valueBytes).order(java.nio.ByteOrder.LITTLE_ENDIAN);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on the encoding.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants