Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ criterion = "0.8"
ctor = "0.10"
darling = "0.23"
dyn-clone = "1.0"
foldhash = { version = "0.1", default-features = false }
hashbrown = { version = "0.15", default-features = false, features = ["inline-more"] }
heapless = "0.8"
ingot = "0.1.1"
ipnetwork = { version = "0.21", default-features = false }
Expand Down
4 changes: 4 additions & 0 deletions lib/opte/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ engine = [
"dep:cfg-if",
"dep:crc32fast",
"dep:derror-macro",
"dep:foldhash",
"dep:hashbrown",
"dep:heapless",
"dep:itertools",
"dep:smoltcp",
Expand Down Expand Up @@ -42,6 +44,8 @@ bitflags = { workspace = true , features = ["serde"] }
cfg-if = { workspace = true, optional = true }
crc32fast = { workspace = true, optional = true }
dyn-clone.workspace = true
foldhash = { workspace = true, optional = true }
hashbrown = { workspace = true, optional = true }
heapless = { workspace = true, optional = true }
itertools = { workspace = true, optional = true }
postcard.workspace = true
Expand Down
129 changes: 126 additions & 3 deletions lib/opte/src/engine/flow_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use super::packet::InnerFlowId;
use crate::ddi::time::MILLIS;
use crate::ddi::time::Moment;
use alloc::boxed::Box;
use alloc::collections::BTreeMap;
use alloc::ffi::CString;
use alloc::string::String;
use alloc::sync::Arc;
Expand All @@ -23,6 +22,8 @@ use core::num::NonZeroU32;
use core::sync::atomic::AtomicBool;
use core::sync::atomic::AtomicU64;
use core::sync::atomic::Ordering;
use foldhash::fast::FixedState;
use hashbrown::HashMap;
#[cfg(all(not(feature = "std"), not(test)))]
use illumos_sys_hdrs::uintptr_t;
use opte_api::OpteError;
Expand All @@ -37,6 +38,9 @@ pub const FLOW_DEF_TTL: Ttl = Ttl::new_seconds(FLOW_DEF_EXPIRE_SECS);

pub const FLOW_TABLE_DEF_MAX_ENTRIES: u32 = 8192;

/// Capacity below which a table's allocation is left alone during reclaim.
const FT_MIN_CAPACITY: usize = 64;

type Result<T> = core::result::Result<T, OpteError>;

/// The Time To Live in milliseconds.
Expand Down Expand Up @@ -78,13 +82,16 @@ impl<S: Dump> ExpiryPolicy<S> for Ttl {

pub type FlowTableDump<T> = Vec<(InnerFlowId, T)>;

/// Per-table flow hasher, seeded from the kernel PRNG in [`hash_seed`].
type FlowHasher = FixedState;

#[derive(Debug)]
pub struct FlowTable<S: Dump> {
port_c: CString,
name_c: CString,
limit: NonZeroU32,
policy: Box<dyn ExpiryPolicy<S>>,
map: BTreeMap<InnerFlowId, Arc<FlowEntry<S>>>,
map: HashMap<InnerFlowId, Arc<FlowEntry<S>>, FlowHasher>,
}

impl<S> FlowTable<S>
Expand Down Expand Up @@ -150,6 +157,8 @@ where
for (flow_id, entry) in &self.map {
flows.push((*flow_id, entry.dump()));
}
// HashMap order is arbitrary; sort for stable dumps.
flows.sort_unstable_by_key(|(flow_id, _)| *flow_id);
flows
}

Expand Down Expand Up @@ -182,9 +191,21 @@ where
true
});

self.reclaim();
expired
}

/// Shrinks the backing allocation toward `2 * len` once a table drains to
/// under a quarter full. Only firing on a deep drain keeps tables near
/// capacity from thrashing and bounds how often the rehash runs under the
/// port lock.
fn reclaim(&mut self) {
let target = self.map.len().saturating_mul(2).max(FT_MIN_CAPACITY);
if self.map.capacity() > target.saturating_mul(2) {
self.map.shrink_to(target);
}
}

/// Get the maximum number of entries this flow table may hold.
pub fn get_limit(&self) -> NonZeroU32 {
self.limit
Expand Down Expand Up @@ -220,7 +241,7 @@ where
name_c: CString::new(name).unwrap(),
limit,
policy,
map: BTreeMap::new(),
map: HashMap::with_hasher(FixedState::with_seed(hash_seed())),
}
}

Expand Down Expand Up @@ -264,6 +285,23 @@ fn flow_expired_probe(
}
}

/// Returns a hasher seed: the kernel PRNG in the kmod, a fixed value under
/// std/test.
fn hash_seed() -> u64 {
cfg_if! {
if #[cfg(all(not(feature = "std"), not(test)))] {
let mut seed = [0u8; 8];
// SAFETY: writes exactly `seed.len()` bytes into a valid buffer.
unsafe {
random_get_pseudo_bytes(seed.as_mut_ptr(), seed.len());
}
u64::from_ne_bytes(seed)
} else {
0
}
}
}

/// A type that can be "dumped" for the purposes of presenting an
/// external view into internal state of the [`FlowEntry<T>`].
pub trait Dump {
Expand Down Expand Up @@ -379,6 +417,11 @@ unsafe extern "C" {
ifid: *const InnerFlowId,
epoch: uintptr_t,
);

fn random_get_pseudo_bytes(
ptr: *mut u8,
len: usize,
) -> illumos_sys_hdrs::c_int;
}

impl Dump for () {
Expand All @@ -398,6 +441,17 @@ mod test {

pub const FT_SIZE: Option<NonZeroU32> = NonZeroU32::new(16);

fn flow_id(dst_port: u16) -> InnerFlowId {
InnerFlowId {
proto: Protocol::TCP.into(),
addrs: AddrPair::V4 {
src: "192.168.2.10".parse().unwrap(),
dst: "76.76.21.21".parse().unwrap(),
},
proto_info: PortInfo { src_port: 37890, dst_port }.into(),
}
}

#[test]
fn flow_expired() {
let flowid = InnerFlowId {
Expand Down Expand Up @@ -442,4 +496,73 @@ mod test {
ft.clear();
assert_eq!(ft.num_flows(), 0);
}

#[test]
fn flow_add_get_remove() {
let flowid = flow_id(443);
let mut ft =
FlowTable::new("port", "flow-crud-test", FT_SIZE.unwrap(), None);

assert!(ft.get(&flowid).is_none());
ft.add(flowid, ()).unwrap();
assert!(ft.get(&flowid).is_some());
assert_eq!(ft.num_flows(), 1);

assert!(ft.remove(&flowid).is_some());
assert!(ft.get(&flowid).is_none());
assert_eq!(ft.num_flows(), 0);
}

#[test]
fn flow_table_enforces_limit() {
let limit = FT_SIZE.unwrap().get();
let mut ft =
FlowTable::new("port", "flow-limit-test", FT_SIZE.unwrap(), None);

for dst_port in 0..limit as u16 {
ft.add(flow_id(dst_port), ()).unwrap();
}
assert_eq!(ft.num_flows(), limit);

let err = ft.add(flow_id(limit as u16), ()).unwrap_err();
assert!(matches!(err, OpteError::MaxCapacity(_)));
assert_eq!(ft.num_flows(), limit);
}

#[test]
fn flow_dump_is_sorted() {
let mut ft =
FlowTable::new("port", "flow-dump-test", FT_SIZE.unwrap(), None);
for dst_port in [5u16, 1, 3, 2, 4] {
ft.add(flow_id(dst_port), ()).unwrap();
}

let dumped: Vec<InnerFlowId> =
ft.dump().into_iter().map(|(flow_id, _)| flow_id).collect();
let mut expected = dumped.clone();
expected.sort_unstable();
assert_eq!(dumped, expected);
assert_eq!(dumped.len(), 5);
}

#[test]
fn flow_table_reclaims_after_drain() {
let limit = NonZeroU32::new(4096).unwrap();
let mut ft = FlowTable::new("port", "flow-reclaim-test", limit, None);

for dst_port in 0..2000u16 {
ft.add(flow_id(dst_port), ()).unwrap();
}
let grown = ft.map.capacity();
assert!(grown >= 2000);

let now = Moment::now() + Duration::new(FLOW_DEF_EXPIRE_SECS + 1, 0);
ft.expire_flows(now, |_| FLOW_ID_DEFAULT);
assert_eq!(ft.num_flows(), 0);

assert!(ft.map.capacity() < grown);

ft.add(flow_id(1), ()).unwrap();
assert!(ft.get(&flow_id(1)).is_some());
}
}