Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ url = { features = ["serde"], version = "2.5" }
# libsqlite3-sys is kept to control the bundled SQLite linkage.
# tonic-prost is used by generated gRPC code rather than handwritten Rust.
#
# TODO(mirko): remove rpc again once we've completed the refactoring.
ignored = ["libsqlite3-sys", "miden-node-rpc", "tonic-prost"]
ignored = ["libsqlite3-sys", "tonic-prost"]

# Lints are set to warn for development, which are promoted to errors in CI.
[workspace.lints.clippy]
Expand Down
34 changes: 10 additions & 24 deletions bin/network-monitor/src/monitor/tasks.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
//! Task management for the network monitor.

use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;

use anyhow::Result;
use miden_node_proto::clients::RemoteProverClient;
use miden_node_utils::tasks::Tasks as SupervisedTasks;
use tokio::sync::watch::Receiver;
use tokio::sync::{Mutex, watch};
use tokio::task::{Id, JoinSet};
use tracing::debug;

use crate::COMPONENT;
Expand All @@ -24,20 +23,16 @@ use crate::service::{Service, build_tls_client};
use crate::status::{RpcService, ServiceStatus};
use crate::validator::ValidatorService;

/// Task management structure that encapsulates `JoinSet` and component names.
/// Task management structure that supervises named component tasks.
#[derive(Default)]
pub struct Tasks {
handles: JoinSet<()>,
names: HashMap<Id, String>,
handles: SupervisedTasks,
}

impl Tasks {
/// Create a new Tasks instance.
pub fn new() -> Self {
Self {
handles: JoinSet::new(),
names: HashMap::new(),
}
Self { handles: SupervisedTasks::new() }
}

/// Spawn the RPC status checker task.
Expand Down Expand Up @@ -178,33 +173,24 @@ impl Tasks {
pub fn spawn_service<S: Service>(&mut self, svc: S) -> Receiver<ServiceStatus> {
let (tx, rx) = watch::channel(svc.initial_status());
let service_name = svc.name().to_string();
let id = self.handles.spawn(async move { svc.run(tx).await }).id();
self.handles
.spawn_infallible(service_name.clone(), async move { svc.run(tx).await });
debug!(target: COMPONENT, service = %service_name, "spawned service");
self.names.insert(id, service_name);
rx
}

/// Spawn the HTTP frontend server.
pub fn spawn_http_server(&mut self, server_state: ServerState, config: &MonitorConfig) {
let config = config.clone();
let id = self.handles.spawn(async move { serve(server_state, config).await }).id();
self.names.insert(id, "frontend".to_string());
self.handles
.spawn_infallible("frontend", async move { serve(server_state, config).await });
}

/// Handles the failure of a task.
///
/// Waits for any task to complete or fail and returns an error. Since components are
/// expected to run indefinitely, any task completion is treated as fatal.
pub async fn handle_failure(&mut self) -> Result<()> {
let component_result =
self.handles.join_next_with_id().await.expect("join set is not empty");

let (id, err) = match component_result {
Ok((id, ())) => (id, anyhow::anyhow!("component completed unexpectedly")),
Err(join_err) => (join_err.id(), anyhow::Error::from(join_err)),
};
let component_name = self.names.get(&id).map_or("unknown", String::as_str);

Err(err.context(format!("component {component_name} failed")))
pub async fn handle_failure(mut self) -> Result<()> {
self.handles.join_next_as_error().await
}
}
1 change: 1 addition & 0 deletions bin/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ miden-node-store = { workspace = true }
miden-node-utils = { workspace = true }
miden-protocol = { workspace = true }
tokio = { features = ["macros", "net", "rt-multi-thread"], workspace = true }
tonic = { default-features = false, workspace = true }
url = { workspace = true }

[dev-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions bin/node/src/commands/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use miden_node_block_producer::{
DEFAULT_BATCH_INTERVAL,
DEFAULT_BLOCK_INTERVAL,
DEFAULT_MAX_BATCHES_PER_BLOCK,
DEFAULT_MAX_CONCURRENT_PROOFS,
DEFAULT_MAX_TXS_PER_BATCH,
};
use miden_node_store::DEFAULT_MAX_CONCURRENT_PROOFS;
use miden_node_utils::clap::duration_to_human_readable_string;
use url::Url;

Expand Down Expand Up @@ -74,7 +74,7 @@ mod tests {
block: BlockOptions {
interval: DEFAULT_BLOCK_INTERVAL,
max_batches,
max_concurrent_proofs: miden_node_store::DEFAULT_MAX_CONCURRENT_PROOFS,
max_concurrent_proofs: miden_node_block_producer::DEFAULT_MAX_CONCURRENT_PROOFS,
},
block_prover: BlockProverOptions { url: None },
mempool: MempoolOptions {
Expand Down
4 changes: 2 additions & 2 deletions bin/node/src/commands/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::path::{Path, PathBuf};

use anyhow::Context;
use miden_node_store::genesis::GenesisBlock;
use miden_node_store::{DataDirectory, Db, Store};
use miden_node_store::{DataDirectory, Db, State};
use miden_node_utils::fs::ensure_empty_directory;
use miden_protocol::block::SignedBlock;
use miden_protocol::utils::serde::Deserializable;
Expand Down Expand Up @@ -38,7 +38,7 @@ pub fn bootstrap_store(data_directory: &Path, genesis_block_path: &Path) -> anyh
let genesis_block =
GenesisBlock::try_from(signed_block).context("genesis block validation failed")?;

Store::bootstrap(genesis_block, data_directory)
State::bootstrap(genesis_block, data_directory)
}

// MIGRATE
Expand Down
4 changes: 2 additions & 2 deletions bin/node/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ impl Command {
match self {
Command::Bootstrap(bootstrap_command) => bootstrap_command.handle(),
Command::Migrate(migrate_command) => migrate_command.handle().await,
Command::Sequencer(sequencer_command) => sequencer_command.handle(),
Command::Full(full_node_command) => full_node_command.handle(),
Command::Sequencer(sequencer_command) => sequencer_command.handle().await,
Command::Full(full_node_command) => full_node_command.handle().await,
}
}
}
118 changes: 84 additions & 34 deletions bin/node/src/commands/modes.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
use std::sync::Arc;

use anyhow::Context;
use miden_node_block_producer::{RpcSync, Sequencer};
use miden_node_proto::clients::{Builder, NtxBuilderClient, RpcClient, ValidatorClient};
use miden_node_rpc::{Rpc, RpcMode};
use miden_node_store::State;
use miden_node_utils::tasks::Tasks;
use tokio::net::TcpListener;
use url::Url;

use super::block_producer::BlockProducerOptions;
use super::rpc::SyncOptions;
use super::runtime::RuntimeOptions;
use super::runtime::{RuntimeConfig, RuntimeOptions};
use super::store::StoreOptions;

// RUNTIME MODES
Expand All @@ -25,27 +33,43 @@ pub struct SequencerCommand {
}

impl SequencerCommand {
pub fn handle(self) -> anyhow::Result<()> {
pub async fn handle(self) -> anyhow::Result<()> {
let runtime = self.runtime.runtime_config(&self.store);
self.block_producer.validate()?;
let validator = self.external_services.validator_client();
let ntx_builder = self.external_services.ntx_builder_client();
let _ = (
runtime.rpc_listen,
runtime.data_directory,
validator,
ntx_builder,
self.block_producer.block_prover.url,
runtime.database_options,
runtime.external_grpc_options,
runtime.storage_options,
self.block_producer.block.max_concurrent_proofs,
);

anyhow::bail!(
"sequencer mode runtime composition is not implemented yet; this stage only defines \
the CLI"
)
let network_tx_auth = self.runtime.rpc.network_tx_auth()?;
let state = load_state(&runtime).await?;
let _disk_monitor = state.spawn_disk_monitor();

let sequencer = Sequencer {
store: Arc::clone(&state),
validator_url: self.external_services.validator_url.clone(),
batch_prover_url: self.block_producer.batch.prover_url,
block_prover_url: self.block_producer.block_prover.url,
batch_interval: self.block_producer.batch.interval,
block_interval: self.block_producer.block.interval,
max_txs_per_batch: self.block_producer.batch.max_txs,
max_batches_per_block: self.block_producer.block.max_batches,
max_concurrent_proofs: self.block_producer.block.max_concurrent_proofs,
mempool_tx_capacity: self.block_producer.mempool.tx_capacity,
Copy link
Copy Markdown
Collaborator

@sergerad sergerad May 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe the self.block_producer and Sequencer could use the same struct for these shared fields?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure how, unless you mean the block producer should own the clap representation directly?

I'd prefer not to do that since how we organise the CLI isn't necessarily how things work in the block producer, so I don't want to couple them.

Or put differently, ideally we would keep the code type separate from the interface boundary, in this case the CLI API.

}
.spawn()
.await
.context("failed to spawn sequencer")?;
let block_producer = sequencer.api();

let rpc = Rpc {
listener: bind_rpc(runtime.rpc_listen).await?,
store: state,
mode: RpcMode::sequencer(block_producer, self.external_services.validator_client()),
ntx_builder: Some(self.external_services.ntx_builder_client()),
grpc_options: runtime.external_grpc_options,
network_tx_auth,
};
let mut tasks = Tasks::new();
tasks.spawn("sequencer", sequencer.wait());
tasks.spawn("RPC server", rpc.serve());

tasks.join_next_as_error().await
}
}

Expand Down Expand Up @@ -95,22 +119,30 @@ pub struct FullNodeCommand {
}

impl FullNodeCommand {
pub fn handle(self) -> anyhow::Result<()> {
pub async fn handle(self) -> anyhow::Result<()> {
let runtime = self.runtime.runtime_config(&self.store);
let source_rpc = self.sync.source_rpc_client();
let _ = (
runtime.rpc_listen,
runtime.data_directory,
runtime.database_options,
runtime.external_grpc_options,
runtime.storage_options,
source_rpc,
);

anyhow::bail!(
"full node mode block-stream sync is not implemented yet; this stage only defines the \
CLI"
)
let network_tx_auth = self.runtime.rpc.network_tx_auth()?;
let state = load_state(&runtime).await?;
let _disk_monitor = state.spawn_disk_monitor();
let sync = RpcSync {
state: Arc::clone(&state),
source_rpc: source_rpc.clone(),
};

let rpc = Rpc {
listener: bind_rpc(runtime.rpc_listen).await?,
store: state,
mode: RpcMode::full_node(source_rpc),
ntx_builder: None,
grpc_options: runtime.external_grpc_options,
network_tx_auth,
};
let mut tasks = Tasks::new();
tasks.spawn("RPC sync", sync.run());
tasks.spawn("RPC server", rpc.serve());

tasks.join_next_as_error().await
}
}

Expand All @@ -125,3 +157,21 @@ impl SyncOptions {
.connect_lazy::<RpcClient>()
}
}

async fn load_state(runtime: &RuntimeConfig) -> anyhow::Result<Arc<State>> {
let state = State::load_with_database_options(
&runtime.data_directory,
runtime.storage_options.clone(),
runtime.database_options,
)
.await
.context("failed to load state")?;

Ok(Arc::new(state))
}

async fn bind_rpc(listen: std::net::SocketAddr) -> anyhow::Result<TcpListener> {
TcpListener::bind(listen)
.await
.with_context(|| format!("failed to bind RPC listener to {listen}"))
}
15 changes: 15 additions & 0 deletions bin/node/src/commands/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use std::net::SocketAddr;
use std::num::{NonZeroU32, NonZeroU64};
use std::time::Duration;

use anyhow::Context;
use miden_node_rpc::NetworkTxAuth;
use miden_node_utils::clap::{GrpcOptionsExternal, duration_to_human_readable_string};
use tonic::metadata::AsciiMetadataValue;
use url::Url;

// RPC OPTIONS
Expand Down Expand Up @@ -40,6 +43,18 @@ impl RpcOptions {
max_concurrent_connections: self.rate_limit.max_concurrent_connections,
}
}

pub(super) fn network_tx_auth(&self) -> anyhow::Result<Option<NetworkTxAuth>> {
self.network_tx_auth_header_value
.as_deref()
.map(|value| {
value
.parse::<AsciiMetadataValue>()
.map(NetworkTxAuth)
.context("invalid rpc.network-tx-auth-header-value")
})
.transpose()
}
}

#[derive(clap::Args, Clone, Debug)]
Expand Down
Loading
Loading