Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 120 additions & 3 deletions parquet/benches/bloom_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
use std::hint;

use criterion::{BatchSize, BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
use parquet::bloom_filter::Sbbf;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};

/// Build a bloom filter sized for `initial_ndv` at `fpp`, insert `num_values` distinct values,
/// and return it ready for folding.
Expand Down Expand Up @@ -46,7 +50,7 @@ fn bench_fold_to_target_fpp(c: &mut Criterion) {
f.fold_to_target_fpp(fpp);
f
},
criterion::BatchSize::SmallInput,
BatchSize::SmallInput,
);
});
}
Expand Down Expand Up @@ -104,10 +108,123 @@ fn bench_insert_only(c: &mut Criterion) {
group.finish();
}

/// Benchmark `Sbbf::insert` across the same three cache regimes as
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice for the bench changes to be a separate PR.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly for conciseness? Or should we push the bench changes first, then this Sbbf probe change second that way its easy to compare? Otherwise I'd lean towards keeping it in here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or should we push the bench changes first, then this Sbbf probe change second that way its easy to compare?

This (see the contributing guide). Thanks!

/// `bench_check`. The filter is allocated once per regime and reused
/// across iterations — bloom inserts are idempotent on identical
/// input, so this measures the pure insert kernel cost (hash + mask +
/// load + OR + store) without per-iteration allocation noise.
fn bench_insert(c: &mut Criterion) {
let regimes: [(&str, usize); 3] = [
("s_128KiB", 128 * 1024),
("m_2MiB", 2 * 1024 * 1024),
("l_32MiB", 32 * 1024 * 1024),
];
const NUM_INSERTS: usize = 50_000;

let mut group = c.benchmark_group("insert");

for (label, num_bytes) in regimes {
let mut rng = StdRng::seed_from_u64(0xC0FFEE);
let keys: Vec<[u8; 16]> = (0..NUM_INSERTS)
.map(|_| {
let mut k = [0u8; 16];
rng.fill(&mut k);
k
})
.collect();

let mut filter = Sbbf::new_with_num_of_bytes(num_bytes);

group.throughput(Throughput::Elements(NUM_INSERTS as u64));
group.bench_with_input(BenchmarkId::new("ins", label), &keys, |b, keys| {
b.iter(|| {
for k in keys {
filter.insert(hint::black_box(k.as_slice()));
}
});
});
}
group.finish();
}

/// Benchmark `Sbbf::check` across three cache regimes and both miss-heavy
/// (the common case for Parquet row-group skipping) and hit-heavy workloads.
///
/// The three filter sizes span the cache hierarchy on a typical server:
/// 128 KB fits in L2, 2 MB lives in L3, 32 MB spills out of L3.
fn bench_check(c: &mut Criterion) {
let regimes: [(&str, usize, usize); 3] = [
("s_128KiB", 128 * 1024, 10_000),
("m_2MiB", 2 * 1024 * 1024, 200_000),
("l_32MiB", 32 * 1024 * 1024, 3_000_000),
];
const NUM_QUERIES: usize = 50_000;

let mut group = c.benchmark_group("check");

for (label, num_bytes, ndv) in regimes {
let mut rng = StdRng::seed_from_u64(0xC0FFEE);

// Clear the high bit of byte 0 on inserted keys so the miss set below
// — which forces the same bit to 1 — is genuinely disjoint by
// construction (not just disjoint at birthday-paradox probability).
let mut filter = Sbbf::new_with_num_of_bytes(num_bytes);
let mut inserted: Vec<[u8; 16]> = Vec::with_capacity(ndv);
for _ in 0..ndv {
let mut k = [0u8; 16];
rng.fill(&mut k);
k[0] &= 0x7f;
filter.insert(k.as_slice());
inserted.push(k);
}

// Disjoint miss set: high bit of byte 0 is 1 here and 0 in `inserted`,
// so no miss key can equal an inserted key.
let miss_keys: Vec<[u8; 16]> = (0..NUM_QUERIES)
.map(|_| {
let mut k = [0u8; 16];
rng.fill(&mut k);
k[0] |= 0x80;
k
})
.collect();
let hit_keys: Vec<[u8; 16]> = (0..NUM_QUERIES)
.map(|_| inserted[rng.random_range(0..inserted.len())])
.collect();

group.throughput(Throughput::Elements(NUM_QUERIES as u64));
group.bench_with_input(BenchmarkId::new("miss", label), &miss_keys, |b, keys| {
b.iter(|| {
let mut found = 0u64;
for k in keys {
if hint::black_box(filter.check(k.as_slice())) {
found += 1;
}
}
hint::black_box(found)
});
});
group.bench_with_input(BenchmarkId::new("hit", label), &hit_keys, |b, keys| {
b.iter(|| {
let mut found = 0u64;
for k in keys {
if hint::black_box(filter.check(k.as_slice())) {
found += 1;
}
}
hint::black_box(found)
});
});
}
group.finish();
}

criterion_group!(
benches,
bench_fold_to_target_fpp,
bench_insert_and_fold,
bench_insert_only
bench_insert_only,
bench_insert,
bench_check,
);
criterion_main!(benches);
92 changes: 52 additions & 40 deletions parquet/src/bloom_filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,13 @@ pub struct BloomFilterHeader {
/// (8 bits total), and those bits are OR'd in. When checking, we verify
/// all 8 bits are set.
#[derive(Debug, Copy, Clone)]
#[repr(transparent)]
#[repr(C, align(32))]
struct Block([u32; 8]);

// The autovectorized 256-bit load/store in `Block::{check,insert}` assumes
// `Block` is 32 contiguous bytes, 32-byte aligned. Assert it.
const _: () = assert!(std::mem::size_of::<Block>() == 32);
const _: () = assert!(std::mem::align_of::<Block>() == 32);
impl Block {
const ZERO: Block = Block([0; 8]);

Expand All @@ -192,6 +197,10 @@ impl Block {
/// Key property: the mask depends *only* on `x` (a u32) and the fixed
/// SALT constants — it is independent of the filter size. This is why
/// folding preserves bit patterns (see Lemma 2 in tests).
///
/// `#[inline]` so this folds into [`Block::check`] / [`Block::insert`]
/// where the autovectorizer needs it as part of the same loop body.
#[inline]
fn mask(x: u32) -> Self {
let mut result = [0_u32; 8];
for i in 0..8 {
Expand Down Expand Up @@ -229,52 +238,24 @@ impl Block {
fn insert(&mut self, hash: u32) {
let mask = Self::mask(hash);
for i in 0..8 {
self[i] |= mask[i];
self.0[i] |= mask.0[i];
}
}

/// Check membership: returns `true` when *every* bit from `mask(hash)` is
/// already set in this block (`block[i] & mask[i] != 0` for all 8 words).
/// Check membership: `true` iff every bit in `mask(hash)` is set in this
/// block (i.e. the value was probably inserted). `false` is definitive.
///
/// A `true` result means "probably present" (other inserts may have set
/// the same bits). A `false` is definitive — the value was never inserted.
/// Branchless `acc |= !block & mask; acc == 0` — the "testc" reduction
/// shape LLVM autovectorizes. A short-circuiting `.all()` would win a
/// few lanes on miss but defeat vectorization.
#[inline]
fn check(&self, hash: u32) -> bool {
let mask = Self::mask(hash);
let mut acc = 0u32;
for i in 0..8 {
if self[i] & mask[i] == 0 {
return false;
}
}
true
}
}

impl std::ops::Index<usize> for Block {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these impls removed? I think that makes this a breaking API change.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Block is private type and only used here so I don't think so. I ran cargo public-api diff for this branch vs main and didn't see any differences

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right you are. Had Sbbf in my head 😅

type Output = u32;

#[inline]
fn index(&self, index: usize) -> &Self::Output {
self.0.index(index)
}
}

impl std::ops::IndexMut<usize> for Block {
#[inline]
fn index_mut(&mut self, index: usize) -> &mut Self::Output {
self.0.index_mut(index)
}
}

impl std::ops::BitOr for Block {
type Output = Self;

#[inline]
fn bitor(self, rhs: Self) -> Self {
let mut result = [0u32; 8];
for (i, item) in result.iter_mut().enumerate() {
*item = self.0[i] | rhs.0[i];
acc |= !self.0[i] & mask.0[i];
}
Self(result)
acc == 0
}
}

Expand Down Expand Up @@ -406,7 +387,7 @@ impl Sbbf {
.map(|chunk| {
let mut block = Block::ZERO;
for (i, word) in chunk.chunks_exact(4).enumerate() {
block[i] = u32::from_le_bytes(word.try_into().unwrap());
block.0[i] = u32::from_le_bytes(word.try_into().unwrap());
}
block
})
Expand Down Expand Up @@ -787,6 +768,37 @@ mod tests {
}
}

/// Autovec'd `Block::check` vs a trivial short-circuit reference across
/// 10K random `(block, hash)` pairs. Guards against autovec divergence
/// on any target the crate is built for.
#[test]
fn test_check_matches_reference() {
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};

// Independent impl: short-circuit form, does not autovectorize.
fn reference_check(block: &Block, hash: u32) -> bool {
let mask = Block::mask(hash);
for i in 0..8 {
if block.0[i] & mask.0[i] == 0 {
return false;
}
}
true
}

let mut rng = StdRng::seed_from_u64(0xC0FFEE);
for _ in 0..10_000 {
let block = Block(std::array::from_fn(|_| rng.random()));
let hash: u32 = rng.random();
assert_eq!(
block.check(hash),
reference_check(&block, hash),
"branchless vs reference check differ for hash {hash:#x}"
);
}
}

#[test]
fn test_sbbf_insert_and_check() {
let mut sbbf = Sbbf(vec![Block::ZERO; 1_000]);
Expand Down
Loading