Skip to content
Draft
41 changes: 27 additions & 14 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl FallbackEncoder {
}

/// Encode `values` to the in-progress page
fn encode<T>(&mut self, values: T, indices: &[usize])
fn encode<T>(&mut self, values: T, indices: impl ExactSizeIterator<Item = usize>)
where
T: ArrayAccessor + Copy,
T::Item: AsRef<[u8]>,
Expand All @@ -174,7 +174,7 @@ impl FallbackEncoder {
match &mut self.encoder {
FallbackEncoderImpl::Plain { buffer } => {
for idx in indices {
let value = values.value(*idx);
let value = values.value(idx);
let value = value.as_ref();
buffer.extend_from_slice((value.len() as u32).as_bytes());
buffer.extend_from_slice(value);
Expand All @@ -183,7 +183,7 @@ impl FallbackEncoder {
}
FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
for idx in indices {
let value = values.value(*idx);
let value = values.value(idx);
let value = value.as_ref();
lengths.put(&[value.len() as i32]).unwrap();
buffer.extend_from_slice(value);
Expand All @@ -197,7 +197,7 @@ impl FallbackEncoder {
suffix_lengths,
} => {
for idx in indices {
let value = values.value(*idx);
let value = values.value(idx);
let value = value.as_ref();
let mut prefix_length = 0;

Expand Down Expand Up @@ -343,15 +343,15 @@ struct DictEncoder {

impl DictEncoder {
/// Encode `values` to the in-progress page
fn encode<T>(&mut self, values: T, indices: &[usize])
fn encode<T>(&mut self, values: T, indices: impl ExactSizeIterator<Item = usize>)
where
T: ArrayAccessor + Copy,
T::Item: AsRef<[u8]>,
{
self.indices.reserve(indices.len());

for idx in indices {
let value = values.value(*idx);
let value = values.value(idx);
let interned = self.interner.intern(value.as_ref());
self.indices.push(interned);
self.variable_length_bytes += value.as_ref().len() as i64;
Expand Down Expand Up @@ -466,12 +466,25 @@ impl ColumnValueEncoder for ByteArrayEncoder {
})
}

fn write(&mut self, _values: &Self::Values, _offset: usize, _len: usize) -> Result<()> {
unreachable!("should call write_gather instead")
fn write(&mut self, values: &Self::Values, offset: usize, len: usize) -> Result<()> {
downcast_op!(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this code actually callable now?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! The code is now able to distinguish between Dense { offset, len } and Sparse(Vec<usize>). When the column has no nulls, write_leaf produces Dense directly without materializing a vec like previously and write_mini_batch then calls encoder.write(values, offset, len) based on that. Neat!

values.data_type(),
values,
encode,
offset..offset + len,
self
);
Ok(())
}

fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
downcast_op!(values.data_type(), values, encode, indices, self);
downcast_op!(
values.data_type(),
values,
encode,
indices.iter().copied(),
self
);
Ok(())
}

Expand Down Expand Up @@ -554,15 +567,16 @@ impl ColumnValueEncoder for ByteArrayEncoder {
/// Encodes the provided `values` and `indices` to `encoder`
///
/// This is a free function so it can be used with `downcast_op!`
fn encode<T>(values: T, indices: &[usize], encoder: &mut ByteArrayEncoder)
fn encode<T, I>(values: T, indices: I, encoder: &mut ByteArrayEncoder)
where
T: ArrayAccessor + Copy,
T::Item: Copy + Ord + AsRef<[u8]>,
I: ExactSizeIterator<Item = usize> + Clone,
{
if encoder.statistics_enabled != EnabledStatistics::None {
if let Some(accumulator) = encoder.geo_stats_accumulator.as_mut() {
update_geo_stats_accumulator(accumulator.as_mut(), values, indices.iter().cloned());
} else if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) {
update_geo_stats_accumulator(accumulator.as_mut(), values, indices.clone());
} else if let Some((min, max)) = compute_min_max(values, indices.clone()) {
if encoder.min_value.as_ref().is_none_or(|m| m > &min) {
encoder.min_value = Some(min);
}
Expand All @@ -575,8 +589,7 @@ where

// encode the values into bloom filter if enabled
if let Some(bloom_filter) = &mut encoder.bloom_filter {
let valid = indices.iter().cloned();
for idx in valid {
for idx in indices.clone() {
bloom_filter.insert(values.value(idx).as_ref());
}
}
Expand Down
Loading
Loading