Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion dash-spv/src/storage/disk/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use std::ops::Range;
use dashcore::hash_types::FilterHeader;
use dashcore_hashes::Hash;

use crate::error::StorageResult;
use crate::error::{StorageError, StorageResult};
use crate::storage::metadata_keys::CHECKPOINT_PREV_FILTER_HEADER_KEY;

use super::manager::DiskStorageManager;
use super::segments::SegmentState;
Expand Down Expand Up @@ -216,6 +217,17 @@ impl DiskStorageManager {
}
tokio::fs::create_dir_all(&filters_dir).await?;

// Remove trusted checkpoint predecessor filter header metadata if present
let metadata_path =
self.base_path.join(format!("state/{}.dat", CHECKPOINT_PREV_FILTER_HEADER_KEY));
if metadata_path.exists() {
if let Err(e) = tokio::fs::remove_file(&metadata_path).await {
if e.kind() != std::io::ErrorKind::NotFound {
return Err(StorageError::Io(e));
}
}
}

// Restart background worker for future operations
self.start_worker().await;

Expand Down
2 changes: 2 additions & 0 deletions dash-spv/src/storage/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::ops::Range;
use dashcore::{block::Header as BlockHeader, hash_types::FilterHeader, BlockHash, Txid};

use crate::error::{StorageError, StorageResult};
use crate::storage::metadata_keys::CHECKPOINT_PREV_FILTER_HEADER_KEY;
use crate::storage::{MasternodeState, StorageManager, StorageStats};
use crate::types::{ChainState, MempoolState, UnconfirmedTransaction};

Expand Down Expand Up @@ -310,6 +311,7 @@ impl StorageManager for MemoryStorageManager {
async fn clear_filters(&mut self) -> StorageResult<()> {
self.filter_headers.clear();
self.filters.clear();
self.metadata.remove(CHECKPOINT_PREV_FILTER_HEADER_KEY);
Ok(())
}

Expand Down
4 changes: 4 additions & 0 deletions dash-spv/src/storage/metadata_keys.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
//! Common metadata keys stored by storage backends.

/// Metadata key storing the filter header for the block immediately before a trusted checkpoint.
pub const CHECKPOINT_PREV_FILTER_HEADER_KEY: &str = "checkpoint_prev_filter_header_v1";
1 change: 1 addition & 0 deletions dash-spv/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

pub mod disk;
pub mod memory;
pub mod metadata_keys;
pub mod sync_state;
pub mod sync_storage;
pub mod types;
Expand Down
67 changes: 56 additions & 11 deletions dash-spv/src/sync/filters/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use dashcore::{
BlockHash,
};

use super::manager::TrustedCheckpointFilterHeader;
use super::types::*;
use crate::error::{SyncError, SyncResult};
use crate::network::NetworkManager;
Expand All @@ -38,19 +39,50 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
return Ok(true);
}

// Load previous and expected headers
let prev_header = storage.get_filter_header(height - 1).await.map_err(|e| {
SyncError::Storage(format!("Failed to load previous filter header: {}", e))
})?;
let prev_height = height - 1;
let prev_header = match storage.get_filter_header(prev_height).await.map_err(|e| {
SyncError::Storage(format!(
"Failed to load previous filter header at height {}: {}",
prev_height, e
))
})? {
Some(header) => header,
None if self.sync_base_height > 0 && height == self.sync_base_height => {
match self.load_checkpoint_prev_filter_header(storage).await? {
Some(trusted) if trusted.height == prev_height => trusted.header,
Some(trusted) => {
tracing::error!(
"Checkpoint predecessor header height mismatch: expected {}, stored {}",
prev_height,
trusted.height
);
return Ok(false);
}
None => {
tracing::warn!(
"Missing trusted checkpoint predecessor filter header at height {}",
prev_height
);
return Ok(false);
}
}
}
None => {
tracing::warn!(
"Missing filter header at height {} required for verifying cfilter at {}",
prev_height,
height
);
return Ok(false);
}
};

let expected_header = storage.get_filter_header(height).await.map_err(|e| {
SyncError::Storage(format!("Failed to load expected filter header: {}", e))
})?;

let (Some(prev_header), Some(expected_header)) = (prev_header, expected_header) else {
tracing::warn!(
"Missing filter headers in storage for height {} (prev and/or expected)",
height
);
let Some(expected_header) = expected_header else {
tracing::warn!("Missing filter headers in storage for height {}", height);
return Ok(false);
};

Expand Down Expand Up @@ -582,8 +614,21 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
);
}

// If this is the first batch after a checkpoint, store the checkpoint filter header
if self.sync_base_height > 0
// If this is the first batch that begins at the checkpoint base, persist the
// trusted predecessor filter header so we can verify the checkpoint filter.
if self.sync_base_height > 0 && start_height == self.sync_base_height {
if let Some(prev_height) = self.sync_base_height.checked_sub(1) {
let record = TrustedCheckpointFilterHeader {
height: prev_height,
header: cfheaders.previous_filter_header,
};
self.persist_checkpoint_prev_filter_header(storage, record).await?;
tracing::info!(
"Stored trusted checkpoint predecessor filter header at height {}",
prev_height
);
}
} else if self.sync_base_height > 0
&& start_height == self.sync_base_height + 1
&& current_filter_tip < self.sync_base_height
{
Expand Down
86 changes: 86 additions & 0 deletions dash-spv/src/sync/filters/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,51 @@
use dashcore::{hash_types::FilterHeader, network::message_filter::CFHeaders, BlockHash};
use dashcore_hashes::{sha256d, Hash};
use std::collections::{HashMap, HashSet, VecDeque};
use tokio::sync::Mutex;

use crate::client::ClientConfig;
use crate::error::{SyncError, SyncResult};
use crate::network::NetworkManager;
use crate::storage::metadata_keys::CHECKPOINT_PREV_FILTER_HEADER_KEY;
use crate::storage::StorageManager;
use crate::types::SharedFilterHeights;

// Import types and constants from the types module
use super::types::*;

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(super) struct TrustedCheckpointFilterHeader {
pub(super) height: u32,
pub(super) header: FilterHeader,
}

impl TrustedCheckpointFilterHeader {
fn to_bytes(self) -> [u8; 36] {
let mut buf = [0u8; 36];
buf[..4].copy_from_slice(&self.height.to_le_bytes());
buf[4..].copy_from_slice(self.header.as_byte_array());
buf
}

fn from_bytes(bytes: &[u8]) -> Option<Self> {
if bytes.len() != 36 {
return None;
}

let mut height_bytes = [0u8; 4];
height_bytes.copy_from_slice(&bytes[..4]);
let height = u32::from_le_bytes(height_bytes);

let mut header_bytes = [0u8; 32];
header_bytes.copy_from_slice(&bytes[4..36]);

Some(Self {
height,
header: FilterHeader::from_byte_array(header_bytes),
})
}
}

/// Manages BIP157 compact block filter synchronization.
///
/// # Generic Parameters
Expand Down Expand Up @@ -102,6 +137,8 @@ pub struct FilterSyncManager<S: StorageManager, N: NetworkManager> {
pub(super) max_concurrent_cfheader_requests: usize,
/// Timeout for CFHeaders requests
pub(super) cfheader_request_timeout: std::time::Duration,
/// Trusted predecessor filter header for the configured checkpoint base
checkpoint_prev_filter_header: Mutex<Option<TrustedCheckpointFilterHeader>>,
}

impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync + 'static>
Expand Down Expand Up @@ -148,6 +185,7 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
cfheader_request_timeout: std::time::Duration::from_secs(
config.cfheaders_request_timeout_secs,
),
checkpoint_prev_filter_header: Mutex::new(None),
_phantom_s: std::marker::PhantomData,
_phantom_n: std::marker::PhantomData,
}
Expand Down Expand Up @@ -264,6 +302,54 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
Ok(new_filter_headers)
}

pub(super) async fn persist_checkpoint_prev_filter_header(
&self,
storage: &mut S,
header: TrustedCheckpointFilterHeader,
) -> SyncResult<()> {
storage
.store_metadata(CHECKPOINT_PREV_FILTER_HEADER_KEY, &header.to_bytes())
.await
.map_err(|e| {
SyncError::Storage(format!(
"Failed to persist checkpoint predecessor filter header: {}",
e
))
})?;

let mut guard = self.checkpoint_prev_filter_header.lock().await;
*guard = Some(header);
Ok(())
}

pub(super) async fn load_checkpoint_prev_filter_header(
&self,
storage: &S,
) -> SyncResult<Option<TrustedCheckpointFilterHeader>> {
let mut guard = self.checkpoint_prev_filter_header.lock().await;
if guard.is_none() {
if let Some(bytes) =
storage.load_metadata(CHECKPOINT_PREV_FILTER_HEADER_KEY).await.map_err(|e| {
SyncError::Storage(format!(
"Failed to load checkpoint predecessor filter header: {}",
e
))
})?
{
if let Some(record) = TrustedCheckpointFilterHeader::from_bytes(&bytes) {
*guard = Some(record);
} else {
tracing::warn!(
"Stored checkpoint predecessor filter header has unexpected format ({} bytes)",
bytes.len()
);
}
}
}

Ok(*guard)
}

/// Handle overlapping filter headers by skipping already processed ones.
pub fn has_pending_downloads(&self) -> bool {
!self.pending_block_downloads.is_empty() || !self.downloading_blocks.is_empty()
Expand Down
Loading
Loading