Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions crates/data-chain/src/primary/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ pub struct PrimaryHandle {
}

impl PrimaryHandle {
/// Get a clone of the worker sender for bridging
///
/// This is used when spawning Workers in a separate task that needs
/// to forward messages to Primary.
pub fn worker_sender(&self) -> mpsc::Sender<WorkerToPrimary> {
self.worker_sender.clone()
}

/// Send a message from Worker
pub async fn send_from_worker(
&self,
Expand Down
126 changes: 123 additions & 3 deletions crates/node/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub struct TcpPrimaryNetwork {
}

/// Connection to a peer
#[derive(Clone)]
struct PeerConnection {
writer: Arc<Mutex<tokio::io::WriteHalf<TcpStream>>>,
}
Expand Down Expand Up @@ -320,22 +321,28 @@ impl PrimaryNetwork for TcpPrimaryNetwork {
}

/// TCP-based Worker network implementation
#[derive(Clone)]
pub struct TcpWorkerNetwork {
/// Our validator ID
#[allow(dead_code)]
our_id: ValidatorId,
/// Worker ID
#[allow(dead_code)]
worker_id: u8,
/// Peer worker connections
peers: Arc<RwLock<HashMap<ValidatorId, PeerConnection>>>,
/// Peer worker addresses
peer_addrs: HashMap<ValidatorId, SocketAddr>,
/// Incoming message channel
incoming_tx: mpsc::Sender<(ValidatorId, WorkerMessage)>,
}

impl TcpWorkerNetwork {
/// Create a new TCP worker network
pub fn new(our_id: ValidatorId, worker_id: u8, peers: &[PeerConfig]) -> Self {
pub fn new(
our_id: ValidatorId,
worker_id: u8,
peers: &[PeerConfig],
incoming_tx: mpsc::Sender<(ValidatorId, WorkerMessage)>,
) -> Self {
let peer_addrs: HashMap<_, _> = peers
.iter()
.filter_map(|p| {
Expand All @@ -350,6 +357,119 @@ impl TcpWorkerNetwork {
worker_id,
peers: Arc::new(RwLock::new(HashMap::new())),
peer_addrs,
incoming_tx,
}
}

/// Start listening for incoming connections
pub async fn start_listener(self: Arc<Self>, listen_addr: SocketAddr) -> Result<()> {
let listener = TcpListener::bind(listen_addr).await.with_context(|| {
format!(
"Failed to bind worker network listener to {}. \
Port {} may already be in use.",
listen_addr,
listen_addr.port()
)
})?;
info!("Worker {} listening on {}", self.worker_id, listen_addr);

tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((stream, addr)) => {
debug!(
"Worker {} accepted connection from {}",
self.worker_id, addr
);
let self_clone = Arc::clone(&self);
tokio::spawn(async move {
if let Err(e) = self_clone.handle_connection(stream).await {
error!("Worker connection error from {}: {}", addr, e);
}
});
}
Err(e) => {
error!("Worker accept error: {}", e);
}
}
}
});

Ok(())
}

/// Handle an incoming connection
async fn handle_connection(&self, stream: TcpStream) -> Result<()> {
let (mut reader, writer) = tokio::io::split(stream);
let writer = Arc::new(Mutex::new(writer));

let mut buf = BytesMut::with_capacity(4096);

loop {
// Read more data
let n = reader.read_buf(&mut buf).await?;
if n == 0 {
break; // Connection closed
}

// Try to parse messages
while buf.len() >= HEADER_SIZE {
let len = (&buf[..HEADER_SIZE]).get_u32() as usize;
if (len as u64) > MAX_MESSAGE_SIZE {
anyhow::bail!("Message too large: {}", len);
}

if buf.len() < HEADER_SIZE + len {
break; // Need more data
}

buf.advance(HEADER_SIZE);
let msg_bytes = buf.split_to(len);

match bincode::deserialize::<NetworkMessage>(&msg_bytes) {
Ok(NetworkMessage::Worker(worker_msg)) => {
// Extract sender from message where available
// BatchRequest has requestor field, others use ZERO (not needed for response routing)
let from = match &worker_msg {
WorkerMessage::BatchRequest { requestor, .. } => *requestor,
_ => ValidatorId::ZERO,
};

// Store connection if sender is known (for potential future responses)
if from != ValidatorId::ZERO {
let mut peers = self.peers.write().await;
peers.entry(from).or_insert_with(|| PeerConnection {
writer: Arc::clone(&writer),
});
}

// Forward to handler
if self.incoming_tx.send((from, worker_msg)).await.is_err() {
break;
}
}
Ok(_) => {
warn!("Received non-Worker message on Worker connection");
}
Err(e) => {
error!("Failed to deserialize worker message: {}", e);
}
}
}
}

Ok(())
}

/// Connect to all known peers
pub async fn connect_to_all_peers(&self) {
for validator_id in self.peer_addrs.keys().cloned().collect::<Vec<_>>() {
if let Err(e) = self.connect_to_peer(validator_id).await {
warn!(
"Worker {} failed to connect to peer {:?}: {}",
self.worker_id, validator_id, e
);
}
}
}

Expand Down
116 changes: 113 additions & 3 deletions crates/node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::config::NodeConfig;
use crate::execution_bridge::ExecutionBridge;
use crate::network::TcpPrimaryNetwork;
use crate::network::{TcpPrimaryNetwork, TcpWorkerNetwork};
use crate::supervisor::NodeSupervisor;
use crate::util::validator_id_from_bls;
use anyhow::{Context, Result};
Expand All @@ -29,7 +29,8 @@ use cipherbft_consensus::{
use cipherbft_crypto::{BlsKeyPair, BlsPublicKey, Ed25519KeyPair, Ed25519PublicKey};
use cipherbft_data_chain::{
primary::{Primary, PrimaryConfig, PrimaryEvent},
Cut, DclMessage,
worker::{Worker, WorkerConfig},
Cut, DclMessage, WorkerMessage,
};
use cipherbft_execution::ChainConfig;
use cipherbft_types::genesis::Genesis;
Expand Down Expand Up @@ -407,14 +408,123 @@ impl Node {
.collect();

// Spawn Primary task
let (mut primary_handle, _worker_rxs) = Primary::spawn(
let (mut primary_handle, worker_rxs) = Primary::spawn(
primary_config,
bls_pubkeys,
Box::new(TcpPrimaryNetworkAdapter {
network: primary_network,
}),
self.config.num_workers as u8,
);

// Spawn Workers and wire up channels
// Workers receive batches from peers and notify Primary when batches are ready
for (worker_idx, mut from_primary_rx) in worker_rxs.into_iter().enumerate() {
let worker_id = worker_idx as u8;

// Create Worker network with incoming message channel
let (worker_incoming_tx, mut worker_incoming_rx) =
mpsc::channel::<(ValidatorId, WorkerMessage)>(1024);

let worker_network = TcpWorkerNetwork::new(
self.validator_id,
worker_id,
&self.config.peers,
worker_incoming_tx,
);

// Start Worker network listener
if let Some(listen_addr) = self.config.worker_listens.get(worker_id as usize) {
let network_for_listener = Arc::new(worker_network.clone());
let listen_addr = *listen_addr;
tokio::spawn(async move {
if let Err(e) = network_for_listener.start_listener(listen_addr).await {
error!("Failed to start worker {} listener: {}", worker_id, e);
}
});

// Connect to peers after a brief delay to allow listeners to start
let network_for_connect = worker_network.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(500)).await;
network_for_connect.connect_to_all_peers().await;
});
}

// Create Worker config and spawn
let worker_config = WorkerConfig::new(self.validator_id, worker_id);
let mut worker_handle = Worker::spawn(worker_config, Box::new(worker_network));

// Combined bridge task: handles all communication with Worker
// - Primary -> Worker: forward batch requests
// - Worker -> Primary: forward batch digests
// - Network -> Worker: forward peer messages
let token = cancel_token.clone();
let primary_worker_sender = primary_handle.worker_sender();
tokio::spawn(async move {
loop {
tokio::select! {
biased;

_ = token.cancelled() => {
debug!("Worker {} bridge shutting down", worker_id);
break;
}

// Primary -> Worker: forward batch requests and digests
msg = from_primary_rx.recv() => {
match msg {
Some(m) => {
if worker_handle.send_from_primary(m).await.is_err() {
warn!("Worker {} send_from_primary failed", worker_id);
break;
}
}
None => {
debug!("Worker {} primary channel closed", worker_id);
break;
}
}
}

// Worker -> Primary: forward batch availability notifications
msg = worker_handle.recv_from_worker() => {
match msg {
Some(m) => {
if primary_worker_sender.send(m).await.is_err() {
warn!("Worker {} send to primary failed", worker_id);
break;
}
}
None => {
debug!("Worker {} receiver closed", worker_id);
break;
}
}
}

// Network -> Worker: forward peer messages (batches, sync requests)
msg = worker_incoming_rx.recv() => {
match msg {
Some((peer, worker_msg)) => {
if worker_handle.send_from_peer(peer, worker_msg).await.is_err() {
warn!("Worker {} send_from_peer failed", worker_id);
break;
}
}
None => {
debug!("Worker {} network channel closed", worker_id);
break;
}
}
}
}
}
});

info!("Worker {} spawned and wired", worker_id);
}

let (decided_tx, mut decided_rx) = mpsc::channel::<(ConsensusHeight, Cut)>(100);

// Get Ed25519 keypair for Consensus
Expand Down