Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions encodings/fastlanes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,7 @@ harness = false
name = "cast_bitpacked"
harness = false
required-features = ["_test-harness"]

[[bench]]
name = "patched_unpack_locality"
harness = false
176 changes: 176 additions & 0 deletions encodings/fastlanes/benches/patched_unpack_locality.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! Microbenchmark isolating the cache-locality question for patched bitpacked arrays:
//! is it faster to unpack every 1024-element block and *then* scatter all patches
//! (`unpack_then_patch`), or to unpack one block and immediately patch it while the
//! freshly-decoded block is still hot in cache (`fused`)?
//!
//! Both strategies perform identical total work (same unpack kernel calls, same number of
//! patch stores); only the loop ordering differs, so any delta is attributable to locality.

#![expect(clippy::cast_possible_truncation)]

use divan::Bencher;
use fastlanes::BitPacking;

fn main() {
// Correctness guard: both strategies must produce identical output.
let case = (1usize << 18, 9u8, 50u32);
let data = Setup::new(case.0, case.1, case.2);
let a = {
let mut out = vec![0u32; data.n_padded];
unpack_then_patch(&data, &mut out);
out
};
let b = {
let mut out = vec![0u32; data.n_padded];
fused(&data, &mut out);
out
};
assert_eq!(a, b, "fused and unpack_then_patch must agree");

divan::main();
}

/// (num_values, bit_width, patch_stride) — one patch every `patch_stride` elements.
const CASES: &[(usize, u8, u32)] = &[
(1 << 16, 9, 200),
(1 << 16, 9, 20),
(1 << 16, 9, 5),
];

struct Setup {
bit_width: usize,
elems_per_chunk: usize,
num_chunks: usize,
n_padded: usize,
packed: Vec<u32>,
/// Patch indices, globally sorted.
indices: Vec<usize>,
/// Patch values, parallel to `indices`.
values: Vec<u32>,
/// `chunk_offsets[c]..chunk_offsets[c + 1]` is the patch range for chunk `c`.
chunk_offsets: Vec<usize>,
}

impl Setup {
fn new(n: usize, bit_width: u8, patch_stride: u32) -> Self {
let bit_width = bit_width as usize;
let num_chunks = n.div_ceil(1024);
let n_padded = num_chunks * 1024;
let elems_per_chunk = 1024 * bit_width / 32;
let mask = if bit_width == 32 {
u32::MAX
} else {
(1u32 << bit_width) - 1
};

// Deterministic, low-entropy values that fit in `bit_width` bits.
let values_in: Vec<u32> = (0..n_padded as u32)
.map(|i| i.wrapping_mul(2654435761) & mask)
.collect();

let mut packed = vec![0u32; num_chunks * elems_per_chunk];
for c in 0..num_chunks {
// SAFETY: input is exactly 1024 elements, output exactly `elems_per_chunk`.
unsafe {
BitPacking::unchecked_pack(
bit_width,
&values_in[c * 1024..][..1024],
&mut packed[c * elems_per_chunk..][..elems_per_chunk],
);
}
}

// Uniformly-spread patches: one every `patch_stride` elements.
let stride = patch_stride as usize;
let mut indices = Vec::new();
let mut values = Vec::new();
let mut chunk_offsets = vec![0usize; num_chunks + 1];
let mut idx = 0usize;
while idx < n_padded {
indices.push(idx);
values.push(0xDEAD_BEEF ^ idx as u32);
idx += stride;
}
// Build chunk offsets from the sorted indices.
let mut p = 0usize;
for c in 0..num_chunks {
let chunk_end = (c + 1) * 1024;
while p < indices.len() && indices[p] < chunk_end {
p += 1;
}
chunk_offsets[c + 1] = p;
}

Self {
bit_width,
elems_per_chunk,
num_chunks,
n_padded,
packed,
indices,
values,
chunk_offsets,
}
}
}

/// Approach A: unpack every block into the output, then scatter all patches in a second pass.
#[inline]
fn unpack_then_patch(s: &Setup, output: &mut [u32]) {
for c in 0..s.num_chunks {
// SAFETY: packed slice is `elems_per_chunk`, output range is exactly 1024.
unsafe {
BitPacking::unchecked_unpack(
s.bit_width,
&s.packed[c * s.elems_per_chunk..][..s.elems_per_chunk],
&mut output[c * 1024..][..1024],
);
}
}
for (i, &idx) in s.indices.iter().enumerate() {
output[idx] = s.values[i];
}
}

/// Approach B: unpack one block and immediately patch it while still hot in cache.
#[inline]
fn fused(s: &Setup, output: &mut [u32]) {
for c in 0..s.num_chunks {
// SAFETY: packed slice is `elems_per_chunk`, output range is exactly 1024.
unsafe {
BitPacking::unchecked_unpack(
s.bit_width,
&s.packed[c * s.elems_per_chunk..][..s.elems_per_chunk],
&mut output[c * 1024..][..1024],
);
}
for p in s.chunk_offsets[c]..s.chunk_offsets[c + 1] {
output[s.indices[p]] = s.values[p];
}
}
}

#[divan::bench(args = CASES)]
fn unpack_then_patch_bench(bencher: Bencher, (n, bit_width, stride): (usize, u8, u32)) {
let setup = Setup::new(n, bit_width, stride);
bencher
.with_inputs(|| vec![0u32; setup.n_padded])
.bench_local_values(|mut output| {
unpack_then_patch(&setup, &mut output);
divan::black_box(output);
});
}

#[divan::bench(args = CASES)]
fn fused_bench(bencher: Bencher, (n, bit_width, stride): (usize, u8, u32)) {
let setup = Setup::new(n, bit_width, stride);
bencher
.with_inputs(|| vec![0u32; setup.n_padded])
.bench_local_values(|mut output| {
fused(&setup, &mut output);
divan::black_box(output);
});
}
2 changes: 2 additions & 0 deletions encodings/sparse/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ mod canonical;
mod compute;
mod kernel;
mod ops;
mod plugin;
mod rules;
mod slice;

pub use plugin::SparsePatchedPlugin;
use vortex_array::aggregate_fn::AggregateFnVTable as _;
use vortex_array::aggregate_fn::fns::is_constant::IsConstant;
use vortex_array::aggregate_fn::fns::min_max::MinMax;
Expand Down
201 changes: 201 additions & 0 deletions encodings/sparse/src/plugin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! A custom [`ArrayPlugin`] that lets you load in and deserialize a `Sparse` array as a
//! `PatchedArray` that wraps a constant fill array.
//!
//! A `Sparse` array is logically a set of patches applied on top of a constant fill value, which
//! is exactly what a `Patched` array over a [`ConstantArray`] represents. This plugin externalizes
//! that representation on deserialize when the array is primitive with non-null patches, which is
//! the subset that `Patched` can represent. All other sparse arrays are returned unchanged.

use vortex_array::Array;
use vortex_array::ArrayId;
use vortex_array::ArrayPlugin;
use vortex_array::ArrayRef;
use vortex_array::ArrayVTable;
use vortex_array::IntoArray;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::Patched;
use vortex_array::buffer::BufferHandle;
use vortex_array::dtype::DType;
use vortex_array::serde::ArrayChildren;
use vortex_error::VortexResult;
use vortex_error::vortex_err;
use vortex_session::VortexSession;

use crate::Sparse;
use crate::SparseExt;

/// Custom deserialization plugin that converts a primitive `Sparse` array into a `PatchedArray`
/// holding a [`ConstantArray`] fill.
#[derive(Debug, Clone)]
pub struct SparsePatchedPlugin;

impl ArrayPlugin for SparsePatchedPlugin {
fn id(&self) -> ArrayId {
// We reuse the existing `Sparse` ID so that we can take over its deserialization pathway.
ArrayVTable::id(&Sparse)
}

fn serialize(
&self,
array: &ArrayRef,
session: &VortexSession,
) -> VortexResult<Option<Vec<u8>>> {
// Delegate to the Sparse VTable for serialization.
Sparse.serialize(array, session)
}

fn deserialize(
&self,
dtype: &DType,
len: usize,
metadata: &[u8],
buffers: &[BufferHandle],
children: &dyn ArrayChildren,
session: &VortexSession,
) -> VortexResult<ArrayRef> {
let sparse = Array::<Sparse>::try_from_parts(ArrayVTable::deserialize(
&Sparse, dtype, len, metadata, buffers, children, session,
)?)
.map_err(|_| vortex_err!("Sparse plugin should only deserialize vortex.sparse"))?;

// `Patched` can only represent primitive inners with non-null patch values, so anything
// else (bool, varbin, struct, fixed-size-list, nullable patches) stays a Sparse array.
if !dtype.is_primitive() {
return Ok(sparse.into_array());
}

let patches = sparse.patches();
let mut ctx = session.create_execution_ctx();
if !patches.values().all_valid(&mut ctx)? {
return Ok(sparse.into_array());
}

let fill = ConstantArray::new(sparse.fill_scalar().clone(), len).into_array();
let patched = Patched::from_array_and_patches(fill, &patches, &mut ctx)?;

Ok(patched.into_array())
}

fn is_supported_encoding(&self, id: &ArrayId) -> bool {
id == ArrayVTable::id(&Sparse) || id == ArrayVTable::id(&Patched)
}
}

#[cfg(test)]
mod tests {
use std::sync::LazyLock;

use vortex_array::ArrayPlugin;
use vortex_array::IntoArray;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::PatchedArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::patched::PatchedArraySlotsExt;
use vortex_array::buffer::BufferHandle;
use vortex_array::patches::Patches;
use vortex_array::scalar::Scalar;
use vortex_array::session::ArraySession;
use vortex_array::session::ArraySessionExt;
use vortex_error::VortexResult;
use vortex_error::vortex_err;
use vortex_session::VortexSession;

use super::SparsePatchedPlugin;
use crate::Sparse;

static SESSION: LazyLock<VortexSession> = LazyLock::new(|| {
let session = VortexSession::empty().with::<ArraySession>();
session.arrays().register(SparsePatchedPlugin);
session
});

fn primitive_sparse() -> VortexResult<crate::SparseArray> {
let patches = Patches::new(
10,
0,
PrimitiveArray::from_iter([1u32, 3, 7]).into_array(),
PrimitiveArray::from_iter([10u32, 30, 70]).into_array(),
None,
)?;
Sparse::try_new_from_patches(
patches,
Scalar::primitive(0u32, vortex_array::dtype::Nullability::NonNullable),
)
}

fn round_trip(array: &vortex_array::ArrayRef) -> VortexResult<vortex_array::ArrayRef> {
let metadata = SESSION.array_serialize(array)?.unwrap();
let children = array.children();
let buffers = array
.buffers()
.into_iter()
.map(BufferHandle::new_host)
.collect::<Vec<_>>();

SparsePatchedPlugin.deserialize(
array.dtype(),
array.len(),
&metadata,
&buffers,
&children,
&SESSION,
)
}

#[test]
fn primitive_sparse_becomes_patched() -> VortexResult<()> {
let sparse = primitive_sparse()?.into_array();
let deserialized = round_trip(&sparse)?;

let patched: PatchedArray = deserialized
.try_downcast()
.map_err(|a| vortex_err!("Expected Patched, got {}", a.encoding_id()))?;

// The inner is the constant fill.
assert!(patched.inner().as_constant().is_some());

// The decoded values must match the original sparse array.
let mut ctx = SESSION.create_execution_ctx();
let expected = sparse
.execute::<PrimitiveArray>(&mut ctx)?
.into_buffer::<u32>();
let actual = patched
.into_array()
.execute::<PrimitiveArray>(&mut ctx)?
.into_buffer::<u32>();
assert_eq!(expected, actual);

Ok(())
}

#[test]
fn non_primitive_sparse_stays_sparse() -> VortexResult<()> {
use vortex_array::arrays::BoolArray;

let patches = Patches::new(
5,
0,
PrimitiveArray::from_iter([1u32, 3]).into_array(),
BoolArray::from_iter([true, false]).into_array(),
None,
)?;
let sparse = Sparse::try_new_from_patches(
patches,
Scalar::bool(false, vortex_array::dtype::Nullability::NonNullable),
)?
.into_array();

let deserialized = round_trip(&sparse)?;
assert!(
deserialized.is::<Sparse>(),
"non-primitive sparse should stay sparse, got {}",
deserialized.encoding_id()
);

Ok(())
}
}
Loading
Loading