Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/spark_expressions_support.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
- [x] array_repeat
- [x] array_union
- [x] arrays_overlap
- [ ] arrays_zip
- [x] arrays_zip
- [x] element_at
- [ ] flatten
- [x] get
Expand Down
22 changes: 20 additions & 2 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ use datafusion::{
};
use datafusion_comet_spark_expr::{
create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, BinaryOutputStyle,
BloomFilterAgg, BloomFilterMightContain, CsvWriteOptions, EvalMode, SumInteger, ToCsv,
BloomFilterAgg, BloomFilterMightContain, CsvWriteOptions, EvalMode, SparkArraysZipFunc,
SumInteger, ToCsv,
};
use iceberg::expr::Bind;

Expand All @@ -93,7 +94,6 @@ use datafusion::physical_expr::window::WindowExpr;
use datafusion::physical_expr::LexOrdering;

use crate::parquet::parquet_exec::init_datasource_exec;

use arrow::array::{
new_empty_array, Array, ArrayRef, BinaryBuilder, BooleanArray, Date32Array, Decimal128Array,
Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray,
Expand Down Expand Up @@ -682,6 +682,24 @@ impl PhysicalPlanner {
csv_write_options,
)))
}
ExprStruct::ArraysZip(expr) => {
if expr.values.is_empty() {
return Err(GeneralError(
"arrays_zip requires at least one argument".to_string(),
));
}

let children = expr
.values
.iter()
.map(|child| self.create_expr(child, Arc::clone(&input_schema)))
.collect::<Result<Vec<_>, _>>()?;

Ok(Arc::new(SparkArraysZipFunc::new(
children,
expr.names.clone(),
)))
}
expr => Err(GeneralError(format!("Not implemented: {expr:?}"))),
}
}
Expand Down
2 changes: 2 additions & 0 deletions native/core/src/execution/planner/expression_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ pub enum ExpressionType {
Randn,
SparkPartitionId,
MonotonicallyIncreasingId,
ArraysZip,

// Time functions
Hour,
Expand Down Expand Up @@ -381,6 +382,7 @@ impl ExpressionRegistry {
Some(ExprStruct::MonotonicallyIncreasingId(_)) => {
Ok(ExpressionType::MonotonicallyIncreasingId)
}
Some(ExprStruct::ArraysZip(_)) => Ok(ExpressionType::ArraysZip),

Some(ExprStruct::Hour(_)) => Ok(ExpressionType::Hour),
Some(ExprStruct::Minute(_)) => Ok(ExpressionType::Minute),
Expand Down
8 changes: 8 additions & 0 deletions native/proto/src/proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ message Expr {
FromJson from_json = 66;
ToCsv to_csv = 67;
HoursTransform hours_transform = 68;
ArraysZip arrays_zip = 69;
}

// Optional QueryContext for error reporting (contains SQL text and position)
Expand Down Expand Up @@ -489,3 +490,10 @@ message ArrayJoin {
message Rand {
int64 seed = 1;
}

// Spark's ArraysZip takes children: Seq[Expression] and names: Seq[Expression]
// https://github.com/apache/spark/blob/branch-4.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L296
message ArraysZip {
repeated Expr values = 1;
repeated string names = 2;
}
Loading
Loading