Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions dash-spv/src/storage/disk/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,69 @@ impl DiskStorageManager {
Ok(*self.cached_filter_tip_height.read().await)
}

/// Get the highest stored compact filter height by scanning the filters directory.
/// This checks which filters are actually persisted on disk, not just filter headers.
///
/// Returns None if no filters are stored, otherwise returns the highest height found.
///
/// Note: This only counts individual filter files ({height}.dat), not segment files.
pub async fn get_stored_filter_height(&self) -> StorageResult<Option<u32>> {
let filters_dir = self.base_path.join("filters");

// If filters directory doesn't exist, no filters are stored
if !filters_dir.exists() {
return Ok(None);
}

let mut max_height: Option<u32> = None;

// Read directory entries
let mut entries = tokio::fs::read_dir(&filters_dir).await?;

while let Some(entry) = entries.next_entry().await? {
let path = entry.path();

// Skip if not a file
if !path.is_file() {
continue;
}

// Check if it's a .dat file
if path.extension().and_then(|e| e.to_str()) != Some("dat") {
continue;
}

// Extract height from filename (format: "{height}.dat")
// Only parse if filename is PURELY numeric (not "filter_segment_0001")
// This ensures we only count individual filter files, not segments
let filename = match path.file_stem().and_then(|f| f.to_str()) {
Some(name) if name.chars().all(|c| c.is_ascii_digit()) => name,
_ => continue, // Skip non-numeric names like "filter_segment_0001"
};

// Since filename only contains digits, this can never fail and can be optimized
// but we'll keep it to ensure correctness
let height: u32 = match filename.parse() {
Ok(h) => h,
Err(_) => continue,
};

// Sanity check - testnet/mainnet should never exceed 2M
if height > 2_000_000 {
tracing::warn!(
"Found suspiciously high filter file: {}.dat (height {}), ignoring",
filename,
height
);
continue;
}

max_height = Some(max_height.map_or(height, |current| current.max(height)));
}

Ok(max_height)
}

/// Store a compact filter.
pub async fn store_filter(&mut self, height: u32, filter: &[u8]) -> StorageResult<()> {
let path = self.base_path.join(format!("filters/{}.dat", height));
Expand Down
4 changes: 4 additions & 0 deletions dash-spv/src/storage/disk/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,10 @@ impl StorageManager for DiskStorageManager {
Self::get_filter_tip_height(self).await
}

async fn get_stored_filter_height(&self) -> StorageResult<Option<u32>> {
Self::get_stored_filter_height(self).await
}

async fn store_masternode_state(&mut self, state: &MasternodeState) -> StorageResult<()> {
Self::store_masternode_state(self, state).await
}
Expand Down
9 changes: 9 additions & 0 deletions dash-spv/src/storage/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,15 @@ impl StorageManager for MemoryStorageManager {
}
}

async fn get_stored_filter_height(&self) -> StorageResult<Option<u32>> {
// For memory storage, find the highest filter in the HashMap
if self.filters.is_empty() {
Ok(None)
} else {
Ok(self.filters.keys().max().copied())
}
}

async fn store_masternode_state(&mut self, state: &MasternodeState) -> StorageResult<()> {
self.masternode_state = Some(state.clone());
Ok(())
Expand Down
4 changes: 4 additions & 0 deletions dash-spv/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ pub trait StorageManager: Send + Sync {
/// Get the current filter tip blockchain height.
async fn get_filter_tip_height(&self) -> StorageResult<Option<u32>>;

/// Get the highest stored compact filter height by checking which filters are persisted.
/// This is distinct from filter header tip - it shows which filters are actually downloaded.
async fn get_stored_filter_height(&self) -> StorageResult<Option<u32>>;

/// Store masternode state.
async fn store_masternode_state(&mut self, state: &MasternodeState) -> StorageResult<()>;

Expand Down
102 changes: 72 additions & 30 deletions dash-spv/src/sync/phase_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,42 +149,84 @@ impl<
.map_err(|e| SyncError::Storage(format!("Failed to get filter tip: {}", e)))?
.unwrap_or(0);

if filter_header_tip > 0 {
// Download all filters for complete blockchain history
// This ensures the wallet can find transactions from any point in history
let start_height = self.header_sync.get_sync_base_height().max(1);
let count = filter_header_tip - start_height + 1;
// No filter headers available, skip to next phase
if filter_header_tip == 0 {
self.transition_to_next_phase(storage, network, "No filter headers available")
.await?;
return Ok(());
}

tracing::info!(
"Starting filter download from height {} to {} ({} filters)",
start_height,
filter_header_tip,
count
);
tracing::info!(
"🔍 Filter download check: filter_header_tip={}, sync_base_height={}",
filter_header_tip,
self.header_sync.get_sync_base_height()
);

// Update the phase to track the expected total
if let SyncPhase::DownloadingFilters {
total_filters,
..
} = &mut self.current_phase
{
*total_filters = count;
}
// Check what filters are already stored to resume download
let stored_filter_height =
storage.get_stored_filter_height().await.map_err(|e| {
SyncError::Storage(format!("Failed to get stored filter height: {}", e))
})?;

// Use the filter sync manager to download filters
self.filter_sync
.sync_filters_with_flow_control(
network,
storage,
Some(start_height),
Some(count),
)
.await?;
tracing::info!(
"🔍 Stored filter height from disk scan: {:?}",
stored_filter_height
);

// Resume from the next height after the last stored filter
// If no filters are stored, start from sync_base_height or 1
let start_height = if let Some(stored_height) = stored_filter_height {
tracing::info!(
"Found stored filters up to height {}, resuming from height {}",
stored_height,
stored_height + 1
);
stored_height + 1
} else {
// No filter headers available, skip to next phase
self.transition_to_next_phase(storage, network, "No filter headers available")
let base_height = self.header_sync.get_sync_base_height().max(1);
tracing::info!("No stored filters found, starting from height {}", base_height);
base_height
};

// If we've already downloaded all filters, skip to next phase
if start_height > filter_header_tip {
tracing::info!(
"All filters already downloaded (stored up to {}, tip is {}), skipping to next phase",
start_height - 1,
filter_header_tip
);
self.transition_to_next_phase(storage, network, "Filters already synced")
.await?;
return Ok(());
}

let count = filter_header_tip - start_height + 1;

tracing::info!(
"Starting filter download from height {} to {} ({} filters)",
start_height,
filter_header_tip,
count
);

// Update the phase to track the expected total
if let SyncPhase::DownloadingFilters {
total_filters,
..
} = &mut self.current_phase
{
*total_filters = count;
}

// Use the filter sync manager to download filters
self.filter_sync
.sync_filters_with_flow_control(
network,
storage,
Some(start_height),
Some(count),
)
.await?;
}

SyncPhase::DownloadingBlocks {
Expand Down