Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
510 changes: 183 additions & 327 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 6 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ members = [
"k8s-intf",
"k8s-less",
"left-right-tlcache",
"lifecycle",
"lpm",
"mgmt",
"nat",
Expand Down Expand Up @@ -67,7 +68,7 @@ config = { path = "./config", package = "dataplane-config", features = [] }
dpdk = { path = "./dpdk", package = "dataplane-dpdk", features = [] }
dpdk-sys = { path = "./dpdk-sys", package = "dataplane-dpdk-sys", features = [] }
dpdk-sysroot-helper = { path = "./dpdk-sysroot-helper", package = "dataplane-dpdk-sysroot-helper", features = [] }
dplane-rpc = { git = "https://github.com/githedgehog/dplane-rpc.git", rev = "e8fc33db10e1d00785f2a2b90cbadcad7900f200", features = [] }
dplane-rpc = { git = "https://github.com/githedgehog/dplane-rpc.git", branch = "pr/daniel-noland/bumps", features = [] }
errno = { path = "./errno", package = "dataplane-errno", features = [] }
flow-entry = { path = "./flow-entry", package = "dataplane-flow-entry", features = [] }
flow-filter = { path = "./flow-filter", package = "dataplane-flow-filter", features = [] }
Expand All @@ -78,6 +79,7 @@ interface-manager = { path = "./interface-manager", package = "dataplane-interfa
k8s-intf = { path = "./k8s-intf", package = "dataplane-k8s-intf", default-features = false, features = [] }
k8s-less = { path = "./k8s-less", package = "dataplane-k8s-less", features = [] }
left-right-tlcache = { path = "./left-right-tlcache", package = "dataplane-left-right-tlcache", features = [] }
lifecycle = { path = "./lifecycle", package = "dataplane-lifecycle", features = [] }
lpm = { path = "./lpm", package = "dataplane-lpm", features = [] }
mgmt = { path = "./mgmt", package = "dataplane-mgmt", features = [] }
nat = { path = "./nat", package = "dataplane-nat", features = [] }
Expand Down Expand Up @@ -114,7 +116,6 @@ clap = { version = "4.6.1", default-features = true, features = [] }
color-eyre = { version = "0.6.5", default-features = false, features = [] }
colored = { version = "3.1.1", default-features = false, features = [] }
crossbeam-utils = { version = "0.8.21", default-features = false, features = [] }
ctrlc = { version = "3.5.2", default-features = false, features = [] }
dashmap = { version = "6.2.1", default-features = false, features = [] }
derive_builder = { version = "0.20.2", default-features = false, features = [] }
dotenvy = { version = "0.15.7", default-features = false, features = [] }
Expand All @@ -138,7 +139,7 @@ kube = { version = "3.1.0", default-features = false, features = [] }
kube-core = { version = "3.1.0", default-features = false, features = [] }
#left-right = { version = "0.11.7", default-features = false, features = [] }
left-right = { git = "https://github.com/githedgehog/left-right.git", branch = "fredi/fix-writehandle-drop", default-features = false, features = [] }
libc = { version = "1.0.0-alpha.3", default-features = false, features = [] }
libc = { version = "0.2.186", default-features = false, features = [] }
linkme = { version = "0.3.36", default-features = false, features = [] }
log = { version = "0.4.30", default-features = false, features = [] } # TODO: try to remove this
loom = { version = "0.7.2", default-features = false, features = [] }
Expand All @@ -148,7 +149,7 @@ metrics-exporter-prometheus = { version = "0.18.3", default-features = false, fe
miette = { version = "7.6.0", default-features = false, features = [] }
mio = { version = "1.2.1", default-features = false, features = [] }
multi_index_map = { version = "0.15.1", default-features = false, features = [] }
n-vm = { git = "https://github.com/githedgehog/testn.git", tag = "v0.0.9", default-features = false, features = [], package = "n-vm" }
n-vm = { git = "https://github.com/githedgehog/testn.git", tag = "v0.0.10", default-features = false, features = [], package = "n-vm" }
netdev = { version = "0.43.0", default-features = false, features = [] }
netgauze-bgp-pkt = { version = "0.12.1", features = [] }
netgauze-bmp-pkt = { version = "0.12.1", features = [] }
Expand All @@ -172,7 +173,7 @@ rapidhash = { version = "4.4.1", default-features = false, features = [] }
reedline = { version = "0.48.0", default-features = false, features = [] }
rkyv = { version = "0.8.16", default-features = false, features = [] }
roaring = { version = "0.11.4", default-features = false, features = [] }
rtnetlink = { git = "https://github.com/githedgehog/rtnetlink.git", branch = "hh/tc-actions3", default-features = false, features = [] }
rtnetlink = { git = "https://github.com/githedgehog/rtnetlink.git", branch = "hh/tc-actions4", default-features = false, features = [] }
rustls = { version = "0.23.40", default-features = false, features = [] }
schemars = { version = "1", default-features = false, features = [] }
serde = { version = "1.0.228", default-features = false, features = [] }
Expand Down
5 changes: 2 additions & 3 deletions dataplane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,18 @@ axum = { workspace = true, features = ["http1", "tokio"] }
axum-server = { workspace = true }
concurrency = { workspace = true }
config = { workspace = true }
ctrlc = { workspace = true, features = ["termination"] }
dyn-iter = { workspace = true }
flow-entry = { workspace = true }
flow-filter = { workspace = true }
futures = { workspace = true }
hyper = { workspace = true }
hyper-util = { workspace = true }
id = { workspace = true }
lifecycle = { workspace = true }
linkme = { workspace = true }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
mgmt = { workspace = true }
mio = { workspace = true, features = ["os-ext", "net"] }
nat = { workspace = true }
net = { workspace = true, features = ["test_buffer"] }
nix = { workspace = true, features = ["socket", "hostname"] }
Expand All @@ -46,7 +45,7 @@ rtnetlink = { workspace = true, features = ["default", "tokio"] }
serde = { workspace = true, features = ["derive"] }
stats = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
Comment thread
daniel-noland marked this conversation as resolved.
tracectl = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, default-features = true }
Expand Down
106 changes: 54 additions & 52 deletions dataplane/src/drivers/kernel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,95 +18,97 @@ mod worker;

use concurrency::sync::Arc;
use concurrency::thread;
#[allow(unused_imports)] // used under loom/shuttle backends
use concurrency::thread::BuilderExt;
use lifecycle::Subsystem;
use net::buffer::test_buffer::TestBuffer;
use pipeline::DynPipeline;
use tracectl::trace_target;
#[allow(unused)]
use tracing::{debug, error, info, trace, warn};

use super::DriverError;
use super::tokio_util::run_in_local_tokio_runtime;
use kif::{Kif, bring_kifs_up};
use worker::Worker;

trace_target!("kernel-driver", LevelFilter::INFO, &["driver"]);

/// Main structure representing the kernel driver.
/// This driver:
/// * receives raw frames via `AF_PACKET`, parses to `Packet<TestBuffer>`
/// * selects a worker by symmetric flow hash
/// * workers run independent pipelines and send processed packets back
/// * dispatcher serializes & transmits on the chosen outgoing interface
/// AF_PACKET-based kernel driver. Spawns N workers with symmetric-hash
/// fanout and per-worker pipelines.
pub struct DriverKernel;

#[allow(clippy::cast_possible_truncation)]
impl DriverKernel {
/// Spawn `workers` processing threads, each with its own pipeline instance.
///
/// Returns:
/// - `Arc<Vec<Sender<Packet<TestBuffer>>>>` one sender per worker (dispatcher -> worker)
/// - `Receiver<Packet<TestBuffer>>` a single queue for processed packets (worker -> dispatcher)
fn spawn_workers(
/// Spawn `num_workers` worker threads into `scope`, each with its own
/// pipeline. Bails on the first spawn failure; workers that did spawn
/// drain via the scope join.
fn spawn_workers_scoped<'scope>(
scope: &'scope thread::Scope<'scope, '_>,
workers_subsystem: &Subsystem,
num_workers: usize,
setup_pipeline: &Arc<dyn Send + Sync + Fn() -> DynPipeline<TestBuffer>>,
interfaces: &[Kif],
) -> Vec<thread::JoinHandle<Result<(), std::io::Error>>> {
) -> Result<Vec<thread::ScopedJoinHandle<'scope, Result<(), std::io::Error>>>, std::io::Error>
{
info!("Spawning {num_workers} workers");
let mut workers = Vec::new();
for wid in 0..num_workers {
let builder = thread::Builder::new().name(format!("dp-worker-{wid}"));
let mut worker = Worker::new(wid, num_workers, setup_pipeline);
match worker.start(builder, interfaces) {
Ok(handle) => workers.push(handle),
Err(e) => {
error!("Failed to start worker {wid}: {e}");
}
}
}
workers
(0..num_workers)
.map(|wid| {
let builder = thread::Builder::new().name(format!("dp-worker-{wid}"));
Worker::new(wid, num_workers, setup_pipeline, workers_subsystem.clone())
.start(scope, builder, interfaces)
})
.collect()
}

/// Starts the kernel driver, spawns worker threads, and runs the dispatcher loop.
///
/// - `args`: kernel driver CLI parameters (e.g., `--interface` list)
/// - `workers`: number of worker threads / pipelines
/// - `setup_pipeline`: factory returning a **fresh** `DynPipeline<TestBuffer>` per worker
/// Spawn worker threads + supervisor into `scope`. The scope joins
/// all driver threads on closure return.
///
/// # Errors
/// Returns [`DriverError`] in case the driver fails to start successfully.
pub fn start(
stop_tx: std::sync::mpsc::Sender<i32>,
/// Returns [`DriverError`] on interface setup or thread spawn failure.
pub fn start<'scope>(
scope: &'scope thread::Scope<'scope, '_>,
workers_subsystem: &Subsystem,
args: impl IntoIterator<Item = impl AsRef<str> + Clone>,
num_workers: usize,
setup_pipeline: &Arc<dyn Send + Sync + Fn() -> DynPipeline<TestBuffer>>,
) -> Result<(), DriverError> {
// A current_thread runtime built inside another tokio runtime
// panics; catch nesting in debug.
debug_assert!(
tokio::runtime::Handle::try_current().is_err(),
"DriverKernel::start must not be invoked from within a tokio runtime context"
);

info!("Collecting interfaces from config");
let interfaces = kif::get_interfaces(args)?;

// ensure that the kernel interfaces for rx/tx are up
run_in_local_tokio_runtime(async || bring_kifs_up(interfaces.as_slice()).await)?;
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?
.block_on(bring_kifs_up(interfaces.as_slice()))?;

// Spawn workers
let worker_handles =
Self::spawn_workers(num_workers, setup_pipeline, interfaces.as_slice());
let worker_handles = Self::spawn_workers_scoped(
scope,
workers_subsystem,
num_workers,
setup_pipeline,
interfaces.as_slice(),
)?;

let control_builder = thread::Builder::new().name("kernel-driver-controller".to_string());
control_builder.spawn(move || {
// The supervisor just joins-and-logs; worker fatal reporting is
// handled by the `ExitGuard` inside each worker thread.
let supervisor_builder =
thread::Builder::new().name("kernel-driver-supervisor".to_string());
supervisor_builder.spawn_scoped(scope, move || {
for (id, handle) in worker_handles.into_iter().enumerate() {
info!("Waiting for workers to finish");
info!("Waiting for worker {id} to finish");
match handle.join() {
Ok(result) => match result {
Ok(()) => info!("Worker {id} exited successfully"),
Err(e) => error!("Worker {id} exited with error: {e}"),
},
Err(e) => error!("Unable to spawn worker {id} error: {e:?}"),
Ok(Ok(())) => info!("Worker {id} exited successfully"),
Ok(Err(e)) => error!("Worker {id} exited with error: {e}"),
Err(panic_payload) => error!("Worker {id} panicked: {panic_payload:?}"),
}
}

// Exiting with error as it's not expected for all workers to finish
error!("All workers finished unexpectedly");
#[allow(clippy::expect_used)]
stop_tx.send(1).expect("Failed to send stop signal");
info!("All workers joined");
})?;

Ok(())
Expand Down
Loading
Loading