Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bindings/cpp/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ pub fn resolve_row_types(
Datum::TimestampNtz(ts) => Datum::TimestampNtz(*ts),
Datum::TimestampLtz(ts) => Datum::TimestampLtz(*ts),
Datum::Array(a) => Datum::Array(a.clone()),
Datum::Row(_) => return Err(anyhow!("Row datum is not yet supported in C++ bindings")),
};
out.set_field(idx, resolved);
}
Expand Down
5 changes: 5 additions & 0 deletions bindings/python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1322,6 +1322,11 @@ fn python_value_to_datum(
}
}
Datum::Array(v) => writer.write_array(i, &v),
Datum::Row(_) => {
return Err(FlussError::new_err(
"Row datum is not supported as an array element",
));
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/fluss/src/client/table/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl AppendWriter {
/// or dropped for fire-and-forget behavior (use `flush()` to ensure delivery).
pub fn append_arrow_batch(&self, batch: RecordBatch) -> Result<WriteResultFuture> {
let physical_table_path = if self.partition_getter.is_some() && batch.num_rows() > 0 {
let first_row = ColumnarRow::new(Arc::new(batch.clone()));
let first_row = ColumnarRow::new(Arc::new(batch.clone()), 0, None);
Arc::new(get_physical_path(
&self.table_path,
self.partition_getter.as_ref(),
Expand Down
58 changes: 37 additions & 21 deletions crates/fluss/src/client/table/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ use crate::client::table::log_fetch_buffer::{
LogFetchBuffer, RemotePendingFetch,
};
use crate::client::table::remote_log::{RemoteLogDownloader, RemoteLogFetchInfo};
use crate::config::Config;
use crate::error::Error::UnsupportedOperation;
use crate::error::{ApiError, Error, FlussError, Result};
use crate::metadata::{LogFormat, PhysicalTablePath, TableBucket, TableInfo, TablePath};
use crate::proto::{ErrorResponse, FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTable};
use crate::metadata::{LogFormat, PhysicalTablePath, RowType, TableBucket, TableInfo, TablePath};
use crate::proto::{
ErrorResponse, FetchLogRequest, FetchLogResponse, PbFetchLogReqForBucket, PbFetchLogReqForTable,
};
use crate::record::{
LogRecordsBatches, ReadContext, ScanBatch, ScanRecord, ScanRecords, to_arrow_schema,
};
Expand Down Expand Up @@ -273,7 +276,7 @@ impl LogScannerInner {
table_info: &TableInfo,
metadata: Arc<Metadata>,
connections: Arc<RpcClient>,
config: &crate::config::Config,
config: &Config,
projected_fields: Option<Vec<usize>>,
) -> Result<Self> {
let log_scanner_status = Arc::new(LogScannerStatus::new());
Expand Down Expand Up @@ -651,14 +654,26 @@ impl LogFetcher {
conns: Arc<RpcClient>,
metadata: Arc<Metadata>,
log_scanner_status: Arc<LogScannerStatus>,
config: &crate::config::Config,
config: &Config,
projected_fields: Option<Vec<usize>>,
) -> Result<Self> {
let full_arrow_schema = to_arrow_schema(table_info.get_row_type())?;
let full_row_type = table_info.get_row_type();
let full_arrow_schema = to_arrow_schema(full_row_type)?;
let projected_row_type = match &projected_fields {
None => Arc::new(full_row_type.clone()),
Some(fields) => Arc::new(RowType::new(
fields
.iter()
.map(|&i| full_row_type.fields()[i].clone())
.collect(),
)),
};
let read_context =
Self::create_read_context(full_arrow_schema.clone(), projected_fields.clone(), false)?;
Self::create_read_context(full_arrow_schema.clone(), projected_fields.clone(), false)?
.with_fluss_row_type(projected_row_type.clone());
let remote_read_context =
Self::create_read_context(full_arrow_schema, projected_fields.clone(), true)?;
Self::create_read_context(full_arrow_schema, projected_fields.clone(), true)?
.with_fluss_row_type(projected_row_type);

let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?;
let log_fetch_buffer = Arc::new(LogFetchBuffer::new(read_context.clone()));
Expand Down Expand Up @@ -928,7 +943,7 @@ impl LogFetcher {

/// Handle fetch response and add completed fetches to buffer
async fn handle_fetch_response(
fetch_response: crate::proto::FetchLogResponse,
fetch_response: FetchLogResponse,
context: FetchResponseContext,
) {
let FetchResponseContext {
Expand Down Expand Up @@ -1704,6 +1719,7 @@ mod tests {
DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
};
use crate::metadata::{DataTypes, PhysicalTablePath, Schema, TableInfo, TablePath};
use crate::proto::{PbFetchLogRespForBucket, PbFetchLogRespForTable};
use crate::record::MemoryLogRecordsArrowBuilder;
use crate::row::{Datum, GenericRow};
use crate::rpc::FlussError;
Expand Down Expand Up @@ -1743,7 +1759,7 @@ mod tests {
Arc::new(RpcClient::new()),
metadata,
status.clone(),
&crate::config::Config::default(),
&Config::default(),
None,
)?;

Expand Down Expand Up @@ -1775,7 +1791,7 @@ mod tests {
Arc::new(RpcClient::new()),
metadata,
status,
&crate::config::Config::default(),
&Config::default(),
None,
)?;

Expand Down Expand Up @@ -1811,7 +1827,7 @@ mod tests {
Arc::new(RpcClient::new()),
metadata,
status,
&crate::config::Config::default(),
&Config::default(),
None,
)?;

Expand All @@ -1835,14 +1851,14 @@ mod tests {
Arc::new(RpcClient::new()),
metadata.clone(),
status.clone(),
&crate::config::Config::default(),
&Config::default(),
None,
)?;

let response = crate::proto::FetchLogResponse {
tables_resp: vec![crate::proto::PbFetchLogRespForTable {
let response = FetchLogResponse {
tables_resp: vec![PbFetchLogRespForTable {
table_id: 1,
buckets_resp: vec![crate::proto::PbFetchLogRespForBucket {
buckets_resp: vec![PbFetchLogRespForBucket {
partition_id: None,
bucket_id: 0,
error_code: Some(FlussError::AuthorizationException.code()),
Expand Down Expand Up @@ -1885,17 +1901,17 @@ mod tests {
Arc::new(RpcClient::new()),
metadata.clone(),
status.clone(),
&crate::config::Config::default(),
&Config::default(),
None,
)?;

let bucket = TableBucket::new(1, 0);
assert!(metadata.leader_for(&table_path, &bucket).await?.is_some());

let response = crate::proto::FetchLogResponse {
tables_resp: vec![crate::proto::PbFetchLogRespForTable {
let response = FetchLogResponse {
tables_resp: vec![PbFetchLogRespForTable {
table_id: 1,
buckets_resp: vec![crate::proto::PbFetchLogRespForBucket {
buckets_resp: vec![PbFetchLogRespForBucket {
partition_id: None,
bucket_id: 0,
error_code: Some(FlussError::NotLeaderOrFollower.code()),
Expand Down Expand Up @@ -2002,12 +2018,12 @@ mod tests {
let status = Arc::new(LogScannerStatus::new());
status.assign_scan_bucket(TableBucket::new(1, 0), 0);

let config = crate::config::Config {
let config = Config {
scanner_log_fetch_max_bytes: 1234,
scanner_log_fetch_min_bytes: 7,
scanner_log_fetch_wait_max_time_ms: 89,
scanner_log_fetch_max_bytes_for_bucket: 512,
..crate::config::Config::default()
..Config::default()
};

let fetcher = LogFetcher::new(
Expand Down
72 changes: 70 additions & 2 deletions crates/fluss/src/metadata/datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,6 @@ impl DecimalType {
});
}
// Validate scale
// Note: MIN_SCALE is 0, and scale is u32, so scale >= MIN_SCALE is always true
if scale > precision {
return Err(IllegalArgument {
message: format!(
Expand Down Expand Up @@ -1220,11 +1219,61 @@ impl DataTypes {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub const UNASSIGNED_FIELD_ID: i32 = -1;

pub fn reassign_field_ids(data_type: &DataType, counter: &mut i32) -> DataType {
match data_type {
DataType::Array(at) => DataType::Array(ArrayType::with_nullable(
at.nullable,
reassign_field_ids(at.get_element_type(), counter),
)),
DataType::Map(mt) => DataType::Map(MapType::with_nullable(
mt.nullable,
reassign_field_ids(mt.key_type(), counter),
reassign_field_ids(mt.value_type(), counter),
)),
DataType::Row(rt) => {
let new_fields: Vec<DataField> = rt
.fields()
.iter()
.map(|f| {
*counter += 1;
let id = *counter;
let new_inner = reassign_field_ids(&f.data_type, counter);
DataField::with_field_id(f.name.clone(), new_inner, f.description.clone(), id)
})
.collect();
DataType::Row(RowType::with_nullable(rt.nullable, new_fields))
}
_ => data_type.clone(),
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DataField {
pub name: String,
pub data_type: DataType,
pub description: Option<String>,
pub field_id: i32,
}

// field_id is excluded from PartialEq/Eq/Hash to match Java's DataField.equals/hashCode.
impl PartialEq for DataField {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
&& self.data_type == other.data_type
&& self.description == other.description
}
}

impl Eq for DataField {}

impl std::hash::Hash for DataField {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.name.hash(state);
self.data_type.hash(state);
self.description.hash(state);
}
}

impl DataField {
Expand All @@ -1237,6 +1286,21 @@ impl DataField {
name: name.into(),
data_type,
description,
field_id: UNASSIGNED_FIELD_ID,
}
}

pub fn with_field_id<N: Into<String>>(
name: N,
data_type: DataType,
description: Option<String>,
field_id: i32,
) -> DataField {
DataField {
name: name.into(),
data_type,
description,
field_id,
}
}

Expand All @@ -1247,6 +1311,10 @@ impl DataField {
pub fn data_type(&self) -> &DataType {
&self.data_type
}

pub fn field_id(&self) -> i32 {
self.field_id
}
}

impl Display for DataField {
Expand Down
Loading
Loading