Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
d870171
perf(decode): 4-slot software prefetch pipeline for sequence execution
polaz May 22, 2026
06ebf23
docs(decode): clarify prefetch L2 deviation + comment direction
polaz May 22, 2026
3d5f040
perf(decode): gate prefetch on offset >= 32 KiB + drop dead prefill g…
polaz May 22, 2026
07d0e26
Merge branch 'main' into perf/#208-match-prefetch
polaz May 22, 2026
a3f4a78
fix(decode): use wrapping_add for prefetch position math to stay fuzz…
polaz May 22, 2026
01be919
perf(decode): exact actual_offset via shadow_hist + short-offset fast…
polaz May 22, 2026
55fb901
refactor(decode): align prefetch pipeline comments + drop redundant t…
polaz May 22, 2026
d8e69a0
perf(decode): gate prefetch pipeline by FSE-table long-offset share (…
polaz May 22, 2026
2e174e1
fix(decode): prefetch across s1/s2 boundary when match source spans t…
polaz May 22, 2026
4760fd9
perf(decode): bump prefetch ADVANCE 4→8 + switch lookahead hint T1→T0…
polaz May 22, 2026
2187c97
perf(decode): short-circuit FSE offsets walk by num_sequences first
polaz May 22, 2026
be9cb20
refactor(decode): pointer-width-aware long-offset gate + accurate for…
polaz May 22, 2026
d8e65ed
docs(decode): align prefetch pipeline comments with ADVANCE=8 + T0/L1…
polaz May 22, 2026
bbd648d
fix(decode): prefetch the match-start cache line even when s1/s2 tail…
polaz May 22, 2026
bf70615
docs(decode): drop redundant 'STORED_SEQS = 8' echo in ADVANCE power-…
polaz May 22, 2026
6dc8465
perf(decode): cache offsets-long-share in FSEScratch, recompute only …
polaz May 22, 2026
54dd9ef
docs+test(decode): align review comments + add prefetch_lookahead_mat…
polaz May 22, 2026
b0a1116
perf(decode): pipeline-aware repeat variant + pre-resolved offset in …
polaz May 22, 2026
f4005ae
fix(decode): keep buffer.reserve in pipelined repeat + scratch reset …
polaz May 22, 2026
45be345
Merge branch 'main' into perf/#208-match-prefetch
polaz May 22, 2026
2aa2751
docs+test(decode): align pipeline comments with actual ring/reserve s…
polaz May 22, 2026
72e0754
fix(decode): recompute offsets_long_share in FSEScratch::reinit_from
polaz May 22, 2026
64ea74a
fix(decode): rollback offset_hist + buffer on mid-loop pipelined Err
polaz May 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 197 additions & 11 deletions zstd/src/decoding/decode_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,38 @@ impl<B: BufferBackend> DecodeBuffer<B> {
}

pub fn repeat(&mut self, offset: usize, match_length: usize) -> Result<(), DecodeBufferError> {
self.repeat_inner::<false>(offset, match_length)
}

/// Same as [`repeat`] but the caller asserts a lookahead
/// prefetch was already issued for this match source ADVANCE
/// iterations ago, so the in-loop `prefetch_match_source` would
/// be redundant issue-port pressure on top of the L1 line that's
/// by now warm. Per-call `reserve` is KEPT — on malformed input
/// the `extend_from_within_unchecked*` writes assume the buffer
/// has the required free capacity (only `debug_assert` checks in
/// release), and a single missing reserve here would turn a
/// fuzz-corrupt block into out-of-bounds UB. The reserve is
/// already amortised by the caller's upfront
/// `reserve(MAX_BLOCK_SIZE)`, so this is a cheap capacity-check
/// branch, not a real allocation. Used exclusively by the
/// pipelined sequence executor in
/// [`crate::decoding::sequence_section_decoder`].
#[inline(always)]
pub(crate) fn repeat_lookahead_prefetched(
&mut self,
offset: usize,
match_length: usize,
) -> Result<(), DecodeBufferError> {
self.repeat_inner::<true>(offset, match_length)
}

#[inline(always)]
fn repeat_inner<const SKIP_PREFETCH: bool>(
&mut self,
offset: usize,
match_length: usize,
) -> Result<(), DecodeBufferError> {
if offset == 0 {
return Err(DecodeBufferError::ZeroOffset);
}
Expand All @@ -212,21 +244,25 @@ impl<B: BufferBackend> DecodeBuffer<B> {
let start_idx = buf_len - offset;
let end_idx = start_idx + match_length;

// Reserve unconditionally — `extend_from_within_unchecked*`
// assumes the required free capacity exists; skipping it
// would turn a malformed block (match_length past the
// upfront `reserve(MAX_BLOCK_SIZE)`) into release-build
// UB. The pipelined caller already reserved MAX_BLOCK_SIZE
// up front, so this is a cheap no-op branch in the hot
// path.
self.buffer.reserve(match_length);
self.prefetch_match_source(start_idx, match_length);
if !SKIP_PREFETCH {
self.prefetch_match_source(start_idx, match_length);
}
if end_idx > buf_len {
self.repeat_overlapping(offset, match_length, start_idx);
} else {
// can just copy parts of the existing buffer
// SAFETY: Requirements checked:
// 1. start_idx + match_length must be <= self.buffer.len()
// We know that:
// 1. start_idx = self.buffer.len() - offset
// 2. end_idx = start_idx + match_length
// 3. end_idx <= self.buffer.len()
// Thus follows: start_idx + match_length <= self.buffer.len()
//
// 2. explicitly reserved enough memory for the whole match_length
// SAFETY: start_idx + match_length <= self.buffer.len()
// (start_idx = buf_len - offset, end_idx = start_idx +
// match_length, end_idx <= buf_len). The `reserve`
// above guarantees the destination has enough free
// capacity for `match_length` more bytes.
unsafe {
if offset >= 16 && use_branchless_wildcopy() {
self.buffer
Expand Down Expand Up @@ -385,6 +421,97 @@ impl<B: BufferBackend> DecodeBuffer<B> {
}
}

/// Lookahead-friendly prefetch issued ahead of execute. The
/// in-loop `prefetch_match_source` above fires at the moment of
/// the copy, so it can't hide DRAM latency for cold long-distance
/// match sources. Pipelined callers compute the match source
/// logical index 3-4 sequences in advance and call this helper —
/// by the time the corresponding `repeat()` reaches the actual
/// load, the line is already in-flight.
///
/// `start_idx` is a logical index into the current buffer (same
/// frame as `buffer.len()`). Indices outside `[0, buffer.len())`
/// are silently dropped — the cases this guards against include
/// intra-block self-overlap (source falls past the not-yet-
/// written cursor), `wrapping_sub` underflow on a caller that
/// computed `match_start - offset` with an offset larger than
/// match_start (e.g. a stale or malformed sequence), and
/// dictionary-sourced matches whose logical position predates
/// the buffer's current frame. The donor (`PREFETCH_L1` in
/// `ZSTD_prefetchMatch` — we mirror that with `prefetch_slice`
/// → `_MM_HINT_T0` / `pldl1keep`, see the body comment) tolerates
/// invalid addresses by spec, but in
/// safe Rust the cheapest equivalent is to bound-check the
/// logical position before chasing the slice.
#[inline(always)]
pub(crate) fn prefetch_lookahead_match_source(&self, start_idx: usize) {
if start_idx >= self.buffer.len() {
return;
}
Comment thread
polaz marked this conversation as resolved.
// Donor's `ZSTD_prefetchMatch` issues two `PREFETCH_L1` hints
// per match — one at `match`, one at `match + CACHELINE_SIZE`.
// We mirror that intent via `prefetch_slice` (`_MM_HINT_T0` on
// x86 / `pldl1keep` on aarch64 → L1 destination) with extent
// capped at 2 × 64 B = 128 B. In the contiguous case the helper
// emits at most two prefetch instructions, matching donor
// exactly. In the wrap-boundary case the same 128 B budget is
// split across `s1_tail` and `s2[0..]`, which can emit up to
// four cache-line prefetches total (two per slice when each
// side covers a full 64 B) — still bounded, still L1, still
// less than the helper's MAX_LINES = 4 ceiling. The lookahead
// depth (ADVANCE) is small enough that L1 should hold the line
// across the gap; if profiling later shows L1 eviction
// pressure we can revisit T1/L2.
Comment thread
polaz marked this conversation as resolved.
const PREFETCH_EXTENT: usize = 128;
const CACHE_LINE: usize = 64;
let (s1, s2) = self.buffer.as_slices();
if start_idx < s1.len() {
let s1_tail = &s1[start_idx..];
let s1_bound = core::cmp::min(s1_tail.len(), PREFETCH_EXTENT);
// `prefetch_slice` no-ops on slices shorter than one cache
// line — sensible for bulk prefetch, but wrong for the
// wrap-boundary case where the cache line containing
// `start_idx` IS the line we need warmed even if the
// remaining contiguous extent is < 64 B. Fall back to the
// single-line variant in that case so the match-start
// line is always hinted.
if s1_bound >= CACHE_LINE {
prefetch::prefetch_slice(&s1_tail[..s1_bound]);
} else {
prefetch::prefetch_first_line_l1(&s1_tail[..s1_bound]);
}
// Wrap continuation: when the match source straddles the
// s1/s2 boundary and the s1 tail is shorter than the
// PREFETCH_EXTENT we asked for, top up the rest from
// s2[0..]. Without this the donor's "up to two cache
// lines" intent silently collapses to one (or zero if
// s1_tail is the last sub-line of s1).
if s1_bound < PREFETCH_EXTENT {
let remaining = PREFETCH_EXTENT - s1_bound;
let s2_bound = core::cmp::min(s2.len(), remaining);
if s2_bound >= CACHE_LINE {
prefetch::prefetch_slice(&s2[..s2_bound]);
} else if s2_bound > 0 {
prefetch::prefetch_first_line_l1(&s2[..s2_bound]);
}
}
} else {
Comment thread
polaz marked this conversation as resolved.
// `start_idx < self.buffer.len()` from the early return,
// `buffer.len() == s1.len() + s2.len()`, and the else
// branch establishes `start_idx >= s1.len()`. So
// `idx = start_idx - s1.len() < s2.len()` by construction
// — no explicit `idx < s2.len()` guard needed.
let idx = start_idx - s1.len();
let tail = &s2[idx..];
let bound = core::cmp::min(tail.len(), PREFETCH_EXTENT);
if bound >= CACHE_LINE {
prefetch::prefetch_slice(&tail[..bound]);
} else {
prefetch::prefetch_first_line_l1(&tail[..bound]);
}
Comment thread
polaz marked this conversation as resolved.
}
Comment thread
polaz marked this conversation as resolved.
}

#[cold]
fn repeat_from_dict(
&mut self,
Expand Down Expand Up @@ -942,4 +1069,63 @@ mod tests {
}
}
}

#[test]
fn prefetch_lookahead_in_range_does_not_panic() {
// Plain in-range lookup: start_idx well within `buffer.len()`.
// The helper should issue prefetch hints and return cleanly.
// Prefetch hints are unobservable from Rust — the assertion is
// simply that the call completes without panic / UB.
let mut buf = DecodeBuffer::<RingBuffer>::new(1024);
buf.reserve(512);
buf.push(&[0xAA; 256]);
buf.prefetch_lookahead_match_source(0);
buf.prefetch_lookahead_match_source(128);
buf.prefetch_lookahead_match_source(buf.len() - 1);
}

#[test]
fn prefetch_lookahead_out_of_range_returns_without_panic() {
// Wrap-derived garbage / dictionary-sourced match / intra-block
// self-overlap all produce `start_idx >= buffer.len()` here.
// The helper must early-return (bound check) and never touch a
// slice past the live region.
let mut buf = DecodeBuffer::<RingBuffer>::new(1024);
buf.reserve(64);
buf.push(&[0x55; 32]);
buf.prefetch_lookahead_match_source(buf.len());
buf.prefetch_lookahead_match_source(buf.len() + 1);
buf.prefetch_lookahead_match_source(usize::MAX);
// Empty buffer — every start_idx is out-of-range.
let empty: DecodeBuffer<RingBuffer> = DecodeBuffer::new(1024);
empty.prefetch_lookahead_match_source(0);
empty.prefetch_lookahead_match_source(7);
}

#[test]
fn prefetch_lookahead_at_wrap_boundary() {
// Force the RingBuffer into a wrapped layout where
// `as_slices()` returns two non-empty halves: push, drain past
// window, push again so the write cursor wraps. Then exercise
// start_idx values at the boundary (last byte of s1, first
// byte of s2, short s1 tail < CACHE_LINE) so the
// `prefetch_first_line_l1` fallback path is touched too.
let mut buf = DecodeBuffer::<RingBuffer>::new(256);
// Fill with two passes so the underlying ringbuffer wraps.
let payload = [0xCD_u8; 320];
buf.push(&payload);
// Drain to free read cursor capacity (write side can then wrap).
let _ = buf.drain_to_window_size();
buf.push(&payload);
// Probe a handful of indices inside and across the wrap.
let n = buf.len();
if n > 0 {
buf.prefetch_lookahead_match_source(0);
buf.prefetch_lookahead_match_source(n / 2);
buf.prefetch_lookahead_match_source(n - 1);
// Out-of-range probe to exercise the early-return path on
// a wrapped buffer.
buf.prefetch_lookahead_match_source(n);
}
}
}
56 changes: 56 additions & 0 deletions zstd/src/decoding/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,25 @@ pub(crate) fn prefetch_slice(slice: &[u8]) {
prefetch_slice_impl_l1(slice);
}

/// Issue exactly one L1 prefetch hint at the first byte of `slice`,
/// regardless of `slice.len()`. Use when the caller knows the slice
/// is short (< CACHE_LINE) but the cache line containing
/// `slice.as_ptr()` is still the one the consumer is about to read.
///
/// The standard `prefetch_slice` early-returns on slices below one
/// cache line, which is the right call for bulk prefetch (no point
/// hinting a partial buffer) but the wrong call for the wrap-boundary
/// match-source case in `prefetch_lookahead_match_source`: there a
/// 16-byte s1 tail is the EXACT line we need warmed even though it
/// sits below the bulk threshold.
#[inline(always)]
pub(crate) fn prefetch_first_line_l1(slice: &[u8]) {
if slice.is_empty() {
return;
}
prefetch_first_line_l1_impl(slice.as_ptr());
}

#[inline(always)]
pub(crate) fn prefetch_slice_t1(slice: &[u8]) {
prefetch_slice_impl_t1(slice);
Expand All @@ -15,6 +34,15 @@ fn prefetch_slice_impl_l1(slice: &[u8]) {
prefetch_stride_x86_64::<{ _MM_HINT_T0 }>(slice);
}

#[cfg(target_arch = "x86_64")]
#[inline(always)]
fn prefetch_first_line_l1_impl(ptr: *const u8) {
use core::arch::x86_64::{_MM_HINT_T0, _mm_prefetch};
// SAFETY: `_mm_prefetch` accepts any address — prefetching an
// invalid pointer is a no-op by the ISA spec, not UB.
unsafe { _mm_prefetch(ptr.cast(), _MM_HINT_T0) };
}

#[cfg(target_arch = "x86_64")]
#[inline(always)]
fn prefetch_slice_impl_t1(slice: &[u8]) {
Expand Down Expand Up @@ -48,6 +76,13 @@ fn prefetch_slice_impl_l1(slice: &[u8]) {
prefetch_stride_x86::<{ _MM_HINT_T0 }>(slice);
}

#[cfg(all(target_arch = "x86", target_feature = "sse"))]
#[inline(always)]
fn prefetch_first_line_l1_impl(ptr: *const u8) {
use core::arch::x86::{_MM_HINT_T0, _mm_prefetch};
unsafe { _mm_prefetch(ptr.cast(), _MM_HINT_T0) };
}

#[cfg(all(target_arch = "x86", target_feature = "sse"))]
#[inline(always)]
fn prefetch_slice_impl_t1(slice: &[u8]) {
Expand Down Expand Up @@ -80,6 +115,19 @@ fn prefetch_slice_impl_l1(slice: &[u8]) {
prefetch_stride_aarch64::<true>(slice);
}

#[cfg(target_arch = "aarch64")]
#[inline(always)]
fn prefetch_first_line_l1_impl(ptr: *const u8) {
use core::arch::asm;
unsafe {
asm!(
"prfm pldl1keep, [{ptr}]",
ptr = in(reg) ptr,
options(nostack, preserves_flags, readonly)
);
}
}

#[cfg(target_arch = "aarch64")]
#[inline(always)]
fn prefetch_slice_impl_t1(slice: &[u8]) {
Expand Down Expand Up @@ -129,6 +177,14 @@ fn prefetch_stride_aarch64<const L1: bool>(slice: &[u8]) {
#[inline(always)]
fn prefetch_slice_impl_l1(_slice: &[u8]) {}

#[cfg(not(any(
target_arch = "x86_64",
all(target_arch = "x86", target_feature = "sse"),
target_arch = "aarch64",
)))]
#[inline(always)]
fn prefetch_first_line_l1_impl(_ptr: *const u8) {}

#[cfg(not(any(
target_arch = "x86_64",
all(target_arch = "x86", target_feature = "sse"),
Expand Down
31 changes: 31 additions & 0 deletions zstd/src/decoding/scratch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ impl<B: BufferBackend> DecoderScratch<B> {
ll_rle: None,
match_lengths: AlignedFSETable::new(MAX_MATCH_LENGTH_CODE),
ml_rle: None,
offsets_long_share: 0,
},
buffer: DecodeBuffer::new(window_size),
offset_hist: [1, 4, 8],
Expand All @@ -71,6 +72,13 @@ impl<B: BufferBackend> DecoderScratch<B> {
self.fse.ll_rle = None;
self.fse.ml_rle = None;
self.fse.of_rle = None;
// Reset the cached pipeline-gate signal alongside the FSE
// table reset — otherwise scratch reuse across frames could
// engage the long pipeline on a new frame's Repeat-mode
// header based on the previous frame's offset distribution
// (or vice versa: skip the pipeline when the new frame
// actually has long offsets).
self.fse.offsets_long_share = 0;

self.huf.table.reset();
}
Expand Down Expand Up @@ -111,6 +119,16 @@ pub struct FSEScratch {
pub ll_rle: Option<u8>,
pub match_lengths: AlignedFSETable,
pub ml_rle: Option<u8>,
/// Cached "share of offset codes strictly > LONG_OFFSET_CODE_THRESHOLD
/// (i.e. codes ≥ 23 when the threshold is 22)" scaled to donor's
/// `OffFSELog = 8` (256-entry reference).
/// Updated by [`crate::decoding::sequence_section_decoder`] when
/// the offsets FSE table is rebuilt (FSE / Predefined modes);
/// stale-but-correct on Repeat-mode blocks where the table was
/// not touched — the share is identical to the previous block's.
/// The sequence-section pipeline gate reads this directly instead
/// of re-walking `offsets.decode` per block.
pub offsets_long_share: u32,
Comment thread
polaz marked this conversation as resolved.
}

impl FSEScratch {
Expand All @@ -122,6 +140,7 @@ impl FSEScratch {
ll_rle: None,
match_lengths: AlignedFSETable::new(MAX_MATCH_LENGTH_CODE),
ml_rle: None,
offsets_long_share: 0,
}
}

Expand All @@ -132,6 +151,18 @@ impl FSEScratch {
self.of_rle = other.of_rle;
self.ll_rle = other.ll_rle;
self.ml_rle = other.ml_rle;
// Recompute the share from the just-copied offsets table
// rather than trusting `other.offsets_long_share`. Two source
// shapes produce a populated `offsets` table but a still-zero
// cached share: (a) `Dictionary::decode_dict` rebuilds the
// offsets FSE table from the dictionary's entropy section
// without ever calling the sequence-decoder path that updates
// the cache, and (b) any future caller that mutates the table
// directly. Recomputing here keeps the pipeline gate aligned
// with the actual table shape regardless of how the table got
// there.
self.offsets_long_share =
super::sequence_section_decoder::compute_offsets_long_share(&self.offsets);
}
}

Expand Down
Loading
Loading