Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
b66d452
feat(parquet): add content defined chunking for arrow writer
kszucs Feb 17, 2026
26364c5
feat(parquet): add `repeated_ancestor_def_level` to `ColumnDescriptor…
kszucs Feb 18, 2026
cf48df7
chore: cargo format
kszucs Feb 20, 2026
b05da4d
chore: fix clippy errors
kszucs Feb 20, 2026
ea0e344
refactor(parquet): maintain field for better encapsulation
kszucs Feb 20, 2026
04711e2
refactor(parquet): simplify the CDC implementation
kszucs Feb 22, 2026
c2b31ff
refactor(parquet): hold the cdc chunkers in ArrowWriter
kszucs Feb 22, 2026
ad4d2c6
chore(parquet): remove redundant flush_current_page() method
kszucs Feb 23, 2026
2553575
doc(parquet): remove content defined chunking example from dosctrings
kszucs Feb 23, 2026
5facebb
chore(parquet): remove unnecessary mut row_group_writer_factory assig…
kszucs Feb 25, 2026
b25f206
fix(parquet): incorporate primitive array offset when calculating cdc…
kszucs Feb 25, 2026
a699aef
chore(parquet): add benchmark for cdc chunking
kszucs Feb 25, 2026
94d2efc
chore(parquet): fix clippy errors
kszucs Feb 25, 2026
caef92e
refactor(parquet): do not store the chunker in the row group writer
kszucs Feb 25, 2026
947bfdf
chore(parquet): spell out cdc as content_defined_chunking in properties
kszucs Feb 25, 2026
7622f22
chore(parquet): apply suggestions from code review
kszucs Mar 14, 2026
3f087aa
chore: address review comments
kszucs Mar 14, 2026
255bec8
chore: address review comments
kszucs Mar 14, 2026
7094008
chore: add constants for default cdc parameters
kszucs Mar 14, 2026
3b45dc8
refactor(parquet): rename Chunk to CdcChunk to avoid confusion with c…
kszucs Mar 16, 2026
8dc0e5b
test(parquet): closely port the CDC tests from the C++ implementation
kszucs Mar 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 14 additions & 29 deletions parquet/benches/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
extern crate criterion;

use criterion::{Bencher, Criterion, Throughput};
use parquet::arrow::arrow_writer::{ArrowRowGroupWriterFactory, compute_leaves};
use parquet::arrow::ArrowWriter;
use parquet::basic::{Compression, ZstdLevel};

extern crate arrow;
Expand All @@ -33,10 +33,8 @@ use arrow::datatypes::*;
use arrow::util::bench_util::{create_f16_array, create_f32_array, create_f64_array};
use arrow::{record_batch::RecordBatch, util::data_gen::*};
use arrow_array::RecordBatchOptions;
use parquet::arrow::ArrowSchemaConverter;
use parquet::errors::Result;
use parquet::file::properties::{WriterProperties, WriterVersion};
use parquet::file::writer::SerializedFileWriter;
use parquet::file::properties::{CdcOptions, WriterProperties, WriterVersion};

fn create_primitive_bench_batch(
size: usize,
Expand Down Expand Up @@ -342,39 +340,21 @@ fn write_batch_with_option(
batch: &RecordBatch,
props: Option<WriterProperties>,
) -> Result<()> {
let mut file = Empty::default();
let props = Arc::new(props.unwrap_or_default());
let parquet_schema = ArrowSchemaConverter::new()
.with_coerce_types(props.coerce_types())
.convert(batch.schema_ref())?;
let writer = SerializedFileWriter::new(&mut file, parquet_schema.root_schema_ptr(), props)?;
let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&writer, batch.schema());
let props = props.unwrap_or_default();

bench.iter(|| {
let mut row_group = row_group_writer_factory.create_column_writers(0).unwrap();

let mut writers = row_group.iter_mut();
for (field, column) in batch
.schema()
.fields()
.iter()
.zip(black_box(batch).columns())
{
for leaf in compute_leaves(field.as_ref(), column).unwrap() {
writers.next().unwrap().write(&leaf).unwrap()
}
}

for writer in row_group.into_iter() {
black_box(writer.close()).unwrap();
}
let mut file = Empty::default();
let mut writer =
ArrowWriter::try_new(&mut file, batch.schema(), Some(props.clone())).unwrap();
writer.write(black_box(batch)).unwrap();
black_box(writer.close()).unwrap();
});

Ok(())
}

fn create_batches() -> Vec<(&'static str, RecordBatch)> {
const BATCH_SIZE: usize = 4096;
const BATCH_SIZE: usize = 1024 * 1024;

let mut batches = vec![];

Expand Down Expand Up @@ -440,6 +420,11 @@ fn create_writer_props() -> Vec<(&'static str, WriterProperties)> {
.build();
props.push(("zstd_parquet_2", prop));

let prop = WriterProperties::builder()
.set_content_defined_chunking(Some(CdcOptions::default()))
.build();
props.push(("cdc", prop));

props
}

Expand Down
196 changes: 196 additions & 0 deletions parquet/src/arrow/arrow_writer/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
//!
//! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding)

use crate::column::chunker::CdcChunk;
use crate::errors::{ParquetError, Result};
use arrow_array::cast::AsArray;
use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
Expand Down Expand Up @@ -801,11 +802,58 @@ impl ArrayLevels {
pub fn non_null_indices(&self) -> &[usize] {
&self.non_null_indices
}

/// Create a sliced view of this `ArrayLevels` for a CDC chunk.
///
/// Note: `def_levels`, `rep_levels`, and `non_null_indices` are copied (not zero-copy),
/// while `array` is sliced without copying.
pub(crate) fn slice_for_chunk(&self, chunk: &CdcChunk) -> Self {
let level_offset = chunk.level_offset;
let num_levels = chunk.num_levels;
let value_offset = chunk.value_offset;
let num_values = chunk.num_values;
let def_levels = self
.def_levels
.as_ref()
.map(|levels| levels[level_offset..level_offset + num_levels].to_vec());
let rep_levels = self
.rep_levels
.as_ref()
.map(|levels| levels[level_offset..level_offset + num_levels].to_vec());

// Filter non_null_indices to [value_offset, value_offset + num_values)
// and shift by -value_offset. Use binary search since the slice is sorted.
let value_end = value_offset + num_values;
let start = self
.non_null_indices
.partition_point(|&idx| idx < value_offset);
let end = self
.non_null_indices
.partition_point(|&idx| idx < value_end);
let non_null_indices: Vec<usize> = self.non_null_indices[start..end]
.iter()
.map(|&idx| idx - value_offset)
.collect();

let array = self.array.slice(value_offset, num_values);
let logical_nulls = array.logical_nulls();

Self {
def_levels,
rep_levels,
non_null_indices,
max_def_level: self.max_def_level,
max_rep_level: self.max_rep_level,
array,
logical_nulls,
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::column::chunker::CdcChunk;

use arrow_array::builder::*;
use arrow_array::types::Int32Type;
Expand Down Expand Up @@ -2096,4 +2144,152 @@ mod tests {
let v = Arc::new(array) as ArrayRef;
LevelInfoBuilder::try_new(field, Default::default(), &v).unwrap()
}

#[test]
fn test_slice_for_chunk_flat() {
// Case 1: required field (max_def_level=0, no def/rep levels stored).
// Array has 6 values; all are non-null so non_null_indices covers every position.
// The chunk selects value_offset=2, num_values=3 → the sub-array [3, 4, 5].
// Since there are no levels, num_levels=0 and level_offset are irrelevant.
// non_null_indices [0,1,2,3,4,5] filtered to [2,4) and shifted by -2 → [0,1,2].
let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6]));
let logical_nulls = array.logical_nulls();
let levels = ArrayLevels {
def_levels: None,
rep_levels: None,
non_null_indices: vec![0, 1, 2, 3, 4, 5],
max_def_level: 0,
max_rep_level: 0,
array,
logical_nulls,
};
let sliced = levels.slice_for_chunk(&CdcChunk {
level_offset: 0,
num_levels: 0,
value_offset: 2,
num_values: 3,
});
assert!(sliced.def_levels.is_none());
assert!(sliced.rep_levels.is_none());
assert_eq!(sliced.non_null_indices, vec![0, 1, 2]);
assert_eq!(sliced.array.len(), 3);

// Case 2: optional field (max_def_level=1, def levels present, no rep levels).
// Array: [Some(1), None, Some(3), None, Some(5), Some(6)]
// def_levels: [1, 0, 1, 0, 1, 1] (1=non-null, 0=null)
// non_null_indices: [0, 2, 4, 5] (array positions of the four non-null values)
//
// The chunk selects level_offset=1, num_levels=3, value_offset=1, num_values=3:
// - def_levels[1..4] = [0, 1, 0] → null, non-null, null
// - sub-array slice(1, 3) = [None, Some(3), None]
// - non_null_indices filtered to [value_offset=1, value_end=4): only index 2 qualifies,
// shifted by -1 → [1] (position of Some(3) within the sliced sub-array)
let array: ArrayRef = Arc::new(Int32Array::from(vec![
Some(1),
None,
Some(3),
None,
Some(5),
Some(6),
]));
let logical_nulls = array.logical_nulls();
let levels = ArrayLevels {
def_levels: Some(vec![1, 0, 1, 0, 1, 1]),
rep_levels: None,
non_null_indices: vec![0, 2, 4, 5],
max_def_level: 1,
max_rep_level: 0,
array,
logical_nulls,
};
let sliced = levels.slice_for_chunk(&CdcChunk {
level_offset: 1,
num_levels: 3,
value_offset: 1,
num_values: 3,
});
assert_eq!(sliced.def_levels, Some(vec![0, 1, 0]));
assert!(sliced.rep_levels.is_none());
assert_eq!(sliced.non_null_indices, vec![1]);
assert_eq!(sliced.array.len(), 3);
}

#[test]
fn test_slice_for_chunk_nested() {
// [[1,2],[3],[4,5]]: def=[2,2,2,2,2], rep=[0,1,0,0,1]
// Slice levels 2..5 (def=[2,2,2], rep=[0,0,1]), values 2..5
let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
let logical_nulls = array.logical_nulls();
let levels = ArrayLevels {
def_levels: Some(vec![2, 2, 2, 2, 2]),
rep_levels: Some(vec![0, 1, 0, 0, 1]),
non_null_indices: vec![0, 1, 2, 3, 4],
max_def_level: 2,
max_rep_level: 1,
array,
logical_nulls,
};
let sliced = levels.slice_for_chunk(&CdcChunk {
level_offset: 2,
num_levels: 3,
value_offset: 2,
num_values: 3,
});
assert_eq!(sliced.def_levels, Some(vec![2, 2, 2]));
assert_eq!(sliced.rep_levels, Some(vec![0, 0, 1]));
// [0,1,2,3,4] filtered to [2,5) → [2,3,4] → shifted -2 → [0,1,2]
assert_eq!(sliced.non_null_indices, vec![0, 1, 2]);
assert_eq!(sliced.array.len(), 3);
}

#[test]
fn test_slice_for_chunk_non_null_indices_boundary() {
// [1, null, 3]: non_null_indices=[0, 2]; test inclusive lower / exclusive upper bounds
let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]));
let logical_nulls = array.logical_nulls();
let levels = ArrayLevels {
def_levels: Some(vec![1, 0, 1]),
rep_levels: None,
non_null_indices: vec![0, 2],
max_def_level: 1,
max_rep_level: 0,
array,
logical_nulls,
};
assert_eq!(
levels
.slice_for_chunk(&CdcChunk {
level_offset: 0,
num_levels: 1,
value_offset: 0,
num_values: 1
})
.non_null_indices,
vec![0]
);
// idx 2 in range [1,3), shifted -1 → 1
assert_eq!(
levels
.slice_for_chunk(&CdcChunk {
level_offset: 1,
num_levels: 2,
value_offset: 1,
num_values: 2
})
.non_null_indices,
vec![1]
);
// idx 2 excluded from [1,2)
assert_eq!(
levels
.slice_for_chunk(&CdcChunk {
level_offset: 1,
num_levels: 1,
value_offset: 1,
num_values: 1
})
.non_null_indices,
Vec::<usize>::new()
);
}
}
Loading
Loading