Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions diskann-garnet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ dashmap = { workspace = true, features = ["inline"] }
diskann.workspace = true
diskann-quantization.workspace = true
diskann-providers.workspace = true
diskann-utils.workspace = true
diskann-vector.workspace = true
foldhash = "0.2.0"
rand.workspace = true
thiserror.workspace = true
tokio.workspace = true
diskann-utils.workspace = true
tokio = { workspace = true, features = ["sync"] }
2 changes: 1 addition & 1 deletion diskann-garnet/src/alloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::ptr::NonNull;
/// Custom allocator that over-aligns to 8 bytes. This is needed since Garnet will hand us byte slices for f32 data
/// that may be unaligned, so we need an allocator to make owned, aligned byte containers.
#[derive(Debug, Clone, Copy)]
pub(crate) struct AlignToEight;
pub struct AlignToEight;

unsafe impl AllocatorCore for AlignToEight {
#[inline]
Expand Down
36 changes: 24 additions & 12 deletions diskann-garnet/src/dyn_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
SearchResults,
garnet::{Context, GarnetId},
labels::GarnetQueryLabelProvider,
provider::{self, GarnetProvider},
provider::{self, DynamicQuantization, GarnetProvider},
};
use diskann::{
ANNError, ANNResult,
Expand All @@ -16,8 +16,7 @@ use diskann::{
utils::VectorRepr,
};
use diskann_providers::{
index::wrapped_async::DiskANNIndex,
model::graph::provider::{async_::common::FullPrecision, layers::BetaFilter},
index::wrapped_async::DiskANNIndex, model::graph::provider::layers::BetaFilter,
};
use std::sync::Arc;

Expand Down Expand Up @@ -55,6 +54,10 @@ pub trait DynIndex: Send + Sync {
fn internal_id_exists(&self, context: &Context, id: u32) -> bool;

fn external_id_exists(&self, context: &Context, id: &GarnetId) -> bool;

fn train_quantizer(&self, context: &Context) -> bool;

fn backfill_quant_vectors(&self, context: &Context, task_idx: usize, task_count: usize);
}

impl<T: VectorRepr> DynIndex for DiskANNIndex<GarnetProvider<T>> {
Expand All @@ -63,7 +66,7 @@ impl<T: VectorRepr> DynIndex for DiskANNIndex<GarnetProvider<T>> {
/// The data slice here must be aligned to `T` or this will panic.
fn insert(&self, context: &Context, id: &GarnetId, data: &[u8]) -> ANNResult<()> {
self.insert(
FullPrecision,
DynamicQuantization,
context,
id,
bytemuck::cast_slice::<u8, T>(data),
Expand All @@ -87,10 +90,10 @@ impl<T: VectorRepr> DynIndex for DiskANNIndex<GarnetProvider<T>> {
) -> ANNResult<SearchStats> {
let query = bytemuck::cast_slice::<u8, T>(data);
if let Some((labels, beta)) = filter {
let beta_filter = BetaFilter::new(FullPrecision, Arc::new(labels.clone()), beta);
let beta_filter = BetaFilter::new(DynamicQuantization, Arc::new(labels.clone()), beta);
self.search(*params, &beta_filter, context, query, output)
} else {
self.search(*params, &FullPrecision, context, query, output)
self.search(*params, &DynamicQuantization, context, query, output)
}
}

Expand All @@ -105,23 +108,22 @@ impl<T: VectorRepr> DynIndex for DiskANNIndex<GarnetProvider<T>> {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.map_err(|e| ANNError::new(diskann::ANNErrorKind::Opaque, e))?;
let mut accessor: provider::FullAccessor<'_, T> =
<FullPrecision as SearchStrategy<_, _>>::search_accessor(
&FullPrecision,
let mut accessor: provider::DynamicAccessor<'_, T> =
<DynamicQuantization as SearchStrategy<_, _>>::search_accessor(
&DynamicQuantization,
self.inner.provider(),
context,
)?;

// Look up internal ID
let iid = self.inner.provider().to_internal_id(context, id)?;
let data = rt.block_on(accessor.get_element(iid))?;
let data_bytes = bytemuck::cast_slice::<T, u8>(&data);
self.search_vector(context, data_bytes, params, filter, output)
self.search_vector(context, &data, params, filter, output)
}

fn remove(&self, context: &Context, id: &GarnetId) -> ANNResult<()> {
self.inplace_delete(
FullPrecision,
DynamicQuantization,
context,
id,
3,
Expand All @@ -147,4 +149,14 @@ impl<T: VectorRepr> DynIndex for DiskANNIndex<GarnetProvider<T>> {
fn external_id_exists(&self, context: &Context, id: &GarnetId) -> bool {
self.inner.provider().vector_id_exists(context, id)
}

fn train_quantizer(&self, context: &Context) -> bool {
self.inner.provider().train_quantizer(context)
}

fn backfill_quant_vectors(&self, context: &Context, task_idx: usize, task_count: usize) {
self.inner
.provider()
.backfill_quant_vectors(context, task_idx, task_count);
}
}
101 changes: 96 additions & 5 deletions diskann-garnet/src/fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
use crossbeam::queue::ArrayQueue;
use std::sync::{
RwLock,
atomic::{AtomicBool, AtomicU32, Ordering},
atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering},
};
use thiserror::Error;

Expand All @@ -47,11 +47,23 @@ pub enum FsmError {
}

pub struct FreeSpaceMap {
/// Garnet callbacks for reading/writing FSM keys
callbacks: Callbacks,
/// A flag to signal whether there are free IDs in the FSM.
/// This is set after a scan of the FSM, and is used to prevent extraneous reads
/// of FSM blocks.
has_free_ids: AtomicBool,
/// A queue of previously deleted IDs to prevent excessive reads to the FSM
fast_free_list: ArrayQueue<u32>,
/// The maximum block ID stored in the FSM
max_block: RwLock<u32>,
/// The next ID that will be minted if reusing previously deleted IDs is unavailable
next_id: AtomicU32,
/// The total number of IDs marked used in the FSM
total_used: AtomicUsize,
/// Lock that prevents reuse of prevously used IDs.
/// This is used to disable ID reuse during quantization backfill.
reuse_lock: AtomicUsize,
}

impl FreeSpaceMap {
Expand All @@ -60,13 +72,16 @@ impl FreeSpaceMap {
let fast_free_list = ArrayQueue::new(FAST_SIZE);
let max_block = RwLock::new(u32::MAX);
let next_id = AtomicU32::new(0);
let total_used = AtomicUsize::new(0);

let mut this = Self {
callbacks,
has_free_ids,
fast_free_list,
max_block,
next_id,
total_used,
reuse_lock: AtomicUsize::new(0),
};

// Attempt to load state from Garnet.
Expand Down Expand Up @@ -97,6 +112,7 @@ impl FreeSpaceMap {

let mut block = vec![0u8; BLOCK_SIZE_BYTES];
let mut last_used_id = -1i64;
let mut total_used = 0usize;

for block_id in (0..max_block_id).rev() {
let block_key = Self::block_key(block_id);
Expand All @@ -115,8 +131,9 @@ impl FreeSpaceMap {
let used = bit_used(byte, bidx);
if used {
last_used_id = last_used_id.max(id as i64);
} else if (id as i64) < last_used_id && self.fast_free_list.push(id).is_err() {
break;
total_used += 1;
} else if (id as i64) < last_used_id {
let _ = self.fast_free_list.push(id);
}

id = id.saturating_sub(1);
Expand All @@ -130,6 +147,8 @@ impl FreeSpaceMap {
self.next_id
.store((last_used_id + 1) as u32, Ordering::Release);

self.total_used.store(total_used, Ordering::Release);

if !self.fast_free_list.is_empty() {
self.has_free_ids.store(true, Ordering::Release);
}
Expand Down Expand Up @@ -174,6 +193,14 @@ impl FreeSpaceMap {
return Err(FsmError::Garnet(GarnetError::Write));
}

if changed {
if used {
self.total_used.fetch_add(1, Ordering::AcqRel);
} else {
self.total_used.fetch_sub(1, Ordering::AcqRel);
}
}

// NOTE: We don't modify the free list if the id was already free.
if !used && changed {
// Push the id onto the fast free list. If the queue is full, ignore it.
Expand Down Expand Up @@ -214,7 +241,7 @@ impl FreeSpaceMap {
/// This may be a a fresh ID larger than all the others, or it may be a reused ID that
/// previously belonged to a deleted element. The returned ID is marked as used.
pub fn next_id(&self, ctx: Context) -> Result<u32, FsmError> {
if self.has_free_ids.load(Ordering::Acquire) {
if self.can_reuse() && self.has_free_ids.load(Ordering::Acquire) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this check correct? self.can_reuse can return true before any quantization happens, causing this check to pass. Then quantization can be initiated. This means we can be in the loop below and doing quantization. I assume this isn't the intended goal since the check exists, but is there a mechanism by which next_id broadcasts "I'm running".

// We retry reusing a freed ID until there are none or we get one and marking it used
// succeeds in changing the value.
loop {
Expand Down Expand Up @@ -270,6 +297,10 @@ impl FreeSpaceMap {
self.next_id.load(Ordering::Acquire).saturating_sub(1)
}

pub fn total_used(&self) -> usize {
self.total_used.load(Ordering::Acquire)
}

/// Return the FSM block number, byte index, and bit index for a given ID.
/// The block number is the block which stores this ID, the byte index is byte offset
/// within the block which contains the status bits, and the bit index is the bit index
Expand Down Expand Up @@ -301,9 +332,9 @@ impl FreeSpaceMap {

let mut has_free_ids = false;
let mut id = 0u32;
let mut block = vec![0u8; BLOCK_SIZE_BYTES];
'scan: for block_id in 0..*max_block {
let block_key = Self::block_key(block_id);
let mut block = vec![0u8; BLOCK_SIZE_BYTES];
if !self
.callbacks
.read_single_wid(ctx.term(Term::Metadata), block_key, &mut block)
Expand Down Expand Up @@ -363,6 +394,66 @@ impl FreeSpaceMap {

Ok(())
}

/// Visit each used id in the FSM, invoking f on each id.
pub fn visit_used<F>(&self, ctx: Context, mut f: F) -> Result<(), FsmError>
where
F: FnMut(u32) -> bool,
{
let max_block = { *self.max_block.read().unwrap() };
let mut block = vec![0u8; BLOCK_SIZE_BYTES];
let mut id = 0u32;

for block_id in 0..max_block + 1 {
let block_key = Self::block_key(block_id);
if !self
.callbacks
.read_single_wid(ctx.term(Term::Metadata), block_key, &mut block)
{
return Err(FsmError::Garnet(GarnetError::Read));
}

for &byte in &block {
if byte == 0x00 {
id += 8;
continue;
}

for bidx in 0..8 {
if bit_used(byte, bidx) {
let keep_going = f(id);
if !keep_going {
return Ok(());
}
}
id += 1;
}
}
}

Ok(())
}

/// Returns whether previously deleted IDs may be reused.
fn can_reuse(&self) -> bool {
self.reuse_lock.load(Ordering::Acquire) == 0
}

/// Prevent the reuse of previously deleted IDs.
/// Each call to this increments a counter, and only once the counter is back to zero
/// will reuse be allowed again.
pub fn lock_reuse(&self) {
self.reuse_lock.fetch_add(1, Ordering::AcqRel);
}

/// Resume reuse of previously deleted IDs.
/// Each call to this decrements a counter, and only once the counter is back to zero
/// will reuse be allowed. This returns whether reuse was actually enabled.
pub fn unlock_reuse(&self) -> bool {
let prev = self.reuse_lock.fetch_sub(1, Ordering::AcqRel);
debug_assert_ne!(prev, 0);
prev == 1
}
}

/// Return whether the `bidx`th bit is set in byte, where bits are labeled from left to right.
Expand Down
8 changes: 8 additions & 0 deletions diskann-garnet/src/garnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ impl Callbacks {
self.rmw_callback
}

#[expect(
dead_code,
reason = "currently unused, but may be needed in the future"
)]
pub fn exists_iid(&self, ctx: Context, id: u32) -> bool {
let key = [4, id];
// SAFETY: Key bytes are preceded by 4 bytes of space.
Expand All @@ -100,6 +104,10 @@ impl Callbacks {
unsafe { self.exists_raw(ctx, &key_bytes[4..]) }
}

#[expect(
dead_code,
reason = "currently unused, but may be needed in the future"
)]
pub fn exists_eid(&self, ctx: Context, id: &GarnetId) -> bool {
// SAFETY: GarnetId ensures there are 4 bytes preceding the key bytes.
unsafe { self.exists_raw(ctx, id) }
Expand Down
Loading
Loading