Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
332 changes: 219 additions & 113 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ members = [
"crates/era-downloader",
"crates/era-utils",
"crates/errors/",
"crates/firehose/",
"crates/ethereum/hardforks/",
"crates/ethereum/cli/",
"crates/ethereum/consensus/",
Expand Down Expand Up @@ -351,6 +352,7 @@ reth-era = { path = "crates/era" }
reth-era-downloader = { path = "crates/era-downloader" }
reth-era-utils = { path = "crates/era-utils" }
reth-errors = { path = "crates/errors" }
reth-firehose = { path = "crates/firehose" }
reth-eth-wire = { path = "crates/net/eth-wire" }
reth-eth-wire-types = { path = "crates/net/eth-wire-types" }
reth-ethereum-payload-builder = { path = "crates/ethereum/payload" }
Expand Down Expand Up @@ -505,6 +507,7 @@ dirs-next = "2.0.0"
dyn-clone = "1.0.17"
eyre = "0.6"
fdlimit = "0.3.0"
firehose-tracer = "5.1.0"
fixed-map = { version = "0.9", default-features = false }
humantime = "2.1"
imbl = "7"
Expand Down Expand Up @@ -700,3 +703,11 @@ vergen-git2 = "9.1.0"

# networking
ipnet = "2.11"

[patch.crates-io]
# Patch rbase64 to remove its #[global_allocator] declaration, which conflicts
# with reth's own global allocator (jemalloc / snmalloc).
# The original rbase64 crate sets MiMalloc as the global allocator in its lib.rs,
# which is a library-level bug. This local copy removes that declaration while
# keeping all encoding/decoding behaviour unchanged.
rbase64 = { path = "vendor/rbase64" }
1 change: 1 addition & 0 deletions bin/reth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ workspace = true
# reth
reth-ethereum-cli.workspace = true
reth-chainspec.workspace = true
reth-firehose.workspace = true
reth-primitives-traits.workspace = true
reth-ethereum-primitives.workspace = true
reth-db = { workspace = true, features = ["mdbx"] }
Expand Down
1 change: 1 addition & 0 deletions bin/reth/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,5 @@ use aquamarine as _;
// used in main
use clap as _;
use reth_cli_util as _;
use reth_firehose as _;
use tracing as _;
61 changes: 56 additions & 5 deletions bin/reth/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ static MALLOC_CONF: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0";
use clap::Parser;
use reth::cli::Cli;
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
use reth_firehose::FirehoseArgs;
use reth_node_ethereum::EthereumNode;
use tracing::info;

Expand All @@ -25,12 +26,62 @@ fn main() {
unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
}

if let Err(err) = Cli::<EthereumChainSpecParser>::parse().run(async move |builder, _| {
info!(target: "reth::cli", "Launching node");
let handle = builder.node(EthereumNode::default()).launch_with_debug_capabilities().await?;
if let Err(err) = Cli::<EthereumChainSpecParser, FirehoseArgs>::parse().run(
async move |builder, firehose_args| {
info!(target: "reth::cli", "Launching node");

handle.wait_for_node_exit().await
}) {
// Resolve the node data directory so the Firehose hook can derive the
// default cursor file path without needing the full NodeConfig later.
let data_dir = builder.config().datadir().data_dir().to_path_buf();

let handle = builder
.node(EthereumNode::default())
// ── Firehose startup hook ─────────────────────────────────────────
// Runs after all node components are built but BEFORE the sync
// pipeline starts. This is the correct place to:
// 1. Initialise the Firehose tracer with the emission config.
// 2. Detect any gap between the last emitted block (cursor file) and the
// execution stage checkpoint.
// 3. Re-emit the missing blocks before sync resumes.
// 4. Register the async writer's shutdown handle with the task executor so it is
// drained gracefully before process exit.
.on_component_initialized(move |node| {
// Build the tracer config from CLI args and the resolved data dir.
let cfg = firehose_args.to_tracer_config(&data_dir);
let cursor_path = cfg.cursor_path.clone();

// Initialise the global tracer. Returns a ShutdownHandle when
// the emission mode uses a background writer thread.
let shutdown_handle = reth_firehose::init_tracer(cfg);

// Detect and re-emit any blocks missed since the last run.
reth_firehose::check_gap_and_re_trace(&node.provider, cursor_path.as_deref())?;

// Wire the background writer's drain into the node shutdown
// lifecycle. The guard held by the async closure keeps the
// GracefulShutdown alive until drain() completes, ensuring the
// background thread has flushed all buffered blocks before the
// process exits.
if let Some(handle) = shutdown_handle {
node.task_executor.spawn_with_graceful_shutdown_signal(
|shutdown| async move {
// Wait for the node shutdown signal.
let _guard = shutdown.await;
// Drain the async writer (blocks until all queued
// blocks have been written to stdout).
handle.drain();
},
);
}

Ok(())
})
.launch_with_debug_capabilities()
.await?;

handle.wait_for_node_exit().await
},
) {
eprintln!("Error: {err:?}");
std::process::exit(1);
}
Expand Down
28 changes: 28 additions & 0 deletions crates/firehose/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[package]
name = "reth-firehose"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
description = "Firehose integration for Reth: async emission, shutdown wiring, and gap detection"

[lints]
workspace = true

[dependencies]
# reth
reth-stages-types.workspace = true
reth-storage-api.workspace = true

# firehose
firehose-tracer.workspace = true

# misc
clap = { workspace = true, features = ["derive", "env"] }
eyre.workspace = true
tracing.workspace = true

[dev-dependencies]
tempfile.workspace = true
130 changes: 130 additions & 0 deletions crates/firehose/src/args.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
//! CLI arguments for the Firehose integration.
//!
//! These arguments control how the Firehose tracer emits blocks to stdout,
//! where to write the cursor file, and how async emission is configured.

use clap::Args;
use firehose_tracer::EmissionMode;
use std::{path::PathBuf, time::Duration};

/// Firehose emission mode, mirroring [`EmissionMode`] for CLI parsing.
#[derive(Debug, Clone, Default, clap::ValueEnum)]
pub enum EmissionModeArg {
/// Encode and write blocks inline on the calling thread (legacy behaviour).
Blocking,
/// Encode and write blocks in a dedicated background thread with backpressure.
Async,
/// Switch automatically based on block age (catch-up → async, live → blocking).
#[default]
Auto,
}

/// CLI arguments for the Firehose tracer integration.
///
/// Add `#[command(flatten)]` to include these in a `NodeCommand` extension struct.
#[derive(Debug, Clone, Default, Args)]
pub struct FirehoseArgs {
/// Controls when and how encoded blocks are written to stdout.
///
/// - `blocking`: encode → base64 → write, all inline on the calling thread (legacy).
/// - `async`: encode and write in a background thread; backpressure via channel.
/// - `auto`: use async for blocks older than `--firehose.live-threshold`; use blocking for
/// blocks within the live window (default).
#[arg(
id = "firehose.emission-mode",
long = "firehose.emission-mode",
value_name = "MODE",
default_value = "auto",
verbatim_doc_comment
)]
pub emission_mode: EmissionModeArg,

/// Channel capacity for the async emission path.
///
/// The background writer thread will block producers once this many encoded
/// blocks are waiting, providing backpressure. Only relevant for `async` and
/// `auto` modes.
#[arg(
id = "firehose.channel-capacity",
long = "firehose.channel-capacity",
value_name = "N",
default_value_t = 32
)]
pub channel_capacity: usize,

/// Age threshold in seconds used by `auto` emission mode.
///
/// Blocks with a timestamp more than this many seconds behind wall-clock time
/// are considered historical (catch-up) and will use the async path.
/// Blocks within this window are considered live and will use the blocking path.
#[arg(
id = "firehose.live-threshold",
long = "firehose.live-threshold",
value_name = "SECS",
default_value_t = 60
)]
pub live_threshold_secs: u64,

/// Path to the cursor file that tracks the last block successfully emitted to stdout.
///
/// After each block is written the cursor file is updated atomically so that the
/// node can detect gaps after an unclean shutdown and re-emit the missing blocks
/// on the next startup. Defaults to `<datadir>/firehose.cursor` when not set.
#[arg(id = "firehose.cursor-path", long = "firehose.cursor-path", value_name = "PATH")]
pub cursor_path: Option<PathBuf>,
}

impl FirehoseArgs {
/// Convert the parsed CLI args into a [`firehose_tracer::config::Config`].
///
/// `data_dir` is used to derive the default cursor file path when
/// `--firehose.cursor-path` is not specified.
pub fn to_tracer_config(&self, data_dir: &std::path::Path) -> firehose_tracer::config::Config {
let cursor_path =
self.cursor_path.clone().unwrap_or_else(|| data_dir.join("firehose.cursor"));

let emission_mode = match self.emission_mode {
EmissionModeArg::Blocking => EmissionMode::Blocking,
EmissionModeArg::Async => {
EmissionMode::Async { channel_capacity: self.channel_capacity }
}
EmissionModeArg::Auto => EmissionMode::Auto {
channel_capacity: self.channel_capacity,
live_threshold: Duration::from_secs(self.live_threshold_secs),
},
};

firehose_tracer::config::Config::new()
.with_emission_mode(emission_mode)
.with_cursor_path(cursor_path)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn default_emission_mode_is_auto() {
let args = FirehoseArgs::default();
assert!(matches!(args.emission_mode, EmissionModeArg::Auto));
}

#[test]
fn to_tracer_config_uses_data_dir_for_cursor() {
let args = FirehoseArgs::default();
let tmp = tempfile::tempdir().unwrap();
let cfg = args.to_tracer_config(tmp.path());
let expected_cursor = tmp.path().join("firehose.cursor");
assert_eq!(cfg.cursor_path, Some(expected_cursor));
}

#[test]
fn to_tracer_config_respects_explicit_cursor_path() {
let custom = PathBuf::from("/custom/firehose.cursor");
let args = FirehoseArgs { cursor_path: Some(custom.clone()), ..Default::default() };
let tmp = tempfile::tempdir().unwrap();
let cfg = args.to_tracer_config(tmp.path());
assert_eq!(cfg.cursor_path, Some(custom));
}
}
112 changes: 112 additions & 0 deletions crates/firehose/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
//! Reth ↔ Firehose integration.
//!
//! This crate wires the [`firehose_tracer`] library into the reth node by
//! providing:
//!
//! * **`FirehoseArgs`** — clap argument group that exposes `--firehose.*` CLI flags (emission mode,
//! channel capacity, live threshold, cursor path).
//! * **`init_tracer`** — one-shot initialisation that stores the tracer in a process-wide
//! `OnceLock` and returns an optional [`ShutdownHandle`] for the async emission background
//! thread.
//! * **`check_gap_and_re_trace`** — startup hook that detects gaps between the cursor file and the
//! execution stage checkpoint, and re-emits the missing blocks before the sync pipeline resumes.
//!
//! # Usage in `bin/reth/src/main.rs`
//!
//! ```rust,ignore
//! use reth_firehose::{FirehoseArgs, init_tracer, check_gap_and_re_trace};
//!
//! // … inside the async run closure …
//! let handle = builder
//! .node(EthereumNode::default())
//! .on_component_initialized(move |node| {
//! let data_dir = node.config.datadir().data_dir().to_path_buf();
//! let cfg = firehose_args.to_tracer_config(&data_dir);
//! let cursor_path = cfg.cursor_path.clone();
//! let shutdown_handle = init_tracer(cfg);
//!
//! check_gap_and_re_trace(&node.provider, cursor_path.as_deref())?;
//!
//! if let Some(handle) = shutdown_handle {
//! node.task_executor.spawn_with_graceful_shutdown_signal(|shutdown| async move {
//! let _guard = shutdown.await;
//! handle.drain();
//! });
//! }
//! Ok(())
//! })
//! .launch_with_debug_capabilities()
//! .await?;
//! ```

#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]

pub mod args;
pub mod re_trace;

pub use args::FirehoseArgs;
pub use re_trace::{check_gap_and_re_trace, firehose_re_trace_range};

use firehose_tracer::{config::Config, ShutdownHandle, Tracer};
use std::sync::{Mutex, OnceLock};
use tracing::{debug, info};

/// Process-wide Firehose tracer, initialised at most once per process.
pub static GLOBAL_TRACER: OnceLock<Mutex<Tracer>> = OnceLock::new();

/// Initialise the global Firehose tracer with the given configuration.
///
/// Returns a [`ShutdownHandle`] when the emission mode has an async background
/// thread (i.e. [`EmissionMode::Async`] or [`EmissionMode::Auto`]). The caller
/// is responsible for calling [`ShutdownHandle::drain`] before the process exits
/// so that the background thread can flush all buffered blocks.
///
/// # Panics
///
/// Panics if called more than once in the same process. Double-initialisation is
/// treated as a programming error because it would silently discard the first
/// tracer (and any pending shutdown handle), so it is caught eagerly at startup
/// rather than producing silent data loss later.
pub fn init_tracer(config: Config) -> Option<ShutdownHandle> {
info!(
target: "reth::firehose",
cursor_path = ?config.cursor_path,
"Initialising Firehose tracer"
);

let mut tracer = Tracer::new(config);
let shutdown_handle = tracer.shutdown_handle();

debug!(
target: "reth::firehose",
has_async_thread = shutdown_handle.is_some(),
"Firehose tracer created"
);

GLOBAL_TRACER.set(Mutex::new(tracer)).unwrap_or_else(|_| {
panic!("init_tracer called more than once; Firehose tracer already initialised");
});

shutdown_handle
}

/// Returns a reference to the global [`Mutex<Tracer>`], or `None` if the
/// tracer has not been initialised via [`init_tracer`].
///
/// The lock should be held only for the duration of a single tracer method
/// call to minimise contention.
pub fn get_tracer() -> Option<&'static Mutex<Tracer>> {
GLOBAL_TRACER.get()
}

#[cfg(test)]
mod tests {
// NOTE: Because GLOBAL_TRACER is a process-wide OnceLock, unit tests that
// call init_tracer cannot be run in the same process as each other.
// The args and re_trace modules have their own self-contained tests.
}
Loading