Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.sf.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## v0.9.1-fh-1

* Fixed flash blocks to arrive in the right order and be 100% identical to the canonical blocks

## v0.9.1-fh

* bumped upstream to 0.9.1
Expand Down
168 changes: 101 additions & 67 deletions crates/firehose-flashblocks/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,7 @@ impl std::fmt::Debug for SpeculativeStateRoot {
.field("block_number", &self.block_number)
.field("flashblock_index", &self.flashblock_index)
.field("revision", &self.revision)
.field(
"result",
&self.result.lock().ok().and_then(|guard| *guard),
)
.field("result", &self.result.lock().ok().and_then(|guard| *guard))
.finish_non_exhaustive()
}
}
Expand Down Expand Up @@ -426,11 +423,7 @@ where
pub fn speculative_state_root_status_for_test(&self) -> Option<(u64, u64, u64, bool)> {
let state = self.state.lock().expect("flashblock state mutex poisoned");
let spec = state.speculative_state_root.as_ref()?;
let completed = spec
.result
.lock()
.expect("spec result mutex poisoned")
.is_some();
let completed = spec.result.lock().expect("spec result mutex poisoned").is_some();
Some((spec.block_number, spec.flashblock_index, spec.revision, completed))
}

Expand Down Expand Up @@ -481,10 +474,7 @@ where
///
/// - **None** — `(false, None)`: peek is absent or unrelated; the current flashblock
/// executes and emits as a non-final partial.
fn classify_peek(
current: &Flashblock,
peek: Option<&Flashblock>,
) -> (bool, Option<B256>) {
fn classify_peek(current: &Flashblock, peek: Option<&Flashblock>) -> (bool, Option<B256>) {
let Some(peek) = peek else { return (false, None) };
let cur_block = current.metadata.block_number;
let peek_block = peek.metadata.block_number;
Expand Down Expand Up @@ -697,12 +687,30 @@ where
// divergence between our `BlockAssembler` and reth's canonical
// sealing, etc. — see the diagnostic fields surfaced below).
if !state.final_part_sent && !state.stored_flashblocks.is_empty() {
// Same optimization as `on_canonical_block`: the live
// revision still matches what the speculative precompute
// captured (no flashblock has executed for the previous
// block since the spec was launched), so a hit avoids
// the synchronous trie traversal.
let cached_state_root = Self::cached_state_root_for(
&state,
latest_block,
state.bundle_revision,
);
if cached_state_root.is_some() {
debug!(
block = latest_block,
revision = state.bundle_revision,
"FirstOfNextBlock-fallback: using speculatively-precomputed state_root"
);
}
match Self::build_is_final_emission(
latest_block,
latest_idx,
&state.stored_flashblocks,
state.accumulated_db.as_ref(),
stored_parent_hash,
cached_state_root,
) {
Ok(emission) => {
pending_final_emission = Some(emission);
Expand Down Expand Up @@ -849,7 +857,8 @@ where
if squash {
debug!(
block = block_number,
index, "squashing flashblock execution; accumulated for next non-squashed flashblock"
index,
"squashing flashblock execution; accumulated for next non-squashed flashblock"
);
// Still flush any pending is_final FIRE BLOCK queued by the
// FirstOfNextBlock transition (squashing the new base must not drop
Expand Down Expand Up @@ -1023,8 +1032,7 @@ where
let Ok(rt) = tokio::runtime::Handle::try_current() else {
debug!(
block_number,
index,
"no tokio runtime in scope; skipping speculative state-root precompute"
index, "no tokio runtime in scope; skipping speculative state-root precompute"
);
return;
};
Expand All @@ -1037,19 +1045,18 @@ where
let parent_block = block_number.saturating_sub(1);

let handle = rt.spawn_blocking(move || {
let provider = match client
.state_by_block_number_or_tag(BlockNumberOrTag::Number(parent_block))
{
Ok(p) => p,
Err(err) => {
debug!(
parent_block,
error = %err,
"speculative state-root: parent state unavailable"
);
return;
}
};
let provider =
match client.state_by_block_number_or_tag(BlockNumberOrTag::Number(parent_block)) {
Ok(p) => p,
Err(err) => {
debug!(
parent_block,
error = %err,
"speculative state-root: parent state unavailable"
);
return;
}
};
let hashed = provider.hashed_post_state(&bundle);
match provider.state_root(hashed) {
Ok(root) => {
Expand All @@ -1075,12 +1082,7 @@ where
result,
handle,
});
debug!(
block_number,
index,
revision,
"speculative state-root precompute launched"
);
debug!(block_number, index, revision, "speculative state-root precompute launched");
}

/// Returns the speculatively-precomputed state_root if one exists and is still
Expand All @@ -1093,6 +1095,19 @@ where
expected_revision: u64,
) -> Option<B256> {
let state = self.state.lock().ok()?;
Self::cached_state_root_for(&state, block_number, expected_revision)
}

/// Same as [`Self::try_speculative_state_root`] but reads from a `ProcessorState`
/// guard the caller already holds — used by [`Self::on_canonical_block`] and the
/// `FirstOfNextBlock` fallback in [`Self::process_inner`], both of which lock
/// `self.state` for the duration of the is_final-emission decision and would
/// deadlock against re-locking inside [`Self::try_speculative_state_root`].
fn cached_state_root_for(
state: &ProcessorState,
block_number: u64,
expected_revision: u64,
) -> Option<B256> {
let spec = state.speculative_state_root.as_ref()?;
if spec.block_number != block_number || spec.revision != expected_revision {
return None;
Expand Down Expand Up @@ -1165,14 +1180,29 @@ where
state.awaiting_canonical_confirmation = false;
return;
}
let stored_clone = state.stored_flashblocks.clone();
let latest_idx = state.latest_flashblock_index.unwrap_or(0);
// No bundle-changing work has happened since the last flashblock
// execution, so the live revision is the one the speculative
// precompute (if any) was launched against. A hit skips the slow
// synchronous state_root and is the key to beating reth's full-block
// FIRE BLOCK out the door.
let cached_state_root =
Self::cached_state_root_for(&state, canonical_block_number, state.bundle_revision);
if cached_state_root.is_some() {
debug!(
block = canonical_block_number,
revision = state.bundle_revision,
"canonical-driven is_final: using speculatively-precomputed state_root"
);
}
Comment on lines +1184 to +1197
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move that inside emission directly? Would be easier to see that block number is correct. checked for alignment if call was done later, no big deal though, I leave the decision to you.


match Self::build_is_final_emission(
canonical_block_number,
latest_idx,
&stored_clone,
&state.stored_flashblocks,
state.accumulated_db.as_ref(),
canonical_block_hash,
cached_state_root,
) {
Ok(emission) => {
// emit_final_if_pending locks self.tracer only; it does not
Expand Down Expand Up @@ -1332,10 +1362,8 @@ where
};
let all_transactions: Vec<Bytes> =
stored.iter().flat_map(|fb| fb.diff.transactions.clone()).collect();
let merged_index = stored
.last()
.expect("pending_state implies non-empty stored_flashblocks")
.index;
let merged_index =
stored.last().expect("pending_state implies non-empty stored_flashblocks").index;
if let Err(err) = self.execute_flashblock(
&assembled,
merged_index,
Expand Down Expand Up @@ -1445,6 +1473,7 @@ where
flashblocks: &[Flashblock],
accumulated_db: Option<&AccumulatedDb>,
expected_parent_hash: B256,
precomputed_state_root: Option<B256>,
) -> Result<PendingFinalEmission, BuildIsFinalError> {
let assembled = BlockAssembler::assemble(flashblocks)
.map_err(|err| BuildIsFinalError::AssembleFailed(format!("{err:?}")))?;
Expand All @@ -1462,8 +1491,17 @@ where
let assembled_header_hash_with_wire_state_root = assembled.block.header.hash_slow();
let total_transactions: usize =
flashblocks.iter().map(|fb| fb.diff.transactions.len()).sum();
let state_root = Self::compute_state_root(accumulated_db)
.map_err(|err| BuildIsFinalError::StateRootFailed(format!("{err}")))?;
// When the caller supplies a precomputed state_root (typically from the
// speculative cache that fires at index ≥ 10), skip the synchronous
// 100-150 ms trie traversal — this is the entire point of the cache,
// and it's what lets the canonical-driven is_final FIRE BLOCK win the
// race against reth's full-block FIRE BLOCK emitted from `mark_verified`
// after its own state-root validation.
let state_root = match precomputed_state_root {
Some(root) => root,
None => Self::compute_state_root(accumulated_db)
.map_err(|err| BuildIsFinalError::StateRootFailed(format!("{err}")))?,
};
let mut block = assembled.block.clone();
block.header.state_root = state_root;
let recomputed_hash = block.header.hash_slow();
Expand Down Expand Up @@ -1673,28 +1711,29 @@ where
let state = self.state.lock().expect("flashblock state mutex poisoned");
state.bundle_revision + if bundle_changed { 1 } else { 0 }
};
let state_root_result = match self
.try_speculative_state_root(block_number, projected_revision)
{
Some(root) => {
debug!(
block_number,
index,
revision = projected_revision,
state_root = %root,
"is_final: using speculatively-precomputed state_root"
);
Ok(root)
}
None => Self::compute_state_root(Some(&*accumulated_db)),
};
let state_root_result =
match self.try_speculative_state_root(block_number, projected_revision) {
Some(root) => {
debug!(
block_number,
index,
revision = projected_revision,
state_root = %root,
"is_final: using speculatively-precomputed state_root"
);
Ok(root)
}
None => Self::compute_state_root(Some(&*accumulated_db)),
};
match state_root_result {
Ok(state_root) => {
let mut recomputed_header = assembled.block.header.clone();
recomputed_header.state_root = state_root;
let recomputed_hash = recomputed_header.hash_slow();
if recomputed_hash == expected_parent_hash {
block_tracer.tracer_mut().set_final_flash_block(recomputed_hash, state_root);
block_tracer
.tracer_mut()
.set_final_flash_block(recomputed_hash, state_root);
emitted_is_final = true;
debug!(
block = block_number,
Expand All @@ -1719,9 +1758,8 @@ where
}
}
Err(err) => {
let io_err = std::io::Error::other(format!(
"state_root computation failed: {err}"
));
let io_err =
std::io::Error::other(format!("state_root computation failed: {err}"));
warn!(
block = block_number,
index,
Expand Down Expand Up @@ -1809,11 +1847,7 @@ where
self.process(flashblock, false, None);
}

fn on_flashblock_received_with_peek(
&self,
flashblock: Flashblock,
peek: Option<&Flashblock>,
) {
fn on_flashblock_received_with_peek(&self, flashblock: Flashblock, peek: Option<&Flashblock>) {
let (squash, is_final_expected_hash) = Self::classify_peek(&flashblock, peek);
self.process(flashblock, squash, is_final_expected_hash);
}
Expand Down
Loading