Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dash-spv/src/sync/block_headers/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl<H: BlockHeaderStorage, M: MetadataStorage> SyncManager for BlockHeadersMana
&[MessageType::Headers, MessageType::Inv]
}

fn clear_in_flight_state(&mut self) {
fn on_disconnect(&mut self) {
// Drop only per-peer in-flight bookkeeping. Segment topology and
// validated chain state per segment (current_tip_hash, current_height,
// buffered_headers, complete) are preserved so a reconnect can resume
Expand Down
37 changes: 37 additions & 0 deletions dash-spv/src/sync/blocks/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,4 +327,41 @@ mod tests {
"wallet_out was not in the routed set, must not be processed"
);
}

/// `on_disconnect` for `BlocksManager` keeps the downloaded buffer, the
/// per-block wallet routing, and the `filters_sync_complete` flag, and
/// moves any in-flight `getdata`s back to pending so the next
/// `send_pending` reissues them. Without this preservation, blocks waiting
/// in `downloaded` for height ordering would be dropped, leaving
/// `FiltersManager.tracker` entries that never get decremented.
#[tokio::test]
async fn test_on_disconnect_preserves_pipeline_work() {
use dashcore::block::Header;
use dashcore::{Block, TxMerkleNode};
use dashcore_hashes::Hash;

let mut manager = create_test_manager().await;
manager.filters_sync_complete = true;

let header = Header {
version: dashcore::blockdata::block::Version::from_consensus(1),
prev_blockhash: dashcore::BlockHash::all_zeros(),
merkle_root: TxMerkleNode::all_zeros(),
time: 0,
bits: dashcore::CompactTarget::from_consensus(0),
nonce: 0,
};
let block = Block {
header,
txdata: vec![],
};

// Already-downloaded block sitting in the pipeline.
manager.pipeline.add_from_storage(block.clone(), 200, BTreeSet::from([MOCK_WALLET_ID]));

manager.on_disconnect();

assert!(!manager.pipeline.is_complete(), "downloaded buffer must survive on_disconnect");
assert!(manager.filters_sync_complete);
}
}
37 changes: 37 additions & 0 deletions dash-spv/src/sync/blocks/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,15 @@ impl BlocksPipeline {
pub(super) fn handle_timeouts(&mut self) {
self.coordinator.check_and_retry_timeouts();
}

/// Move in-flight `getdata` requests back to pending after a peer
/// disconnect so the next `send_pending` reissues them to the new peer.
/// `pending_heights`, `downloaded`, `hash_to_height`, and `hash_to_wallets`
/// are preserved so already-received blocks are not re-fetched and the
/// per-block wallet routing stays intact.
pub(super) fn requeue_in_flight(&mut self) {
self.coordinator.requeue_in_flight();
}
}

#[cfg(test)]
Expand Down Expand Up @@ -308,6 +317,34 @@ mod tests {
assert!(!pipeline.has_pending_requests());
}

#[test]
fn test_requeue_in_flight_preserves_downloaded_and_pending_heights() {
let mut pipeline = BlocksPipeline::new();
let block_a = make_test_block(1);
let block_b = make_test_block(2);
let hash_a = block_a.block_hash();
let hash_b = block_b.block_hash();

// A: queued and sent — will be requeued.
pipeline.queue([(FilterMatchKey::new(100, hash_a), BTreeSet::from([[1u8; 32]]))]);
let sent = pipeline.coordinator.take_pending(1);
pipeline.coordinator.mark_sent(&sent);
assert_eq!(pipeline.coordinator.active_count(), 1);

// B: already received, sitting in `downloaded` — must survive requeue.
pipeline.add_from_storage(block_b.clone(), 200, BTreeSet::from([[2u8; 32]]));

pipeline.requeue_in_flight();

assert_eq!(pipeline.coordinator.active_count(), 0);
assert_eq!(pipeline.coordinator.pending_count(), 1);
assert!(pipeline.pending_heights.contains(&100));
assert_eq!(pipeline.hash_to_height.get(&hash_a), Some(&100));
assert!(pipeline.hash_to_wallets.contains_key(&hash_a));
assert!(pipeline.downloaded.contains_key(&200));
assert!(pipeline.hash_to_wallets.contains_key(&hash_b));
}

#[test]
fn test_timeout_requeues() {
// Create pipeline with very short timeout for testing
Expand Down
22 changes: 16 additions & 6 deletions dash-spv/src/sync/blocks/sync_manager.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::error::SyncResult;
use crate::network::{Message, MessageType, RequestSender};
use crate::storage::{BlockHeaderStorage, BlockStorage};
use crate::sync::blocks::pipeline::BlocksPipeline;
use crate::sync::sync_manager::ensure_not_started;
use crate::sync::{
BlocksManager, ManagerIdentifier, SyncEvent, SyncManager, SyncManagerProgress, SyncState,
Expand Down Expand Up @@ -42,14 +41,25 @@ impl<H: BlockHeaderStorage, B: BlockStorage, W: WalletInterface + 'static> SyncM
return Ok(vec![]);
}

// Otherwise wait for BlocksNeeded or FiltersSyncComplete events
self.set_state(SyncState::WaitForEvents);
// If in-progress block work survived a disconnect, resume in Syncing so
// `process_buffered_blocks` can transition to Synced once the pipeline
// drains. Otherwise wait for BlocksNeeded / FiltersSyncComplete events.
if !self.pipeline.is_complete() {
self.set_state(SyncState::Syncing);
} else {
self.set_state(SyncState::WaitForEvents);
}
Ok(vec![])
}

fn clear_in_flight_state(&mut self) {
self.pipeline = BlocksPipeline::new();
self.filters_sync_complete = false;
/// Keep the entire pipeline (downloaded blocks, pending queue, per-block
/// wallet routing) and the `filters_sync_complete` flag, and move in-flight
/// `getdata`s back to the front of `pending` so the next `send_pending`
/// reissues them to the new peer immediately. Without this preservation,
/// `FiltersManager`'s tracker would re-track the same block hashes after a
/// re-scan and leak `pending_blocks` counters that never reach zero.
fn on_disconnect(&mut self) {
self.pipeline.requeue_in_flight();
}

async fn handle_message(
Expand Down
2 changes: 1 addition & 1 deletion dash-spv/src/sync/chainlock/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl<H: BlockHeaderStorage, M: MetadataStorage> SyncManager for ChainLockManager
&[MessageType::CLSig, MessageType::Inv]
}

fn clear_in_flight_state(&mut self) {
fn on_disconnect(&mut self) {
self.requested_chainlocks.clear();
self.masternode_ready = false;
}
Expand Down
103 changes: 103 additions & 0 deletions dash-spv/src/sync/download_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,23 @@ impl<K: Hash + Eq + Clone> DownloadCoordinator<K> {
self.last_progress = Instant::now();
}

/// Move all in-flight items back to the front of the pending queue.
///
/// Used on peer disconnect: the requests went to a now-dead peer, but the
/// items themselves are still wanted. Retry counts are preserved so a peer
/// that consistently fails to deliver an item still trips the normal retry
/// budget. Without this hook, items would only be retried once their
/// timeout elapsed.
pub(crate) fn requeue_in_flight(&mut self) {
let items: Vec<K> = self.in_flight.drain().map(|(k, _)| k).collect();
if items.is_empty() {
return;
}
for item in items.into_iter().rev() {
self.pending.push_front(item);
}
}

/// Queue items for download.
pub(crate) fn enqueue(&mut self, items: impl IntoIterator<Item = K>) {
for item in items {
Expand Down Expand Up @@ -148,6 +165,18 @@ impl<K: Hash + Eq + Clone> DownloadCoordinator<K> {
}
}

/// Drop a key from the pending queue without touching in-flight state.
///
/// Used when a pending item is satisfied through a side channel: a late
/// response from a disconnected peer can complete a batch that
/// `requeue_in_flight` just moved from in-flight back to pending. Without
/// this hook, the key would stay in `pending` with no tracker, and the
/// next `take_pending` would resurrect a finished batch.
pub(crate) fn cancel_pending(&mut self, key: &K) {
self.pending.retain(|k| k != key);
self.retry_counts.remove(key);
}

/// Check if an item is currently in-flight.
pub(crate) fn is_in_flight(&self, key: &K) -> bool {
self.in_flight.contains_key(key)
Expand Down Expand Up @@ -315,6 +344,80 @@ mod tests {
assert!(coord.in_flight.is_empty());
}

#[test]
fn test_requeue_in_flight_moves_items_to_pending_front() {
let mut coord: DownloadCoordinator<u32> = DownloadCoordinator::default();
coord.enqueue([10, 11]);
coord.mark_sent(&[1, 2, 3]);

coord.requeue_in_flight();

assert_eq!(coord.active_count(), 0);
// Requeued items go to the front, original pending follows.
let items = coord.take_pending(5);
assert_eq!(items.len(), 5);
assert_eq!(&items[3..], &[10, 11]);
let mut requeued = items[..3].to_vec();
requeued.sort();
assert_eq!(requeued, vec![1, 2, 3]);
}

#[test]
fn test_requeue_in_flight_preserves_retry_counts() {
let mut coord: DownloadCoordinator<u32> = DownloadCoordinator::default();
coord.enqueue_retry(7);
let items = coord.take_pending(1);
coord.mark_sent(&items);
assert_eq!(coord.retry_counts.get(&7), Some(&1));

coord.requeue_in_flight();

assert_eq!(coord.retry_counts.get(&7), Some(&1));
assert!(!coord.is_in_flight(&7));
assert_eq!(coord.pending_count(), 1);
}

#[test]
fn test_requeue_in_flight_no_op_when_empty() {
let mut coord: DownloadCoordinator<u32> = DownloadCoordinator::default();
coord.enqueue([1, 2]);

coord.requeue_in_flight();

assert_eq!(coord.pending_count(), 2);
assert_eq!(coord.active_count(), 0);
}

#[test]
fn test_cancel_pending_removes_from_pending_only() {
let mut coord: DownloadCoordinator<u32> = DownloadCoordinator::default();
coord.enqueue([1, 2, 3]);
coord.mark_sent(&[10]);
coord.enqueue_retry(2);
assert_eq!(coord.retry_counts.get(&2), Some(&1));

coord.cancel_pending(&2);

assert_eq!(coord.pending_count(), 2);
assert!(coord.is_in_flight(&10));
assert_eq!(coord.retry_counts.get(&2), None);

let items = coord.take_pending(2);
assert_eq!(items, vec![1, 3]);
}

#[test]
fn test_cancel_pending_unknown_key_is_noop() {
let mut coord: DownloadCoordinator<u32> = DownloadCoordinator::default();
coord.enqueue([1, 2]);
coord.mark_sent(&[5]);

coord.cancel_pending(&99);

assert_eq!(coord.pending_count(), 2);
assert_eq!(coord.active_count(), 1);
}

#[test]
fn test_clear() {
let mut coord: DownloadCoordinator<u32> = DownloadCoordinator::default();
Expand Down
6 changes: 3 additions & 3 deletions dash-spv/src/sync/filter_headers/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,14 +380,14 @@ mod tests {
}

#[tokio::test]
async fn test_clear_in_flight_state() {
async fn test_on_disconnect() {
let mut manager = create_test_manager().await;

// Set all fields that clear_in_flight_state resets
// Set all fields that on_disconnect resets
manager.block_headers_synced = true;
manager.checkpoint_start_height = Some(500);

manager.clear_in_flight_state();
manager.on_disconnect();

assert!(!manager.block_headers_synced);
assert!(manager.checkpoint_start_height.is_none());
Expand Down
2 changes: 1 addition & 1 deletion dash-spv/src/sync/filter_headers/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl<H: BlockHeaderStorage, FH: FilterHeaderStorage> SyncManager for FilterHeade
&[MessageType::CFHeaders]
}

fn clear_in_flight_state(&mut self) {
fn on_disconnect(&mut self) {
self.pipeline = FilterHeadersPipeline::default();
self.checkpoint_start_height = None;
self.block_headers_synced = false;
Expand Down
Loading
Loading