-
Notifications
You must be signed in to change notification settings - Fork 492
[lake/iceberg] Support MAP type in Iceberg tables #2367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[lake/iceberg] Support MAP type in Iceberg tables #2367
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request adds support for MAP data types in Iceberg tables for Fluss, addressing issue #2258. The implementation enables conversion of Fluss MAP types to Iceberg MapType and provides runtime support for reading and writing map data.
Changes:
- Implemented MAP type conversion from Fluss to Iceberg format in
FlussDataTypeToIcebergDataType - Added
FlussMapAsIcebergMapadapter class for converting Fluss InternalMap to Java Map for Iceberg - Enhanced
FlussArrayAsIcebergListandFlussRowAsIcebergRecordto support nested MAP types - Added FLOAT and DOUBLE support to
IcebergBinaryRowWriter.createFieldWriter - Added explicit error handling for MAP and ARRAY types in bucket key writer
- Comprehensive test coverage for MAP type conversions, including nested structures
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
FlussDataTypeToIcebergDataType.java |
Replaced UnsupportedOperationException with actual MAP type conversion logic, handling field ID allocation |
FlussMapAsIcebergMap.java |
New adapter class for converting Fluss InternalMap to Java Map, with support for all scalar types and nested collections |
FlussRowAsIcebergRecord.java |
Added MAP type handling in field converter creation |
FlussArrayAsIcebergList.java |
Added support for MAP elements within arrays |
IcebergBinaryRowWriter.java |
Added FLOAT/DOUBLE type support and explicit error messages for non-scalar bucket key types |
FlussDataTypeToIcebergDataTypeMapTest.java |
New comprehensive test suite covering MAP type conversions with various key-value type combinations |
FlussRowAsIcebergRecordTest.java |
Added integration tests for MAP data conversion including nested structures |
IcebergBinaryRowWriterTest.java |
Added tests to verify MAP and ARRAY types are properly rejected as bucket keys |
Comments suppressed due to low confidence (1)
fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java:118
- The get method is missing support for RowType, which is a supported nested type in Arrays. When an Array contains Row values (e.g., Array<Row<id: Int, name: String>>), this method will throw an UnsupportedOperationException. Add a case to handle RowType by retrieving the InternalRow from the array and converting it appropriately, similar to how it's done in FlussRowAsIcebergRecord.createTypeConverter.
public Object get(int index) {
if (flussArray.isNullAt(index)) {
return null;
}
if (elementType instanceof BooleanType) {
return flussArray.getBoolean(index);
} else if (elementType instanceof TinyIntType) {
return (int) flussArray.getByte(index);
} else if (elementType instanceof SmallIntType) {
return (int) flussArray.getShort(index);
} else if (elementType instanceof IntType) {
return flussArray.getInt(index);
} else if (elementType instanceof BigIntType) {
return flussArray.getLong(index);
} else if (elementType instanceof FloatType) {
return flussArray.getFloat(index);
} else if (elementType instanceof DoubleType) {
return flussArray.getDouble(index);
} else if (elementType instanceof StringType) {
return flussArray.getString(index).toString();
} else if (elementType instanceof CharType) {
CharType charType = (CharType) elementType;
return flussArray.getChar(index, charType.getLength()).toString();
} else if (elementType instanceof BytesType || elementType instanceof BinaryType) {
return ByteBuffer.wrap(flussArray.getBytes(index));
} else if (elementType instanceof DecimalType) {
DecimalType decimalType = (DecimalType) elementType;
return flussArray
.getDecimal(index, decimalType.getPrecision(), decimalType.getScale())
.toBigDecimal();
} else if (elementType instanceof LocalZonedTimestampType) {
LocalZonedTimestampType ltzType = (LocalZonedTimestampType) elementType;
return toIcebergTimestampLtz(
flussArray.getTimestampLtz(index, ltzType.getPrecision()).toInstant());
} else if (elementType instanceof TimestampType) {
TimestampType tsType = (TimestampType) elementType;
return flussArray.getTimestampNtz(index, tsType.getPrecision()).toLocalDateTime();
} else if (elementType instanceof DateType) {
return DateTimeUtils.toLocalDate(flussArray.getInt(index));
} else if (elementType instanceof TimeType) {
return DateTimeUtils.toLocalTime(flussArray.getInt(index));
} else if (elementType instanceof ArrayType) {
InternalArray innerArray = flussArray.getArray(index);
return innerArray == null
? null
: new FlussArrayAsIcebergList(
innerArray, ((ArrayType) elementType).getElementType());
} else if (elementType instanceof MapType) {
MapType mapType = (MapType) elementType;
InternalMap internalMap = flussArray.getMap(index);
return new FlussMapAsIcebergMap(
internalMap, mapType.getKeyType(), mapType.getValueType());
} else {
throw new UnsupportedOperationException(
"Unsupported array element type conversion for Fluss type: "
+ elementType.getClass().getSimpleName());
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...ss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussMapAsIcebergMap.java
Show resolved
Hide resolved
...-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
Show resolved
Hide resolved
...lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
Show resolved
Hide resolved
ea54108 to
c5156b9
Compare
|
All issues raised by Copilot have been resolved. @luoyuxia |
|
@MehulBatra Could you please help review this pr? |
|
Already in my radar, Will try to review by EOD! @pithecuse527 @luoyuxia |
|
@pithecuse527 can you run |
c5156b9 to
62696c5
Compare
|
@MehulBatra |
62696c5 to
572accb
Compare
luoyuxia
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pithecuse527 Thanks for the pr. I left some comments. PTAL.
Also, please don't forget to add IT in FlinkUnionReadLogTableITCase. Refer to pr #2278
...eberg/src/test/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataTypeMapTest.java
Outdated
Show resolved
Hide resolved
...-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
Outdated
Show resolved
Hide resolved
572accb to
300c831
Compare
|
I've added the integration test for iceberg map in |
luoyuxia
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pithecuse527 Thanks for the pr. LGTM overall. Left minor comments.
...-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
Outdated
Show resolved
Hide resolved
...-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
Outdated
Show resolved
Hide resolved
38f09dd to
739696c
Compare
739696c to
2de888c
Compare
luoyuxia
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pithecuse527 Thanks for the pr. LGTM!
|
@pithecuse527 But the ci still fails. Also could you please also update the doc for |
Purpose
Linked issue: #2258
Brief change log
Tests
API and Format
Documentation