Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dash-spv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ pub enum StorageError {
#[error("Read failed: {0}")]
ReadFailed(String),

#[error("Invalid argument: {0}")]
InvalidArgument(String),

#[error("IO error: {0}")]
Io(#[from] io::Error),

Expand Down
75 changes: 75 additions & 0 deletions dash-spv/src/storage/block_headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ pub trait BlockHeaderStorage: Send + Sync + 'static {
height: u32,
) -> StorageResult<()>;

/// Load a contiguous range of headers by height.
///
/// Returns `StorageError::InvalidArgument` when the range extends into a
/// segment queued for deletion by a prior `truncate_above` (before the next
/// `persist`). Callers must clamp the range to at most `get_tip_height`.
async fn load_headers(&self, range: Range<u32>) -> StorageResult<Vec<BlockHeader>>;

async fn get_header(&self, height: u32) -> StorageResult<Option<BlockHeader>> {
Expand Down Expand Up @@ -95,6 +100,17 @@ pub trait BlockHeaderStorage: Send + Sync + 'static {
&self,
hash: &dashcore::BlockHash,
) -> StorageResult<Option<u32>>;

/// Drop all headers with `height > target_height`.
///
/// Truncating above the current tip is a no-op, truncating below
/// `start_height` returns an error. Changes are applied in-memory and
/// flushed on the next `persist`.
///
/// The truncation is not durable until the next successful `persist` call.
/// A crash between `truncate_above` and `persist` may leave orphaned segment
/// files on disk and cause the storage to reopen at the pre-truncation tip.
async fn truncate_above(&mut self, target_height: u32) -> StorageResult<()>;
}

pub struct PersistentBlockHeaderStorage {
Expand Down Expand Up @@ -234,6 +250,17 @@ impl BlockHeaderStorage for PersistentBlockHeaderStorage {
) -> StorageResult<Option<u32>> {
Ok(self.header_hash_index.get(hash).copied())
}

async fn truncate_above(&mut self, target_height: u32) -> StorageResult<()> {
let mut block_headers = self.block_headers.write().await;
let needs_index_prune = block_headers.tip_height().is_some_and(|tip| target_height < tip);
block_headers.truncate_above(target_height).await?;
drop(block_headers);
if needs_index_prune {
self.header_hash_index.retain(|_, h| *h <= target_height);
}
Ok(())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -261,4 +288,52 @@ mod tests {
assert_eq!(tip, expected_tip);
assert_eq!(storage.get_tip_height().await, Some(4));
}

#[tokio::test]
async fn test_truncate_above_drops_index_entries_and_allows_restore() {
let tmp_dir = TempDir::new().unwrap();
let mut storage = PersistentBlockHeaderStorage::open(tmp_dir.path()).await.unwrap();

let headers = BlockHeader::dummy_batch(0..10);
storage.store_headers(&headers).await.unwrap();

let orphaned_hash = headers[7].block_hash();
assert_eq!(storage.get_header_height_by_hash(&orphaned_hash).await.unwrap(), Some(7));

storage.truncate_above(100).await.unwrap();
assert_eq!(storage.get_tip_height().await, Some(9));
assert_eq!(
storage.get_header_height_by_hash(&headers[4].block_hash()).await.unwrap(),
Some(4)
);

storage.truncate_above(5).await.unwrap();

assert_eq!(storage.get_tip_height().await, Some(5));
assert_eq!(storage.get_header_height_by_hash(&orphaned_hash).await.unwrap(), None);

let kept_hash = headers[3].block_hash();
assert_eq!(storage.get_header_height_by_hash(&kept_hash).await.unwrap(), Some(3));

let replacement = BlockHeader::dummy_batch(100..105);
storage.store_headers_at_height(&replacement, 6).await.unwrap();
assert_eq!(storage.get_tip_height().await, Some(10));

let reloaded = storage.load_headers(6..11).await.unwrap();
assert_eq!(reloaded, replacement);

let new_hash = replacement[0].block_hash();
assert_eq!(storage.get_header_height_by_hash(&new_hash).await.unwrap(), Some(6));

// Exercise the durability contract: persist, drop, reopen, and verify
// the rebuilt index does not resurrect orphaned hashes from stale files.
storage.persist(tmp_dir.path()).await.unwrap();
drop(storage);

let reopened = PersistentBlockHeaderStorage::open(tmp_dir.path()).await.unwrap();
assert_eq!(reopened.get_tip_height().await, Some(10));
assert_eq!(reopened.get_header_height_by_hash(&orphaned_hash).await.unwrap(), None);
assert_eq!(reopened.get_header_height_by_hash(&kept_hash).await.unwrap(), Some(3));
assert_eq!(reopened.get_header_height_by_hash(&new_hash).await.unwrap(), Some(6));
}
}
33 changes: 32 additions & 1 deletion dash-spv/src/storage/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ pub trait BlockStorage: Send + Sync + 'static {

/// Load a single block by height.
async fn load_block(&self, height: CoreBlockHeight) -> StorageResult<Option<HashedBlock>>;

/// Drop all blocks with `height > target_height`.
///
/// Truncating above the current tip is a no-op, truncating below
/// `start_height` returns an error. Changes are applied in-memory and
/// flushed on the next `persist`.
///
/// The truncation is not durable until the next successful `persist` call.
/// A crash between `truncate_above` and `persist` may leave orphaned segment
/// files on disk and cause the storage to reopen at the pre-truncation tip.
async fn truncate_above(&mut self, target_height: CoreBlockHeight) -> StorageResult<()>;
}

/// Persistent storage for full blocks using segmented files.
Expand Down Expand Up @@ -66,13 +77,18 @@ impl BlockStorage for PersistentBlockStorage {
async fn load_block(&self, height: u32) -> StorageResult<Option<HashedBlock>> {
self.blocks.write().await.get_item(height).await
}

async fn truncate_above(&mut self, target_height: u32) -> StorageResult<()> {
self.blocks.write().await.truncate_above(target_height).await
}
}

#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;

use super::*;

#[tokio::test]
async fn test_store_and_load_block() {
let temp_dir = TempDir::new().unwrap();
Expand Down Expand Up @@ -112,6 +128,21 @@ mod tests {
assert!(loaded.is_none());
}

#[tokio::test]
async fn test_truncate_above_wrapper_smoke() {
let temp_dir = TempDir::new().unwrap();
let mut storage = PersistentBlockStorage::open(temp_dir.path()).await.unwrap();

for height in 0..5 {
storage.store_block(height, HashedBlock::dummy(height, vec![])).await.unwrap();
}

storage.truncate_above(2).await.unwrap();

assert_eq!(storage.load_block(2).await.unwrap(), Some(HashedBlock::dummy(2, vec![])));
assert_eq!(storage.load_block(3).await.unwrap(), None);
}

#[tokio::test]
async fn test_returns_none_for_gaps() {
let temp_dir = TempDir::new().unwrap();
Expand Down
40 changes: 40 additions & 0 deletions dash-spv/src/storage/filter_headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ pub trait FilterHeaderStorage: Send + Sync + 'static {
height: u32,
) -> StorageResult<()>;

/// Load a contiguous range of filter headers by height.
///
/// Returns `StorageError::InvalidArgument` when the range extends into a
/// segment queued for deletion by a prior `truncate_above` (before the next
/// `persist`). Callers must clamp the range to at most `get_filter_tip_height`.
async fn load_filter_headers(&self, range: Range<u32>) -> StorageResult<Vec<FilterHeader>>;

async fn get_filter_header(&self, height: u32) -> StorageResult<Option<FilterHeader>> {
Expand All @@ -42,6 +47,17 @@ pub trait FilterHeaderStorage: Send + Sync + 'static {
async fn get_filter_tip_height(&self) -> StorageResult<Option<u32>>;

async fn get_filter_start_height(&self) -> Option<u32>;

/// Drop all filter headers with `height > target_height`.
///
/// Truncating above the current tip is a no-op, truncating below
/// `start_height` returns an error. Changes are applied in-memory and
/// flushed on the next `persist`.
///
/// The truncation is not durable until the next successful `persist` call.
/// A crash between `truncate_above` and `persist` may leave orphaned segment
/// files on disk and cause the storage to reopen at the pre-truncation tip.
async fn truncate_above(&mut self, target_height: u32) -> StorageResult<()>;
}

pub struct PersistentFilterHeaderStorage {
Expand Down Expand Up @@ -100,4 +116,28 @@ impl FilterHeaderStorage for PersistentFilterHeaderStorage {
async fn get_filter_start_height(&self) -> Option<u32> {
self.filter_headers.read().await.start_height()
}

async fn truncate_above(&mut self, target_height: u32) -> StorageResult<()> {
self.filter_headers.write().await.truncate_above(target_height).await
}
}

#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;

#[tokio::test]
async fn test_truncate_above_wrapper_smoke() {
let tmp_dir = TempDir::new().unwrap();
let mut storage = PersistentFilterHeaderStorage::open(tmp_dir.path()).await.unwrap();

let headers = FilterHeader::dummy_batch(0..5);
storage.store_filter_headers(&headers).await.unwrap();

storage.truncate_above(2).await.unwrap();

assert_eq!(storage.get_filter_tip_height().await.unwrap(), Some(2));
assert_eq!(storage.get_filter_header(3).await.unwrap(), None);
}
}
45 changes: 45 additions & 0 deletions dash-spv/src/storage/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,25 @@ use crate::{
pub trait FilterStorage: Send + Sync + 'static {
async fn store_filter(&mut self, height: u32, filter: &[u8]) -> StorageResult<()>;

/// Load a contiguous range of filters by height.
///
/// Returns `StorageError::InvalidArgument` when the range extends into a
/// segment queued for deletion by a prior `truncate_above` (before the next
/// `persist`). Callers must clamp the range to at most `filter_tip_height`.
async fn load_filters(&self, range: Range<u32>) -> StorageResult<Vec<Vec<u8>>>;

async fn filter_tip_height(&self) -> StorageResult<u32>;

/// Drop all filters with `height > target_height`.
///
/// Truncating above the current tip is a no-op, truncating below
/// `start_height` returns an error. Changes are applied in-memory and
/// flushed on the next `persist`.
///
/// The truncation is not durable until the next successful `persist` call.
/// A crash between `truncate_above` and `persist` may leave orphaned segment
/// files on disk and cause the storage to reopen at the pre-truncation tip.
async fn truncate_above(&mut self, target_height: u32) -> StorageResult<()>;
}

pub struct PersistentFilterStorage {
Expand Down Expand Up @@ -62,4 +78,33 @@ impl FilterStorage for PersistentFilterStorage {
async fn filter_tip_height(&self) -> StorageResult<u32> {
Ok(self.filters.read().await.tip_height().unwrap_or(0))
}

async fn truncate_above(&mut self, target_height: u32) -> StorageResult<()> {
self.filters.write().await.truncate_above(target_height).await
}
}

#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;

fn filter_bytes(seed: u8) -> Vec<u8> {
vec![seed; 8]
}

#[tokio::test]
async fn test_truncate_above_wrapper_smoke() {
let tmp_dir = TempDir::new().unwrap();
let mut storage = PersistentFilterStorage::open(tmp_dir.path()).await.unwrap();

for height in 0..5 {
storage.store_filter(height, &filter_bytes(height as u8)).await.unwrap();
}

storage.truncate_above(2).await.unwrap();

assert_eq!(storage.filter_tip_height().await.unwrap(), 2);
assert!(storage.load_filters(3..4).await.is_err());
}
}
37 changes: 37 additions & 0 deletions dash-spv/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,10 @@ impl BlockHeaderStorage for DiskStorageManager {
) -> StorageResult<Option<u32>> {
self.block_headers.read().await.get_header_height_by_hash(hash).await
}

async fn truncate_above(&mut self, target_height: u32) -> StorageResult<()> {
self.block_headers.write().await.truncate_above(target_height).await
}
}

#[async_trait]
Expand Down Expand Up @@ -346,6 +350,10 @@ impl FilterHeaderStorage for DiskStorageManager {
async fn get_filter_start_height(&self) -> Option<u32> {
self.filter_headers.read().await.get_filter_start_height().await
}

async fn truncate_above(&mut self, target_height: u32) -> StorageResult<()> {
self.filter_headers.write().await.truncate_above(target_height).await
}
}

#[async_trait]
Expand All @@ -361,6 +369,10 @@ impl filters::FilterStorage for DiskStorageManager {
async fn filter_tip_height(&self) -> StorageResult<u32> {
self.filters.read().await.filter_tip_height().await
}

async fn truncate_above(&mut self, target_height: u32) -> StorageResult<()> {
self.filters.write().await.truncate_above(target_height).await
}
}

#[async_trait]
Expand All @@ -372,6 +384,10 @@ impl BlockStorage for DiskStorageManager {
async fn load_block(&self, height: u32) -> StorageResult<Option<HashedBlock>> {
self.blocks.read().await.load_block(height).await
}

async fn truncate_above(&mut self, target_height: u32) -> StorageResult<()> {
self.blocks.write().await.truncate_above(target_height).await
}
}

#[async_trait]
Expand Down Expand Up @@ -568,6 +584,27 @@ mod tests {
assert!(storage.get_header_height_by_hash(&hash).await.unwrap().is_none());
}

#[tokio::test]
async fn test_disk_storage_manager_truncate_above_block_headers() {
let temp_dir = TempDir::new().unwrap();
let config = ClientConfig::regtest().with_storage_path(temp_dir.path());
let mut mgr = DiskStorageManager::new(&config).await.unwrap();

let headers = BlockHeader::dummy_batch(0..10);
mgr.store_headers(&headers).await.unwrap();

let orphaned_hash = headers[7].block_hash();
assert_eq!(mgr.get_header_height_by_hash(&orphaned_hash).await.unwrap(), Some(7));

<DiskStorageManager as BlockHeaderStorage>::truncate_above(&mut mgr, 5).await.unwrap();

assert_eq!(mgr.get_tip_height().await, Some(5));
assert_eq!(mgr.get_header_height_by_hash(&orphaned_hash).await.unwrap(), None);

let kept_hash = headers[3].block_hash();
assert_eq!(mgr.get_header_height_by_hash(&kept_hash).await.unwrap(), Some(3));
}

#[tokio::test]
async fn test_lock_lifecycle() {
let temp_dir = TempDir::new().expect("Failed to create temp directory");
Expand Down
Loading
Loading