Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rs/moq-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub struct Cli {

#[cfg(feature = "iroh")]
#[command(flatten)]
iroh: moq_native::IrohEndpointConfig,
iroh: moq_native::iroh::EndpointConfig,

#[command(subcommand)]
command: Command,
Expand Down
59 changes: 46 additions & 13 deletions rs/moq-native/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use url::Url;
/// TLS configuration for the client.
#[derive(Clone, Default, Debug, clap::Args, serde::Serialize, serde::Deserialize)]
#[serde(default, deny_unknown_fields)]
#[group(id = "client-tls")]
#[non_exhaustive]
pub struct ClientTls {
/// Use the TLS root at this path, encoded as PEM.
Expand Down Expand Up @@ -37,6 +38,7 @@ pub struct ClientTls {
/// Configuration for the MoQ client.
#[derive(Clone, Debug, clap::Parser, serde::Serialize, serde::Deserialize)]
#[serde(deny_unknown_fields, default)]
#[group(id = "client")]
#[non_exhaustive]
pub struct ClientConfig {
/// Listen for UDP packets on the given address.
Expand Down Expand Up @@ -80,7 +82,7 @@ pub struct ClientConfig {
#[cfg(feature = "websocket")]
#[command(flatten)]
#[serde(default)]
pub websocket: super::ClientWebSocket,
pub websocket: super::ws::ClientConfig,
}

impl ClientConfig {
Expand All @@ -96,6 +98,36 @@ impl ClientConfig {
moq_lite::Versions::from(self.version.clone())
}
}

pub fn with_bind(mut self, bind: net::SocketAddr) -> Self {
self.bind = bind;
self
}

pub fn with_backend(mut self, backend: QuicBackend) -> Self {
self.backend = Some(backend);
self
}

pub fn with_max_streams(mut self, max_streams: u64) -> Self {
self.max_streams = Some(max_streams);
self
}

pub fn with_version(mut self, version: moq_lite::Version) -> Self {
self.version.push(version);
self
}

pub fn with_tls_disable_verify(mut self, disable: bool) -> Self {
self.tls.disable_verify = Some(disable);
self
}

pub fn with_tls_root(mut self, root: PathBuf) -> Self {
self.tls.root.push(root);
self
}
}

impl Default for ClientConfig {
Expand All @@ -107,7 +139,7 @@ impl Default for ClientConfig {
version: Vec::new(),
tls: ClientTls::default(),
#[cfg(feature = "websocket")]
websocket: super::ClientWebSocket::default(),
websocket: super::ws::ClientConfig::default(),
}
}
}
Expand All @@ -120,14 +152,14 @@ pub struct Client {
moq: moq_lite::Client,
versions: moq_lite::Versions,
#[cfg(feature = "websocket")]
websocket: super::ClientWebSocket,
websocket: super::ws::ClientConfig,
tls: rustls::ClientConfig,
#[cfg(feature = "noq")]
noq: Option<crate::noq::NoqClient>,
noq: Option<crate::noq::Client>,
#[cfg(feature = "quinn")]
quinn: Option<crate::quinn::QuinnClient>,
quinn: Option<crate::quinn::Client>,
#[cfg(feature = "quiche")]
quiche: Option<crate::quiche::QuicheClient>,
quiche: Option<crate::quiche::Client>,
#[cfg(feature = "iroh")]
iroh: Option<web_transport_iroh::iroh::Endpoint>,
#[cfg(feature = "iroh")]
Expand Down Expand Up @@ -211,20 +243,20 @@ impl Client {
#[cfg(feature = "noq")]
#[allow(unreachable_patterns)]
let noq = match backend {
QuicBackend::Noq => Some(crate::noq::NoqClient::new(&config)?),
QuicBackend::Noq => Some(crate::noq::Client::new(&config)?),
_ => None,
};

#[cfg(feature = "quinn")]
#[allow(unreachable_patterns)]
let quinn = match backend {
QuicBackend::Quinn => Some(crate::quinn::QuinnClient::new(&config)?),
QuicBackend::Quinn => Some(crate::quinn::Client::new(&config)?),
_ => None,
};

#[cfg(feature = "quiche")]
let quiche = match backend {
QuicBackend::Quiche => Some(crate::quiche::QuicheClient::new(&config)?),
QuicBackend::Quiche => Some(crate::quiche::Client::new(&config)?),
_ => None,
};

Expand Down Expand Up @@ -291,7 +323,8 @@ impl Client {
#[cfg(feature = "iroh")]
if url.scheme() == "iroh" {
let endpoint = self.iroh.as_ref().context("Iroh support is not enabled")?;
let session = crate::iroh::connect(endpoint, url, self.iroh_addrs.iter().copied()).await?;
let alpns = self.versions.alpns();
let session = crate::iroh::connect(endpoint, url, self.iroh_addrs.iter().copied(), &alpns).await?;
let session = self.moq.connect(session).await?;
return Ok(session);
}
Expand All @@ -311,7 +344,7 @@ impl Client {
#[cfg(feature = "websocket")]
{
let alpns = self.versions.alpns();
let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
let ws_handle = crate::ws::race_handle(&self.websocket, &self.tls, url, &alpns);

return Ok(tokio::select! {
Ok(quic) = quic_handle => self.moq.connect(quic).await?,
Expand Down Expand Up @@ -342,7 +375,7 @@ impl Client {
#[cfg(feature = "websocket")]
{
let alpns = self.versions.alpns();
let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
let ws_handle = crate::ws::race_handle(&self.websocket, &self.tls, url, &alpns);

return Ok(tokio::select! {
Ok(quic) = quic_handle => self.moq.connect(quic).await?,
Expand Down Expand Up @@ -372,7 +405,7 @@ impl Client {
#[cfg(feature = "websocket")]
{
let alpns = self.versions.alpns();
let ws_handle = crate::websocket::race_handle(&self.websocket, &self.tls, url, &alpns);
let ws_handle = crate::ws::race_handle(&self.websocket, &self.tls, url, &alpns);

return Ok(tokio::select! {
Ok(quic) = quic_handle => self.moq.connect(quic).await?,
Expand Down
94 changes: 61 additions & 33 deletions rs/moq-native/src/iroh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ use web_transport_iroh::{
// NOTE: web-transport-iroh should re-export proto like web-transport-quinn does.
use web_transport_proto::{ConnectRequest, ConnectResponse};

pub use iroh::Endpoint as IrohEndpoint;
pub use iroh::Endpoint;

#[derive(clap::Args, Clone, Debug, Default, serde::Serialize, serde::Deserialize)]
#[serde(deny_unknown_fields, default)]
#[group(id = "iroh")]
#[non_exhaustive]
pub struct IrohEndpointConfig {
pub struct EndpointConfig {
/// Whether to enable iroh support.
#[arg(
id = "iroh-enabled",
Expand Down Expand Up @@ -43,8 +44,8 @@ pub struct IrohEndpointConfig {
pub bind_v6: Option<net::SocketAddrV6>,
}

impl IrohEndpointConfig {
pub async fn bind(self) -> anyhow::Result<Option<IrohEndpoint>> {
impl EndpointConfig {
pub async fn bind(self) -> anyhow::Result<Option<Endpoint>> {
if !self.enabled.unwrap_or(false) {
return Ok(None);
}
Expand All @@ -54,15 +55,21 @@ impl IrohEndpointConfig {
secret
} else if let Some(path) = self.secret {
let path = PathBuf::from(path);
if !path.exists() {
// Generate a new random secret and write it to the file.
let secret = SecretKey::generate(&mut rand::rng());
tokio::fs::write(path, hex::encode(secret.to_bytes())).await?;
secret
} else {
// Otherwise, read the secret from a file.
let key_str = tokio::fs::read_to_string(&path).await?;
SecretKey::from_str(&key_str)?
// Generate a new random secret and attempt to write it atomically.
// If the file already exists (AlreadyExists), read the existing secret instead.
// This avoids a TOCTOU race between exists() and create_new().
let secret = SecretKey::generate(&mut rand::rng());
let data = hex::encode(secret.to_bytes());
match write_secret_file(&path, data.as_bytes()).await {
Ok(()) => secret,
Err(e)
if e.downcast_ref::<std::io::Error>()
.is_some_and(|io| io.kind() == std::io::ErrorKind::AlreadyExists) =>
{
let key_str = tokio::fs::read_to_string(&path).await?;
SecretKey::from_str(&key_str)?
Comment on lines +63 to +70
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

The AlreadyExists fallback can still read an incomplete key file.

Another process can hit this branch after the winner has created the path but before it has finished writing the hex payload, so the one-shot read_to_string() can still see empty/truncated contents and fail SecretKey::from_str(). Please retry until a valid key is readable, or publish via a temp file + atomic rename, so concurrent startup doesn’t still fail intermittently.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-native/src/iroh.rs` around lines 63 - 70, The AlreadyExists branch is
racy because a concurrent reader can see a truncated/empty file; fix by either
making write_secret_file perform atomic publish (write to a temp file in the
same directory and then tokio::fs::rename to the final path) or by changing the
AlreadyExists fallback to retry reading until SecretKey::from_str succeeds
(loop: tokio::fs::read_to_string(&path) -> attempt SecretKey::from_str, sleep
short backoff on parse/empty result, and give up with a clear error after a
timeout/attempt limit). Update the code that calls write_secret_file and the
match arm using tokio::fs::read_to_string and SecretKey::from_str accordingly so
readers only parse fully-written files.

}
Err(e) => return Err(e),
}
} else {
// Otherwise, generate a new random secret.
Expand All @@ -73,7 +80,7 @@ impl IrohEndpointConfig {
let mut alpns: Vec<Vec<u8>> = moq_lite::ALPNS.iter().map(|alpn| alpn.as_bytes().to_vec()).collect();
alpns.push(web_transport_iroh::ALPN_H3.as_bytes().to_vec());

let mut builder = IrohEndpoint::builder().secret_key(secret_key).alpns(alpns);
let mut builder = Endpoint::builder().secret_key(secret_key).alpns(alpns);
if let Some(addr) = self.bind_v4 {
builder = builder.bind_addr(addr)?;
}
Expand All @@ -88,17 +95,19 @@ impl IrohEndpointConfig {
}
}

pub enum IrohRequest {
pub enum Request {
Quic {
request: web_transport_iroh::QuicRequest,
alpns: Vec<&'static str>,
},
WebTransport {
request: Box<web_transport_iroh::H3Request>,
alpns: Vec<&'static str>,
},
}

impl IrohRequest {
pub async fn accept(conn: iroh::endpoint::Incoming) -> anyhow::Result<Self> {
impl Request {
pub async fn accept(conn: iroh::endpoint::Incoming, alpns: Vec<&'static str>) -> anyhow::Result<Self> {
let conn = conn.accept()?.await?;
let alpn = String::from_utf8(conn.alpn().to_vec()).context("failed to decode ALPN")?;
tracing::Span::current().record("id", conn.stable_id());
Expand All @@ -110,10 +119,12 @@ impl IrohRequest {
.context("failed to receive WebTransport request")?;
Ok(Self::WebTransport {
request: Box::new(request),
alpns,
})
}
alpn if moq_lite::ALPNS.contains(&alpn) => Ok(Self::Quic {
alpn if alpns.contains(&alpn) => Ok(Self::Quic {
request: web_transport_iroh::QuicRequest::accept(conn),
alpns,
}),
_ => Err(anyhow::anyhow!("unsupported ALPN: {alpn}")),
}
Expand All @@ -122,10 +133,10 @@ impl IrohRequest {
/// Accept the session.
pub async fn ok(self) -> Result<web_transport_iroh::Session, web_transport_iroh::ServerError> {
match self {
IrohRequest::Quic { request } => Ok(request.ok()),
IrohRequest::WebTransport { request } => {
Request::Quic { request, .. } => Ok(request.ok()),
Request::WebTransport { request, alpns } => {
let mut response = ConnectResponse::OK;
if let Some(protocol) = request.protocols.first() {
if let Some(protocol) = request.protocols.iter().find(|p| alpns.contains(&p.as_str())) {
response = response.with_protocol(protocol);
}
request.respond(response).await
Expand All @@ -136,26 +147,27 @@ impl IrohRequest {
/// Reject the session.
pub async fn close(self, status: http::StatusCode) -> Result<(), web_transport_iroh::ServerError> {
match self {
IrohRequest::Quic { request } => {
Request::Quic { request, .. } => {
request.close(status);
Ok(())
}
IrohRequest::WebTransport { request, .. } => request.reject(status).await,
Request::WebTransport { request, .. } => request.reject(status).await,
}
}

pub fn url(&self) -> Option<&Url> {
match self {
IrohRequest::Quic { .. } => None,
IrohRequest::WebTransport { request } => Some(&request.url),
Request::Quic { .. } => None,
Request::WebTransport { request, .. } => Some(&request.url),
}
}
}

pub(crate) async fn connect(
endpoint: &IrohEndpoint,
endpoint: &Endpoint,
url: Url,
addrs: impl IntoIterator<Item = std::net::SocketAddr>,
alpns: &[&str],
) -> anyhow::Result<web_transport_iroh::Session> {
let host = url.host().context("Invalid URL: missing host")?.to_string();
let endpoint_id: iroh::EndpointId = host.parse().context("Invalid URL: host is not an iroh endpoint id")?;
Expand All @@ -168,11 +180,9 @@ pub(crate) async fn connect(

// We need to use this API to provide multiple ALPNs.
// H3 is last because it requires WebTransport framing which not all H3 endpoints support.
let alpn = moq_lite::ALPNS[0].as_bytes();
let mut additional: Vec<Vec<u8>> = moq_lite::ALPNS[1..]
.iter()
.map(|alpn| alpn.as_bytes().to_vec())
.collect();
anyhow::ensure!(!alpns.is_empty(), "no ALPNs configured");
let alpn = alpns[0].as_bytes();
let mut additional: Vec<Vec<u8>> = alpns[1..].iter().map(|alpn| alpn.as_bytes().to_vec()).collect();
additional.push(b"h3".to_vec());
let opts = iroh::endpoint::ConnectOptions::new().with_additional_alpns(additional);

Expand All @@ -186,13 +196,13 @@ pub(crate) async fn connect(
let url = url_set_scheme(url, "https")?;

let mut request = ConnectRequest::new(url);
for alpn in moq_lite::ALPNS {
for alpn in alpns {
request = request.with_protocol(alpn.to_string());
}

web_transport_iroh::Session::connect_h3(conn, request).await?
}
alpn if moq_lite::ALPNS.contains(&alpn) => {
alpn if alpns.contains(&alpn) => {
let conn = connecting.await?;
web_transport_iroh::Session::raw(conn)
}
Expand All @@ -217,3 +227,21 @@ fn url_set_scheme(url: Url, scheme: &str) -> anyhow::Result<Url> {
.parse()?;
Ok(url)
}

/// Write secret key data to a file with owner-only permissions (0o600 on Unix).
///
/// Uses `create_new(true)` so the call fails with `AlreadyExists` if the file
/// already exists, which callers can use to handle races atomically.
async fn write_secret_file(path: &std::path::Path, data: &[u8]) -> anyhow::Result<()> {
use tokio::io::AsyncWriteExt;

let mut opts = tokio::fs::OpenOptions::new();
opts.write(true).create_new(true);

#[cfg(unix)]
opts.mode(0o600);

let mut file = opts.open(path).await?;
file.write_all(data).await.context("failed to write secret key")?;
Ok(())
}
Loading
Loading