Skip to content

Commit c602aa8

Browse files
prestwichclaude
andauthored
feat: handle ChainEvent::Reorg in SubscriptionTask (#97)
* feat: handle ChainEvent::Reorg in SubscriptionTask with removed log emission Replace the no-op reorg arm in SubscriptionTask::task_future with proper handling that filters removed logs against subscription criteria and emits them with `removed: true` per the Ethereum JSON-RPC spec. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * chore: bump signet-storage deps to 0.6.5 and remove patch overrides Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * refactor: store removed headers in ReorgNotification for block metadata Address review feedback: - Replace `removed_hashes`/`removed_logs` with per-block `RemovedBlock` structs carrying headers, so removed logs include `block_hash`, `block_number`, and `block_timestamp` per the Ethereum JSON-RPC spec. - Use `let else` instead of match for early return (Evalir). - Take `ReorgNotification` by value in `filter_reorg_for_sub` to avoid intermediate `collect` and per-log clones. - Use `into_inner()` on `SealedHeader` instead of `inner().clone()`. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent fe9585f commit c602aa8

6 files changed

Lines changed: 152 additions & 26 deletions

File tree

Cargo.toml

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,12 @@ signet-tx-cache = "0.16.0-rc.11"
5353
signet-types = "0.16.0-rc.11"
5454
signet-zenith = "0.16.0-rc.11"
5555
signet-journal = "0.16.0-rc.11"
56-
signet-storage = "0.6.4"
57-
signet-cold = "0.6.4"
58-
signet-hot = "0.6.4"
59-
signet-hot-mdbx = "0.6.4"
60-
signet-cold-mdbx = "0.6.4"
61-
signet-storage-types = "0.6.4"
56+
signet-storage = "0.6.5"
57+
signet-cold = "0.6.5"
58+
signet-hot = "0.6.5"
59+
signet-hot-mdbx = "0.6.5"
60+
signet-cold-mdbx = "0.6.5"
61+
signet-storage-types = "0.6.5"
6262

6363
# ajj
6464
ajj = { version = "0.6.0" }
@@ -114,11 +114,6 @@ url = "2.5.4"
114114
tempfile = "3.17.0"
115115

116116
[patch.crates-io]
117-
signet-cold = { git = "https://github.com/init4tech/storage.git", branch = "james/eng-1978" }
118-
signet-hot = { git = "https://github.com/init4tech/storage.git", branch = "james/eng-1978" }
119-
signet-storage = { git = "https://github.com/init4tech/storage.git", branch = "james/eng-1978" }
120-
signet-storage-types = { git = "https://github.com/init4tech/storage.git", branch = "james/eng-1978" }
121-
122117
# signet-bundle = { path = "../sdk/crates/bundle"}
123118
# signet-constants = { path = "../sdk/crates/constants"}
124119
# signet-evm = { path = "../sdk/crates/evm"}

crates/node/src/node.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ use signet_block_processor::{AliasOracleFactory, SignetBlockProcessorV1};
1515
use signet_evm::EthereumHardfork;
1616
use signet_extract::Extractor;
1717
use signet_node_config::SignetNodeConfig;
18-
use signet_rpc::{ChainNotifier, NewBlockNotification, ReorgNotification, RpcServerGuard};
18+
use signet_rpc::{
19+
ChainNotifier, NewBlockNotification, RemovedBlock, ReorgNotification, RpcServerGuard,
20+
};
1921
use signet_storage::{DrainedBlock, HistoryRead, HotKv, HotKvRead, UnifiedStorage};
2022
use signet_types::{PairedHeights, constants::SignetSystemConstants};
2123
use std::{fmt, sync::Arc};
@@ -363,14 +365,16 @@ where
363365

364366
/// Send a reorg notification on the broadcast channel.
365367
fn notify_reorg(&self, drained: Vec<DrainedBlock>, common_ancestor: u64) {
366-
let removed_hashes = drained.iter().map(|d| d.header.hash()).collect();
367-
let removed_logs = drained
368+
let removed_blocks = drained
368369
.into_iter()
369-
.flat_map(|d| d.receipts)
370-
.flat_map(|r| r.receipt.logs)
371-
.map(|l| l.inner)
370+
.map(|d| {
371+
let header = d.header.into_inner();
372+
let logs =
373+
d.receipts.into_iter().flat_map(|r| r.receipt.logs).map(|l| l.inner).collect();
374+
RemovedBlock { header, logs }
375+
})
372376
.collect();
373-
let notif = ReorgNotification { common_ancestor, removed_hashes, removed_logs };
377+
let notif = ReorgNotification { common_ancestor, removed_blocks };
374378
// Ignore send errors — no subscribers is fine.
375379
let _ = self.chain.send_reorg(notif);
376380
}

crates/rpc/src/interest/kind.rs

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
//! Filter kinds for subscriptions and polling filters.
22
3-
use crate::interest::{NewBlockNotification, filters::FilterOutput, subs::SubscriptionBuffer};
3+
use crate::interest::{
4+
NewBlockNotification, ReorgNotification, filters::FilterOutput, subs::SubscriptionBuffer,
5+
};
46
use alloy::rpc::types::{Filter, Header, Log};
57
use std::collections::VecDeque;
68

@@ -94,4 +96,116 @@ impl InterestKind {
9496
Self::Block => SubscriptionBuffer::Block(VecDeque::new()),
9597
}
9698
}
99+
100+
/// Filter a reorg notification for a subscription, producing a buffer of
101+
/// removed logs (with `removed: true`) that match this filter.
102+
///
103+
/// Block subscriptions return an empty buffer — the Ethereum JSON-RPC
104+
/// spec does not push removed headers for `newHeads` subscriptions.
105+
pub(crate) fn filter_reorg_for_sub(&self, reorg: ReorgNotification) -> SubscriptionBuffer {
106+
let Some(filter) = self.as_filter() else {
107+
return self.empty_sub_buffer();
108+
};
109+
110+
let logs: VecDeque<Log> = reorg
111+
.removed_blocks
112+
.into_iter()
113+
.flat_map(|block| {
114+
let block_hash = block.header.hash_slow();
115+
let block_number = block.header.number;
116+
let block_timestamp = block.header.timestamp;
117+
block.logs.into_iter().filter(move |log| filter.matches(log)).map(move |log| Log {
118+
inner: log,
119+
block_hash: Some(block_hash),
120+
block_number: Some(block_number),
121+
block_timestamp: Some(block_timestamp),
122+
transaction_hash: None,
123+
transaction_index: None,
124+
log_index: None,
125+
removed: true,
126+
})
127+
})
128+
.collect();
129+
130+
SubscriptionBuffer::Log(logs)
131+
}
132+
}
133+
134+
#[cfg(test)]
135+
mod tests {
136+
use super::*;
137+
use crate::interest::RemovedBlock;
138+
use alloy::primitives::{Address, B256, Bytes, LogData, address, b256};
139+
140+
fn test_log(addr: Address, topic: B256) -> alloy::primitives::Log {
141+
alloy::primitives::Log {
142+
address: addr,
143+
data: LogData::new_unchecked(vec![topic], Bytes::new()),
144+
}
145+
}
146+
147+
fn test_header(number: u64) -> alloy::consensus::Header {
148+
alloy::consensus::Header { number, timestamp: 1_000_000 + number, ..Default::default() }
149+
}
150+
151+
fn test_filter(addr: Address) -> Filter {
152+
Filter::new().address(addr)
153+
}
154+
155+
#[test]
156+
fn filter_reorg_for_sub_matches_logs() {
157+
let addr = address!("0x0000000000000000000000000000000000000001");
158+
let topic = b256!("0x0000000000000000000000000000000000000000000000000000000000000001");
159+
160+
let header = test_header(11);
161+
let kind = InterestKind::Log(Box::new(test_filter(addr)));
162+
let reorg = ReorgNotification {
163+
common_ancestor: 10,
164+
removed_blocks: vec![RemovedBlock {
165+
header: header.clone(),
166+
logs: vec![test_log(addr, topic)],
167+
}],
168+
};
169+
170+
let buf = kind.filter_reorg_for_sub(reorg);
171+
let SubscriptionBuffer::Log(logs) = buf else { panic!("expected Log buffer") };
172+
173+
assert_eq!(logs.len(), 1);
174+
assert!(logs[0].removed);
175+
assert_eq!(logs[0].inner.address, addr);
176+
assert_eq!(logs[0].block_hash.unwrap(), header.hash_slow());
177+
assert_eq!(logs[0].block_number.unwrap(), 11);
178+
assert_eq!(logs[0].block_timestamp.unwrap(), 1_000_011);
179+
}
180+
181+
#[test]
182+
fn filter_reorg_for_sub_filters_non_matching() {
183+
let addr = address!("0x0000000000000000000000000000000000000001");
184+
let other = address!("0x0000000000000000000000000000000000000002");
185+
let topic = b256!("0x0000000000000000000000000000000000000000000000000000000000000001");
186+
187+
let kind = InterestKind::Log(Box::new(test_filter(addr)));
188+
let reorg = ReorgNotification {
189+
common_ancestor: 10,
190+
removed_blocks: vec![RemovedBlock {
191+
header: test_header(11),
192+
logs: vec![test_log(other, topic)],
193+
}],
194+
};
195+
196+
let buf = kind.filter_reorg_for_sub(reorg);
197+
let SubscriptionBuffer::Log(logs) = buf else { panic!("expected Log buffer") };
198+
assert!(logs.is_empty());
199+
}
200+
201+
#[test]
202+
fn filter_reorg_for_sub_block_returns_empty() {
203+
let reorg = ReorgNotification {
204+
common_ancestor: 10,
205+
removed_blocks: vec![RemovedBlock { header: test_header(11), logs: vec![] }],
206+
};
207+
208+
let buf = InterestKind::Block.filter_reorg_for_sub(reorg);
209+
assert!(buf.is_empty());
210+
}
97211
}

crates/rpc/src/interest/mod.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,20 @@ pub enum ChainEvent {
6969
Reorg(ReorgNotification),
7070
}
7171

72+
/// Data from a single block removed during a chain reorganization.
73+
#[derive(Debug, Clone)]
74+
pub struct RemovedBlock {
75+
/// The header of the removed block.
76+
pub header: alloy::consensus::Header,
77+
/// Logs emitted by the removed block.
78+
pub logs: Vec<alloy::primitives::Log>,
79+
}
80+
7281
/// Notification sent when a chain reorganization is detected.
7382
#[derive(Debug, Clone)]
7483
pub struct ReorgNotification {
7584
/// The block number of the common ancestor (last block still valid).
7685
pub common_ancestor: u64,
77-
/// Hashes of the removed blocks.
78-
pub removed_hashes: Vec<alloy::primitives::B256>,
79-
/// Logs from the removed blocks (needed for `removed: true` emission).
80-
pub removed_logs: Vec<alloy::primitives::Log>,
86+
/// Blocks removed by the reorg, each carrying its header and logs.
87+
pub removed_blocks: Vec<RemovedBlock>,
8188
}

crates/rpc/src/interest/subs.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,8 +232,14 @@ impl SubscriptionTask {
232232
};
233233
let notif = match event {
234234
ChainEvent::NewBlock(notif) => *notif,
235-
// Reorg handling will be addressed in future PRs (ENG-1968 et al.)
236-
ChainEvent::Reorg(_) => continue,
235+
ChainEvent::Reorg(reorg) => {
236+
let output = filter.filter_reorg_for_sub(reorg);
237+
trace!(count = output.len(), "Reorg filter applied");
238+
if !output.is_empty() {
239+
notif_buffer.extend(output);
240+
}
241+
continue;
242+
}
237243
};
238244

239245
let output = filter.filter_notification_for_sub(&notif);

crates/rpc/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ mod eth;
1818
pub use eth::EthError;
1919

2020
mod interest;
21-
pub use interest::{ChainEvent, NewBlockNotification, ReorgNotification};
21+
pub use interest::{ChainEvent, NewBlockNotification, RemovedBlock, ReorgNotification};
2222

2323
mod debug;
2424
pub use debug::DebugError;

0 commit comments

Comments
 (0)