Skip to content

Commit afec144

Browse files
prestwichclaude
andcommitted
test: add integration tests for reorg tracking in RPC subscriptions and filters
Adds 7 integration tests verifying end-to-end reorg handling across the RPC layer: block tag rewind, polling filter watermark reset, push subscription removed-log emission, filter selectivity, and a no-regression check for normal block progression. Closes ENG-1972 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 93f823c commit afec144

1 file changed

Lines changed: 385 additions & 0 deletions

File tree

crates/node-tests/tests/reorg.rs

Lines changed: 385 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,385 @@
1+
use alloy::{
2+
primitives::{Address, B256, LogData},
3+
providers::Provider,
4+
rpc::types::eth::{Filter, Log},
5+
sol_types::{SolCall, SolEvent},
6+
};
7+
use serial_test::serial;
8+
use signet_node_tests::{HostBlockSpec, SignetTestContext, rpc_test, run_test, types::Counter};
9+
use std::time::Duration;
10+
11+
const SOME_USER: Address = Address::repeat_byte(0x39);
12+
13+
/// Helper: build and process an increment block using a host system
14+
/// transaction (`simple_transact`). This avoids the transaction pool
15+
/// entirely, which is important for reorg tests where we revert and
16+
/// rebuild blocks.
17+
///
18+
/// Returns the `HostBlockSpec` so it can later be reverted.
19+
fn increment_block(ctx: &SignetTestContext, contract_address: Address) -> HostBlockSpec {
20+
ctx.start_host_block().simple_transact(
21+
ctx.addresses[1],
22+
contract_address,
23+
Counter::incrementCall::SELECTOR,
24+
0,
25+
)
26+
}
27+
28+
/// Process an increment block and return the spec for later revert.
29+
async fn process_increment(ctx: &SignetTestContext, contract_address: Address) -> HostBlockSpec {
30+
let block = increment_block(ctx, contract_address);
31+
let for_revert = block.clone();
32+
ctx.process_block(block).await.unwrap();
33+
for_revert
34+
}
35+
36+
// ---------------------------------------------------------------------------
37+
// 1. Block tags
38+
// ---------------------------------------------------------------------------
39+
40+
#[serial]
41+
#[tokio::test]
42+
async fn test_block_tags_reorg() {
43+
run_test(|ctx| async move {
44+
// Process two blocks via enter events.
45+
let block1 = HostBlockSpec::new(ctx.constants()).enter_token(
46+
SOME_USER,
47+
1000,
48+
ctx.constants().host().tokens().usdc(),
49+
);
50+
let block1_clone = block1.clone();
51+
ctx.process_block(block1).await.unwrap();
52+
53+
let block2 = HostBlockSpec::new(ctx.constants()).enter_token(
54+
SOME_USER,
55+
2000,
56+
ctx.constants().host().tokens().usdc(),
57+
);
58+
let block2_clone = block2.clone();
59+
ctx.process_block(block2).await.unwrap();
60+
61+
assert_eq!(ctx.alloy_provider.get_block_number().await.unwrap(), 2);
62+
63+
// Revert block 2.
64+
ctx.revert_block(block2_clone).await.unwrap();
65+
assert_eq!(ctx.alloy_provider.get_block_number().await.unwrap(), 1);
66+
67+
// Revert block 1.
68+
ctx.revert_block(block1_clone).await.unwrap();
69+
assert_eq!(ctx.alloy_provider.get_block_number().await.unwrap(), 0);
70+
71+
// Rebuild two new blocks.
72+
let new_block1 = HostBlockSpec::new(ctx.constants()).enter_token(
73+
SOME_USER,
74+
500,
75+
ctx.constants().host().tokens().usdc(),
76+
);
77+
ctx.process_block(new_block1).await.unwrap();
78+
assert_eq!(ctx.alloy_provider.get_block_number().await.unwrap(), 1);
79+
80+
let new_block2 = HostBlockSpec::new(ctx.constants()).enter_token(
81+
SOME_USER,
82+
600,
83+
ctx.constants().host().tokens().usdc(),
84+
);
85+
ctx.process_block(new_block2).await.unwrap();
86+
assert_eq!(ctx.alloy_provider.get_block_number().await.unwrap(), 2);
87+
88+
// Verify the new block 2 is accessible.
89+
let block = ctx.alloy_provider.get_block_by_number(2.into()).await.unwrap();
90+
assert!(block.is_some());
91+
})
92+
.await;
93+
}
94+
95+
// ---------------------------------------------------------------------------
96+
// 2. Block filter + reorg
97+
// ---------------------------------------------------------------------------
98+
99+
#[serial]
100+
#[tokio::test]
101+
async fn test_block_filter_reorg() {
102+
rpc_test(|ctx, contract| async move {
103+
// Install a block filter (starts after block 1, where contract was deployed).
104+
let filter_id = ctx.alloy_provider.new_block_filter().await.unwrap();
105+
106+
// Process block 2 (increment via system tx).
107+
let _block2 = process_increment(&ctx, *contract.address()).await;
108+
109+
// Poll: should have 1 block hash.
110+
let hashes: Vec<B256> = ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap();
111+
assert_eq!(hashes.len(), 1);
112+
let block2_hash = hashes[0];
113+
114+
// Process block 3 (increment), keep clone for revert.
115+
let block3 = process_increment(&ctx, *contract.address()).await;
116+
117+
// Revert block 3.
118+
ctx.revert_block(block3).await.unwrap();
119+
120+
// Poll: reorg watermark resets start to ancestor+1 (= 3), but latest
121+
// is now 2, so start > latest -> empty.
122+
let hashes: Vec<B256> = ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap();
123+
assert!(hashes.is_empty());
124+
125+
// Process a new block 3.
126+
let _new_block3 = process_increment(&ctx, *contract.address()).await;
127+
128+
// Poll: should return the new block 3 hash.
129+
let hashes: Vec<B256> = ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap();
130+
assert_eq!(hashes.len(), 1);
131+
// Verify it is NOT the old block 2 hash (it should be the new block 3).
132+
assert_ne!(hashes[0], block2_hash);
133+
134+
ctx
135+
})
136+
.await;
137+
}
138+
139+
// ---------------------------------------------------------------------------
140+
// 3. Log filter + reorg
141+
// ---------------------------------------------------------------------------
142+
143+
#[serial]
144+
#[tokio::test]
145+
async fn test_log_filter_reorg() {
146+
rpc_test(|ctx, contract| async move {
147+
// Install a log filter on the Counter address.
148+
let filter_id = ctx
149+
.alloy_provider
150+
.new_filter(&Filter::new().address(*contract.address()))
151+
.await
152+
.unwrap();
153+
154+
// Process block 2 (increment -> count=1).
155+
let _block2 = process_increment(&ctx, *contract.address()).await;
156+
157+
// Poll: 1 log.
158+
let logs: Vec<Log<LogData>> =
159+
ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap();
160+
assert_eq!(logs.len(), 1);
161+
assert_eq!(logs[0].inner.topics()[0], Counter::Count::SIGNATURE_HASH);
162+
assert_eq!(logs[0].inner.topics()[1], B256::with_last_byte(1));
163+
164+
// Process block 3 (increment -> count=2), clone for revert.
165+
let block3 = process_increment(&ctx, *contract.address()).await;
166+
167+
// Revert block 3.
168+
ctx.revert_block(block3).await.unwrap();
169+
170+
// Poll: empty (watermark rewinds, but latest < start).
171+
let logs: Vec<Log<LogData>> =
172+
ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap();
173+
assert!(logs.is_empty());
174+
175+
// Process a new block 3 (increment -> count=2 again).
176+
let _new_block3 = process_increment(&ctx, *contract.address()).await;
177+
178+
// Poll: 1 log with count=2.
179+
let logs: Vec<Log<LogData>> =
180+
ctx.alloy_provider.get_filter_changes(filter_id).await.unwrap();
181+
assert_eq!(logs.len(), 1);
182+
assert_eq!(logs[0].inner.topics()[1], B256::with_last_byte(2));
183+
184+
ctx
185+
})
186+
.await;
187+
}
188+
189+
// ---------------------------------------------------------------------------
190+
// 4. Block subscription + reorg
191+
// ---------------------------------------------------------------------------
192+
193+
#[serial]
194+
#[tokio::test]
195+
async fn test_block_subscription_reorg() {
196+
rpc_test(|ctx, contract| async move {
197+
let mut sub = ctx.alloy_provider.subscribe_blocks().await.unwrap();
198+
199+
// Process block 2.
200+
let block2 = process_increment(&ctx, *contract.address()).await;
201+
202+
let header =
203+
tokio::time::timeout(Duration::from_secs(5), sub.recv()).await.unwrap().unwrap();
204+
assert_eq!(header.number, 2);
205+
206+
// Revert block 2. Block subs do not emit anything for reorgs.
207+
ctx.revert_block(block2).await.unwrap();
208+
209+
// Process a new block 2.
210+
let _new_block2 = process_increment(&ctx, *contract.address()).await;
211+
212+
let header =
213+
tokio::time::timeout(Duration::from_secs(5), sub.recv()).await.unwrap().unwrap();
214+
assert_eq!(header.number, 2);
215+
216+
ctx
217+
})
218+
.await;
219+
}
220+
221+
// ---------------------------------------------------------------------------
222+
// 5. Log subscription + reorg (removed: true)
223+
// ---------------------------------------------------------------------------
224+
225+
#[serial]
226+
#[tokio::test]
227+
async fn test_log_subscription_reorg() {
228+
rpc_test(|ctx, contract| async move {
229+
let mut sub = ctx
230+
.alloy_provider
231+
.subscribe_logs(&Filter::new().address(*contract.address()))
232+
.await
233+
.unwrap();
234+
235+
// Process block 2 (increment -> count=1).
236+
let block2 = process_increment(&ctx, *contract.address()).await;
237+
238+
// Receive the normal log.
239+
let log = tokio::time::timeout(Duration::from_secs(5), sub.recv()).await.unwrap().unwrap();
240+
assert!(!log.removed);
241+
assert_eq!(log.inner.address, *contract.address());
242+
assert_eq!(log.inner.topics()[0], Counter::Count::SIGNATURE_HASH);
243+
assert_eq!(log.inner.topics()[1], B256::with_last_byte(1));
244+
245+
// Revert block 2.
246+
ctx.revert_block(block2).await.unwrap();
247+
248+
// Receive the removed log.
249+
let removed_log =
250+
tokio::time::timeout(Duration::from_secs(5), sub.recv()).await.unwrap().unwrap();
251+
assert!(removed_log.removed);
252+
assert_eq!(removed_log.inner.address, *contract.address());
253+
assert_eq!(removed_log.inner.topics()[0], Counter::Count::SIGNATURE_HASH);
254+
255+
// Process a new block 2 (increment -> count=1 again).
256+
let _new_block2 = process_increment(&ctx, *contract.address()).await;
257+
258+
// Receive the new log.
259+
let new_log =
260+
tokio::time::timeout(Duration::from_secs(5), sub.recv()).await.unwrap().unwrap();
261+
assert!(!new_log.removed);
262+
assert_eq!(new_log.inner.address, *contract.address());
263+
assert_eq!(new_log.inner.topics()[1], B256::with_last_byte(1));
264+
265+
ctx
266+
})
267+
.await;
268+
}
269+
270+
// ---------------------------------------------------------------------------
271+
// 6. Log subscription filter selectivity during reorg
272+
// ---------------------------------------------------------------------------
273+
274+
#[serial]
275+
#[tokio::test]
276+
async fn test_log_subscription_reorg_filter_selectivity() {
277+
rpc_test(|ctx, contract| async move {
278+
// Subscribe to logs on the Counter address (should receive events).
279+
let mut matching_sub = ctx
280+
.alloy_provider
281+
.subscribe_logs(&Filter::new().address(*contract.address()))
282+
.await
283+
.unwrap();
284+
285+
// Subscribe to logs on a non-matching address (should receive nothing).
286+
let mut non_matching_sub =
287+
ctx.alloy_provider.subscribe_logs(&Filter::new().address(SOME_USER)).await.unwrap();
288+
289+
// Process a block with an increment system tx.
290+
let block2 = process_increment(&ctx, *contract.address()).await;
291+
292+
// The matching subscription should receive the log.
293+
let log = tokio::time::timeout(Duration::from_secs(5), matching_sub.recv())
294+
.await
295+
.unwrap()
296+
.unwrap();
297+
assert!(!log.removed);
298+
assert_eq!(log.inner.address, *contract.address());
299+
300+
// The non-matching subscription should receive nothing.
301+
let extra = tokio::time::timeout(Duration::from_millis(200), non_matching_sub.recv()).await;
302+
assert!(extra.is_err(), "non-matching sub should not receive the log");
303+
304+
// Revert: only the matching subscription should get a removed log.
305+
ctx.revert_block(block2).await.unwrap();
306+
307+
let removed = tokio::time::timeout(Duration::from_secs(5), matching_sub.recv())
308+
.await
309+
.unwrap()
310+
.unwrap();
311+
assert!(removed.removed);
312+
assert_eq!(removed.inner.address, *contract.address());
313+
314+
// The non-matching subscription should still receive nothing.
315+
let extra = tokio::time::timeout(Duration::from_millis(200), non_matching_sub.recv()).await;
316+
assert!(extra.is_err(), "non-matching sub should not receive removed log");
317+
318+
ctx
319+
})
320+
.await;
321+
}
322+
323+
// ---------------------------------------------------------------------------
324+
// 7. No-regression: normal progression with filters and subscriptions
325+
// ---------------------------------------------------------------------------
326+
327+
#[serial]
328+
#[tokio::test]
329+
async fn test_no_regression_filters_and_subscriptions() {
330+
rpc_test(|ctx, contract| async move {
331+
// Install filters.
332+
let block_filter = ctx.alloy_provider.new_block_filter().await.unwrap();
333+
let log_filter = ctx
334+
.alloy_provider
335+
.new_filter(&Filter::new().address(*contract.address()))
336+
.await
337+
.unwrap();
338+
339+
// Subscribe.
340+
let mut block_sub = ctx.alloy_provider.subscribe_blocks().await.unwrap();
341+
let mut log_sub = ctx
342+
.alloy_provider
343+
.subscribe_logs(&Filter::new().address(*contract.address()))
344+
.await
345+
.unwrap();
346+
347+
// Process 2 increments via system transactions.
348+
let _b2 = process_increment(&ctx, *contract.address()).await;
349+
let _b3 = process_increment(&ctx, *contract.address()).await;
350+
351+
// Poll block filter: 2 hashes.
352+
let hashes: Vec<B256> = ctx.alloy_provider.get_filter_changes(block_filter).await.unwrap();
353+
assert_eq!(hashes.len(), 2);
354+
355+
// Poll log filter: 2 logs with sequential counter values.
356+
let logs: Vec<Log<LogData>> =
357+
ctx.alloy_provider.get_filter_changes(log_filter).await.unwrap();
358+
assert_eq!(logs.len(), 2);
359+
assert_eq!(logs[0].inner.topics()[1], B256::with_last_byte(1));
360+
assert_eq!(logs[1].inner.topics()[1], B256::with_last_byte(2));
361+
362+
// Receive 2 block headers.
363+
for expected_num in [2, 3] {
364+
let header = tokio::time::timeout(Duration::from_secs(5), block_sub.recv())
365+
.await
366+
.unwrap()
367+
.unwrap();
368+
assert_eq!(header.number, expected_num);
369+
}
370+
371+
// Receive 2 log events, all removed=false.
372+
for expected_count in [1u8, 2] {
373+
let log = tokio::time::timeout(Duration::from_secs(5), log_sub.recv())
374+
.await
375+
.unwrap()
376+
.unwrap();
377+
assert!(!log.removed);
378+
assert_eq!(log.inner.address, *contract.address());
379+
assert_eq!(log.inner.topics()[1], B256::with_last_byte(expected_count));
380+
}
381+
382+
ctx
383+
})
384+
.await;
385+
}

0 commit comments

Comments
 (0)