Skip to content

Commit a8bfea2

Browse files
prestwichclaude
andcommitted
refactor(host-rpc): rewrite notifier with hash-based chain walking
Replace fetch-by-number with hash-anchored backward walk, eliminating TOCTOU race conditions in reorg detection. The subscription is now a wake-up signal; on each event the notifier walks from the hint hash backward until it finds overlap with a lightweight (u64, B256) ring buffer. Full blocks are fetched by hash only for the new segment. Key changes: - Hash-based walk algorithm (WalkResult enum: Advance/Reorg/Exhausted) - Parallel block+receipt fetches via tokio::try_join! and FuturesOrdered - Self-healing buffer exhaustion: clears state and re-enters backfill - Backfill stops at (latest - buffer_capacity), frontfill bridges gap - set_backfill_thresholds(None) resets to default per trait contract - fetch_range guards against from > to underflow - Segment: non-empty invariant enforced, pub(crate) visibility - Error: removed dead SubscriptionClosed/ReorgTooDeep variants - Builder: genesis_timestamp validation, uses pub(crate) constructor - Shared constants in lib.rs, private notifier fields - Tracing instrumentation on walk and notification methods Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 9b32577 commit a8bfea2

6 files changed

Lines changed: 388 additions & 245 deletions

File tree

crates/host-rpc/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,5 @@ signet-types.workspace = true
1717
alloy.workspace = true
1818
futures-util.workspace = true
1919
thiserror.workspace = true
20+
tokio.workspace = true
2021
tracing.workspace = true

crates/host-rpc/src/builder.rs

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
11
use crate::{RpcHostError, RpcHostNotifier};
22
use alloy::providers::Provider;
3-
use std::collections::VecDeque;
4-
5-
/// Default block buffer capacity.
6-
const DEFAULT_BUFFER_CAPACITY: usize = 64;
7-
/// Default backfill batch size.
8-
const DEFAULT_BACKFILL_BATCH_SIZE: u64 = 32;
3+
use tracing::warn;
94

105
/// Builder for [`RpcHostNotifier`].
116
///
@@ -15,6 +10,7 @@ const DEFAULT_BACKFILL_BATCH_SIZE: u64 = 32;
1510
/// let notifier = RpcHostNotifierBuilder::new(provider)
1611
/// .with_buffer_capacity(128)
1712
/// .with_backfill_batch_size(64)
13+
/// .with_genesis_timestamp(1_606_824_023)
1814
/// .build()
1915
/// .await?;
2016
/// ```
@@ -34,8 +30,8 @@ where
3430
pub const fn new(provider: P) -> Self {
3531
Self {
3632
provider,
37-
buffer_capacity: DEFAULT_BUFFER_CAPACITY,
38-
backfill_batch_size: DEFAULT_BACKFILL_BATCH_SIZE,
33+
buffer_capacity: crate::DEFAULT_BUFFER_CAPACITY,
34+
backfill_batch_size: crate::DEFAULT_BACKFILL_BATCH_SIZE,
3935
genesis_timestamp: 0,
4036
}
4137
}
@@ -60,20 +56,18 @@ where
6056

6157
/// Build the notifier, establishing the `newHeads` WebSocket subscription.
6258
pub async fn build(self) -> Result<RpcHostNotifier<P>, RpcHostError> {
59+
if self.genesis_timestamp == 0 {
60+
warn!("genesis_timestamp not set; epoch calculations will use Unix epoch");
61+
}
6362
let sub = self.provider.subscribe_blocks().await?;
6463
let header_sub = sub.into_stream();
6564

66-
Ok(RpcHostNotifier {
67-
provider: self.provider,
65+
Ok(RpcHostNotifier::new(
66+
self.provider,
6867
header_sub,
69-
block_buffer: VecDeque::with_capacity(self.buffer_capacity),
70-
buffer_capacity: self.buffer_capacity,
71-
cached_safe: None,
72-
cached_finalized: None,
73-
last_tag_epoch: None,
74-
backfill_from: None,
75-
backfill_batch_size: self.backfill_batch_size,
76-
genesis_timestamp: self.genesis_timestamp,
77-
})
68+
self.buffer_capacity,
69+
self.backfill_batch_size,
70+
self.genesis_timestamp,
71+
))
7872
}
7973
}

crates/host-rpc/src/error.rs

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,21 @@
1-
use alloy::transports::{RpcError, TransportErrorKind};
1+
use alloy::{
2+
eips::BlockNumberOrTag,
3+
primitives::B256,
4+
transports::{RpcError, TransportErrorKind},
5+
};
26

37
/// Errors from the RPC host notifier.
48
#[derive(Debug, thiserror::Error)]
59
pub enum RpcHostError {
6-
/// The WebSocket subscription was dropped unexpectedly.
7-
#[error("subscription closed")]
8-
SubscriptionClosed,
9-
1010
/// An RPC call failed.
1111
#[error("rpc error: {0}")]
1212
Rpc(#[from] RpcError<TransportErrorKind>),
1313

14-
/// The RPC node returned no block for the requested number.
15-
#[error("missing block {0}")]
16-
MissingBlock(u64),
14+
/// The RPC node returned no block for the requested hash.
15+
#[error("missing block with hash {0}")]
16+
MissingBlockByHash(B256),
1717

18-
/// Reorg deeper than the block buffer.
19-
#[error("reorg depth {depth} exceeds buffer capacity {capacity}")]
20-
ReorgTooDeep {
21-
/// The detected reorg depth.
22-
depth: u64,
23-
/// The configured buffer capacity.
24-
capacity: usize,
25-
},
18+
/// The RPC node returned no block for the requested number or tag.
19+
#[error("missing block {0}")]
20+
MissingBlock(BlockNumberOrTag),
2621
}

crates/host-rpc/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@
1111
#![deny(unused_must_use, rust_2018_idioms)]
1212
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
1313

14+
/// Default block buffer capacity.
15+
pub(crate) const DEFAULT_BUFFER_CAPACITY: usize = 64;
16+
/// Default backfill batch size.
17+
pub(crate) const DEFAULT_BACKFILL_BATCH_SIZE: u64 = 32;
18+
1419
mod builder;
1520
pub use builder::RpcHostNotifierBuilder;
1621

0 commit comments

Comments
 (0)