Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 54 additions & 4 deletions doc/spec/draft-ietf-moq-cmsf-00.txt
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,10 @@ Internet-Draft CMSF- a CMAF compliant implementation of December 2025
CMSF defines a special instance of an Event Timeline track, termed
the SAP Type timeline track. Its purpose is to convey information
about the distribution of Stream Access Point types and their
associated Earlist Presentation Times.
associated Earliest Presentation Times.

In the catalog, the SAP-type timeline track MUST include a
'packaging' value of 'eventtimeline" and MUST include an 'eventType'
'packaging' value of 'eventtimeline' and MUST include an 'eventType'
value of 'org.ietf.moq.cmsf.sap'.

In the SAP Type timeline JSON payload:
Expand Down Expand Up @@ -291,7 +291,7 @@ Internet-Draft CMSF- a CMAF compliant implementation of December 2025
3.6.2. SAP-type timeline track example

This shows an example of 30-fps HEVC-encoded content, in which each
4s Group beings with SAP-type 2 (i.e., the first picture in the Group
4s Group begins with SAP-type 2 (i.e., the first picture in the Group
is an IDR picture, while there may be one or more pictures in the
Group following the IDR picture in decoding order but preceding it in
output order). After 2 seconds in each Group, there is a SAP-type 3,
Expand Down Expand Up @@ -424,7 +424,57 @@ Internet-Draft CMSF- a CMAF compliant implementation of December 2025

6. Security Considerations

TODO Security
CMSF inherits the security properties of the underlying MoQ
Transport protocol [MoQTransport] and the WebTransport or QUIC
session over which it operates. Implementations MUST ensure that
the transport layer provides confidentiality, integrity, and
authentication (e.g., via TLS 1.3).

Threat Model: An attacker may be on-path or off-path and may
attempt to eavesdrop, inject, modify, or replay CMSF messages.
Attackers may also attempt to disrupt availability through
resource exhaustion.

Confidentiality: Media content carried within CMSF Groups and
Objects is visible to relays and any entity with access to the
QUIC session keys. Applications requiring end-to-end media
confidentiality SHOULD encrypt media payloads above the
transport layer (e.g., using Secure Frame [SFrame]).

Comment on lines +442 to +443
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add or remove unresolved [SFrame] citation.

At Line 442, [SFrame] is cited but no matching reference entry appears in Section 8. This will usually produce an unresolved-reference warning/error in draft tooling.

✏️ Proposed fix (quick safe option)
-      transport layer (e.g., using Secure Frame [SFrame]).
+      transport layer (e.g., using Secure Frame).

If you want to keep the citation, add a proper [SFrame] reference entry in the references section instead.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
transport layer (e.g., using Secure Frame [SFrame]).
transport layer (e.g., using Secure Frame).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@doc/spec/draft-ietf-moq-cmsf-00.txt` around lines 442 - 443, The document
cites [SFrame] but there is no corresponding entry in the references section
(Section 8); either remove the inline citation "[SFrame]" from the phrase "using
Secure Frame [SFrame]" or add a full reference entry for [SFrame] in the
references list (Section 8) with the proper bibliographic details (title,
authors, draft/RFC number, date, and URL) so the citation resolves; update the
text to match the chosen action and ensure the citation token "[SFrame]" exactly
matches the new reference label.

Integrity: CMSF metadata (SAP types, event timelines, timing
information) MUST be integrity-protected by the underlying
transport. Receivers SHOULD validate that CMSF metadata is
consistent with the media content (e.g., declared SAP types
match actual Stream Access Points).

Authentication and Authorization: Publishers and subscribers
SHOULD be authenticated before being allowed to announce or
consume CMSF content. Authorization policies SHOULD restrict
which paths a client may publish to or subscribe from.
Credentials SHOULD have bounded lifetimes, and implementations
SHOULD support revocation mechanisms.

Replay and Impersonation: Implementations MUST rely on the
transport's replay protection (e.g., QUIC's nonce-based
encryption). CMSF itself does not introduce additional replay
vectors beyond those of the underlying MoQ Transport.

Availability: Receivers MUST impose limits on the rate and size
of CMSF Objects and Groups to mitigate resource exhaustion
attacks. Relays SHOULD apply backpressure or rate limiting
to protect downstream consumers.

Privacy: CMSF track names and catalog metadata may reveal
information about the media content or its structure.
Implementations SHOULD consider the privacy implications of
exposing track names, codec parameters, and timing metadata
to untrusted intermediaries.

Operational Guidance: Implementations SHOULD log security-
relevant events (authentication failures, authorization
denials, malformed messages) for monitoring and incident
response. Error messages SHOULD NOT leak internal state or
configuration details to unauthenticated clients.

7. IANA Considerations

Expand Down
70 changes: 65 additions & 5 deletions doc/spec/draft-ietf-moq-msf-00.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1598,10 +1598,10 @@ Internet-Draft MOQT Streaming Format January 2026

7.2. Media Timeline Catalog requirements

A media timeline track MUST carry a 'type' identifier in the Catalog
with a value of "mediatimeline". A media timeline track MUST carry a
'depends' attribute which contains an array of all track names to
which the media timeline track applies. The mime-type of a media
A media timeline track MUST carry a 'packaging' identifier in the
Catalog with a value of "mediatimeline". A media timeline track MUST
carry a 'depends' attribute which contains an array of all track
names to which the media timeline track applies. The mime-type of a media
timeline track MUST be specified as "application/json".

7.3. Media Timeline track updating
Expand Down Expand Up @@ -1778,7 +1778,67 @@ Internet-Draft MOQT Streaming Format January 2026

10. Security Considerations

ToDo
MSF inherits the security properties of the underlying MoQ
Transport protocol [MoQTransport] and its associated transport
security mechanisms (TLS 1.3 over QUIC or WebTransport).

Threat Model: Attackers may be on-path (e.g., compromised relays)
or off-path. They may attempt to eavesdrop on media content,
inject or modify track data, replay previously captured Groups
or Objects, or disrupt service availability through resource
exhaustion or malformed messages.

Confidentiality and Integrity: Media content and catalog metadata
are protected in transit by the underlying QUIC transport
encryption. However, relay nodes can observe and modify
content. Applications requiring end-to-end confidentiality
SHOULD apply payload-level encryption (e.g., Secure Frame
[SFrame]) above the MSF layer.

Authentication and Authorization: Implementations MUST
authenticate publishers and subscribers before allowing them
to announce or consume MSF tracks. Authorization SHOULD be
enforced per-track or per-broadcast path. Credentials MUST
have bounded lifetimes and implementations SHOULD support
revocation.

Replay and Denial of Service: The underlying QUIC transport
provides replay protection through its nonce-based encryption.
Implementations MUST impose limits on catalog size, track
count, Group rate, and Object sizes to prevent resource
exhaustion. Relays SHOULD apply rate limiting and
backpressure to protect downstream consumers from floods of
data or announcements.

Catalog Integrity: The MSF catalog (Section 5) describes the
structure of a broadcast and is critical to correct operation.
A malicious or compromised publisher could advertise misleading
codec parameters, invalid track dependencies, or excessive
track counts. Receivers MUST validate catalog contents and
SHOULD reject catalogs that exceed implementation-defined
complexity limits.

Privacy and Metadata: MSF track names, catalog fields (codec,
bitrate, resolution, temporal properties), and media timeline
metadata may reveal information about content and users.
Implementations SHOULD minimize metadata exposure to untrusted
intermediaries. The 'renderGroup' and temporal ordering
metadata in particular may allow traffic analysis of media
structure even when payloads are encrypted.

Interoperability: Implementations that do not fully validate
MSF catalog fields or track packaging types may be vulnerable
to unexpected behavior when interoperating with other
implementations. Receivers MUST gracefully handle unknown
catalog fields and SHOULD ignore unrecognized packaging types
rather than crashing.

Operational Guidance: Deployments SHOULD log authentication
failures, authorization denials, and malformed catalog or
track data for monitoring. Error responses MUST NOT leak
internal state or configuration to unauthenticated peers.
Implementations SHOULD follow the guidance in [RFC3552] for
security considerations in protocol design.

11. IANA Considerations

Expand Down
2 changes: 1 addition & 1 deletion js/hang/src/catalog/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export const ContainerSchema = z
z.object({
kind: z.literal("cmaf"),
// Base64-encoded init segment (ftyp+moov)
initData: z.string().base64(),
initData: z.base64(),
}),
])
.default({ kind: "legacy" });
Expand Down
7 changes: 1 addition & 6 deletions rs/hang/examples/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,7 @@ async fn run_session(origin: moq_lite::OriginProducer) -> anyhow::Result<()> {
// Subscribe to a broadcast and read media frames.
async fn run_subscribe(mut consumer: moq_lite::OriginConsumer) -> anyhow::Result<()> {
// Wait for a broadcast to be announced.
let (path, broadcast) = consumer
.announced()
.await
.ok_or_else(|| anyhow::anyhow!("origin closed"))?;

let broadcast = broadcast.ok_or_else(|| anyhow::anyhow!("broadcast unannounced: {path}"))?;
let (path, broadcast) = consumer.announced().await?;

tracing::info!(%path, "broadcast announced");

Expand Down
4 changes: 2 additions & 2 deletions rs/libmoq/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ pub struct moq_announced {
pub path: *const c_char,
pub path_len: usize,

/// Whether the broadcast is active or has ended
/// This MUST toggle between true and false over the lifetime of the broadcast
/// Whether the broadcast is active (always true; unannouncements are no longer sent).
#[deprecated(note = "always true — use BroadcastConsumer::closed() to detect end of broadcast")]
pub active: bool,
Comment on lines +79 to 81
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for any existing C FFI function that exposes broadcast closed functionality
rg -n 'moq_.*closed|closed.*extern.*C' rs/libmoq/src/

Repository: moq-dev/moq

Length of output: 168


🏁 Script executed:

#!/bin/bash
# First, verify the BroadcastConsumer::closed() method exists at the claimed location
echo "=== Checking for BroadcastConsumer closed method ==="
rg -n 'fn closed' rs/libmoq/src/ rs/moq-lite/src/ 2>/dev/null | head -20

# Also search for BroadcastConsumer struct definition and methods
echo -e "\n=== Searching for BroadcastConsumer struct and its methods ==="
rg -A 20 'struct BroadcastConsumer' rs/libmoq/src/ rs/moq-lite/src/ 2>/dev/null | head -50

Repository: moq-dev/moq

Length of output: 2895


🏁 Script executed:

#!/bin/bash
# Check all moq_consume* functions in the C FFI layer
echo "=== All moq_consume* C FFI exports ==="
rg -n 'moq_consume' rs/libmoq/src/api.rs | head -30

# Search for any callbacks or patterns related to broadcast consumer status
echo -e "\n=== Broadcast-related C FFI exports ==="
rg -n 'BroadcastConsumer|broadcast.*callback' rs/libmoq/src/api.rs | head -30

Repository: moq-dev/moq

Length of output: 1880


Missing C FFI binding for BroadcastConsumer::closed().

The deprecation note at line 80 directs C consumers to use BroadcastConsumer::closed() to detect end of broadcast, but this method is not exposed through the C FFI layer. C consumers currently have no way to detect when a broadcast has closed, leaving them without a migration path before the active field is fully removed.

Add a C FFI binding (e.g., moq_consume_closed()) to expose broadcast closure detection to C consumers.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/libmoq/src/api.rs` around lines 79 - 81, The C FFI currently exposes the
deprecated pub active field but not BroadcastConsumer::closed(), so add a C
binding named moq_consume_closed (or similar) that takes the C consumer handle
(e.g., moq_broadcast_consumer_t* / raw pointer used by existing moq_consume_*
functions), checks for null/invalid pointer, calls the Rust
BroadcastConsumer::closed() method, and returns a C-friendly boolean (e.g., int
or bool type used by your other FFI functions); mirror the error/null handling
conventions used by existing bindings (like other moq_consume_* functions) and
update the C header declaration so C consumers can call moq_consume_closed().

}

Expand Down
10 changes: 7 additions & 3 deletions rs/libmoq/src/origin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl Origin {
}

async fn run_announced(task_id: Id, mut consumer: moq_lite::OriginConsumer) -> Result<(), Error> {
while let Some((path, broadcast)) = consumer.announced().await {
while let Ok((path, _broadcast)) = consumer.announced().await {
let mut state = State::lock();

// Stop if the callback was revoked by close.
Expand All @@ -74,7 +74,7 @@ impl Origin {
};
let callback = entry.callback;

let announced_id = state.origin.announced.insert((path.to_string(), broadcast.is_some()))?;
let announced_id = state.origin.announced.insert((path.to_string(), true))?;
drop(state);

// The lock is dropped before the callback is invoked.
Expand All @@ -84,6 +84,7 @@ impl Origin {
Ok(())
}

#[allow(deprecated)]
pub fn announced_info(&self, announced: Id, dst: &mut moq_announced) -> Result<(), Error> {
let announced = self.announced.get(announced).ok_or(Error::AnnouncementNotFound)?;
*dst = moq_announced {
Expand All @@ -106,7 +107,10 @@ impl Origin {

pub fn consume<P: moq_lite::AsPath>(&mut self, origin: Id, path: P) -> Result<moq_lite::BroadcastConsumer, Error> {
let origin = self.active.get_mut(origin).ok_or(Error::OriginNotFound)?;
origin.consume().consume_broadcast(path).ok_or(Error::BroadcastNotFound)
origin
.consume()
.try_consume_broadcast(path)
.ok_or(Error::BroadcastNotFound)
}

pub fn publish<P: moq_lite::AsPath>(
Expand Down
12 changes: 9 additions & 3 deletions rs/libmoq/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ fn unknown_format() {
}

#[test]
#[allow(deprecated)]
fn local_announce() {
let origin = id(moq_origin_create());

Expand Down Expand Up @@ -259,6 +260,7 @@ fn local_announce() {
}

#[test]
#[allow(deprecated)]
fn announced_deactivation() {
let origin = id(moq_origin_create());
let cb = Callback::new();
Expand All @@ -280,11 +282,15 @@ fn announced_deactivation() {
assert_eq!(unsafe { moq_origin_announced_info(announced_id, &mut info) }, 0);
assert!(info.active);

// Closing the broadcast no longer triggers an unannounce callback.
// C consumers should watch BroadcastConsumer::closed() to detect end of broadcast.
assert_eq!(moq_publish_close(broadcast), 0);

let deactivated_id = id(cb.recv());
assert_eq!(unsafe { moq_origin_announced_info(deactivated_id, &mut info) }, 0);
assert!(!info.active, "broadcast should be inactive after publisher closes");
// No deactivation callback is expected.
assert!(
cb.try_recv(Duration::from_millis(100)).is_none(),
"should not receive deactivation callback"
);

assert_eq!(moq_origin_announced_close(announced_task), 0);
assert_eq!(moq_origin_close(origin), 0);
Expand Down
10 changes: 4 additions & 6 deletions rs/moq-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,13 @@ async fn wait_broadcast(
name: &str,
) -> anyhow::Result<moq_lite::BroadcastConsumer> {
loop {
let (path, announced) = consumer
let (path, broadcast) = consumer
.announced()
.await
.ok_or_else(|| anyhow::anyhow!("origin closed"))?;
.map_err(|_| anyhow::anyhow!("origin closed"))?;

if let Some(broadcast) = announced {
if path.as_ref() == name {
return Ok(broadcast);
}
if path.as_ref() == name {
return Ok(broadcast);
}
}
}
Expand Down
5 changes: 1 addition & 4 deletions rs/moq-cli/src/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ impl Subscribe {

// Run the converter concurrently — it blocks until all tracks finish,
// so we must read from the output broadcast in parallel.
tokio::select! {
res = converter.run() => res?,
res = mux_fmp4(catalog_track, cmaf_consumer, max_latency) => res?,
}
tokio::try_join!(converter.run(), mux_fmp4(catalog_track, cmaf_consumer, max_latency))?;

Ok(())
}
Expand Down
22 changes: 9 additions & 13 deletions rs/moq-clock/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,31 +78,27 @@ async fn main() -> anyhow::Result<()> {
Command::Subscribe => {
let session = client.with_consume(origin.clone()).connect(config.url).await?;

// NOTE: We could just call `session.consume_broadcast(&config.broadcast)` instead,
// However that won't work with IETF MoQ and the current OriginConsumer API the moment.
// So instead we do the cooler thing and loop while the broadcast is announced.
// We use announced() to watch for the broadcast to come online/reconnect,
// rather than a one-shot consume_broadcast() call.

tracing::info!(broadcast = %config.broadcast, "waiting for broadcast to be online");

let path: moq_lite::Path<'_> = config.broadcast.into();
let mut origin = origin
.consume_only(&[path])
.consume()
.with_filter(&[path])
.context("not allowed to consume broadcast")?;

// The current subscriber if any, dropped after each announce.
let mut clock: Option<clock::Subscriber> = None;

loop {
tokio::select! {
Some(announce) = origin.announced() => match announce {
(path, Some(broadcast)) => {
tracing::info!(broadcast = %path, "broadcast is online, subscribing to track");
let track = broadcast.subscribe_track(&track)?;
clock = Some(clock::Subscriber::new(track));
}
(path, None) => {
tracing::warn!(broadcast = %path, "broadcast is offline, waiting...");
}
res = origin.announced() => {
let (path, broadcast) = res.context("origin closed")?;
tracing::info!(broadcast = %path, "broadcast is online, subscribing to track");
let subscribed_track = broadcast.subscribe_track(&track)?;
clock = Some(clock::Subscriber::new(subscribed_track));
},
res = session.closed() => return res.context("session closed"),
// NOTE: This drops clock when a new announce arrives, canceling it.
Expand Down
30 changes: 8 additions & 22 deletions rs/moq-ffi/src/origin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,18 @@ struct Announced {

impl Announced {
async fn next(&mut self) -> Result<Option<Arc<MoqAnnouncement>>, MoqError> {
loop {
match self.inner.announced().await {
Some((path, Some(broadcast))) => {
return Ok(Some(Arc::new(MoqAnnouncement {
path: path.to_string(),
broadcast: Arc::new(MoqBroadcastConsumer::new(broadcast)),
})));
}
// TODO moq-lite will change to not emit None (unannounce) events here.
Some((_path, None)) => continue,
None => return Ok(None),
}
match self.inner.announced().await {
Ok((path, broadcast)) => Ok(Some(Arc::new(MoqAnnouncement {
path: path.to_string(),
broadcast: Arc::new(MoqBroadcastConsumer::new(broadcast)),
}))),
Err(_) => Ok(None),
}
}

async fn available(&mut self) -> Result<Arc<MoqBroadcastConsumer>, MoqError> {
loop {
match self.inner.announced().await {
Some((_path, Some(broadcast))) => {
return Ok(Arc::new(MoqBroadcastConsumer::new(broadcast)));
}
// TODO moq-lite will change to not emit None (unannounce) events here.
Some((_path, None)) => continue,
None => return Err(MoqError::Closed),
}
}
let (_path, broadcast) = self.inner.announced().await.map_err(|_| MoqError::Closed)?;
Ok(Arc::new(MoqBroadcastConsumer::new(broadcast)))
}
}

Expand Down
Loading
Loading