Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ repository = "https://github.com/decipherhub/cipherbft"
[workspace.dependencies]
# Async runtime
tokio = { version = "1.35", features = ["full"] }
tokio-util = { version = "0.7", features = ["codec"] }
tokio-util = { version = "0.7", features = ["codec", "rt"] }
async-trait = "0.1"

# ABCI & Tendermint types
Expand Down
1 change: 1 addition & 0 deletions crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ alloy-primitives = { version = "1", features = ["serde"] }

# Async runtime
tokio = { workspace = true, features = ["full", "signal"] }
tokio-util = { workspace = true }
async-trait = { workspace = true }
futures = { workspace = true }

Expand Down
2 changes: 2 additions & 0 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod genesis_bootstrap;
pub mod key_cli;
pub mod network;
pub mod node;
pub mod supervisor;
pub mod util;

pub use client_config::ClientConfig;
Expand All @@ -26,3 +27,4 @@ pub use genesis_bootstrap::{
};
pub use key_cli::{execute_keys_command, KeysCommand};
pub use node::Node;
pub use supervisor::{NodeSupervisor, ShutdownError};
78 changes: 57 additions & 21 deletions crates/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use cipherbft_crypto::{BlsKeyPair, BlsSecretKey, Ed25519KeyPair, Ed25519SecretKe
use cipherd::key_cli::KeyringBackendArg;
use cipherd::{
execute_keys_command, generate_local_configs, GenesisGenerator, GenesisGeneratorConfig,
GenesisLoader, KeysCommand, Node, NodeConfig, ValidatorKeyFile, CIPHERD_HOME_ENV,
DEFAULT_HOME_DIR,
GenesisLoader, KeysCommand, Node, NodeConfig, NodeSupervisor, ValidatorKeyFile,
CIPHERD_HOME_ENV, DEFAULT_HOME_DIR,
};
use clap::{Parser, Subcommand};
use std::path::PathBuf;
Expand Down Expand Up @@ -720,7 +720,24 @@ async fn cmd_start(
let mut node = Node::new(config, bls_keypair, ed25519_keypair)?;
node.bootstrap_validators_from_genesis(&genesis)?;

node.run().await?;
// Create a supervisor for structured task management and graceful shutdown
let supervisor = NodeSupervisor::new();

// Set up signal handling for graceful shutdown
let shutdown_supervisor = supervisor.clone();
tokio::spawn(async move {
if let Err(e) = tokio::signal::ctrl_c().await {
tracing::error!("Failed to listen for Ctrl+C: {}", e);
return;
}
info!("Received Ctrl+C, initiating graceful shutdown...");
if let Err(e) = shutdown_supervisor.shutdown().await {
tracing::warn!("Shutdown warning: {}", e);
}
});

// Run node with the supervisor for coordinated task management
node.run_with_supervisor(supervisor).await?;

Ok(())
}
Expand Down Expand Up @@ -908,11 +925,13 @@ async fn cmd_testnet_start(num_validators: usize, duration: u64) -> Result<()> {
})
.collect();

// Create and start nodes
let mut handles = Vec::new();
// Create a shared supervisor for coordinated task management across all nodes
// This ensures graceful shutdown propagates to all validators simultaneously
let supervisor = NodeSupervisor::new();

// Create and start nodes under the shared supervisor
for tc in test_configs {
let validator_id = tc
let _validator_id = tc
.config
.validator_id
.expect("test config should have validator_id");
Expand All @@ -924,33 +943,50 @@ async fn cmd_testnet_start(num_validators: usize, duration: u64) -> Result<()> {
node.add_validator(*vid, bls_pubkey.clone(), ed25519_pubkey.clone());
}

let handle = tokio::spawn(async move {
if let Err(e) = node.run().await {
tracing::error!("Node {:?} error: {}", validator_id, e);
}
});

handles.push(handle);
// Spawn each node under the shared supervisor for coordinated lifecycle management
let node_supervisor = supervisor.clone();
supervisor.spawn(
// Note: Using a static string for task name as required by spawn()
"validator-node",
async move {
node.run_with_supervisor(node_supervisor).await?;
Ok(())
},
);

// Stagger node startup slightly
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}

info!("All {} nodes started", num_validators);
info!("All {} nodes started under shared supervisor", num_validators);

// Run for specified duration or until Ctrl+C
if duration > 0 {
// Wait for shutdown trigger (duration or Ctrl+C)
let shutdown_reason = if duration > 0 {
info!("Running for {} seconds...", duration);
tokio::time::sleep(tokio::time::Duration::from_secs(duration)).await;
info!("Duration elapsed, shutting down...");
tokio::select! {
_ = tokio::time::sleep(tokio::time::Duration::from_secs(duration)) => {
"duration elapsed"
}
_ = tokio::signal::ctrl_c() => {
"Ctrl+C received"
}
}
} else {
info!("Press Ctrl+C to stop...");
tokio::signal::ctrl_c().await?;
info!("Received shutdown signal...");
"Ctrl+C received"
};

info!("Shutdown triggered: {}", shutdown_reason);

// Initiate graceful shutdown through the supervisor
// This propagates cancellation to all nodes in the correct order
info!("Initiating coordinated shutdown of all {} validators...", num_validators);
if let Err(e) = supervisor.shutdown().await {
tracing::warn!("Shutdown warning: {}", e);
}

// All handles will be dropped, tasks will be cancelled
info!("Testnet stopped");
info!("Testnet stopped gracefully");

Ok(())
}
Expand Down
105 changes: 98 additions & 7 deletions crates/node/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,24 @@
//! Node runner - ties Primary, Workers, and Network together
//!
//! # Task Supervision
//!
//! The node uses a [`NodeSupervisor`] for structured task management:
//! - All background tasks are tracked and can be cancelled gracefully
//! - Shutdown follows a specific order to ensure clean state
//! - Critical task failures trigger coordinated shutdown
//!
//! # Shutdown Order
//!
//! 1. Stop accepting new network connections
//! 2. Drain in-flight consensus rounds (via cancellation signal)
//! 3. Flush pending storage writes
//! 4. Close database connections
//! 5. Exit

use crate::config::NodeConfig;
use crate::execution_bridge::ExecutionBridge;
use crate::network::TcpPrimaryNetwork;
use crate::supervisor::NodeSupervisor;
use crate::util::validator_id_from_bls;
use anyhow::{Context, Result};
use cipherbft_consensus::{
Expand All @@ -27,6 +43,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};

/// Validator public key information for both DCL and Consensus layers
Expand Down Expand Up @@ -266,13 +283,34 @@ impl Node {
Ok(self)
}

/// Run the node
/// Run the node with a default supervisor.
///
/// Creates a new [`NodeSupervisor`] and runs the node until shutdown is triggered
/// (e.g., via Ctrl+C signal).
pub async fn run(self) -> Result<()> {
let supervisor = NodeSupervisor::new();
self.run_with_supervisor(supervisor).await
}

/// Run the node with a provided supervisor.
///
/// This allows external control over task supervision, useful for:
/// - Testing with custom shutdown timing
/// - Coordinating multiple nodes in a single process
/// - Custom shutdown ordering
///
/// # Arguments
///
/// * `supervisor` - The supervisor that manages task lifecycle
pub async fn run_with_supervisor(self, supervisor: NodeSupervisor) -> Result<()> {
info!("Starting node with validator ID: {:?}", self.validator_id);

// Create data directory if needed
std::fs::create_dir_all(&self.config.data_dir)?;

// Get cancellation token for graceful shutdown
let cancel_token = supervisor.cancellation_token();

// Create channels for Primary
let (primary_incoming_tx, mut primary_incoming_rx) =
mpsc::channel::<(ValidatorId, DclMessage)>(1000);
Expand All @@ -295,16 +333,25 @@ impl Node {
)
})?;

// Connect to peers (with retry)
tokio::spawn({
// Connect to peers (with retry) - supervised task
supervisor.spawn_cancellable("peer-connector", {
let network = Arc::clone(&primary_network);
async move {
move |token| async move {
// Initial delay to let other nodes start
tokio::time::sleep(Duration::from_secs(1)).await;
loop {
network.connect_to_all_peers().await;
tokio::time::sleep(Duration::from_secs(5)).await;
tokio::select! {
biased;
_ = token.cancelled() => {
info!("Peer connector shutting down");
break;
}
_ = tokio::time::sleep(Duration::from_secs(5)) => {
network.connect_to_all_peers().await;
}
}
}
Ok(())
}
});

Expand Down Expand Up @@ -478,9 +525,53 @@ impl Node {
// Clone execution bridge for use in event loop
let execution_bridge = self.execution_bridge.clone();

// Main event loop
// Run the main event loop with graceful shutdown support
let result = Self::run_event_loop(
cancel_token,
&mut primary_incoming_rx,
&mut primary_handle,
&cut_tx,
&mut decided_rx,
execution_bridge,
)
.await;

// Graceful shutdown sequence
info!("Shutting down node components...");

// Step 1: Signal Primary to stop (it will stop accepting new batches)
info!("Stopping Primary...");
primary_handle.shutdown().await;

// Step 2: Wait for supervisor to complete all tracked tasks
info!("Waiting for supervised tasks to complete...");
if let Err(e) = supervisor.shutdown().await {
warn!("Some tasks did not complete cleanly: {}", e);
}

info!("Node shutdown complete");
result
}

/// Internal event loop that handles messages and can be cancelled.
async fn run_event_loop(
cancel_token: CancellationToken,
primary_incoming_rx: &mut mpsc::Receiver<(ValidatorId, DclMessage)>,
primary_handle: &mut cipherbft_data_chain::primary::PrimaryHandle,
cut_tx: &mpsc::Sender<Cut>,
decided_rx: &mut mpsc::Receiver<(ConsensusHeight, Cut)>,
execution_bridge: Option<Arc<ExecutionBridge>>,
) -> Result<()> {
loop {
tokio::select! {
biased;

// Check for shutdown signal first (high priority)
_ = cancel_token.cancelled() => {
info!("Received shutdown signal, exiting event loop");
return Ok(());
}

// Incoming network messages -> forward to Primary
Some((from, msg)) = primary_incoming_rx.recv() => {
debug!("Received message from {:?}: {:?}", from, msg.type_name());
Expand Down
Loading
Loading