moq-lite: rewrite Origin as a poll-driven, conducer-based model#1358
moq-lite: rewrite Origin as a poll-driven, conducer-based model#1358kixelated wants to merge 4 commits into
Conversation
Replace the OriginNode/NotifyNode tree, per-publish web_async::spawn cleanup, and per-consumer mpsc fan-out with a flat HashMap behind a parking-lot-style Mutex plus per-consumer queues. Wakers register on both the global state and each tracked broadcast's `poll_closed`, so broadcast closures wake consumers directly — no spawned cleanup tasks and no more `tokio::time::sleep(1ms)` in tests. Renames (with one-line migrations across the workspace): publish_broadcast -> publish create_broadcast -> create consume_broadcast -> dropped (use wait_for_broadcast / try_next) publish_only -> scope consume_only -> scope (on OriginConsumer) announced -> next (returns OriginUpdate enum) try_announced -> try_next announced_broadcast -> wait_for_broadcast Active/Ended semantics, shortest-hop preference, is_clone dedup, and "newer wins ties" all preserved. Active selection still lives in the producer so relay forwarders don't have to reimplement it. Also exposes BroadcastConsumer::poll_closed and is_closed so callers that need to compose close-detection without spawning have a primitive. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
WalkthroughThe PR rewrites origin announcement handling from a tuple-based API to an enum-driven update model (Active/Ended). The core origin model is reworked around a centralized, mutex-guarded State with per-consumer queues and scope-based filtering. Origin producer/consumer methods and flows are replaced to emit and consume explicit announce updates; consumer call sites and examples are updated to pattern-match the new variants. A new BroadcastRejected error variant is added. 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
rs/moq-ffi/src/origin.rs (1)
118-123:⚠️ Potential issue | 🟠 Major | ⚡ Quick winThis no longer waits for the exact broadcast path.
with_root(path)subscribes to the whole subtree underpath, andAnnounced::available()returns the firstActiveupdate it sees. That meansannounced_broadcast("foo")can resolve withfoo/barif the nested broadcast arrives first. Please drive this throughOriginConsumer::wait_for_broadcast(path)or store the requested path and filter for an exact match.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-ffi/src/origin.rs` around lines 118 - 123, The function announced_broadcast currently uses self.inner.clone().with_root(path) which subscribes to the whole subtree and returns the first Active update (via Announced::available), allowing nested paths (e.g., "foo/bar") to satisfy a request for "foo". Change announced_broadcast to obtain the exact broadcast origin for the requested path by calling the consumer method that waits for an exact match (OriginConsumer::wait_for_broadcast(path)) or, if you prefer keeping with_root, capture and store the requested path and filter Announced::available events until the update.path == requested_path; then construct the MoqAnnouncedBroadcast using Task::new(Announced { inner: exact_origin }) and return it as before.rs/moq-lite/src/lite/subscriber.rs (1)
223-232:⚠️ Potential issue | 🟠 Major | ⚡ Quick winHandle rejected origin publishes before registering the announce locally.
OriginProducer::publish()can returnfalsehere, but this path still keeps the entry inproducersand spawnsrun_broadcast. If the publish was rejected, the announce becomes locally “active” even though no origin consumer can ever see it, and a later real announce on the same path will trip the duplicate-path check. Roll back the inserted producer and skip spawning whenpublish()returnsfalse.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-lite/src/lite/subscriber.rs` around lines 223 - 232, The code currently inserts the producer and then calls OriginProducer::publish(...); if publish returns false you must roll back that insertion and avoid spawning run_broadcast. Change the flow so after creating `dynamic = broadcast.dynamic()` you call `let published = self.origin.as_mut().unwrap().publish(path.clone(), broadcast.consume());` and if `published` is false remove the just-inserted producer from `self.producers` (or undo whatever created the local announce) and do not call `web_async::spawn(self.clone().run_broadcast(path, dynamic));` — only spawn run_broadcast when publish returns true. Ensure the rollback targets the same producer entry created earlier so duplicate-path checks remain correct.rs/moq-lite/src/ietf/subscriber.rs (1)
415-423:⚠️ Potential issue | 🟠 Major | ⚡ Quick winDon't treat a rejected
publish()as a successful announce.This migration now calls
OriginProducer::publish(...), but the return value is ignored. If the origin rejects the publish,state.broadcastsstill records the path andrun_broadcaststill starts, so later announces on that path can be rejected as duplicates even though nothing was actually published. Please fail or roll back the entry whenpublish()returnsfalse.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-lite/src/ietf/subscriber.rs` around lines 415 - 423, The Vacant-branch currently ignores OriginProducer::publish(...)’s boolean result so a rejected publish still inserts a BroadcastState and returns a Broadcast; update the Entry::Vacant handling (the block that creates Broadcast::new().produce(), calls origin.publish(path.clone(), ...), inserts BroadcastState and returns broadcast) to check the publish return value and abort/roll back on false: if publish(...) returns false, do not insert into the map (do not create or record BroadcastState in state.broadcasts), do not start run_broadcast for that path, and return an error or a None/appropriate failure value to the caller instead of the broadcast; ensure you reference OriginProducer::publish, BroadcastState, Entry::Vacant and run_broadcast when making the change so the insert and broadcast-start are skipped/undone on rejection.rs/moq-boy/src/input.rs (1)
72-97:⚠️ Potential issue | 🟠 Major | ⚡ Quick winAvoid emitting
ViewerLeftdirectly onOriginUpdate::Ended.This can produce duplicate/spurious offline events during path handoff (
Endedmay be immediately followed byActive), and the spawned command task already emitsViewerLeftwhen the stream actually ends.Suggested adjustment
- let (path, broadcast) = match update { - moq_lite::OriginUpdate::Active(p, b) => (p, Some(b)), - moq_lite::OriginUpdate::Ended(p) => (p, None), - }; + let (path, broadcast) = match update { + moq_lite::OriginUpdate::Active(p, b) => (p, b), + moq_lite::OriginUpdate::Ended(p) => { + tracing::debug!(viewer_id = %p, "viewer broadcast ended"); + continue; + } + }; let viewer_id = path.to_string(); - if let Some(broadcast) = broadcast { + { tracing::info!(%viewer_id, "viewer connected"); let cmd_tx = cmd_tx.clone(); let vid = viewer_id.clone(); tokio::spawn(async move { if let Err(e) = handle_viewer_commands(&vid, broadcast, &cmd_tx).await { tracing::warn!(viewer_id = %vid, error = %e, "viewer command error"); } tracing::info!(viewer_id = %vid, "viewer disconnected"); let _ = cmd_tx.send(Command::ViewerLeft { viewer_id: vid }).await; }); - } else { - tracing::info!(%viewer_id, "viewer went offline"); - let _ = cmd_tx - .send(Command::ViewerLeft { - viewer_id: viewer_id.clone(), - }) - .await; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-boy/src/input.rs` around lines 72 - 97, Don't send Command::ViewerLeft when matching moq_lite::OriginUpdate::Ended; that causes duplicate/offline events because the spawned task from the Active case already sends ViewerLeft when the stream closes. In the match/if block around OriginUpdate::Active/Ended, keep the existing tokio::spawn + handle_viewer_commands flow for Active (including the send in the spawned task), but for the Ended branch remove the cmd_tx.send(Command::ViewerLeft { .. }).await and only emit a log (tracing::info! or tracing::debug!) for viewer offline; this ensures ViewerLeft is only emitted by the spawned task handling the broadcast and prevents spurious duplicate events.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@rs/libmoq/src/origin.rs`:
- Around line 113-126: The current scoped-consumer usage can return a descendant
(e.g., "foo/bar") first and incorrectly fail; after creating the consumer via
origin.consume().scope(&[path.as_path()]) you must drain the consumer (loop over
consumer.try_next()) until you find an OriginUpdate::Active where p.as_path() ==
path and then return Ok(b), or until try_next() returns None in which case
return Err(Error::BroadcastNotFound); keep the same origin.consume(),
scope(&[path.as_path()]), and matching on moq_lite::OriginUpdate::Active(p, b)
but iterate instead of a single try_next() call.
- Around line 135-137: The current wrapper in origin.rs calls
self.active.get_mut(origin) and then origin.publish(path, broadcast) but always
returns Ok(()), which masks rejections signaled by OriginProducer::publish()
returning false; update the code to check the boolean result of
OriginProducer::publish(path, broadcast) and propagate failure by returning an
Err variant (e.g., Err(Error::PublishRejected) or an appropriate existing Error)
when publish returns false, otherwise return Ok(()). Ensure you reference the
same origin variable from self.active.get_mut(origin) and preserve existing
error handling for get_mut (ok_or(Error::OriginNotFound)?).
---
Outside diff comments:
In `@rs/moq-boy/src/input.rs`:
- Around line 72-97: Don't send Command::ViewerLeft when matching
moq_lite::OriginUpdate::Ended; that causes duplicate/offline events because the
spawned task from the Active case already sends ViewerLeft when the stream
closes. In the match/if block around OriginUpdate::Active/Ended, keep the
existing tokio::spawn + handle_viewer_commands flow for Active (including the
send in the spawned task), but for the Ended branch remove the
cmd_tx.send(Command::ViewerLeft { .. }).await and only emit a log
(tracing::info! or tracing::debug!) for viewer offline; this ensures ViewerLeft
is only emitted by the spawned task handling the broadcast and prevents spurious
duplicate events.
In `@rs/moq-ffi/src/origin.rs`:
- Around line 118-123: The function announced_broadcast currently uses
self.inner.clone().with_root(path) which subscribes to the whole subtree and
returns the first Active update (via Announced::available), allowing nested
paths (e.g., "foo/bar") to satisfy a request for "foo". Change
announced_broadcast to obtain the exact broadcast origin for the requested path
by calling the consumer method that waits for an exact match
(OriginConsumer::wait_for_broadcast(path)) or, if you prefer keeping with_root,
capture and store the requested path and filter Announced::available events
until the update.path == requested_path; then construct the
MoqAnnouncedBroadcast using Task::new(Announced { inner: exact_origin }) and
return it as before.
In `@rs/moq-lite/src/ietf/subscriber.rs`:
- Around line 415-423: The Vacant-branch currently ignores
OriginProducer::publish(...)’s boolean result so a rejected publish still
inserts a BroadcastState and returns a Broadcast; update the Entry::Vacant
handling (the block that creates Broadcast::new().produce(), calls
origin.publish(path.clone(), ...), inserts BroadcastState and returns broadcast)
to check the publish return value and abort/roll back on false: if publish(...)
returns false, do not insert into the map (do not create or record
BroadcastState in state.broadcasts), do not start run_broadcast for that path,
and return an error or a None/appropriate failure value to the caller instead of
the broadcast; ensure you reference OriginProducer::publish, BroadcastState,
Entry::Vacant and run_broadcast when making the change so the insert and
broadcast-start are skipped/undone on rejection.
In `@rs/moq-lite/src/lite/subscriber.rs`:
- Around line 223-232: The code currently inserts the producer and then calls
OriginProducer::publish(...); if publish returns false you must roll back that
insertion and avoid spawning run_broadcast. Change the flow so after creating
`dynamic = broadcast.dynamic()` you call `let published =
self.origin.as_mut().unwrap().publish(path.clone(), broadcast.consume());` and
if `published` is false remove the just-inserted producer from `self.producers`
(or undo whatever created the local announce) and do not call
`web_async::spawn(self.clone().run_broadcast(path, dynamic));` — only spawn
run_broadcast when publish returns true. Ensure the rollback targets the same
producer entry created earlier so duplicate-path checks remain correct.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 3a30bf84-46cf-4da8-a44d-256ee6e5983f
📒 Files selected for processing (23)
rs/hang/examples/subscribe.rsrs/hang/examples/video.rsrs/libmoq/src/origin.rsrs/moq-boy/src/input.rsrs/moq-boy/src/main.rsrs/moq-cli/src/client.rsrs/moq-cli/src/server.rsrs/moq-clock/src/main.rsrs/moq-ffi/src/origin.rsrs/moq-gst/src/sink/imp.rsrs/moq-gst/src/source/imp.rsrs/moq-lite/src/ietf/publisher.rsrs/moq-lite/src/ietf/subscriber.rsrs/moq-lite/src/lite/publisher.rsrs/moq-lite/src/lite/subscriber.rsrs/moq-lite/src/model/broadcast.rsrs/moq-lite/src/model/origin.rsrs/moq-native/examples/chat.rsrs/moq-native/tests/backend.rsrs/moq-native/tests/broadcast.rsrs/moq-relay/src/cluster.rsrs/moq-relay/src/connection.rsrs/moq-relay/src/web.rs
# Conflicts: # rs/libmoq/src/origin.rs # rs/moq-clock/src/main.rs # rs/moq-lite/src/ietf/publisher.rs # rs/moq-lite/src/lite/publisher.rs # rs/moq-lite/src/model/broadcast.rs # rs/moq-lite/src/model/origin.rs # rs/moq-relay/src/cluster.rs # rs/moq-relay/src/web.rs
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
rs/moq-relay/src/web.rs (1)
465-474:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winSnapshot can include ended or duplicate paths.
The drain only inspects
Activeupdates, so if the queue contains any of:
Active(p, _)followed byEnded(p)—pis reported as currently announced even though it just ended;Active(p, b1)followed byEnded(p)andActive(p, b2)(the documented sequence when a path is replaced) —pappears twice in the response.Per
OriginUpdate's docs (rs/moq-lite/src/model/origin.rs:214-231),Activefor a replacement is preceded byEndedfor the same path, so duplicates are reachable in normal operation, not just edge cases.Tracking both variants in a small set keeps the HTTP
/announcedsnapshot consistent without changing its semantics.🛠️ Suggested fix
- let mut broadcasts = Vec::new(); - - while let Some(update) = origin.try_next() { - if let moq_lite::OriginUpdate::Active(suffix, _) = update { - broadcasts.push(suffix); - } - } - - Ok(broadcasts.iter().map(|p| p.to_string()).collect::<Vec<_>>().join("\n")) + let mut broadcasts = std::collections::BTreeSet::new(); + + while let Some(update) = origin.try_next() { + match update { + moq_lite::OriginUpdate::Active(suffix, _) => { + broadcasts.insert(suffix); + } + moq_lite::OriginUpdate::Ended(suffix) => { + broadcasts.remove(&suffix); + } + } + } + + Ok(broadcasts.iter().map(|p| p.to_string()).collect::<Vec<_>>().join("\n"))🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-relay/src/web.rs` around lines 465 - 474, The current drain loop only pushes OriginUpdate::Active suffixes into broadcasts, allowing ended or duplicate paths to appear; change the logic to maintain a small set (e.g., HashSet<String>) instead of Vec: when origin.try_next() yields OriginUpdate::Active(suffix, _) insert suffix.into() into the set, and when it yields OriginUpdate::Ended(suffix) remove that suffix from the set; after the loop, collect the set into a Vec (optionally sort for stable output) and join with "\n" instead of using broadcasts.iter(), ensuring the snapshot reflects currently announced (non-ended, non-duplicated) paths — update references around origin.try_next(), OriginUpdate::Active, OriginUpdate::Ended, and the broadcasts variable accordingly.rs/moq-lite/src/ietf/publisher.rs (1)
446-528:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftSynchronous PublishNamespace handshake serializes the entire announce loop.
Each
OriginUpdate::Activeopens a bidi stream and inline-awaits the peer's response (stream.reader.decode().await?/read_exact(...).await?on lines 479–481). Until that single response arrives, no other update — including subsequentActives and anyEnded— can be drained fromself.origin.next(). A slow or unresponsive peer therefore stalls all announce traffic on this session, and anEndedfor a never-acked namespace can sit in the queue indefinitely. Any decoding error or unexpected message (line 497, 501) also fails the entire session.Consider spawning a per-namespace task that owns the stream + handshake + cleanup, and have
run_announcedispatch updates to it (e.g., viaweb_async::spawnand a small registry keyed byPathOwned). This would decouple peer responsiveness from the update draining loop and improve overall liveness under slow/adversarial peers.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-lite/src/ietf/publisher.rs` around lines 446 - 528, The announce loop in run_announce blocks on the synchronous per-namespace handshake (calls like stream.reader.decode().await and read_exact(...) in the OriginUpdate::Active branch), which serializes the entire loop and lets any error abort the session; refactor so run_announce only dispatches updates and does not await the peer response: spawn a per-namespace task (e.g., via web_async::spawn) that takes ownership of the Stream, performs the PublishNamespace handshake and local decoding (previously done around stream.reader.decode()/read_exact()), inserts itself into a small registry keyed by PathOwned (previously namespace_streams), and handles cleanup on OriginUpdate::Ended by receiving a signal or lookup+close; ensure the per-namespace task catches and logs decoding/IO errors instead of returning Err from run_announce so a bad/slow peer won’t stall or kill the whole session.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rs/moq-boy/src/main.rs`:
- Line 222: The call to publish_origin.publish(&broadcast_path,
broadcast.consume()) currently ignores its boolean return; change it to assert
success using anyhow::ensure! so rejected publishes produce an error like
elsewhere (e.g., rs/moq-gst/src/sink/imp.rs). Specifically, after calling
publish_origin.publish(&broadcast_path, broadcast.consume()), use
anyhow::ensure!(result, "failed to publish broadcast for path {}: rejected by
OriginProducer", broadcast_path) (or equivalent) so a rejected publish returns
an error instead of silently proceeding; reference the publish_origin,
broadcast_path, and broadcast symbols when locating the code to modify.
In `@rs/moq-cli/src/client.rs`:
- Line 9: The call to origin.publish(&name, publish.consume()) currently ignores
the bool return that signals rejection; update the code to check the returned
bool and fail fast on rejection (e.g., use anyhow::ensure!(origin.publish(&name,
publish.consume()), "failed to publish {}: rejected by origin", name)) so that a
rejected publish does not let reconnect.closed() continue with no broadcast
registered; mirror the ensure pattern used around publish handling in the other
module to propagate the error instead of discarding the result.
In `@rs/moq-lite/src/lite/publisher.rs`:
- Around line 202-230: The Active branch may skip sending Announce::Active when
hops.push(self_origin).is_err(), but the Ended branch still sends
Announce::Ended and causes the subscriber to error; fix by tracking suppressed
paths: create a HashSet (e.g., suppressed_paths) in the same scope as the
announce loop, and when hops.push(self_origin).is_err() insert the full path (or
the same key used by subscriber) into suppressed_paths and continue; in the
OriginUpdate::Ended handler, check suppressed_paths.remove(&path) first and if
it was present, skip encoding/sending the Announce::Ended, otherwise proceed to
encode/send as before (ensuring you use the same path form used for
suppression/lookup).
---
Outside diff comments:
In `@rs/moq-lite/src/ietf/publisher.rs`:
- Around line 446-528: The announce loop in run_announce blocks on the
synchronous per-namespace handshake (calls like stream.reader.decode().await and
read_exact(...) in the OriginUpdate::Active branch), which serializes the entire
loop and lets any error abort the session; refactor so run_announce only
dispatches updates and does not await the peer response: spawn a per-namespace
task (e.g., via web_async::spawn) that takes ownership of the Stream, performs
the PublishNamespace handshake and local decoding (previously done around
stream.reader.decode()/read_exact()), inserts itself into a small registry keyed
by PathOwned (previously namespace_streams), and handles cleanup on
OriginUpdate::Ended by receiving a signal or lookup+close; ensure the
per-namespace task catches and logs decoding/IO errors instead of returning Err
from run_announce so a bad/slow peer won’t stall or kill the whole session.
In `@rs/moq-relay/src/web.rs`:
- Around line 465-474: The current drain loop only pushes OriginUpdate::Active
suffixes into broadcasts, allowing ended or duplicate paths to appear; change
the logic to maintain a small set (e.g., HashSet<String>) instead of Vec: when
origin.try_next() yields OriginUpdate::Active(suffix, _) insert suffix.into()
into the set, and when it yields OriginUpdate::Ended(suffix) remove that suffix
from the set; after the loop, collect the set into a Vec (optionally sort for
stable output) and join with "\n" instead of using broadcasts.iter(), ensuring
the snapshot reflects currently announced (non-ended, non-duplicated) paths —
update references around origin.try_next(), OriginUpdate::Active,
OriginUpdate::Ended, and the broadcasts variable accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: fe8ca23a-ae99-432f-8389-e2f3f2f976e0
📒 Files selected for processing (14)
rs/hang/examples/subscribe.rsrs/hang/examples/video.rsrs/libmoq/src/error.rsrs/libmoq/src/origin.rsrs/moq-boy/src/main.rsrs/moq-cli/src/client.rsrs/moq-cli/src/main.rsrs/moq-gst/src/sink/imp.rsrs/moq-gst/src/source/imp.rsrs/moq-lite/src/ietf/publisher.rsrs/moq-lite/src/ietf/subscriber.rsrs/moq-lite/src/lite/publisher.rsrs/moq-lite/src/lite/subscriber.rsrs/moq-relay/src/web.rs
… etc.) Reverts the method/type renames the rewrite introduced and restores the names already in use on main, since those are clearer and avoid churn at every existing call site. | PR rewrite name | Restored to main name | | ---------------------- | ---------------------- | | publish | publish_broadcast | | create | create_broadcast | | next (async) | announced | | try_next | try_announced | | wait_for_broadcast | announced_broadcast | | OriginUpdate | OriginAnnounce | The behavior (poll-driven conducer model, Active/Ended enum, is_closed and poll_closed primitives) is unchanged.
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
rs/moq-lite/src/lite/publisher.rs (1)
204-230:⚠️ Potential issue | 🟠 Major | ⚡ Quick winMAX_HOPS suppression still asymmetric — Active dropped but Ended still emitted.
When
hops.push(self_origin).is_err()at line 211, the Active is suppressed (line 216continue). However, the matchingOriginAnnounce::Ended(path)arm (lines 221-230) unconditionally encodesAnnounce::Ended. The peer inlite/subscriber.rsdoesproducers.remove(&path).ok_or(Error::NotFound)?for an Ended it has no record of, which propagatesError::NotFoundand aborts the announce stream.Track suppressed paths so the corresponding Ended is also suppressed (see prior review for sketch).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-lite/src/lite/publisher.rs` around lines 204 - 230, When hops.push(self_origin).is_err() suppresses an Active announce in the OriginAnnounce handling, also record that suffix/path in a local suppressed set so the matching OriginAnnounce::Ended handling checks and skips encoding lite::Announce::Ended for those suppressed paths; specifically, add a HashSet (e.g., suppressed_paths) in the publisher scope, insert the suffix/path when the Active branch hits the hops.push(...) error, and in the OriginAnnounce::Ended branch check suppressed_paths first — if present, remove it and continue (skip encoding), otherwise proceed to create and encode lite::Announce::Ended as before; this keeps behavior symmetric with subscriber logic (which expects Ended only for known producers removed via producers.remove).
🧹 Nitpick comments (3)
rs/moq-lite/src/model/origin.rs (2)
387-401: 💤 Low valuePer-consumer pending queues are unbounded.
distributedoes an unboundedpush_backper consumer with no high-water mark. This is an intentional trade-off vs the previous mpsc design (per the PR description, avoids the tokio 127-message bug), and announcement events are typically low-rate, so this is fine in practice. Worth a brief comment nearConsumerQueuedocumenting the choice so future contributors don't reintroduce a bound that breaks the no-blocking guarantee forpublish.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-lite/src/model/origin.rs` around lines 387 - 401, The per-consumer pending queues are intentionally unbounded to avoid blocking/pushing back in publish (see distribute) — add a concise comment on the ConsumerQueue type (and/or its pending field) stating that pending is intentionally unbounded to preserve the no-blocking guarantee for publish, explaining the trade-off vs bounded channels (mentioning the tokio 127-message mpsc issue) and that this is deliberate to avoid reintroducing a capacity that could block; reference ConsumerQueue and distribute in the comment so future contributors understand the rationale.
688-743: 💤 Low valuePhase 4/5 waiter registration may accumulate across iterations.
When
close_detectedtriggers acontinueback to Phase 1, thewaiterwas already registered both globally (Phase 4) and on one or morepoll_closedslots (Phase 5). Re-registration in the next iteration will append duplicate entries rather than replace them, sinceconducer::Waiter::registerappends newWeak<Waker>slots instead of deduplicating by waker identity. This results in redundant wake-ups on the same waiter—still correct, but wasteful of space in theWaiterList.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-lite/src/model/origin.rs` around lines 688 - 743, The waiter can get registered twice across loops because Phase 4 registers globally before Phase 5 checks per-active closures; move the global registration so it happens after the loop that calls entry.active.poll_closed (i.e., perform the per-active poll_closed checks first, and only call waiter.register(&mut state.waiters) if no close_detected), ensuring poll_announced doesn't append duplicate waiter entries when it continues the loop; adjust the code in poll_announced around the waiter.register(&mut state.waiters) and the for (path, entry) .. entry.active.poll_closed(...) section accordingly.rs/moq-native/tests/broadcast.rs (1)
81-91: 💤 Low valueLGTM — consistent enum-match migration across all three transports.
The
Activebranch verifies the announced path and yieldsbc; theEndedarm is a fail-fast assertion, which is appropriate for a smoke test that publishes once and expects exactly one Active update.Minor nit (skip if not interesting): the same 7-line match is now copy-pasted three times. Extracting a small helper like
await_active(announcements, "test")would centralize the assertion message and keep future API drift to one site. Not worth blocking on.Also applies to: 425-435, 536-546
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-native/tests/broadcast.rs` around lines 81 - 91, Extract the repeated 7-line match into a small helper (e.g., await_active) that takes the announcements stream/handle and an expected path string (and optionally TIMEOUT) and performs the tokio::time::timeout(...).await.expect(...).expect(...), matches on moq_lite::OriginAnnounce::{Active(path, bc) | Ended(path)}, asserts path == expected and returns bc; then replace the three inline match blocks with calls to await_active(announcements, "test") (or the appropriate expected path) to centralize the assertion and reduce duplication.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rs/libmoq/src/origin.rs`:
- Around line 112-130: Update the workspace MSRV to at least Rust 1.88 in the
root Cargo.toml so the let-chain syntax used in the consume function compiles
under the declared toolchain, and modify the path handling in
rs/libmoq/src/origin.rs: after calling path.as_path() to obtain a Path<'_> (the
local variable named path), remove the redundant path.as_path() call and pass
the path directly to origin.consume().scope (e.g. use &[path] or &[path.clone()]
as needed to satisfy borrowing/lifetime requirements) so scope receives a slice
of Path without the extra conversion.
In `@rs/moq-lite/src/ietf/publisher.rs`:
- Around line 511-523: The match on Version should treat the newest-draft
behavior as the default so future drafts inherit it: change the arm that
explicitly lists Version::Draft17 to a wildcard default and keep the older
drafts explicitly listed; i.e., have Version::Draft14 | Version::Draft15 |
Version::Draft16 => send the ietf::PublishNamespaceDone (using
stream.writer.encode_message with suffix.as_path() and request_id), and replace
Version::Draft17 => {} with _ => {} so Draft17+ default to the no-op behavior.
---
Duplicate comments:
In `@rs/moq-lite/src/lite/publisher.rs`:
- Around line 204-230: When hops.push(self_origin).is_err() suppresses an Active
announce in the OriginAnnounce handling, also record that suffix/path in a local
suppressed set so the matching OriginAnnounce::Ended handling checks and skips
encoding lite::Announce::Ended for those suppressed paths; specifically, add a
HashSet (e.g., suppressed_paths) in the publisher scope, insert the suffix/path
when the Active branch hits the hops.push(...) error, and in the
OriginAnnounce::Ended branch check suppressed_paths first — if present, remove
it and continue (skip encoding), otherwise proceed to create and encode
lite::Announce::Ended as before; this keeps behavior symmetric with subscriber
logic (which expects Ended only for known producers removed via
producers.remove).
---
Nitpick comments:
In `@rs/moq-lite/src/model/origin.rs`:
- Around line 387-401: The per-consumer pending queues are intentionally
unbounded to avoid blocking/pushing back in publish (see distribute) — add a
concise comment on the ConsumerQueue type (and/or its pending field) stating
that pending is intentionally unbounded to preserve the no-blocking guarantee
for publish, explaining the trade-off vs bounded channels (mentioning the tokio
127-message mpsc issue) and that this is deliberate to avoid reintroducing a
capacity that could block; reference ConsumerQueue and distribute in the comment
so future contributors understand the rationale.
- Around line 688-743: The waiter can get registered twice across loops because
Phase 4 registers globally before Phase 5 checks per-active closures; move the
global registration so it happens after the loop that calls
entry.active.poll_closed (i.e., perform the per-active poll_closed checks first,
and only call waiter.register(&mut state.waiters) if no close_detected),
ensuring poll_announced doesn't append duplicate waiter entries when it
continues the loop; adjust the code in poll_announced around the
waiter.register(&mut state.waiters) and the for (path, entry) ..
entry.active.poll_closed(...) section accordingly.
In `@rs/moq-native/tests/broadcast.rs`:
- Around line 81-91: Extract the repeated 7-line match into a small helper
(e.g., await_active) that takes the announcements stream/handle and an expected
path string (and optionally TIMEOUT) and performs the
tokio::time::timeout(...).await.expect(...).expect(...), matches on
moq_lite::OriginAnnounce::{Active(path, bc) | Ended(path)}, asserts path ==
expected and returns bc; then replace the three inline match blocks with calls
to await_active(announcements, "test") (or the appropriate expected path) to
centralize the assertion and reduce duplication.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: d989bb11-ca4c-490f-a6bb-f12532ea0511
📒 Files selected for processing (12)
rs/hang/examples/subscribe.rsrs/libmoq/src/origin.rsrs/moq-boy/src/input.rsrs/moq-clock/src/main.rsrs/moq-ffi/src/origin.rsrs/moq-lite/src/ietf/publisher.rsrs/moq-lite/src/lite/publisher.rsrs/moq-lite/src/lite/subscriber.rsrs/moq-lite/src/model/origin.rsrs/moq-native/tests/backend.rsrs/moq-native/tests/broadcast.rsrs/moq-relay/src/web.rs
| pub fn consume<P: moq_lite::AsPath>(&mut self, origin: Id, path: P) -> Result<moq_lite::BroadcastConsumer, Error> { | ||
| use moq_lite::AsPath; | ||
| let origin = self.active.get_mut(origin).ok_or(Error::OriginNotFound)?; | ||
| // TODO: expose an async variant backed by `announced_broadcast` so FFI callers can wait | ||
| // for gossip instead of racing it. | ||
| // Uses the deprecated direct lookup to avoid the per-call cost of OriginProducer::consume(). | ||
| #[allow(deprecated)] | ||
| origin.get_broadcast(path).ok_or(Error::BroadcastNotFound) | ||
| // TODO: expose an async variant so FFI callers can wait for gossip instead of racing it. | ||
| // Scope is prefix-based, so drain the replay queue until we find an exact path match. | ||
| let path = path.as_path(); | ||
| let mut consumer = origin | ||
| .consume() | ||
| .scope(&[path.as_path()]) | ||
| .ok_or(Error::BroadcastNotFound)?; | ||
| while let Some(update) = consumer.try_announced() { | ||
| if let moq_lite::OriginAnnounce::Active(p, b) = update | ||
| && p.as_path() == path | ||
| { | ||
| return Ok(b); | ||
| } | ||
| } | ||
| Err(Error::BroadcastNotFound) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Confirm every crate using let-chains is on edition 2024 and that workspace MSRV is >= 1.88.
fd -t f -e toml | xargs rg -nP '^\s*(edition|rust-version)\s*='Repository: moq-dev/moq
Length of output: 763
🏁 Script executed:
#!/bin/bash
# Check if moq-gst depends on libmoq
grep -E "libmoq|moq_lite" rs/moq-gst/Cargo.toml
# Also get details on the moq_lite::AsPath trait to verify the redundancy claim
fd -t f -name "*.rs" -path "*/moq_lite/*" | xargs rg -l "trait AsPath" | head -1 | xargs cat -nRepository: moq-dev/moq
Length of output: 34780
🏁 Script executed:
#!/bin/bash
# Check moq-gst Cargo.toml for dependencies on libmoq or moq_lite
cat rs/moq-gst/Cargo.toml | grep -E "libmoq|moq.lite"Repository: moq-dev/moq
Length of output: 87
Update workspace MSRV from 1.85 to 1.88 or higher to support the let-chain syntax in lines 123–125.
Let chains were stabilized in Rust 1.88 and require edition 2024. This code is on the correct edition, but the workspace declares rust-version = "1.85" in the root Cargo.toml, which is incompatible. Any attempt to build with Rust 1.85–1.87 will fail with a let-chain feature error. Update the workspace MSRV to at least 1.88.
Additionally, path.as_path() on line 120 is redundant — path is already a Path<'_> after line 117, so use &[path] instead (or &[path.clone()] if needed).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/libmoq/src/origin.rs` around lines 112 - 130, Update the workspace MSRV to
at least Rust 1.88 in the root Cargo.toml so the let-chain syntax used in the
consume function compiles under the declared toolchain, and modify the path
handling in rs/libmoq/src/origin.rs: after calling path.as_path() to obtain a
Path<'_> (the local variable named path), remove the redundant path.as_path()
call and pass the path directly to origin.consume().scope (e.g. use &[path] or
&[path.clone()] as needed to satisfy borrowing/lifetime requirements) so scope
receives a slice of Path without the extra conversion.
- lite/publisher.rs: when an Active is dropped because the hop chain hits
MAX_HOPS, also drop the matching Ended. The subscriber treats Ended for
an unknown path as Error::NotFound, so the asymmetric pair was a real
protocol error.
- ietf/publisher.rs: replace `Version::Draft17 => {}` arms with `_ => {}`
so future drafts inherit v17+ behavior by default (per CLAUDE.md).
- libmoq/origin.rs: split the let-chain into nested if so the code stays
within the workspace MSRV (1.85; let-chains require 1.88).
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rs/moq-lite/src/ietf/publisher.rs (1)
597-610:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winInitial drain silently drops
Ended, leaking stale namespaces to the peer.
try_announced()returns updates in queue order, so a path that went Active→Ended before this drain runs will yield both events back-to-back. Theif lethere handles onlyActive, so the code emitsNamespace(A)and then dropsEnded(A)on the floor. Since the originalEnded(A)was already consumed from the consumer queue, the streaming loop below will never re-emit it, and the peer is left with a permanently stale namespace until some other event happens to A.The streaming loop (lines 619-630) handles both variants symmetrically; the init drain should too.
🔧 Suggested fix
// Send initial NAMESPACE messages for currently active namespaces while let Some(update) = origin.try_announced() { - if let crate::OriginAnnounce::Active(path, _) = update { - let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path"); - tracing::debug!(broadcast = %origin.absolute(&path), "namespace"); - stream.writer.encode(&ietf::Namespace::ID).await?; - stream - .writer - .encode(&ietf::Namespace { - suffix: suffix.to_owned(), - }) - .await?; - } + match update { + crate::OriginAnnounce::Active(path, _) => { + let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path").to_owned(); + tracing::debug!(broadcast = %origin.absolute(&path), "namespace"); + stream.writer.encode(&ietf::Namespace::ID).await?; + stream.writer.encode(&ietf::Namespace { suffix }).await?; + } + crate::OriginAnnounce::Ended(path) => { + let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path").to_owned(); + tracing::debug!(broadcast = %origin.absolute(&path), "namespace_done"); + stream.writer.encode(&ietf::NamespaceDone::ID).await?; + stream.writer.encode(&ietf::NamespaceDone { suffix }).await?; + } + } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-lite/src/ietf/publisher.rs` around lines 597 - 610, The initial drain currently only handles OriginAnnounce::Active and silently drops OriginAnnounce::Ended, leaking stale namespaces; update the while let loop over origin.try_announced() to match both OriginAnnounce::Active(path, _) and OriginAnnounce::Ended(path, _) and for the Ended case emit the same pair of messages (stream.writer.encode(&ietf::Namespace::ID) and stream.writer.encode(&ietf::Namespace { suffix: suffix.to_owned() }).await?) as the Active branch so the peer receives the Ended notification just like the streaming loop does; use the same suffix extraction via path.strip_prefix(&prefix).expect(...) and the same tracing/logging symbol (origin.absolute(&path)) to keep behavior consistent.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rs/libmoq/src/origin.rs`:
- Line 82: The insert into state.origin.announced (announced_id =
state.origin.announced.insert((path.to_string(), active))?) creates unbounded
growth because entries are never removed; fix by ensuring every announced_id is
removed or evicted after being consumed—either call
state.origin.announced.remove(announced_id) (or equivalent) in the consumer path
where the announcement is processed, or replace the backing storage with a
bounded/evicting structure (e.g., an LRU or time-windowed cache) and add pruning
logic; locate uses of announced_id and the consumer functions that read
announcements and add a removal/prune step to reclaim entries.
---
Outside diff comments:
In `@rs/moq-lite/src/ietf/publisher.rs`:
- Around line 597-610: The initial drain currently only handles
OriginAnnounce::Active and silently drops OriginAnnounce::Ended, leaking stale
namespaces; update the while let loop over origin.try_announced() to match both
OriginAnnounce::Active(path, _) and OriginAnnounce::Ended(path, _) and for the
Ended case emit the same pair of messages
(stream.writer.encode(&ietf::Namespace::ID) and
stream.writer.encode(&ietf::Namespace { suffix: suffix.to_owned() }).await?) as
the Active branch so the peer receives the Ended notification just like the
streaming loop does; use the same suffix extraction via
path.strip_prefix(&prefix).expect(...) and the same tracing/logging symbol
(origin.absolute(&path)) to keep behavior consistent.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: c364712f-96f3-40b8-bff2-c9ee4388048e
📒 Files selected for processing (3)
rs/libmoq/src/origin.rsrs/moq-lite/src/ietf/publisher.rsrs/moq-lite/src/lite/publisher.rs
| let callback = entry.callback; | ||
|
|
||
| let announced_id = state.origin.announced.insert((path.to_string(), broadcast.is_some()))?; | ||
| let announced_id = state.origin.announced.insert((path.to_string(), active))?; |
There was a problem hiding this comment.
Unbounded growth risk in origin.announced storage.
Each announce update inserts a new (String, bool) entry, but this module exposes no corresponding removal path for consumed announcement IDs. On long-lived/high-churn origins, this can grow indefinitely and eventually exhaust memory/IDs.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/libmoq/src/origin.rs` at line 82, The insert into state.origin.announced
(announced_id = state.origin.announced.insert((path.to_string(), active))?)
creates unbounded growth because entries are never removed; fix by ensuring
every announced_id is removed or evicted after being consumed—either call
state.origin.announced.remove(announced_id) (or equivalent) in the consumer path
where the announcement is processed, or replace the backing storage with a
bounded/evicting structure (e.g., an LRU or time-windowed cache) and add pruning
logic; locate uses of announced_id and the consumer functions that read
announcements and add a removal/prune step to reclaim entries.
Summary
Replaces the
OriginNode/NotifyNodetree, per-publishweb_async::spawncleanup, and per-consumermpscfan-out with a flatHashMap<PathOwned, Entry>behind aMutexplus per-consumer queues. Consumers register a singleconducer::Waiteron both the shared state and each tracked broadcast'spoll_closed, so a broadcast closing wakes its consumers directly: no spawned cleanup tasks, notokio::time::sleep(1ms)in tests, no 127-message tokio-mpsc bug.Active/Ended semantics, shortest-hop preference,
is_clonededup, and "newer wins ties" are all preserved. Active selection stays in the producer so relay forwarders don't have to reimplement the bookkeeping.OriginAnnounceis now an enum (Active(path, broadcast)/Ended(path)) instead of a tuple, but the existing method names on main (publish_broadcast,create_broadcast,announced,try_announced,announced_broadcast,scope,with_root) are preserved so call sites only had to change at the match.Also exposes
BroadcastConsumer::poll_closedandis_closedso callers that need to compose close-detection without spawning have a primitive.Test plan
cargo test -p moq-lite --lib model::origin— origin tests passcargo test --workspace— all unit + integration tests passcargo build --workspacejust check🤖 Generated with Claude Code