Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dash-spv/src/network/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,4 @@ pub const PEER_DISCOVERY_INTERVAL: Duration = Duration::from_secs(60); // Discov

// DNS and polling intervals
pub const DNS_DISCOVERY_DELAY: Duration = Duration::from_secs(10);
pub const MESSAGE_POLL_INTERVAL: Duration = Duration::from_millis(10);
pub const MESSAGE_RECEIVE_TIMEOUT: Duration = Duration::from_millis(100);
27 changes: 13 additions & 14 deletions dash-spv/src/network/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,26 +459,19 @@ impl PeerNetworkManager {
}
};

// Read message with minimal lock time
// Read with only a shared peer lock so senders can run concurrently. The
// inner read state has its own mutex, and `receive_message` is fully
// waker-driven on the read half. No polling sleep needed.
let msg_result = {
// Try to get a read lock first to check if peer is available
let peer_guard = peer.read().await;
if !peer_guard.is_connected() {
tracing::warn!("Breaking peer reader loop for {} - peer no longer connected (iteration {})", addr, loop_iteration);
drop(peer_guard);
break;
}
drop(peer_guard);

// Now get write lock only for the duration of the read
let mut peer_guard = peer.write().await;
tokio::select! {
message = peer_guard.receive_message() => {
message
},
_ = tokio::time::sleep(MESSAGE_POLL_INTERVAL) => {
Ok(None)
},
_ = shutdown_token.cancelled() => {
tracing::info!("Breaking peer reader loop for {} - shutdown signal received while reading (iteration {})", addr, loop_iteration);
break;
Expand Down Expand Up @@ -519,7 +512,7 @@ impl PeerNetworkManager {
);
// Send our known addresses
let response = addrv2_handler.build_addr_response().await;
let mut peer_guard = peer.write().await;
let peer_guard = peer.read().await;
if let Err(e) = peer_guard.send_message(response).await {
tracing::error!(
"Failed to send addr response to {}: {}",
Expand All @@ -531,12 +524,16 @@ impl PeerNetworkManager {
}
NetworkMessage::Ping(nonce) => {
// Handle ping directly
let mut peer_guard = peer.write().await;
let peer_guard = peer.read().await;
if let Err(e) = peer_guard.handle_ping(*nonce).await {
tracing::error!("Failed to handle ping from {}: {}", addr, e);
// If we can't send pong, connection is likely broken
if matches!(e, NetworkError::ConnectionFailed(_)) {
tracing::warn!("Breaking peer reader loop for {} - failed to send pong response (iteration {})", addr, loop_iteration);
// Drop the read guard before acquiring the write
// guard on the same RwLock to avoid a self-deadlock.
drop(peer_guard);
peer.write().await.mark_disconnected();
break;
}
}
Expand Down Expand Up @@ -688,6 +685,7 @@ impl PeerNetworkManager {
match e {
NetworkError::PeerDisconnected => {
tracing::info!("Peer {} disconnected", addr);
peer.write().await.mark_disconnected();
break;
}
NetworkError::Timeout => {
Expand Down Expand Up @@ -757,6 +755,7 @@ impl PeerNetworkManager {
}
}

peer.write().await.mark_disconnected();
break;
}
}
Expand Down Expand Up @@ -1237,7 +1236,7 @@ impl PeerNetworkManager {
other => other,
};

let mut peer_guard = peer.write().await;
let peer_guard = peer.read().await;
peer_guard
.send_message(message)
.await
Expand All @@ -1263,7 +1262,7 @@ impl PeerNetworkManager {
let msg = message.clone();

let handle = tokio::spawn(async move {
let mut peer_guard = peer.write().await;
let peer_guard = peer.read().await;
peer_guard.send_message(msg).await.map_err(Error::Network)
});
handles.push(handle);
Expand Down
Loading
Loading