Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 42 additions & 39 deletions encodings/fastlanes/src/bitpacking/compute/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,26 @@ fn filter_primitive_without_patches<U: UnsignedPType + BitPacking>(
array: &BitPackedArray,
selection: &Arc<MaskValues>,
) -> VortexResult<(Buffer<U>, Validity)> {
let values = filter_with_indices(array, selection.indices());
let selection_buffer = selection.bit_buffer();

let values = filter_with_indices(
array,
selection_buffer.set_indices(),
selection_buffer.true_count(),
);
let validity = array.validity()?.filter(&Mask::Values(selection.clone()))?;

Ok((values.freeze(), validity))
}

fn filter_with_indices<T: NativePType + BitPacking>(
fn filter_with_indices<T: NativePType + BitPacking, I: Iterator<Item = usize>>(
array: &BitPackedArray,
indices: &[usize],
indices: I,
indices_len: usize,
) -> BufferMut<T> {
let offset = array.offset() as usize;
let bit_width = array.bit_width() as usize;
let mut values = BufferMut::with_capacity(indices.len());
let mut values = BufferMut::with_capacity(indices_len);

// Some re-usable memory to store per-chunk indices.
let mut unpacked = [const { MaybeUninit::<T>::uninit() }; 1024];
Expand All @@ -118,43 +125,39 @@ fn filter_with_indices<T: NativePType + BitPacking>(
// Group the indices by the FastLanes chunk they belong to.
let chunk_size = 128 * bit_width / size_of::<T>();

chunked_indices(
indices.iter().copied(),
offset,
|chunk_idx, indices_within_chunk| {
let packed = &packed_bytes[chunk_idx * chunk_size..][..chunk_size];

if indices_within_chunk.len() == 1024 {
// Unpack the entire chunk.
unsafe {
let values_len = values.len();
values.set_len(values_len + 1024);
BitPacking::unchecked_unpack(
bit_width,
packed,
&mut values.as_mut_slice()[values_len..],
);
}
} else if indices_within_chunk.len() > UNPACK_CHUNK_THRESHOLD {
// Unpack into a temporary chunk and then copy the values.
unsafe {
let dst: &mut [MaybeUninit<T>] = &mut unpacked;
let dst: &mut [T] = std::mem::transmute(dst);
BitPacking::unchecked_unpack(bit_width, packed, dst);
}
values.extend_trusted(
indices_within_chunk
.iter()
.map(|&idx| unsafe { unpacked.get_unchecked(idx).assume_init() }),
chunked_indices(indices, offset, |chunk_idx, indices_within_chunk| {
let packed = &packed_bytes[chunk_idx * chunk_size..][..chunk_size];

if indices_within_chunk.len() == 1024 {
// Unpack the entire chunk.
unsafe {
let values_len = values.len();
values.set_len(values_len + 1024);
BitPacking::unchecked_unpack(
bit_width,
packed,
&mut values.as_mut_slice()[values_len..],
);
} else {
// Otherwise, unpack each element individually.
values.extend_trusted(indices_within_chunk.iter().map(|&idx| unsafe {
BitPacking::unchecked_unpack_single(bit_width, packed, idx)
}));
}
},
);
} else if indices_within_chunk.len() > UNPACK_CHUNK_THRESHOLD {
// Unpack into a temporary chunk and then copy the values.
unsafe {
let dst: &mut [MaybeUninit<T>] = &mut unpacked;
let dst: &mut [T] = std::mem::transmute(dst);
BitPacking::unchecked_unpack(bit_width, packed, dst);
}
values.extend_trusted(
indices_within_chunk
.iter()
.map(|&idx| unsafe { unpacked.get_unchecked(idx).assume_init() }),
);
} else {
// Otherwise, unpack each element individually.
values.extend_trusted(indices_within_chunk.iter().map(|&idx| unsafe {
BitPacking::unchecked_unpack_single(bit_width, packed, idx)
}));
}
});

values
}
Expand Down
2 changes: 1 addition & 1 deletion encodings/runend/src/compute/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl FilterKernel for RunEndVTable {
if runs_ratio < FILTER_TAKE_THRESHOLD || mask_values.true_count() < 25 {
Ok(Some(take_indices_unchecked(
array,
mask_values.indices(),
mask_values.bit_buffer().set_indices(),
&Validity::NonNullable,
)?))
} else {
Expand Down
12 changes: 8 additions & 4 deletions encodings/runend/src/compute/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,19 @@ impl TakeExecute for RunEndVTable {
.collect::<VortexResult<Vec<_>>>()?
});

take_indices_unchecked(array, &checked_indices, primitive_indices.validity()).map(Some)
take_indices_unchecked(
array,
checked_indices.into_iter(),
primitive_indices.validity(),
)
.map(Some)
}
}

/// Perform a take operation on a RunEndArray by binary searching for each of the indices.
pub fn take_indices_unchecked<T: AsPrimitive<usize>>(
pub fn take_indices_unchecked<T: AsPrimitive<usize>, I: Iterator<Item = T>>(
array: &RunEndArray,
indices: &[T],
indices: I,
validity: &Validity,
) -> VortexResult<ArrayRef> {
let ends = array.ends().to_primitive();
Expand All @@ -66,7 +71,6 @@ pub fn take_indices_unchecked<T: AsPrimitive<usize>>(
let physical_indices = match_each_integer_ptype!(ends.ptype(), |I| {
let end_slices = ends.as_slice::<I>();
let physical_indices_vec: Vec<u64> = indices
.iter()
.map(|idx| idx.as_() + array.offset())
.map(|idx| {
match <I as NumCast>::from(idx) {
Expand Down
2 changes: 1 addition & 1 deletion encodings/sequence/src/compute/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fn filter_impl<T: NativePType>(mul: T, base: T, mask: &Mask, validity: Validity)
.values()
.vortex_expect("FilterKernel precondition: mask is Mask::Values");
let mut buffer = BufferMut::<T>::with_capacity(mask_values.true_count());
buffer.extend(mask_values.indices().iter().map(|&idx| {
buffer.extend(mask_values.bit_buffer().set_indices().map(|idx| {
let i = T::from_usize(idx).vortex_expect("all valid indices fit");
base + i * mul
}));
Expand Down
34 changes: 15 additions & 19 deletions encodings/sparse/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ use vortex_error::VortexExpect as _;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;
use vortex_mask::AllOr;
use vortex_mask::Mask;
use vortex_scalar::Scalar;
use vortex_scalar::ScalarValue;
Expand Down Expand Up @@ -311,23 +310,16 @@ impl SparseArray {
} else if mask.false_count() as f64 > (0.9 * mask.len() as f64) {
// Array is dominated by NULL but has non-NULL values
let non_null_values = filter(array, &mask)?;
let non_null_indices = match mask.indices() {
AllOr::All => {
// We already know that the mask is 90%+ false
unreachable!("Mask is mostly null")
}
AllOr::None => {
// we know there are some non-NULL values
unreachable!("Mask is mostly null but not all null")
}
AllOr::Some(values) => {
let buffer: Buffer<u32> = values
.iter()
.map(|&v| v.try_into().vortex_expect("indices must fit in u32"))
.collect();

buffer.into_array()
}
let non_null_indices = if let Some(mask_values) = mask.values() {
let buffer: Buffer<u32> = mask_values
.bit_buffer()
.set_indices()
.map(|v| v.try_into().vortex_expect("indices must fit in u32"))
.collect();

buffer.into_array()
} else {
unreachable!()
};

return Ok(SparseArray::try_new(
Expand Down Expand Up @@ -370,7 +362,11 @@ impl SparseArray {
// All values are equal to the top value
return Ok(fill_array);
}
Mask::Values(values) => values.indices().iter().map(|v| *v as u64).collect(),
Mask::Values(values) => values
.bit_buffer()
.set_indices()
.map(|v| v as u64)
.collect(),
};

SparseArray::try_new(indices.into_array(), non_top_values, array.len(), fill)
Expand Down
51 changes: 26 additions & 25 deletions encodings/zstd/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,27 +251,25 @@ fn collect_valid_primitive(parray: &PrimitiveArray) -> VortexResult<PrimitiveArr

fn collect_valid_vbv(vbv: &VarBinViewArray) -> VortexResult<(ByteBuffer, Vec<usize>)> {
let mask = vbv.validity_mask()?;
let buffer_and_value_byte_indices = match mask.bit_buffer() {
AllOr::None => (Buffer::empty(), Vec::new()),
_ => {
let mut buffer = BufferMut::with_capacity(
usize::try_from(vbv.nbytes()).vortex_expect("must fit into buffer")
+ mask.true_count() * size_of::<ViewLen>(),
);
let mut value_byte_indices = Vec::new();
vbv.with_iterator(|iterator| {
// by flattening, we should omit nulls
for value in iterator.flatten() {
value_byte_indices.push(buffer.len());
// here's where we write the string lengths
buffer
.extend_trusted(ViewLen::try_from(value.len())?.to_le_bytes().into_iter());
buffer.extend_from_slice(value);
}
Ok::<_, VortexError>(())
})?;
(buffer.freeze(), value_byte_indices)
}
let buffer_and_value_byte_indices = if mask.all_false() {
(Buffer::empty(), Vec::new())
} else {
let mut buffer = BufferMut::with_capacity(
usize::try_from(vbv.nbytes()).vortex_expect("must fit into buffer")
+ mask.true_count() * size_of::<ViewLen>(),
);
let mut value_byte_indices = Vec::new();
vbv.with_iterator(|iterator| {
// by flattening, we should omit nulls
for value in iterator.flatten() {
value_byte_indices.push(buffer.len());
// here's where we write the string lengths
buffer.extend_trusted(ViewLen::try_from(value.len())?.to_le_bytes().into_iter());
buffer.extend_from_slice(value);
}
Ok::<_, VortexError>(())
})?;
(buffer.freeze(), value_byte_indices)
};
Ok(buffer_and_value_byte_indices)
}
Expand Down Expand Up @@ -719,7 +717,9 @@ impl ZstdArray {
Ok(primitive.into_array())
}
DType::Binary(_) | DType::Utf8(_) => {
match slice_validity.to_mask(slice_n_rows).indices() {
let mask = slice_validity.to_mask(slice_n_rows);

match mask.bit_buffer() {
AllOr::All => {
// the decompressed buffer is a bunch of interleaved u32 lengths
// and strings of those lengths, we need to reconstruct the
Expand All @@ -745,7 +745,7 @@ impl ZstdArray {
slice_n_rows,
)
.into_array()),
AllOr::Some(valid_indices) => {
AllOr::Some(mask_bits) => {
// the decompressed buffer is a bunch of interleaved u32 lengths
// and strings of those lengths, we need to reconstruct the
// views into those strings by passing through the buffer.
Expand All @@ -755,8 +755,9 @@ impl ZstdArray {
);

let mut views = BufferMut::<BinaryView>::zeroed(slice_n_rows);
for (view, index) in valid_views.into_iter().zip_eq(valid_indices) {
views[*index] = view
for (view, index) in valid_views.into_iter().zip_eq(mask_bits.set_indices())
{
views[index] = view
}

// SAFETY: we properly construct the views inside `reconstruct_views`
Expand Down
16 changes: 8 additions & 8 deletions vortex-array/src/arrays/bool/compute/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use vortex_buffer::get_bit;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_mask::Mask;
use vortex_mask::MaskIter;

use crate::ArrayRef;
use crate::ExecutionCtx;
Expand All @@ -32,17 +31,18 @@ impl FilterKernel for BoolVTable {
.values()
.vortex_expect("AllTrue and AllFalse are handled by filter fn");

let buffer = match mask_values.threshold_iter(FILTER_SLICES_DENSITY_THRESHOLD) {
MaskIter::Indices(indices) => filter_indices(
let buffer = if mask_values.density() >= FILTER_SLICES_DENSITY_THRESHOLD {
filter_slices(
&array.to_bit_buffer(),
mask.true_count(),
indices.iter().copied(),
),
MaskIter::Slices(slices) => filter_slices(
mask_values.bit_buffer().set_slices(),
)
} else {
filter_indices(
&array.to_bit_buffer(),
mask.true_count(),
slices.iter().copied(),
),
mask_values.bit_buffer().set_indices(),
)
};

Ok(Some(BoolArray::new(buffer, validity).into_array()))
Expand Down
12 changes: 5 additions & 7 deletions vortex-array/src/arrays/chunked/compute/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use vortex_buffer::BufferMut;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_mask::Mask;
use vortex_mask::MaskIter;

use crate::Array;
use crate::ArrayRef;
Expand All @@ -32,12 +31,11 @@ impl FilterKernel for ChunkedVTable {
.values()
.vortex_expect("AllTrue and AllFalse are handled by filter fn");

// Based on filter selectivity, we take the values between a range of slices, or
// we take individual indices.
let chunks = match mask_values.threshold_iter(FILTER_SLICES_SELECTIVITY_THRESHOLD) {
MaskIter::Indices(indices) => filter_indices(array, indices.iter().copied()),
MaskIter::Slices(slices) => filter_slices(array, slices.iter().copied()),
}?;
let chunks = if mask_values.density() >= FILTER_SLICES_SELECTIVITY_THRESHOLD {
filter_slices(array, mask_values.bit_buffer().set_slices())?
} else {
filter_indices(array, mask_values.bit_buffer().set_indices())?
};

// SAFETY: Filter operation preserves the dtype of each chunk.
// All filtered chunks maintain the same dtype as the original array.
Expand Down
20 changes: 9 additions & 11 deletions vortex-array/src/arrays/chunked/compute/mask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ use itertools::Itertools as _;
use vortex_buffer::BitBuffer;
use vortex_buffer::BitBufferMut;
use vortex_dtype::DType;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_mask::AllOr;
use vortex_mask::Mask;
use vortex_mask::MaskIter;
use vortex_scalar::Scalar;

use super::filter::ChunkFilter;
Expand All @@ -31,13 +30,12 @@ use crate::validity::Validity;
impl MaskKernel for ChunkedVTable {
fn mask(&self, array: &ChunkedArray, mask: &Mask) -> VortexResult<ArrayRef> {
let new_dtype = array.dtype().as_nullable();
let new_chunks = match mask.threshold_iter(FILTER_SLICES_SELECTIVITY_THRESHOLD) {
AllOr::All => unreachable!("handled in top-level mask"),
AllOr::None => unreachable!("handled in top-level mask"),
AllOr::Some(MaskIter::Indices(indices)) => mask_indices(array, indices, &new_dtype),
AllOr::Some(MaskIter::Slices(slices)) => {
mask_slices(array, slices.iter().cloned(), &new_dtype)
}
let mask_values = mask.values().vortex_expect("handled in top-level mask");

let new_chunks = if mask_values.density() >= FILTER_SLICES_SELECTIVITY_THRESHOLD {
mask_indices(array, mask_values.bit_buffer().set_indices(), &new_dtype)
} else {
mask_slices(array, mask_values.bit_buffer().set_slices(), &new_dtype)
}?;
debug_assert_eq!(new_chunks.len(), array.nchunks());
debug_assert_eq!(
Expand All @@ -52,7 +50,7 @@ register_kernel!(MaskKernelAdapter(ChunkedVTable).lift());

fn mask_indices(
array: &ChunkedArray,
indices: &[usize],
indices: impl Iterator<Item = usize>,
new_dtype: &DType,
) -> VortexResult<Vec<ArrayRef>> {
let mut new_chunks = Vec::with_capacity(array.nchunks());
Expand All @@ -61,7 +59,7 @@ fn mask_indices(

let chunk_offsets = array.chunk_offsets();

for &set_index in indices {
for set_index in indices {
let (chunk_id, index) = find_chunk_idx(set_index, &chunk_offsets)?;
if chunk_id != current_chunk_id {
let chunk = array.chunk(current_chunk_id).clone();
Expand Down
Loading
Loading