Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bindings/cpp/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ pub fn resolve_row_types(
Datum::TimestampNtz(ts) => Datum::TimestampNtz(*ts),
Datum::TimestampLtz(ts) => Datum::TimestampLtz(*ts),
Datum::Array(a) => Datum::Array(a.clone()),
Datum::Map(m) => Datum::Map(m.clone()),
};
out.set_field(idx, resolved);
}
Expand Down Expand Up @@ -519,6 +520,9 @@ pub fn compacted_row_to_owned(
Datum::Blob(Cow::Owned(row.get_binary(i, dt.length())?.to_vec()))
}
fcore::metadata::DataType::Array(_) => Datum::Array(row.get_array(i)?),
fcore::metadata::DataType::Map(mt) => {
Datum::Map(row.get_map(i, mt.key_type(), mt.value_type())?)
}
other => return Err(anyhow!("Unsupported data type for column {i}: {other:?}")),
};

Expand Down
1 change: 1 addition & 0 deletions bindings/python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1322,6 +1322,7 @@ fn python_value_to_datum(
}
}
Datum::Array(v) => writer.write_array(i, &v),
Datum::Map(v) => writer.write_map(i, &v),
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion crates/fluss/src/client/table/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,10 @@ impl AppendWriter {
/// or dropped for fire-and-forget behavior (use `flush()` to ensure delivery).
pub fn append_arrow_batch(&self, batch: RecordBatch) -> Result<WriteResultFuture> {
let physical_table_path = if self.partition_getter.is_some() && batch.num_rows() > 0 {
let first_row = ColumnarRow::new(Arc::new(batch.clone()));
let first_row = ColumnarRow::new(
Arc::new(batch.clone()),
Arc::new(self.table_info.get_row_type().clone()),
);
Arc::new(get_physical_path(
&self.table_path,
self.partition_getter.as_ref(),
Expand Down
8 changes: 6 additions & 2 deletions crates/fluss/src/client/table/log_fetch_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,11 @@ mod tests {

fn test_read_context() -> Result<ReadContext> {
let row_type = RowType::new(vec![DataField::new("id", DataTypes::int(), None)]);
Ok(ReadContext::new(to_arrow_schema(&row_type)?, false))
Ok(ReadContext::new(
to_arrow_schema(&row_type)?,
Arc::new(row_type),
false,
))
}

struct ErrorPendingFetch {
Expand Down Expand Up @@ -921,7 +925,7 @@ mod tests {

let data = builder.build()?;
let log_records = LogRecordsBatches::new(data.clone());
let read_context = ReadContext::new(to_arrow_schema(&row_type)?, false);
let read_context = ReadContext::new(to_arrow_schema(&row_type)?, Arc::new(row_type), false);
let mut fetch = DefaultCompletedFetch::new(
TableBucket::new(1, 0),
log_records,
Expand Down
37 changes: 26 additions & 11 deletions crates/fluss/src/client/table/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::client::table::log_fetch_buffer::{
use crate::client::table::remote_log::{RemoteLogDownloader, RemoteLogFetchInfo};
use crate::error::Error::UnsupportedOperation;
use crate::error::{ApiError, Error, FlussError, Result};
use crate::metadata::{LogFormat, PhysicalTablePath, TableBucket, TableInfo, TablePath};
use crate::metadata::{LogFormat, PhysicalTablePath, RowType, TableBucket, TableInfo, TablePath};
use crate::proto::{ErrorResponse, FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTable};
use crate::record::{
LogRecordsBatches, ReadContext, ScanBatch, ScanRecord, ScanRecords, to_arrow_schema,
Expand Down Expand Up @@ -654,11 +654,16 @@ impl LogFetcher {
config: &crate::config::Config,
projected_fields: Option<Vec<usize>>,
) -> Result<Self> {
let full_arrow_schema = to_arrow_schema(table_info.get_row_type())?;
let read_context =
Self::create_read_context(full_arrow_schema.clone(), projected_fields.clone(), false)?;
let row_type = Arc::new(table_info.get_row_type().clone());
let full_arrow_schema = to_arrow_schema(&row_type)?;
let read_context = Self::create_read_context(
full_arrow_schema.clone(),
row_type.clone(),
projected_fields.clone(),
false,
)?;
let remote_read_context =
Self::create_read_context(full_arrow_schema, projected_fields.clone(), true)?;
Self::create_read_context(full_arrow_schema, row_type, projected_fields.clone(), true)?;

let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?;
let log_fetch_buffer = Arc::new(LogFetchBuffer::new(read_context.clone()));
Expand Down Expand Up @@ -703,14 +708,22 @@ impl LogFetcher {

fn create_read_context(
full_arrow_schema: SchemaRef,
row_type: Arc<RowType>,
projected_fields: Option<Vec<usize>>,
is_from_remote: bool,
) -> Result<ReadContext> {
match projected_fields {
None => Ok(ReadContext::new(full_arrow_schema, is_from_remote)),
Some(fields) => {
ReadContext::with_projection_pushdown(full_arrow_schema, fields, is_from_remote)
}
None => Ok(ReadContext::new(
full_arrow_schema,
row_type,
is_from_remote,
)),
Some(fields) => ReadContext::with_projection_pushdown(
full_arrow_schema,
row_type,
fields,
is_from_remote,
),
}
}

Expand Down Expand Up @@ -1752,7 +1765,8 @@ mod tests {

let data = build_records(&table_info, Arc::new(table_path))?;
let log_records = LogRecordsBatches::new(data.clone());
let read_context = ReadContext::new(to_arrow_schema(table_info.get_row_type())?, false);
let row_type = Arc::new(table_info.get_row_type().clone());
let read_context = ReadContext::new(to_arrow_schema(&row_type)?, row_type, false);
let completed =
DefaultCompletedFetch::new(bucket.clone(), log_records, data.len(), read_context, 0, 0);
fetcher.log_fetch_buffer.add(Box::new(completed));
Expand Down Expand Up @@ -1782,7 +1796,8 @@ mod tests {
let bucket = TableBucket::new(1, 0);
let data = build_records(&table_info, Arc::new(table_path))?;
let log_records = LogRecordsBatches::new(data.clone());
let read_context = ReadContext::new(to_arrow_schema(table_info.get_row_type())?, false);
let row_type = Arc::new(table_info.get_row_type().clone());
let read_context = ReadContext::new(to_arrow_schema(&row_type)?, row_type, false);
let mut completed: Box<dyn CompletedFetch> = Box::new(DefaultCompletedFetch::new(
bucket,
log_records,
Expand Down
23 changes: 16 additions & 7 deletions crates/fluss/src/metadata/datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,7 @@ impl MapType {
pub fn with_nullable(nullable: bool, key_type: DataType, value_type: DataType) -> Self {
Self {
nullable,
key_type: Box::new(key_type),
key_type: Box::new(key_type.as_non_nullable()),
value_type: Box::new(value_type),
}
}
Expand Down Expand Up @@ -1384,16 +1384,22 @@ fn test_array_display() {
#[test]
fn test_map_display() {
let map_type = MapType::new(DataTypes::string(), DataTypes::int());
assert_eq!(map_type.to_string(), "MAP<STRING, INT>");
assert_eq!(map_type.to_string(), "MAP<STRING NOT NULL, INT>");

let map_type_non_null = MapType::with_nullable(false, DataTypes::int(), DataTypes::string());
assert_eq!(map_type_non_null.to_string(), "MAP<INT, STRING> NOT NULL");
assert_eq!(
map_type_non_null.to_string(),
"MAP<INT NOT NULL, STRING> NOT NULL"
);

let nested_map = MapType::new(
DataTypes::string(),
DataTypes::map(DataTypes::int(), DataTypes::boolean()),
);
assert_eq!(nested_map.to_string(), "MAP<STRING, MAP<INT, BOOLEAN>>");
assert_eq!(
nested_map.to_string(),
"MAP<STRING NOT NULL, MAP<INT NOT NULL, BOOLEAN>>"
);
}

#[test]
Expand Down Expand Up @@ -1429,7 +1435,7 @@ fn test_datatype_display() {
assert_eq!(DataTypes::array(DataTypes::int()).to_string(), "ARRAY<INT>");
assert_eq!(
DataTypes::map(DataTypes::string(), DataTypes::int()).to_string(),
"MAP<STRING, INT>"
"MAP<STRING NOT NULL, INT>"
);
}

Expand Down Expand Up @@ -1457,7 +1463,7 @@ fn test_complex_nested_display() {
]);
assert_eq!(
row_type.to_string(),
"ROW<id INT, tags ARRAY<STRING>, metadata MAP<STRING, STRING>>"
"ROW<id INT, tags ARRAY<STRING>, metadata MAP<STRING NOT NULL, STRING>>"
);
}

Expand All @@ -1479,7 +1485,10 @@ fn test_deeply_nested_types() {
DataTypes::field("y", DataTypes::int()),
]),
));
assert_eq!(nested.to_string(), "ARRAY<MAP<STRING, ROW<x INT, y INT>>>");
assert_eq!(
nested.to_string(),
"ARRAY<MAP<STRING NOT NULL, ROW<x INT, y INT>>>"
);
}

// ============================================================================
Expand Down
49 changes: 39 additions & 10 deletions crates/fluss/src/record/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,7 @@ impl LogRecordBatch {
let data = &self.data[RECORDS_OFFSET..];

let record_batch = read_context.record_batch(data)?;
let arrow_reader = ArrowReader::new(Arc::new(record_batch));
let arrow_reader = ArrowReader::new(Arc::new(record_batch), read_context.row_type.clone());
let log_record_iterator = LogRecordIterator::Arrow(ArrowLogRecordIterator {
reader: arrow_reader,
base_offset: self.base_log_offset(),
Expand All @@ -1015,7 +1015,8 @@ impl LogRecordBatch {
let log_record_iterator = match record_batch {
None => LogRecordIterator::empty(),
Some(record_batch) => {
let arrow_reader = ArrowReader::new(Arc::new(record_batch));
let arrow_reader =
ArrowReader::new(Arc::new(record_batch), read_context.row_type.clone());
LogRecordIterator::Arrow(ArrowLogRecordIterator {
reader: arrow_reader,
base_offset: self.base_log_offset(),
Expand Down Expand Up @@ -1231,7 +1232,7 @@ pub fn to_arrow_type(fluss_type: &DataType) -> Result<ArrowDataType> {
Arc::new(Field::new(
"entries",
ArrowDataType::Struct(arrow_schema::Fields::from(entry_fields)),
fluss_type.is_nullable(),
false,
)),
false,
)
Expand Down Expand Up @@ -1322,6 +1323,7 @@ pub(crate) fn from_arrow_type(arrow_type: &ArrowDataType) -> Result<DataType> {
pub struct ReadContext {
target_schema: SchemaRef,
full_schema: SchemaRef,
row_type: Arc<RowType>,
projection: Option<Projection>,
is_from_remote: bool,
}
Expand All @@ -1337,17 +1339,23 @@ struct Projection {
}

impl ReadContext {
pub fn new(arrow_schema: SchemaRef, is_from_remote: bool) -> ReadContext {
pub fn new(
arrow_schema: SchemaRef,
row_type: Arc<RowType>,
is_from_remote: bool,
) -> ReadContext {
ReadContext {
target_schema: arrow_schema.clone(),
full_schema: arrow_schema,
row_type,
projection: None,
is_from_remote,
}
}

pub fn with_projection_pushdown(
arrow_schema: SchemaRef,
row_type: Arc<RowType>,
projected_fields: Vec<usize>,
is_from_remote: bool,
) -> Result<ReadContext> {
Expand Down Expand Up @@ -1412,6 +1420,7 @@ impl ReadContext {
Ok(ReadContext {
target_schema,
full_schema: arrow_schema,
row_type,
projection: Some(project),
is_from_remote,
})
Expand Down Expand Up @@ -1604,19 +1613,23 @@ impl Iterator for ArrowLogRecordIterator {

pub struct ArrowReader {
record_batch: Arc<RecordBatch>,
row_type: Arc<RowType>,
}

impl ArrowReader {
pub fn new(record_batch: Arc<RecordBatch>) -> Self {
ArrowReader { record_batch }
pub fn new(record_batch: Arc<RecordBatch>, row_type: Arc<RowType>) -> Self {
ArrowReader {
record_batch,
row_type,
}
}

pub fn row_count(&self) -> usize {
self.record_batch.num_rows()
}

pub fn read(&self, row_id: usize) -> ColumnarRow {
ColumnarRow::new_with_row_id(self.record_batch.clone(), row_id)
ColumnarRow::new_with_row_id(self.record_batch.clone(), self.row_type.clone(), row_id)
}
}
pub struct MyVec<T>(pub StreamReader<T>);
Expand Down Expand Up @@ -1741,10 +1754,10 @@ mod tests {
Arc::new(Field::new(
"entries",
ArrowDataType::Struct(arrow_schema::Fields::from(vec![
Field::new("key", ArrowDataType::Utf8, true),
Field::new("key", ArrowDataType::Utf8, false),
Field::new("value", ArrowDataType::Int32, true),
])),
true,
false,
)),
false,
)
Expand All @@ -1763,6 +1776,21 @@ mod tests {
);
}

#[test]
fn test_arrow_map_schema_strictness() {
let map_type = DataTypes::map(DataTypes::string(), DataTypes::int());
let arrow_type = to_arrow_type(&map_type).unwrap();

if let ArrowDataType::Map(entries_field, _) = arrow_type {
assert!(
!entries_field.is_nullable(),
"Arrow Map 'entries' field must be strictly non-nullable"
);
} else {
panic!("Expected ArrowDataType::Map, got {:?}", arrow_type);
}
}

#[test]
fn test_parse_ipc_message() {
let empty_body: &[u8] = &le_bytes(&[0xFFFFFFFF, 0x00000000]);
Expand Down Expand Up @@ -1820,7 +1848,8 @@ mod tests {
DataField::new("name", DataTypes::string(), None),
]);
let schema = to_arrow_schema(&row_type).unwrap();
let result = ReadContext::with_projection_pushdown(schema, vec![0, 2], false);
let result =
ReadContext::with_projection_pushdown(schema, Arc::new(row_type), vec![0, 2], false);

assert!(matches!(result, Err(IllegalArgument { .. })));
}
Expand Down
8 changes: 7 additions & 1 deletion crates/fluss/src/record/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,16 @@ mod tests {
use std::sync::Arc;

fn make_row(values: Vec<i32>, row_id: usize) -> ColumnarRow {
use crate::metadata::{DataField, DataTypes, RowType};
let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, false)]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(values))])
.expect("record batch");
ColumnarRow::new_with_row_id(Arc::new(batch), row_id)
let row_type = Arc::new(RowType::new(vec![DataField::new(
"v",
DataTypes::int(),
None,
)]));
ColumnarRow::new_with_row_id(Arc::new(batch), row_type, row_id)
}

#[test]
Expand Down
Loading