Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions flow-entry/src/flow_table/nf_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use pipeline::NetworkFunction;

use crate::flow_table::FlowTable;
use net::FlowKey;
use net::flow_key;

use tracectl::trace_target;
trace_target!("flow-lookup", LevelFilter::INFO, &["pipeline"]);
Expand Down Expand Up @@ -40,7 +39,7 @@ impl<Buf: PacketBufferMut> NetworkFunction<Buf> for FlowLookup {
input.filter_map(move |mut packet| {
let nfi = &self.name;
if !packet.is_done() && packet.meta().is_overlay() && packet.meta().dst_vpcd.is_none() {
if let Ok(flow_key) = FlowKey::try_from(flow_key::Uni(&packet)) {
if let Ok(flow_key) = FlowKey::try_from(&packet) {
if let Some(flow_info) = self.flow_table.lookup(&flow_key) {
debug!("{nfi}: Tagging packet with flow info for flow key {flow_key}",);
packet.meta_mut().flow_info = Some(flow_info);
Expand Down Expand Up @@ -101,7 +100,7 @@ mod test {
packet.meta_mut().set_overlay(true);

// Insert matching flow entry
let flow_key = FlowKey::try_from(net::flow_key::Uni(&packet)).unwrap();
let flow_key = FlowKey::try_from(&packet).unwrap();
let flow_info = FlowInfo::new(flow_key, Instant::now() + Duration::from_secs(10));
flow_table.insert(flow_info).unwrap();

Expand Down Expand Up @@ -130,7 +129,7 @@ mod test {
input: Input,
) -> impl Iterator<Item = Packet<Buf>> + 'a {
input.filter_map(move |packet| {
let flow_key = FlowKey::try_from(net::flow_key::Uni(&packet)).unwrap();
let flow_key = FlowKey::try_from(&packet).unwrap();
let flow_info = FlowInfo::new(flow_key, Instant::now() + self.timeout);
self.flow_table
.insert(flow_info)
Expand Down Expand Up @@ -193,8 +192,8 @@ mod test {
packet_2.meta_mut().set_overlay(true);

// build keys for the packets
let key_1 = FlowKey::try_from(net::flow_key::Uni(&packet_1)).unwrap();
let key_2 = FlowKey::try_from(net::flow_key::Uni(&packet_2)).unwrap();
let key_1 = FlowKey::try_from(&packet_1).unwrap();
let key_2 = FlowKey::try_from(&packet_2).unwrap();

// create a pair of related flow entries; flow_2 will get a longer timeout
let expires_at = tokio::time::Instant::now().into_std() + Duration::from_secs(2);
Expand Down Expand Up @@ -251,8 +250,8 @@ mod test {
let mut packet_2 = build_test_udp_ipv4_packet("192.168.1.1", "20.0.0.1", 500, 80);
packet_1.meta_mut().set_overlay(true);
packet_2.meta_mut().set_overlay(true);
let key_1 = FlowKey::try_from(net::flow_key::Uni(&packet_1)).unwrap();
let key_2 = FlowKey::try_from(net::flow_key::Uni(&packet_2)).unwrap();
let key_1 = FlowKey::try_from(&packet_1).unwrap();
let key_2 = FlowKey::try_from(&packet_2).unwrap();
let input = vec![packet_1, packet_2];
let out: Vec<_> = pipeline.process(input.into_iter()).collect();
let packet_1 = &out[0];
Expand Down
42 changes: 21 additions & 21 deletions flow-entry/src/flow_table/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ mod tests {
use net::tcp::TcpPort;
use net::vxlan::Vni;

use net::{FlowKey, FlowKeyData, IpProtoKey, TcpProtoKey};
use net::{FlowKey, IpProtoKey, TcpProtoKey};

#[concurrency_mode(std)]
mod std_tests {
Expand All @@ -439,15 +439,15 @@ mod tests {
let five_seconds_from_now = now + five_seconds;

let flow_table = FlowTable::default();
let flow_key = FlowKey::Unidirectional(FlowKeyData::new(
let flow_key = FlowKey::new(
Some(VpcDiscriminant::VNI(Vni::new_checked(1).unwrap())),
"1.2.3.4".parse::<IpAddr>().unwrap(),
"4.5.6.7".parse::<IpAddr>().unwrap(),
IpProtoKey::Tcp(TcpProtoKey {
src_port: TcpPort::new_checked(1025).unwrap(),
dst_port: TcpPort::new_checked(2048).unwrap(),
}),
));
);

let flow_info = FlowInfo::new(flow_key, five_seconds_from_now);

Expand All @@ -469,15 +469,15 @@ mod tests {
let one_second = Duration::from_secs(1);

let flow_table = FlowTable::default();
let flow_key = FlowKey::Unidirectional(FlowKeyData::new(
let flow_key = FlowKey::new(
Some(VpcDiscriminant::VNI(Vni::new_checked(42).unwrap())),
"10.0.0.1".parse::<IpAddr>().unwrap(),
"10.0.0.2".parse::<IpAddr>().unwrap(),
IpProtoKey::Tcp(TcpProtoKey {
src_port: TcpPort::new_checked(1234).unwrap(),
dst_port: TcpPort::new_checked(5678).unwrap(),
}),
));
);

let flow_info = FlowInfo::new(flow_key, now + two_seconds);
flow_table.insert(flow_info).unwrap();
Expand All @@ -501,15 +501,15 @@ mod tests {
let second_expiry_time = now + Duration::from_secs(10);

let flow_table = FlowTable::default();
let flow_key = FlowKey::Unidirectional(FlowKeyData::new(
let flow_key = FlowKey::new(
Some(VpcDiscriminant::VNI(Vni::new_checked(1).unwrap())),
"1.2.3.4".parse::<IpAddr>().unwrap(),
"4.5.6.7".parse::<IpAddr>().unwrap(),
IpProtoKey::Tcp(TcpProtoKey {
src_port: TcpPort::new_checked(1025).unwrap(),
dst_port: TcpPort::new_checked(2048).unwrap(),
}),
));
);

// Insert first entry.
let first_arc = Arc::new(FlowInfo::new(flow_key, first_expiry_time));
Expand Down Expand Up @@ -586,15 +586,15 @@ mod tests {

let mut flow_keys = vec![];
for src_port in 1..=NUM_FLOWS {
let flow_key = FlowKey::Unidirectional(FlowKeyData::new(
let flow_key = FlowKey::new(
Some(VpcDiscriminant::VNI(Vni::new_checked(1).unwrap())),
"1.2.3.4".parse::<IpAddr>().unwrap(),
"4.5.6.7".parse::<IpAddr>().unwrap(),
IpProtoKey::Tcp(TcpProtoKey {
src_port: TcpPort::new_checked(src_port).unwrap(),
dst_port: TcpPort::new_checked(2048).unwrap(),
}),
));
);
let flow_info = FlowInfo::new(flow_key, deadline);
flow_table.insert(flow_info).unwrap();
flow_keys.push(flow_key);
Expand Down Expand Up @@ -627,15 +627,15 @@ mod tests {
let now = Instant::now();
let deadline = now + Duration::from_secs(2);

let flow_key = FlowKey::Unidirectional(FlowKeyData::new(
let flow_key = FlowKey::new(
Some(VpcDiscriminant::VNI(Vni::new_checked(1).unwrap())),
"1.2.3.4".parse::<IpAddr>().unwrap(),
"4.5.6.7".parse::<IpAddr>().unwrap(),
IpProtoKey::Tcp(TcpProtoKey {
src_port: TcpPort::new_checked(1).unwrap(),
dst_port: TcpPort::new_checked(2048).unwrap(),
}),
));
);
let flow_info = FlowInfo::new(flow_key, deadline);
flow_table.insert(flow_info).unwrap();

Expand Down Expand Up @@ -663,27 +663,27 @@ mod tests {
for i in 1u16..=2 {
let src_port = TcpPort::new_checked(1000 + i).unwrap();
let dst_port = TcpPort::new_checked(80).unwrap();
let flow_key = FlowKey::Unidirectional(FlowKeyData::new(
let flow_key = FlowKey::new(
Some(src_vpcd),
src_ip,
dst_ip,
IpProtoKey::Tcp(TcpProtoKey { src_port, dst_port }),
));
);
flow_table
.insert(FlowInfo::new(flow_key, far_future))
.expect("insert under capacity should succeed");
}

// One more insert must fail with CapacityExceeded.
let overflow_key = FlowKey::Unidirectional(FlowKeyData::new(
let overflow_key = FlowKey::new(
Some(src_vpcd),
src_ip,
dst_ip,
IpProtoKey::Tcp(TcpProtoKey {
src_port: TcpPort::new_checked(9999).unwrap(),
dst_port: TcpPort::new_checked(80).unwrap(),
}),
));
);
assert!(matches!(
flow_table.insert(FlowInfo::new(overflow_key, far_future)),
Err(FlowTableError::CapacityExceeded)
Expand All @@ -709,7 +709,7 @@ mod tests {
let two_seconds = Duration::from_secs(2);
let flow_keys: Vec<_> = (0u16..2u16)
.map(|i| {
FlowKey::Unidirectional(FlowKeyData::new(
FlowKey::new(
Some(VpcDiscriminant::VNI(
Vni::new_checked(u32::from(i) + 1).unwrap(),
)),
Expand All @@ -719,7 +719,7 @@ mod tests {
src_port: TcpPort::new_checked(1000 + i).unwrap(),
dst_port: TcpPort::new_checked(2000 + i).unwrap(),
}),
))
)
})
.collect();

Expand Down Expand Up @@ -825,25 +825,25 @@ mod tests {
let flow_table = Arc::new(FlowTable::default());

let five_seconds_from_now = Instant::now() + Duration::from_secs(5);
let flow_key1 = FlowKey::Unidirectional(FlowKeyData::new(
let flow_key1 = FlowKey::new(
Some(VpcDiscriminant::VNI(Vni::new_checked(1).unwrap())),
"1.2.3.4".parse::<IpAddr>().unwrap(),
"4.5.6.7".parse::<IpAddr>().unwrap(),
IpProtoKey::Tcp(TcpProtoKey {
src_port: TcpPort::new_checked(1025).unwrap(),
dst_port: TcpPort::new_checked(2048).unwrap(),
}),
));
);

let flow_key2 = FlowKey::Unidirectional(FlowKeyData::new(
let flow_key2 = FlowKey::new(
Some(VpcDiscriminant::VNI(Vni::new_checked(10).unwrap())),
"10.2.3.4".parse::<IpAddr>().unwrap(),
"40.5.6.7".parse::<IpAddr>().unwrap(),
IpProtoKey::Tcp(TcpProtoKey {
src_port: TcpPort::new_checked(1025).unwrap(),
dst_port: TcpPort::new_checked(2048).unwrap(),
}),
));
);

let flow_table_clone1 = flow_table.clone();
let flow_table_clone2 = flow_table.clone();
Expand Down
2 changes: 1 addition & 1 deletion flow-filter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ impl FlowFilter {
return;
}

let Ok(flow_key) = FlowKey::try_from(net::flow_key::Uni(&*packet)) else {
let Ok(flow_key) = FlowKey::try_from(&*packet) else {
return;
};

Expand Down
3 changes: 1 addition & 2 deletions flow-filter/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use config::external::overlay::vpcpeering::{VpcExpose, VpcManifest, VpcPeering,
use lpm::prefix::{L4Protocol, PortRange, Prefix, PrefixWithOptionalPorts};
use net::FlowKey;
use net::buffer::{PacketBufferMut, TestBuffer};
use net::flow_key::Uni;
use net::flows::{FlowInfo, FlowStatus};
use net::headers::{Net, TryHeadersMut, TryIpMut};
use net::ip::NextHeader;
Expand Down Expand Up @@ -191,7 +190,7 @@ fn fake_flow_session<Buf: PacketBufferMut>(
set_port_fw_state: bool,
) {
// build flow key
let flow_key = FlowKey::try_from(Uni(&*packet)).unwrap();
let flow_key = FlowKey::try_from(&*packet).unwrap();

// Create flow_info with dst_vpcd and NAT info and attach it to the packet
let flow_info = FlowInfo::new(flow_key, Instant::now() + Duration::from_secs(60));
Expand Down
21 changes: 20 additions & 1 deletion nat/src/icmp_handler/icmp_error_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use net::headers::{
use net::icmp_any::TruncatedIcmpAny;
use net::icmp_any::{IcmpAnyChecksumErrorPlaceholder, IcmpAnyChecksumPayload};
use net::ipv4::Ipv4;
use net::packet::Packet;
use net::packet::{DoneReason, Packet};
use std::net::IpAddr;
use std::num::NonZero;

Expand All @@ -41,6 +41,25 @@ pub enum IcmpErrorMsgError {
NoTranslationPossible,
}

impl From<&IcmpErrorMsgError> for DoneReason {
fn from(error: &IcmpErrorMsgError) -> Self {
match error {
IcmpErrorMsgError::NoIpHeader => DoneReason::NotIp,
IcmpErrorMsgError::InvalidPort(_) => DoneReason::Malformed,
IcmpErrorMsgError::NotUnicast(_) => DoneReason::NatFailure,
IcmpErrorMsgError::InvalidIpVersion | IcmpErrorMsgError::NoTranslationPossible => {
DoneReason::InternalFailure
}
IcmpErrorMsgError::BadChecksumIcmp(_) | IcmpErrorMsgError::BadChecksumInnerIpv4(_) => {
DoneReason::InvalidChecksum
}
IcmpErrorMsgError::NoEmbeddedHeaders | IcmpErrorMsgError::NoInnerIpHeader => {
DoneReason::Filtered
}
}
}
}

// # Return
//
// * `Ok(())` if checksums are valid and we can translate the inner packet
Expand Down
13 changes: 5 additions & 8 deletions nat/src/portfw/flow_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#![allow(clippy::single_match_else)]

use net::buffer::PacketBufferMut;
use net::flow_key::Uni;
use net::flows::{ExtractRef, FlowStatus};
use net::ip::UnicastIpAddr;
use net::packet::{Packet, VpcDiscriminant};
Expand Down Expand Up @@ -107,7 +106,7 @@ pub(crate) fn build_portfw_flow_keys<Buf: PacketBufferMut>(
dst_vpcd: VpcDiscriminant, // destination VPC to forward to
) -> Result<(FlowKey, FlowKey), ()> {
// Extract flow key for the current packet
let current_flow_key = FlowKey::try_from(Uni(&*packet)).map_err(|_| ())?;
let current_flow_key = FlowKey::try_from(&*packet).map_err(|_| ())?;

// Retrieve initial flow key for the current packet (before any other NAT translation); if
// we don't have the information, we didn't populate it because we don't need it and fall
Expand All @@ -120,14 +119,12 @@ pub(crate) fn build_portfw_flow_keys<Buf: PacketBufferMut>(
.unwrap_or(current_flow_key);

// Build the key for the reverse path
let proto = current_flow_key.data().proto();
let src_port = current_flow_key.data().src_port().ok_or(())?;
let proto = current_flow_key.proto();
let src_port = current_flow_key.src_port().ok_or(())?;

let mut key_forward_dnated = current_flow_key;
key_forward_dnated.data_mut().set_dst_ip(new_dst_ip.inner());
key_forward_dnated
.data_mut()
.set_ip_proto_key(IpProtoKey::from((proto, src_port, new_dst_port)));
key_forward_dnated.set_dst_ip(new_dst_ip.inner());
key_forward_dnated.set_ip_proto_key(IpProtoKey::from((proto, src_port, new_dst_port)));
let key_reverse = key_forward_dnated.reverse(Some(dst_vpcd));

Ok((initial_flow_key, key_reverse))
Expand Down
18 changes: 18 additions & 0 deletions nat/src/stateful/allocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

use crate::port::NatPortError;
use net::ip::NextHeader;
use net::packet::DoneReason;
use std::fmt::{Debug, Display};
use std::time::Duration;

Expand Down Expand Up @@ -34,6 +35,23 @@ pub enum AllocatorError {
Denied,
}

impl From<&AllocatorError> for DoneReason {
fn from(error: &AllocatorError) -> Self {
match error {
AllocatorError::UnsupportedProtocol(_) => DoneReason::NatUnsupportedProto,
AllocatorError::NoFreeIp
| AllocatorError::NoPortBlock
| AllocatorError::NoFreePort(_) => DoneReason::NatOutOfResources,
AllocatorError::PortAllocationFailed(_)
| AllocatorError::PortReservationFailed(_)
| AllocatorError::MissingDiscriminant
| AllocatorError::UnsupportedDiscriminant => DoneReason::NatFailure,
AllocatorError::InternalIssue(_) => DoneReason::InternalFailure,
AllocatorError::Denied => DoneReason::Filtered,
}
}
}

/// `AllocationResult` is a struct to represent the result of an allocation.
/// It contains the allocated IP address and port to masquerade a packet,
/// and the time for the allocation (flow timeout).
Expand Down
6 changes: 3 additions & 3 deletions nat/src/stateful/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ fn re_reserve_ip_and_port(
port: NatPort,
) -> Result<(), ()> {
let flow_key = flow_info.flowkey();
let proto = flow_key.data().proto();
let proto = flow_key.proto();
let dst_vpcd = flow_info.get_dst_vpcd().unwrap_or_else(|| unreachable!());
let src_ip = *flow_key.data().src_ip();
let src_ip = *flow_key.src_ip();
let port_u16 = port.as_u16();
debug!("Attempting to reserve {ip} {port_u16} {proto}...");

Expand Down Expand Up @@ -104,7 +104,7 @@ pub(crate) fn check_masquerading_flow(
return;
};
let dst_vpcd = flow_info.get_dst_vpcd().unwrap_or_else(|| unreachable!());
let src_vpcd = flow_key.data().src_vpcd().unwrap_or_else(|| unreachable!());
let src_vpcd = flow_key.src_vpcd().unwrap_or_else(|| unreachable!());

debug!("Checking flow {}", flow_info.logfmt());

Expand Down
Loading
Loading