feat: implement FlussMap support for compacted rows and binary encoding#530
feat: implement FlussMap support for compacted rows and binary encoding#530qzyu999 wants to merge 11 commits intoapache:mainfrom
Conversation
|
@charlesdong1991 @fresh-borzoni Appreciate if you guys can have a look as well. |
leekeiabstraction
left a comment
There was a problem hiding this comment.
TY for the PR! Left an early comment.
| fn write_array(&mut self, value: &[u8]); | ||
|
|
||
| fn write_map(&mut self, value: &[u8]); | ||
|
|
There was a problem hiding this comment.
Appreciate that you are following convention from write_array.
I'm thinking about the ease of read/use of this signature. The caller would have to know in advance how to get the map into the form of a single byte array.
Why not pass FlussMap? Save some .as_bytes() calls and get the benefit of stronger compile time check vs. passing byte array.
There was a problem hiding this comment.
Hi @leekeiabstraction, thanks for the suggestion, made appropriate changes in 9f995f8.
leekeiabstraction
left a comment
There was a problem hiding this comment.
TY for the PR! Added comments, PTAL @qzyu999
| use crate::record::from_arrow_type; | ||
| use crate::row::binary_array::FlussArrayWriter; | ||
| use crate::row::binary_map::FlussMap; | ||
| use arrow::array::MapArray; |
There was a problem hiding this comment.
Let's follow current code convention and have use at the top of the file.
There was a problem hiding this comment.
Hi @leekeiabstraction, this has been addressed in 558b7ee.
There was a problem hiding this comment.
Let's follow current code convention and have use at the top of the file.
There was a problem hiding this comment.
Hi @leekeiabstraction, this has been addressed in 558b7ee.
| writer.complete() | ||
| } | ||
|
|
||
| fn get_map(&self, pos: usize) -> Result<crate::row::binary_map::FlussMap> { |
There was a problem hiding this comment.
We can make this less verbose by declaring use at the top of the file
fn get_map(&self, pos: usize) -> Result<FlussMap>There was a problem hiding this comment.
Hi @leekeiabstraction, this has been addressed in 558b7ee.
There was a problem hiding this comment.
We can make this less verbose by declaring use at the top of the file
fn get_array(&self, pos: usize) -> Result<FlussArray>There was a problem hiding this comment.
Hi @leekeiabstraction, this has been addressed in 558b7ee.
| fn write_array(&mut self, value: &crate::row::FlussArray); | ||
|
|
||
| fn write_map(&mut self, value: &crate::row::FlussMap); |
There was a problem hiding this comment.
We can make this less verbose by declaring use at the top of the file.
fn write_array(&mut self, value: &FlussArray);
fn write_map(&mut self, value: &FlussMap);There was a problem hiding this comment.
Hi @leekeiabstraction, this has been addressed in 558b7ee.
| let mut bad_data2 = vec![]; | ||
| bad_data2.extend_from_slice(&large_size); | ||
| bad_data2.extend_from_slice(&[0, 0, 0, 0]); | ||
| assert!(FlussMap::from_bytes(&bad_data2).is_err()); |
There was a problem hiding this comment.
What's the error message in this case?
There was a problem hiding this comment.
Hi @leekeiabstraction, this has been addressed in 471016f.
| // Get the entries for this specific row | ||
| let entries = map_arr.value(self.row_id); | ||
| let struct_arr = entries | ||
| .as_any() | ||
| .downcast_ref::<arrow::array::StructArray>() | ||
| .ok_or_else(|| IllegalArgument { | ||
| message: "Expected StructArray inside MapArray entries".to_string(), | ||
| })?; | ||
|
|
||
| if struct_arr.num_columns() != 2 { | ||
| return Err(IllegalArgument { | ||
| message: format!( | ||
| "Expected 2 columns in Map entries struct, got {}", | ||
| struct_arr.num_columns() | ||
| ), | ||
| }); | ||
| } | ||
|
|
||
| let keys_arrow = struct_arr.column(0); | ||
| let values_arrow = struct_arr.column(1); | ||
| let key_fluss_type = from_arrow_type(keys_arrow.data_type())?; | ||
| let value_fluss_type = from_arrow_type(values_arrow.data_type())?; | ||
|
|
||
| let len = keys_arrow.len(); | ||
|
|
||
| // Convert Arrow keys → FlussArray | ||
| let mut key_writer = FlussArrayWriter::new(len, &key_fluss_type); | ||
| write_arrow_values_to_fluss_array(&**keys_arrow, &key_fluss_type, &mut key_writer)?; | ||
| let key_array = key_writer.complete()?; | ||
|
|
||
| // Convert Arrow values → FlussArray | ||
| let mut value_writer = FlussArrayWriter::new(len, &value_fluss_type); | ||
| write_arrow_values_to_fluss_array(&**values_arrow, &value_fluss_type, &mut value_writer)?; | ||
| let value_array = value_writer.complete()?; | ||
|
|
||
| FlussMap::from_arrays(&key_array, &value_array) |
There was a problem hiding this comment.
These lines which parse from StructArray to FlussMap seem duplicated, let's apply DRY and refactor.
There was a problem hiding this comment.
Hi @leekeiabstraction, this has been addressed in 558b7ee.
| let entries = map_arr.value(i); | ||
| let struct_arr = entries | ||
| .as_any() | ||
| .downcast_ref::<arrow::array::StructArray>() | ||
| .ok_or_else(|| IllegalArgument { | ||
| message: "Expected StructArray inside MapArray entries".to_string(), | ||
| })?; | ||
| let keys_arrow = struct_arr.column(0); | ||
| let values_arrow = struct_arr.column(1); | ||
| let key_fluss_type = from_arrow_type(keys_arrow.data_type())?; | ||
| let value_fluss_type = from_arrow_type(values_arrow.data_type())?; | ||
| let entry_len = keys_arrow.len(); | ||
|
|
||
| let mut key_w = FlussArrayWriter::new(entry_len, &key_fluss_type); | ||
| write_arrow_values_to_fluss_array(&**keys_arrow, &key_fluss_type, &mut key_w)?; | ||
| let key_array = key_w.complete()?; | ||
|
|
||
| let mut val_w = FlussArrayWriter::new(entry_len, &value_fluss_type); | ||
| write_arrow_values_to_fluss_array( | ||
| &**values_arrow, | ||
| &value_fluss_type, | ||
| &mut val_w, | ||
| )?; | ||
| let value_array = val_w.complete()?; | ||
|
|
||
| let fluss_map = | ||
| crate::row::binary_map::FlussMap::from_arrays(&key_array, &value_array)?; |
There was a problem hiding this comment.
These lines which parse from StructArray to FlussMap seem duplicated, let's apply DRY and refactor.
There was a problem hiding this comment.
+1, parsing logic seems very similar if not identical, can be wrapped to a helper function
There was a problem hiding this comment.
Hi @leekeiabstraction and @charlesdong1991, this has been addressed in 558b7ee.
| pub fn from_bytes(data: &[u8]) -> Result<Self> { | ||
| let (key_array, value_array) = Self::validate(data)?; | ||
| Ok(FlussMap { | ||
| data: Bytes::copy_from_slice(data), | ||
| key_array, | ||
| value_array, | ||
| }) | ||
| } | ||
|
|
||
| /// Creates a FlussMap from owned bytes without copying. | ||
| pub fn from_owned_bytes(data: Bytes) -> Result<Self> { |
There was a problem hiding this comment.
Do these need to be pub? Can they be pub(crate)?
There was a problem hiding this comment.
Hi @leekeiabstraction, this has been addressed in 471016f.
| /// Creates a FlussMap by combining a key array and a value array. | ||
| /// | ||
| /// Copies both arrays into a new contiguous buffer. | ||
| pub fn from_arrays(key_array: &FlussArray, value_array: &FlussArray) -> Result<Self> { |
There was a problem hiding this comment.
IIUC, this is the main method through which we expect end user to use FlussMap. This method signature is rather clunky for end user to use because they need to initialise two FlussArray.
Let's use a more user friendly signature e.g. something that accepts HashMap
There was a problem hiding this comment.
Hi @leekeiabstraction, thanks for your reply I think you bring up a valid point.
My understanding is that because FlussMap writes to a strict binary layout, we need the exact DataType schema (like DataTypes::int()) at runtime to initialize the FlussArrayWriters.
If we tried to accept a standard HashMap<K, V>, we wouldn't have that Fluss schema metadata inside the method to know how to properly pack the bytes. Because of that, it seemed like from_arrays had to be the required low-level API since the caller has already explicitly built the arrays with the correct schemas.
IIUC, it would make sense to keep from_arrays for the core API, but maybe add a helper method in test_utils like from_hashmap(map, key_schema, val_schema) to make the test UX nicer? Please let me know what your thoughts are about this.
There was a problem hiding this comment.
Hello @qzyu999 ,
I realise I should elaborate more. Suppose a client wants to set a map field on generic row with current pattern, if I understand correctly the pattern that they can use with current shape of API is below:
let mut row = GenericRow::new(2);
row.set_field(0, i as i32);
let keyWriter = FlussArrayWriter::new(2, &DataType::String());
keyWriter.write_string(0,"hello");
keyWriter.write_string(1,"bonjour");
let valueWriter = FlussArrayWriter::new(2, &DataType::Int());
valueWriter.write_int(0,1);
valueWriter.write_int(1,2);
row.set_field(1, FlussMap::from_arrays(keyWriter.complete()?, valueWriter.complete()?));This is rather verbose and error prone. We can instead we have a FlussMapWriter, that allows clients to use a more Map like write pattern e.g. mapWriter.write_entry(A, B) or mapWriter.write_map(map). Type checking, conversion to FlussArray etc. can happen within FlussMapWriter.
fresh-borzoni
left a comment
There was a problem hiding this comment.
@qzyu999 Ty for the PR, GH is acting weird, so I'll paste my review like this:
crates/fluss/src/row/column_writer.rs:563(append_null)
When Datum::Null is appended to a Map column, the path falls through with_builder! and crashes at runtime.
Probably we need an arm for Map as well
crates/fluss/src/row/binary_map.rs:172 and probably in datatype.rs
Shall we check that key is not null?
charlesdong1991
left a comment
There was a problem hiding this comment.
Thanks for the nice PR 👍 left some comments!
| } | ||
|
|
||
| #[test] | ||
| fn test_write_map_type() { |
There was a problem hiding this comment.
can you add a test where a NULL Map value is written?
There was a problem hiding this comment.
Hi @charlesdong1991, this has been addressed in 21e8ca9.
| let entries = map_arr.value(i); | ||
| let struct_arr = entries | ||
| .as_any() | ||
| .downcast_ref::<arrow::array::StructArray>() | ||
| .ok_or_else(|| IllegalArgument { | ||
| message: "Expected StructArray inside MapArray entries".to_string(), | ||
| })?; | ||
| let keys_arrow = struct_arr.column(0); | ||
| let values_arrow = struct_arr.column(1); | ||
| let key_fluss_type = from_arrow_type(keys_arrow.data_type())?; | ||
| let value_fluss_type = from_arrow_type(values_arrow.data_type())?; | ||
| let entry_len = keys_arrow.len(); | ||
|
|
||
| let mut key_w = FlussArrayWriter::new(entry_len, &key_fluss_type); | ||
| write_arrow_values_to_fluss_array(&**keys_arrow, &key_fluss_type, &mut key_w)?; | ||
| let key_array = key_w.complete()?; | ||
|
|
||
| let mut val_w = FlussArrayWriter::new(entry_len, &value_fluss_type); | ||
| write_arrow_values_to_fluss_array( | ||
| &**values_arrow, | ||
| &value_fluss_type, | ||
| &mut val_w, | ||
| )?; | ||
| let value_array = val_w.complete()?; | ||
|
|
||
| let fluss_map = | ||
| crate::row::binary_map::FlussMap::from_arrays(&key_array, &value_array)?; |
There was a problem hiding this comment.
+1, parsing logic seems very similar if not identical, can be wrapped to a helper function
| builder: &mut dyn ArrayBuilder, | ||
| data_type: &arrow_schema::DataType, | ||
| ) -> Result<()> { | ||
| use crate::record::from_arrow_type; |
There was a problem hiding this comment.
Does Arrow type support Map? can we add a test to see if it does or not?
There was a problem hiding this comment.
Hi @charlesdong1991, this has been addressed in fa3adfa.
| } | ||
|
|
||
| fn write_map(&mut self, _value: &crate::row::FlussMap) { | ||
| panic!("Iceberg key columns do not support map values"); |
There was a problem hiding this comment.
With a second thought now, i feel it might be better to return an error, instead of panicing, the same applies to array.
There was a problem hiding this comment.
Hi @charlesdong1991, I addressed this in 94fccea where I return an unreachable!() instead, which I believe should work since the function returns void. To return an error my understanding is that it would require rewriting the BinaryWriter trait which would lead to a much larger refactor of all other code that touches it.
|
|
||
| assert!(FlussMap::from_arrays(&key_array, &value_array).is_err()); | ||
| } | ||
| } |
There was a problem hiding this comment.
i wonder if we can add a test for nested Map?
There was a problem hiding this comment.
Hi @charlesdong1991, this has been addressed in 471016f.
…p parsing - Move inline `use` statements for `FlussArray`, `FlussMap`, and Arrow types to the top of `column.rs` and `binary_writer.rs`. - Simplify method signatures for `get_map`, `get_array`, `write_map`, and `write_array` to avoid verbose module paths. - Extract duplicated logic for converting Arrow `StructArray` into `FlussMap` into a new `arrow_map_entry_to_fluss_map` helper in `column.rs`. - Optimize the new helper to accept `&StructArray` directly, removing redundant downcasts.
- Changed the visibility of from_bytes and from_owned_bytes to pub(crate). - Added a validation loop inside from_arrays that checks for null keys, safely returning an IllegalArgument error. - Refined test_invalid_data and test_mismatched_array_sizes to strictly assert their exact error messages. - Hardcoded the generated map byte sequences in test_round_trip_int_to_string_map and test_map_with_null_values to lock them down from regression. - Added a comprehensive test_nested_map case that validates a MAP<STRING, MAP<INT, STRING>> structure.
Hi @fresh-borzoni, this has been addressed in 471016f. |
- Updated MapType::with_nullable to force the key_type to be non-nullable via Box::new(key_type.as_non_nullable()). - Updated the Display traits in datatype.rs tests and the Arrow conversion tests in arrow.rs to correctly assert the updated schema behavior (NOT NULL map keys).
- Added the missing TypedWriter::Map arm inside append_null() to properly write an empty map entry (updating offsets and pushing a false validity) whenever Datum::Null is provided for a map type. - Implemented test_write_null_map_type to verify that Datum::Null successfully converts into a null Map field correctly.
Hi @fresh-borzoni, this has been addressed in 21e8ca9. |
Hi @fresh-borzoni, this has been addressed in 4c06cd9. |
|
Hi @leekeiabstraction, @fresh-borzoni, and @charlesdong1991, thank you all for the review suggestions. I've replied to all of your comments and have made corresponding changes, PTAL. |
There was a problem hiding this comment.
@qzyu999 Ty for the changes, looked through, looks good, left comments, PTAL
Also can we add simple e2e integration test to see if everything is wired correctly?
Scenario:
- Create a table with one Map column
- Insert one row
- Fetch it back
- Assert the map round-trips (size + at least one key/value)
| fn write_array(&mut self, _value: &[u8]) { | ||
| panic!("Iceberg key columns do not support array values"); | ||
| fn write_array(&mut self, _value: &FlussArray) { | ||
| unreachable!("Array/Map types are rejected during value writer creation"); |
There was a problem hiding this comment.
It's the same panic under the hood
There was a problem hiding this comment.
Hi @fresh-borzoni, I responded to @charlesdong1991 regarding this with the following:
Hi @charlesdong1991, I addressed this in 94fccea where I return an unreachable!() instead, which I believe should work since the function returns void. To return an error my understanding is that it would require rewriting the BinaryWriter trait which would lead to a much larger refactor of all other code that touches it.
| impl FlussMap { | ||
| /// Validates the raw bytes and extracts the sub-arrays. | ||
| fn validate(data: &[u8]) -> Result<(FlussArray, FlussArray)> { | ||
| if data.len() < 4 { |
There was a problem hiding this comment.
validate accepts trailing bytes - FlussArray::from_bytes only checks the header, not that the slice is fully consumed
Could we assert 4 + key_extent + value_extent == data.len() at the end of validate?
| }); | ||
| } | ||
|
|
||
| for i in 0..key_array.size() { |
There was a problem hiding this comment.
The null-key check is in from_arrays but not from_bytes. With non-nullable keys, column_writerskips the null check and writes garbage to Arrow.
Could the check move into validate so all paths share it?
| Field::new("key", key_type, map_type.key_type().is_nullable()), | ||
| Field::new("value", value_type, map_type.value_type().is_nullable()), | ||
| ]; | ||
| ArrowDataType::Map( |
There was a problem hiding this comment.
Arrow spec requires the entries field of Map to be non-nullable
Should be Field::new("entries", ..., false).
| } | ||
| }; | ||
|
|
||
| let key_fluss_type = from_arrow_type(&key_arrow_type)?; |
There was a problem hiding this comment.
Key/value Fluss types come from the Arrow type, not the FlussMap. What if is they mismatch?
Purpose
Linked issue: close #387
This pull request introduces support for the MAP data type in the Fluss Rust client. It allows developers to define associative arrays (key-value pairs) in their table schemas and provides the underlying binary representation (FlussMap) necessary to process these fields, ensuring wire compatibility with the Java client's BinaryMap.
Brief change log
MapTypeto theDataTypeenum and implemented theDataTypes::map()factory. Updatedeq_ignore_nullableto support recursive map comparison.FlussMapincrates/fluss/src/row/binary_map.rs. This provides a zero-copy, wire-compatible wrapper for the[4B key_size] + [keys] + [values]binary layout.InternalRowtrait withget_map()and implemented it acrossGenericRow,CompactedRow,LookupRow,ProjectedRow, and nestedFlussArrayaccessors.write_mapinBinaryWriter,CompactedRowWriter, andCompactedKeyWriter.CompactedRowDeserializerto handleMAPdata type reconstruction.FieldGetterandInnerFieldGettersupport for dynamically accessing map fields.ColumnarRow::get_map()to convert ArrowMapArraystructures intoFlussMap.Datum::Map::append_tousing Arrow'sMapBuilderfor seamless integration with the Arrow ecosystem.ColumnWriterto support building ArrowMapArrayoutputs from Fluss rows.Datum::Mapduring serialization and row processing.MAPtypes as primary or bucketing keys in theCompactedKeyWriter.Tests
Added unit tests covering the new functionality:
binary_map.rsfor round-trips, empty maps, null values, and malformed buffer validation.compacted_row.rscovering basic maps, nullable map columns, nested maps (Map<String, Array<Int>>), and empty maps.ColumnWriterunit test to verify thatMAPcolumns are correctly serialized into valid ArrowMapArraystructures.test_encode_map_rejectedto ensureMAPcannot be incorrectly used as a key type.API and Format
DataTypes::map()for schemas androw.get_map()for data retrieval.Documentation
Yes. This adds the
MAPdata type. User guides will need updating to reflect thatMAPis now a supported type in the Rust client.