Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 86 additions & 5 deletions recon/src/libp2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ use ceramic_core::{EventId, PeerKey};
use futures::{future::BoxFuture, FutureExt};
use libp2p::{
core::ConnectedPoint,
swarm::{CloseConnection, ConnectionId, NetworkBehaviour, NotifyHandler, ToSwarm},
swarm::{
CloseConnection, ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, ToSwarm,
},
};
use libp2p_identity::PeerId;
use std::{
cmp::min,
collections::{btree_map::Entry, BTreeMap},
collections::{btree_map::Entry, BTreeMap, HashSet},
task::Poll,
time::Duration,
};
Expand All @@ -38,8 +40,10 @@ use tracing::{debug, trace, warn};

use crate::{
libp2p::handler::{FromBehaviour, FromHandler, Handler},
metrics::{BlockedConnection, InboundSyncRejected},
Sha256a,
};
use ceramic_metrics::Recorder;

/// Name of the Recon protocol for synchronizing peers
pub const PROTOCOL_NAME_PEER: &str = "/ceramic/recon/0.1.0/peer";
Expand All @@ -57,6 +61,9 @@ pub struct Config {
/// Maximum delay between synchronization attempts.
/// Defaults to 10 minutes
pub per_peer_maximum_sync_delay: Duration,
/// Set of PeerIds that are permanently blocked.
/// Connections from these peers will be denied.
pub blocked_peers: HashSet<PeerId>,
}

impl Default for Config {
Expand All @@ -65,6 +72,7 @@ impl Default for Config {
per_peer_sync_delay: Duration::from_millis(1000),
per_peer_sync_backoff: 2.0,
per_peer_maximum_sync_delay: Duration::from_secs(60 * 10),
blocked_peers: HashSet::new(),
}
}
}
Expand All @@ -79,6 +87,9 @@ pub struct Behaviour<P, M> {
model: M,
config: Config,
peers: BTreeMap<PeerId, PeerInfo>,
/// Tracks backoff state for peers, persisting across disconnections.
/// Maps peer ID to the time until which incoming syncs should be rejected.
backoff_registry: BTreeMap<PeerId, Instant>,
swarm_events_sender: tokio::sync::mpsc::Sender<ToSwarm<Event, FromBehaviour>>,
swarm_events_receiver: tokio::sync::mpsc::Receiver<ToSwarm<Event, FromBehaviour>>,
next_sync: Option<BoxFuture<'static, ()>>,
Expand All @@ -94,6 +105,7 @@ where
.field("model", &self.model)
.field("config", &self.config)
.field("peers", &self.peers)
.field("backoff_registry", &self.backoff_registry)
.field("swarm_events_sender", &self.swarm_events_sender)
.field("swarm_events_receiver", &self.swarm_events_receiver)
.field("next_sync", &"_")
Expand Down Expand Up @@ -156,6 +168,7 @@ impl<P, M> Behaviour<P, M> {
model,
config,
peers: BTreeMap::new(),
backoff_registry: BTreeMap::new(),
swarm_events_sender: tx,
swarm_events_receiver: rx,
next_sync: None,
Expand All @@ -169,6 +182,16 @@ impl<P, M> Behaviour<P, M> {
handle.block_on(async move { tx.send(event).await })
});
}

/// Calculate the reject_until timestamp for a peer
fn calculate_reject_until(peer_info: &PeerInfo) -> Option<Instant> {
peer_info
.next_sync
.values()
.max()
.copied()
.filter(|t| *t > Instant::now())
}
}

impl<P, M> NetworkBehaviour for Behaviour<P, M>
Expand All @@ -188,6 +211,8 @@ where
id: info.connection_id,
dialer: matches!(info.endpoint, ConnectedPoint::Dialer { .. }),
};

// Get or create peer info
self.peers
.entry(info.peer_id)
.and_modify(|peer_info| peer_info.connections.push(connection_info))
Expand Down Expand Up @@ -311,9 +336,28 @@ where
),
);
info.status = PeerStatus::Failed { stream_set };
// Collect data before releasing borrow
let reject_until = Self::calculate_reject_until(info);
let status = info.status;
let connection_ids: Vec<_> = info.connections.iter().map(|c| c.id).collect();

// Update backoff registry and notify handlers
if let Some(reject_until) = reject_until {
self.backoff_registry.insert(peer_id, reject_until);
// Notify handlers so they can reject incoming syncs
for conn_id in connection_ids {
self.send_event(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(conn_id),
event: FromBehaviour::UpdateRejectUntil {
reject_until: Some(reject_until),
},
});
}
}
Some(ToSwarm::GenerateEvent(Event::PeerEvent(PeerEvent {
remote_peer_id: peer_id,
status: info.status,
status,
})))
} else {
tracing::warn!(%peer_id, ?connection_id, "peer not found in peers map when failed synchronizing? closing connectoin");
Expand All @@ -323,6 +367,13 @@ where
})
}
}

// An incoming sync was rejected due to backoff - just log it, no state change needed
FromHandler::InboundRejected { stream_set } => {
debug!(%peer_id, ?stream_set, "inbound sync rejected due to backoff");
self.peer.metrics().record(&InboundSyncRejected);
None
}
};

if let Some(ev) = ev {
Expand All @@ -334,6 +385,10 @@ where
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>> {
// Clean up expired backoff entries
let now = Instant::now();
self.backoff_registry.retain(|_, expires| *expires > now);

if let Poll::Ready(Some(event)) = self.swarm_events_receiver.poll_recv(cx) {
trace!(?event, "swarm event");
return Poll::Ready(event);
Expand Down Expand Up @@ -388,13 +443,26 @@ where
_local_addr: &libp2p::Multiaddr,
_remote_addr: &libp2p::Multiaddr,
) -> std::result::Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
debug!(%peer, ?connection_id, "handle_established_inbound_connection");
// Check if peer is blocked
if self.config.blocked_peers.contains(&peer) {
debug!(%peer, ?connection_id, "rejecting inbound connection from blocked peer");
self.peer.metrics().record(&BlockedConnection);
return Err(ConnectionDenied::new(format!("peer {} is blocked", peer)));
}
// Check backoff registry for this peer
let reject_inbound_until = self
.backoff_registry
.get(&peer)
.copied()
.filter(|t| *t > Instant::now());
debug!(%peer, ?connection_id, ?reject_inbound_until, "handle_established_inbound_connection");
Ok(Handler::new(
peer,
connection_id,
handler::State::WaitingInbound,
self.peer.clone(),
self.model.clone(),
reject_inbound_until,
))
}

Expand All @@ -405,7 +473,19 @@ where
_addr: &libp2p::Multiaddr,
_role_override: libp2p::core::Endpoint,
) -> std::result::Result<libp2p::swarm::THandler<Self>, libp2p::swarm::ConnectionDenied> {
debug!(%peer, ?connection_id, "handle_established_outbound_connection");
// Check if peer is blocked
if self.config.blocked_peers.contains(&peer) {
debug!(%peer, ?connection_id, "rejecting outbound connection to blocked peer");
self.peer.metrics().record(&BlockedConnection);
return Err(ConnectionDenied::new(format!("peer {} is blocked", peer)));
}
// Check backoff registry for this peer
let reject_inbound_until = self
.backoff_registry
.get(&peer)
.copied()
.filter(|t| *t > Instant::now());
debug!(%peer, ?connection_id, ?reject_inbound_until, "handle_established_outbound_connection");
Ok(Handler::new(
peer,
connection_id,
Expand All @@ -415,6 +495,7 @@ where
},
self.peer.clone(),
self.model.clone(),
reject_inbound_until,
))
}
}
Expand Down
37 changes: 36 additions & 1 deletion recon/src/libp2p/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use libp2p::{
},
};
use libp2p_identity::PeerId;
use tokio::time::Instant;
use tracing::{debug, trace};

use crate::{
Expand All @@ -29,6 +30,9 @@ pub struct Handler<P, M> {
model: M,
state: State,
behavior_events_queue: VecDeque<FromHandler>,
/// Timestamp until which incoming syncs should be rejected.
/// If this is in the future, incoming sync attempts will be dropped.
reject_inbound_until: Option<Instant>,
}

impl<P, M> Handler<P, M>
Expand All @@ -42,6 +46,7 @@ where
state: State,
peer: P,
model: M,
reject_inbound_until: Option<Instant>,
) -> Self {
Self {
remote_peer_id: peer_id,
Expand All @@ -50,6 +55,7 @@ where
model,
state,
behavior_events_queue: VecDeque::new(),
reject_inbound_until,
}
}
// Transition the state to a new state.
Expand Down Expand Up @@ -134,7 +140,14 @@ impl std::fmt::Debug for State {

#[derive(Debug)]
pub enum FromBehaviour {
StartSync { stream_set: StreamSet },
StartSync {
stream_set: StreamSet,
},
/// Update the timestamp until which incoming syncs should be rejected.
/// Used to reject incoming syncs from peers we're currently backing off.
UpdateRejectUntil {
reject_until: Option<Instant>,
},
}
#[derive(Debug)]
pub enum FromHandler {
Expand All @@ -150,6 +163,10 @@ pub enum FromHandler {
stream_set: StreamSet,
error: anyhow::Error,
},
/// An incoming sync was rejected because we're backing off from this peer.
InboundRejected {
stream_set: StreamSet,
},
}

impl<P, M> ConnectionHandler for Handler<P, M>
Expand Down Expand Up @@ -239,6 +256,9 @@ where
| State::Outbound(_, _)
| State::Inbound(_, _) => {}
},
FromBehaviour::UpdateRejectUntil { reject_until } => {
self.reject_inbound_until = reject_until;
}
}
}

Expand All @@ -260,6 +280,21 @@ where
) => {
match self.state {
State::Idle | State::WaitingInbound => {
// Check if we should reject this inbound sync due to backoff
if let Some(reject_until) = self.reject_inbound_until {
if reject_until > Instant::now() {
debug!(
%self.remote_peer_id,
?self.connection_id,
?stream_set,
"rejecting inbound sync due to backoff"
);
self.behavior_events_queue
.push_front(FromHandler::InboundRejected { stream_set });
return;
}
}

self.behavior_events_queue
.push_front(FromHandler::Started { stream_set });
let stream = match stream_set {
Expand Down
Loading
Loading