Flink: Support Variant to Flink 2.1#15265
Conversation
f2a9efa to
b6fd571
Compare
mxm
left a comment
There was a problem hiding this comment.
Thanks for the PR @Guosmilesmile! I'll take a look. Just checking, did we talk with @talatuyarer that we can continue his work?
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/ParquetWithFlinkSchemaVisitor.java
Outdated
Show resolved
Hide resolved
|
I noticed it has been closed for a while. So I opened a new PR to continue this work. I will ask him whether it’s appropriate for me to keep pushing this forward. |
|
It just closed due to inactivity I was waiting someone review like @mxm :) You can use my code build on top of it. Having variant type support is important for me. I am happy to give a review also whatever works for you @Guosmilesmile |
|
@talatuyarer Thank you for your reply. I noticed that the issues raised by Steven and Aihuaxu's reviews in the previous PR were not resolved, and it was closed due to prolonged inactivity, so I created a new PR to continue this feature. This PR fixes those issues and some bug,also adds some end-to-end test cases. Which PR to continue this feature in is up to you. |
|
Thank you two for your replies! Looks like @talatuyarer is fine with continuing here. I'll continue reviewing. |
mxm
left a comment
There was a problem hiding this comment.
Looks mostly good to me @Guosmilesmile / @talatuyarer! Nice work.
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
Outdated
Show resolved
Hide resolved
| public static final VariantPrimitive<?>[] UNSUPPORTED_PRIMITIVES = | ||
| new VariantPrimitive[] { | ||
| Variants.ofIsoTime("12:33:54.123456"), | ||
| Variants.ofIsoTimestamptzNanos("2024-11-07T12:33:54.123456789+00:00"), | ||
| Variants.ofIsoTimestampntzNanos("2024-11-07T12:33:54.123456789"), | ||
| }; |
There was a problem hiding this comment.
In which sense are those unsupported? Only in Spark?
There was a problem hiding this comment.
In Flink is also unsupported.
The assertions in this test case weren’t written well, so I adjusted them now.
| icebergVariant.metadata().writeTo(metadataBuffer, 0); | ||
|
|
||
| byte[] valueBytes = new byte[icebergVariant.value().sizeInBytes()]; | ||
| ByteBuffer valueBuffer = ByteBuffer.wrap(valueBytes).order(java.nio.ByteOrder.LITTLE_ENDIAN); |
Starting from Flink 2.1, variant is supported. This PR adds variant support based on that.
Overall, the logic in this PR relies on the existing Parquet variant reader/writer to handle most of the work.
For test cases, this PR adds validations for type conversions and an end-to-end verification from the Flink SQL side.
This PR is developed based on #14259.
Here’s a summary of the changes:
Type Mapping Additions:
VariantTypeand Iceberg’sVariantTypein both directions (FlinkTypeToTypeandTypeToFlinkType).Parquet Read/Write Integration:
BinaryVariantobjects for Flink and as Variant objects for Iceberg internal representation.Schema Visitor Enhancements:
ParquetWithFlinkSchemaVisitor) to recognize and properly handle Variant logical types and annotations.Enhanced Testing:
TestFlinkVariantType).TestFlinkVariants), parameterized for multiple Variant subtypes.VariantTestHelperfor reusable Variant test cases across both Flink and Spark integration tests.