Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions parquet/src/encodings/decoding/byte_stream_split_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,32 @@ impl<T: DataType> ByteStreamSplitDecoder<T> {
// Here we assume src contains the full data (which it must, since we're
// can only know where to split the streams once all data is collected),
// but dst can be just a slice starting from the given index.
// We iterate over the output bytes and fill them in from their strided
// input byte locations.
fn join_streams_const<const TYPE_SIZE: usize>(
src: &[u8],
dst: &mut [u8],
stride: usize,
values_decoded: usize,
) {
let sub_src = &src[values_decoded..];
for i in 0..dst.len() / TYPE_SIZE {
for j in 0..TYPE_SIZE {
dst[i * TYPE_SIZE + j] = sub_src[i + j * stride];
// Process values in blocks to improve cache locality when rebuilding
// values from the split byte streams.
const BLOCK: usize = 32;

let values = dst.len() / TYPE_SIZE;
let src = &src[values_decoded..];

for base in (0..values).step_by(BLOCK) {
// Handle the final partial block without branching in the inner loop.
let len = (values - base).min(BLOCK);

for byte_idx in 0..TYPE_SIZE {
let src_start = byte_idx * stride + base;
let src_block = &src[src_start..src_start + len];

// Read contiguous bytes from each byte stream and write them
// back into the original value layout.
for (idx, value) in src_block.iter().copied().enumerate() {
dst[(base + idx) * TYPE_SIZE + byte_idx] = value;
}
}
}
}
Expand All @@ -71,10 +85,21 @@ fn join_streams_variable(
type_size: usize,
values_decoded: usize,
) {
let sub_src = &src[values_decoded..];
for i in 0..dst.len() / type_size {
for j in 0..type_size {
dst[i * type_size + j] = sub_src[i + j * stride];
const BLOCK: usize = 32;

let values = dst.len() / type_size;
let src = &src[values_decoded..];

for base in (0..values).step_by(BLOCK) {
let len = (values - base).min(BLOCK);

for byte_idx in 0..type_size {
let src_start = byte_idx * stride + base;
let src_block = &src[src_start..src_start + len];

for (idx, value) in src_block.iter().copied().enumerate() {
dst[(base + idx) * type_size + byte_idx] = value;
}
}
}
}
Expand Down
Loading