Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ name = "bench_ua"
path = "src/bin/bench_ua.rs"

[dependencies]
arc-swap = "1.7.1"
async-trait = "0.1.89"
futures = "0.3.31"
rsip = { version = "0.4.0" }
Expand Down
2 changes: 1 addition & 1 deletion src/dialog/dialog_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ impl DialogLayer {
let addr = self
.endpoint
.transport_layer
.get_addrs()
.get_contact_addrs()
.first()
.ok_or(crate::Error::EndpointError("not sipaddrs".to_string()))?
.clone();
Expand Down
2 changes: 2 additions & 0 deletions src/dialog/tests/test_dialog_states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ pub async fn create_test_endpoint() -> crate::Result<crate::transaction::endpoin
crate::transport::udp::UdpInner {
conn: tokio_socket,
addr: crate::transport::SipAddr::from(local_addr),
learned_public_addr: arc_swap::ArcSwapOption::empty(),
auto_learn_public_addr: false,
},
None,
Some(token.child_token()),
Expand Down
8 changes: 8 additions & 0 deletions src/transaction/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,10 @@ impl EndpointInner {
self.transport_layer.get_addrs()
}

pub fn get_contact_addrs(&self) -> Vec<SipAddr> {
self.transport_layer.get_contact_addrs()
}

pub fn get_record_route(&self) -> Result<rsip::typed::RecordRoute> {
let first_addr = self
.transport_layer
Expand Down Expand Up @@ -758,4 +762,8 @@ impl Endpoint {
pub fn get_addrs(&self) -> Vec<SipAddr> {
self.inner.transport_layer.get_addrs()
}

pub fn get_contact_addrs(&self) -> Vec<SipAddr> {
self.inner.transport_layer.get_contact_addrs()
}
}
7 changes: 7 additions & 0 deletions src/transport/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,13 @@ impl SipConnection {
SipConnection::WebSocketListener(transport) => transport.get_addr(),
}
}
pub fn get_contact_addr(&self) -> SipAddr {
if let SipConnection::Udp(transpport) = self {
transpport.get_contact_addr()
} else {
self.get_addr().to_owned()
}
}
pub async fn send(&self, msg: rsip::SipMessage, destination: Option<&SipAddr>) -> Result<()> {
match self {
SipConnection::Channel(transport) => transport.send(msg).await,
Expand Down
163 changes: 161 additions & 2 deletions src/transport/tests/test_udp.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{
transport::{
connection::{KEEPALIVE_REQUEST, KEEPALIVE_RESPONSE},
udp::UdpConnection,
TransportEvent,
udp::{UdpConnection, UdpInner},
SipAddr, TransportEvent,
},
Result,
};
Expand Down Expand Up @@ -85,3 +85,162 @@ async fn test_udp_recv_sip_message() -> Result<()> {
};
Ok(())
}

#[tokio::test]
async fn test_udp_learns_public_addr_from_response_when_external_not_configured() -> Result<()> {
let peer = UdpConnection::create_connection_with_auto_learn_public_addr(
"127.0.0.1:0".parse()?,
None,
None,
true,
)
.await?;
let remote = UdpConnection::create_connection("127.0.0.1:0".parse()?, None, None).await?;
let (tx, mut rx) = unbounded_channel();

let remote_public_port = 62000u16;
let response = format!(
"SIP/2.0 100 Trying\r\n\
Via: SIP/2.0/UDP 10.0.0.10:5060;branch=z9hG4bK1;rport={};received=198.51.100.10\r\n\
From: <sip:alice@example.com>;tag=1\r\n\
To: <sip:bob@example.com>\r\n\
Call-ID: abc\r\n\
CSeq: 1 INVITE\r\n\
Content-Length: 0\r\n\r\n",
remote_public_port
);

let peer_addr = peer.get_addr().to_owned();
tokio::spawn(async move {
sleep(Duration::from_millis(20)).await;
remote
.send_raw(response.as_bytes(), &peer_addr)
.await
.expect("send_raw");
});

select! {
_ = peer.serve_loop(tx) => {
assert!(false, "peer serve_loop exited");
}
event = rx.recv() => {
match event {
Some(TransportEvent::Incoming(msg, _, _)) => {
assert!(msg.is_response());
assert_eq!(
peer.get_contact_addr().to_string(),
format!("UDP 198.51.100.10:{}", remote_public_port)
);
}
_ => assert!(false, "unexpected event"),
}
}
_ = sleep(Duration::from_millis(500)) => {
assert!(false, "timeout waiting");
}
};

Ok(())
}

#[tokio::test]
async fn test_udp_contact_prefers_configured_external_addr() -> Result<()> {
let socket = tokio::net::UdpSocket::bind("127.0.0.1:0").await?;
let local_addr = socket.local_addr()?;
let peer = UdpConnection::attach_with_auto_learn_public_addr(
UdpInner {
conn: socket,
addr: SipAddr::from(local_addr),
learned_public_addr: arc_swap::ArcSwapOption::empty(),
auto_learn_public_addr: false,
},
Some("203.0.113.10:5060".parse()?),
None,
true,
)
.await;
let remote = UdpConnection::create_connection("127.0.0.1:0".parse()?, None, None).await?;
let (tx, mut rx) = unbounded_channel();

let response = "SIP/2.0 100 Trying\r\n\
Via: SIP/2.0/UDP 10.0.0.10:5060;branch=z9hG4bK1;rport=62000;received=198.51.100.10\r\n\
From: <sip:alice@example.com>;tag=1\r\n\
To: <sip:bob@example.com>\r\n\
Call-ID: abc\r\n\
CSeq: 1 INVITE\r\n\
Content-Length: 0\r\n\r\n";

let peer_local_addr = SipAddr::from(local_addr);
tokio::spawn(async move {
sleep(Duration::from_millis(20)).await;
remote
.send_raw(response.as_bytes(), &peer_local_addr)
.await
.expect("send_raw");
});

select! {
_ = peer.serve_loop(tx) => {
assert!(false, "peer serve_loop exited");
}
event = rx.recv() => {
match event {
Some(TransportEvent::Incoming(msg, _, _)) => {
assert!(msg.is_response());
assert_eq!(peer.get_contact_addr().to_string(), "UDP 203.0.113.10:5060");
}
_ => assert!(false, "unexpected event"),
}
}
_ = sleep(Duration::from_millis(500)) => {
assert!(false, "timeout waiting");
}
};

Ok(())
}

#[tokio::test]
async fn test_udp_does_not_learn_public_addr_by_default() -> Result<()> {
let peer = UdpConnection::create_connection("127.0.0.1:0".parse()?, None, None).await?;
let remote = UdpConnection::create_connection("127.0.0.1:0".parse()?, None, None).await?;
let (tx, mut rx) = unbounded_channel();

let local_contact_before = peer.get_contact_addr();
let response = "SIP/2.0 100 Trying\r\n\
Via: SIP/2.0/UDP 10.0.0.10:5060;branch=z9hG4bK1;rport=62000;received=198.51.100.10\r\n\
From: <sip:alice@example.com>;tag=1\r\n\
To: <sip:bob@example.com>\r\n\
Call-ID: abc\r\n\
CSeq: 1 INVITE\r\n\
Content-Length: 0\r\n\r\n";

let peer_addr = peer.get_addr().to_owned();
tokio::spawn(async move {
sleep(Duration::from_millis(20)).await;
remote
.send_raw(response.as_bytes(), &peer_addr)
.await
.expect("send_raw");
});

select! {
_ = peer.serve_loop(tx) => {
assert!(false, "peer serve_loop exited");
}
event = rx.recv() => {
match event {
Some(TransportEvent::Incoming(msg, _, _)) => {
assert!(msg.is_response());
assert_eq!(peer.get_contact_addr(), local_contact_before);
}
_ => assert!(false, "unexpected event"),
}
}
_ = sleep(Duration::from_millis(500)) => {
assert!(false, "timeout waiting");
}
};

Ok(())
}
57 changes: 56 additions & 1 deletion src/transport/transport_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,16 @@ impl TransportLayer {
}
}

pub fn get_contact_addrs(&self) -> Vec<SipAddr> {
match self.inner.listens.read() {
Ok(listens) => listens.iter().map(|t| t.get_contact_addr()).collect(),
Err(e) => {
warn!(error = ?e, "Failed to read listens");
Vec::new()
}
}
}

/// Set an async whitelist callback invoked on incoming packets/connections.
pub fn set_whitelist<T>(&self, whitelist: T)
where
Expand Down Expand Up @@ -501,10 +511,15 @@ impl Drop for TransportLayer {
mod tests {
use crate::resolver::SipResolver;
use crate::{
transport::{udp::UdpConnection, SipAddr},
transport::{
udp::{UdpConnection, UdpInner},
SipAddr,
},
Result,
};
use arc_swap::ArcSwapOption;
use rsip::Transport;
use std::sync::Arc;

#[tokio::test]
async fn test_lookup() -> Result<()> {
Expand Down Expand Up @@ -627,4 +642,44 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_contact_addrs_do_not_change_listener_addrs() -> Result<()> {
let tl = super::TransportLayer::new(tokio_util::sync::CancellationToken::new());
let socket = tokio::net::UdpSocket::bind("127.0.0.1:0").await?;
let local_addr = socket.local_addr()?;
let local_sip_addr = SipAddr {
r#type: Some(rsip::transport::Transport::Udp),
addr: local_addr.into(),
};

let learned_public_addr = ArcSwapOption::empty();
learned_public_addr.store(Some(Arc::new(
"198.51.100.10:62000".parse::<std::net::SocketAddr>()?,
)));

let udp_conn = UdpConnection::attach(
UdpInner {
conn: socket,
addr: local_sip_addr.clone(),
learned_public_addr,
auto_learn_public_addr: false,
},
None,
Some(tl.inner.cancel_token.child_token()),
)
.await;

tl.add_transport(udp_conn.into());

let addrs = tl.get_addrs();
assert_eq!(addrs.len(), 1);
assert_eq!(addrs[0], local_sip_addr);

let contact_addrs = tl.get_contact_addrs();
assert_eq!(contact_addrs.len(), 1);
assert_eq!(contact_addrs[0].to_string(), "UDP 198.51.100.10:62000");

Ok(())
}
}
Loading