Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 53 additions & 9 deletions crates/net/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ use crate::{
},
req_resp::{
BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, Codec,
MAX_COMPRESSED_PAYLOAD_SIZE, Request, STATUS_PROTOCOL_V1, build_status,
fetch_block_from_peer,
MAX_COMPRESSED_PAYLOAD_SIZE, MAX_REQUEST_BLOCKS, Request, STATUS_PROTOCOL_V1, build_status,
fetch_block_from_peer, request_blocks_by_range_from_peer,
},
swarm_adapter::SwarmHandle,
};
Expand All @@ -59,12 +59,22 @@ const MAX_FETCH_RETRIES: u32 = 10;
const INITIAL_BACKOFF_MS: u64 = 5;
const BACKOFF_MULTIPLIER: u64 = 2;
const PEER_REDIAL_INTERVAL_SECS: u64 = 12;
const MAX_SYNC_RANGE: u64 = MAX_REQUEST_BLOCKS * 64; // 65,536 slots (~3 days)

pub(crate) struct PendingRequest {
pub(crate) attempts: u32,
pub(crate) failed_peers: HashSet<PeerId>,
}

pub(crate) enum PendingRequestKind {
Root(H256),
Range {
start_slot: u64,
end_slot: u64,
total_end_slot: u64,
},
}

// --- Swarm construction ---

/// [libp2p Behaviour](libp2p::swarm::NetworkBehaviour) combining identify, Gossipsub
Expand Down Expand Up @@ -300,8 +310,9 @@ impl P2P {
block_topic: built.block_topic,
aggregation_topic: built.aggregation_topic,
connected_peers: HashSet::new(),
pending_requests: HashMap::new(),
request_id_map: HashMap::new(),
pending_root_requests: HashMap::new(),
outbound_requests: HashMap::new(),
pending_range_requests: HashSet::new(),
bootnode_addrs: built.bootnode_addrs,
node_names,
};
Expand Down Expand Up @@ -336,8 +347,9 @@ pub struct P2PServer {
pub(crate) aggregation_topic: libp2p::gossipsub::IdentTopic,

pub(crate) connected_peers: HashSet<PeerId>,
pub(crate) pending_requests: HashMap<H256, PendingRequest>,
pub(crate) request_id_map: HashMap<OutboundRequestId, H256>,
pub(crate) pending_root_requests: HashMap<H256, PendingRequest>,
pub(crate) outbound_requests: HashMap<OutboundRequestId, PendingRequestKind>,
pub(crate) pending_range_requests: HashSet<(u64, u64)>,
bootnode_addrs: HashMap<PeerId, Multiaddr>,
node_names: HashMap<PeerId, String>,
}
Expand All @@ -359,6 +371,13 @@ pub(crate) trait P2PProtocol: Send + Sync {
fn retry_block_fetch(&self, root: H256) -> Result<(), ActorError>;
#[allow(dead_code)] // invoked via send_after, not called directly
fn retry_peer_redial(&self, peer_id: PeerId) -> Result<(), ActorError>;
#[allow(dead_code)]
fn retry_range_sync(
&self,
start_slot: u64,
end_slot: u64,
peer_id: PeerId,
) -> Result<(), ActorError>;
}

#[actor(protocol = P2PProtocol)]
Expand All @@ -371,7 +390,7 @@ impl P2PServer {
) {
let root = msg.root;
// Check if still pending (might have succeeded during backoff)
if !self.pending_requests.contains_key(&root) {
if !self.pending_root_requests.contains_key(&root) {
trace!(%root, "Block fetch completed during backoff, skipping retry");
return;
}
Expand All @@ -380,7 +399,7 @@ impl P2PServer {

if !fetch_block_from_peer(self, root).await {
tracing::error!(%root, "Failed to retry block fetch, giving up");
self.pending_requests.remove(&root);
self.pending_root_requests.remove(&root);
}
}

Expand All @@ -403,6 +422,31 @@ impl P2PServer {
self.swarm_handle.dial(addr.clone());
}
}

#[send_handler]
async fn handle_retry_range_sync(
&mut self,
msg: p2p_protocol::RetryRangeSync,
_ctx: &Context<Self>,
) {
let start_slot = msg.start_slot;
let end_slot = msg.end_slot;
let peer = msg.peer_id;

// safety check: if already synced, skip retry
let still_needed = !self
.pending_range_requests
.contains(&(start_slot, end_slot));

if still_needed {
tracing::trace!(%peer, start_slot, end_slot, "Skipping retry, range already resolved");
return;
}

info!(%peer, start_slot, end_slot, "Retrying BlocksByRange sync");

request_blocks_by_range_from_peer(self, peer, start_slot, end_slot).await;
}
}

// --- Manual Handler impls for network-api messages ---
Expand Down Expand Up @@ -436,7 +480,7 @@ impl Handler<FetchBlock> for P2PServer {
async fn handle(&mut self, msg: FetchBlock, _ctx: &Context<Self>) {
let root = msg.root;
// Deduplicate - if already pending, ignore
if self.pending_requests.contains_key(&root) {
if self.pending_root_requests.contains_key(&root) {
trace!(%root, "Block fetch already in progress, ignoring duplicate");
return;
}
Expand Down
Loading