Skip to content

Commit be5dfce

Browse files
committed
Null checks
1 parent e49b3e3 commit be5dfce

1 file changed

Lines changed: 229 additions & 20 deletions

File tree

native/spark-expr/src/array_funcs/arrays_zip.rs

Lines changed: 229 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,23 @@
1616
// under the License.
1717

1818
use arrow::array::RecordBatch;
19-
use arrow::array::{Array, ArrayRef, StringArray};
19+
use arrow::array::{
20+
new_null_array, Array, ArrayRef, Capacities, ListArray, MutableArrayData, StructArray,
21+
};
22+
use arrow::buffer::{NullBuffer, OffsetBuffer};
2023
use arrow::datatypes::DataType::{FixedSizeList, LargeList, List, Null};
2124
use arrow::datatypes::Schema;
2225
use arrow::datatypes::{DataType, Field, Fields};
26+
use datafusion::common::cast::{as_fixed_size_list_array, as_large_list_array, as_list_array};
2327
use datafusion::common::{exec_err, Result, ScalarValue};
2428
use datafusion::logical_expr::ColumnarValue;
2529
use datafusion::physical_expr::PhysicalExpr;
26-
use datafusion::functions_nested::arrays_zip::arrays_zip_inner;
2730
use std::any::Any;
2831
use std::fmt::{Display, Formatter};
2932
use std::sync::Arc;
33+
// use datafusion::functions_nested::utils::make_scalar_function;
34+
// use datafusion::functions_nested::arrays_zip::arrays_zip_inner;
35+
// use datafusion::functions_nested::arrays_zip::StructOrdinal;
3036

3137
#[derive(Debug, Eq, Hash, PartialEq)]
3238
pub struct SparkArraysZipFunc {
@@ -81,7 +87,7 @@ impl PhysicalExpr for SparkArraysZipFunc {
8187
}
8288

8389
fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
84-
Ok(false)
90+
Ok(true)
8591
}
8692

8793
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
@@ -91,7 +97,36 @@ impl PhysicalExpr for SparkArraysZipFunc {
9197
.map(|e| e.evaluate(batch))
9298
.collect::<datafusion::common::Result<Vec<_>>>()?;
9399

94-
let len = values
100+
make_scalar_function(|arr| arrays_zip_inner(arr, self.names.clone()))(&*values)
101+
}
102+
103+
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
104+
self.values.iter().collect()
105+
}
106+
107+
fn with_new_children(
108+
self: Arc<Self>,
109+
children: Vec<Arc<dyn PhysicalExpr>>,
110+
) -> Result<Arc<dyn PhysicalExpr>> {
111+
Ok(Arc::new(SparkArraysZipFunc::new(
112+
children.clone(),
113+
self.names.clone(),
114+
)))
115+
}
116+
117+
fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
118+
Display::fmt(self, f)
119+
}
120+
}
121+
122+
pub fn make_scalar_function<F>(inner: F) -> impl Fn(&[ColumnarValue]) -> Result<ColumnarValue>
123+
where
124+
F: Fn(&[ArrayRef]) -> Result<ArrayRef>,
125+
{
126+
move |args: &[ColumnarValue]| {
127+
// first, identify if any of the arguments is an Array. If yes, store its `len`,
128+
// as any scalar will need to be converted to an array of len `len`.
129+
let len = args
95130
.iter()
96131
.fold(Option::<usize>::None, |acc, arg| match arg {
97132
ColumnarValue::Scalar(_) => acc,
@@ -100,11 +135,9 @@ impl PhysicalExpr for SparkArraysZipFunc {
100135

101136
let is_scalar = len.is_none();
102137

103-
let arrays = ColumnarValue::values_to_arrays(&values)?;
104-
let names = vec![Arc::new(StringArray::from(self.names.clone())) as ArrayRef];
138+
let args = ColumnarValue::values_to_arrays(args)?;
105139

106-
// TODO: replace this with DF's function
107-
let result = arrays_zip_inner(&arrays, &names);
140+
let result = (inner)(&args);
108141

109142
if is_scalar {
110143
// If all inputs are scalar, keeps output as scalar
@@ -114,22 +147,198 @@ impl PhysicalExpr for SparkArraysZipFunc {
114147
result.map(ColumnarValue::Array)
115148
}
116149
}
150+
}
117151

118-
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
119-
self.values.iter().collect()
152+
struct ListColumnView {
153+
/// The flat values array backing this list column.
154+
values: ArrayRef,
155+
/// Pre-computed per-row start offsets (length = num_rows + 1).
156+
offsets: Vec<usize>,
157+
/// Pre-computed null bitmap: true means the row is null.
158+
is_null: Vec<bool>,
159+
}
160+
161+
pub fn arrays_zip_inner(args: &[ArrayRef], names: Vec<String>) -> Result<ArrayRef> {
162+
if args.is_empty() {
163+
return exec_err!("arrays_zip requires at least one argument");
120164
}
121165

122-
fn with_new_children(
123-
self: Arc<Self>,
124-
children: Vec<Arc<dyn PhysicalExpr>>,
125-
) -> Result<Arc<dyn PhysicalExpr>> {
126-
Ok(Arc::new(SparkArraysZipFunc::new(
127-
children.clone(),
128-
self.names.clone(),
129-
)))
166+
// let (start_ordinal, end_ordinal) = match struct_ordinal {
167+
// StructOrdinal::ZeroBased => (0, args.len()),
168+
// StructOrdinal::OneBased => (1, args.len() + 1),
169+
// };
170+
// let names: Vec<String> = (start_ordinal..end_ordinal)
171+
// .map(|i| i.to_string())
172+
// .collect();
173+
174+
let num_rows = args[0].len();
175+
176+
// Build a type-erased ListColumnView for each argument.
177+
// None means the argument is Null-typed (all nulls, no backing data).
178+
let mut views: Vec<Option<ListColumnView>> = Vec::with_capacity(args.len());
179+
let mut element_types: Vec<DataType> = Vec::with_capacity(args.len());
180+
181+
for (i, arg) in args.iter().enumerate() {
182+
match arg.data_type() {
183+
List(field) => {
184+
let arr = as_list_array(arg)?;
185+
let raw_offsets = arr.value_offsets();
186+
let offsets: Vec<usize> = raw_offsets.iter().map(|&o| o as usize).collect();
187+
let is_null = (0..num_rows).map(|row| arr.is_null(row)).collect();
188+
element_types.push(field.data_type().clone());
189+
views.push(Some(ListColumnView {
190+
values: Arc::clone(arr.values()),
191+
offsets,
192+
is_null,
193+
}));
194+
}
195+
LargeList(field) => {
196+
let arr = as_large_list_array(arg)?;
197+
let raw_offsets = arr.value_offsets();
198+
let offsets: Vec<usize> = raw_offsets.iter().map(|&o| o as usize).collect();
199+
let is_null = (0..num_rows).map(|row| arr.is_null(row)).collect();
200+
element_types.push(field.data_type().clone());
201+
views.push(Some(ListColumnView {
202+
values: Arc::clone(arr.values()),
203+
offsets,
204+
is_null,
205+
}));
206+
}
207+
FixedSizeList(field, size) => {
208+
let arr = as_fixed_size_list_array(arg)?;
209+
let size = *size as usize;
210+
let offsets: Vec<usize> = (0..=num_rows).map(|row| row * size).collect();
211+
let is_null = (0..num_rows).map(|row| arr.is_null(row)).collect();
212+
element_types.push(field.data_type().clone());
213+
views.push(Some(ListColumnView {
214+
values: Arc::clone(arr.values()),
215+
offsets,
216+
is_null,
217+
}));
218+
}
219+
Null => {
220+
element_types.push(Null);
221+
views.push(None);
222+
}
223+
dt => {
224+
return exec_err!("arrays_zip argument {i} expected list type, got {dt}");
225+
}
226+
}
130227
}
131228

132-
fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
133-
Display::fmt(self, f)
229+
// Collect per-column values data for MutableArrayData builders.
230+
let values_data: Vec<_> = views
231+
.iter()
232+
.map(|v| v.as_ref().map(|view| view.values.to_data()))
233+
.collect();
234+
235+
let struct_fields: Fields = element_types
236+
.iter()
237+
.enumerate()
238+
.map(|(i, dt)| Field::new(names[i].to_string(), dt.clone(), true))
239+
.collect::<Vec<_>>()
240+
.into();
241+
242+
// Create a MutableArrayData builder per column. For None (Null-typed)
243+
// args we only need extend_nulls, so we track them separately.
244+
let mut builders: Vec<Option<MutableArrayData>> = values_data
245+
.iter()
246+
.map(|vd| {
247+
vd.as_ref().map(|data| {
248+
MutableArrayData::with_capacities(vec![data], true, Capacities::Array(0))
249+
})
250+
})
251+
.collect();
252+
253+
let mut offsets: Vec<i32> = Vec::with_capacity(num_rows + 1);
254+
offsets.push(0);
255+
let mut null_mask: Vec<bool> = Vec::with_capacity(num_rows);
256+
let mut total_values: usize = 0;
257+
258+
// Process each row: compute per-array lengths, then copy values
259+
// and pad shorter arrays with NULLs.
260+
for row_idx in 0..num_rows {
261+
let mut max_len: usize = 0;
262+
let mut all_null = true;
263+
264+
for view in views.iter().flatten() {
265+
if !view.is_null[row_idx] {
266+
all_null = false;
267+
let len = view.offsets[row_idx + 1] - view.offsets[row_idx];
268+
max_len = max_len.max(len);
269+
}
270+
}
271+
272+
if all_null {
273+
null_mask.push(true);
274+
offsets.push(*offsets.last().unwrap());
275+
continue;
276+
}
277+
null_mask.push(false);
278+
279+
// Extend each column builder for this row.
280+
for (col_idx, view) in views.iter().enumerate() {
281+
match view {
282+
Some(v) if !v.is_null[row_idx] => {
283+
let start = v.offsets[row_idx];
284+
let end = v.offsets[row_idx + 1];
285+
let len = end - start;
286+
let builder = builders[col_idx].as_mut().unwrap();
287+
builder.extend(0, start, end);
288+
if len < max_len {
289+
builder.extend_nulls(max_len - len);
290+
}
291+
}
292+
_ => {
293+
// Null list entry or None (Null-typed) arg — all nulls.
294+
if let Some(builder) = builders[col_idx].as_mut() {
295+
builder.extend_nulls(max_len);
296+
}
297+
}
298+
}
299+
}
300+
301+
total_values += max_len;
302+
let last = *offsets.last().unwrap();
303+
offsets.push(last + max_len as i32);
134304
}
305+
306+
// Assemble struct columns from builders.
307+
let struct_columns: Vec<ArrayRef> = builders
308+
.into_iter()
309+
.zip(element_types.iter())
310+
.map(|(builder, elem_type)| match builder {
311+
Some(b) => arrow::array::make_array(b.freeze()),
312+
None => new_null_array(
313+
if elem_type.is_null() {
314+
&Null
315+
} else {
316+
elem_type
317+
},
318+
total_values,
319+
),
320+
})
321+
.collect();
322+
323+
let struct_array = StructArray::try_new(struct_fields, struct_columns, None)?;
324+
325+
let null_buffer = if null_mask.iter().any(|&v| v) {
326+
Some(NullBuffer::from(
327+
null_mask.iter().map(|v| !v).collect::<Vec<bool>>(),
328+
))
329+
} else {
330+
None
331+
};
332+
333+
let result = ListArray::try_new(
334+
Arc::new(Field::new_list_field(
335+
struct_array.data_type().clone(),
336+
true,
337+
)),
338+
OffsetBuffer::new(offsets.into()),
339+
Arc::new(struct_array),
340+
null_buffer,
341+
)?;
342+
343+
Ok(Arc::new(result))
135344
}

0 commit comments

Comments
 (0)