Skip to content

moq-lite: rewrite Origin as a poll-driven, conducer-based model#1358

Open
kixelated wants to merge 4 commits into
mainfrom
origin-rewrite
Open

moq-lite: rewrite Origin as a poll-driven, conducer-based model#1358
kixelated wants to merge 4 commits into
mainfrom
origin-rewrite

Conversation

@kixelated
Copy link
Copy Markdown
Collaborator

@kixelated kixelated commented Apr 28, 2026

Summary

Replaces the OriginNode/NotifyNode tree, per-publish web_async::spawn cleanup, and per-consumer mpsc fan-out with a flat HashMap<PathOwned, Entry> behind a Mutex plus per-consumer queues. Consumers register a single conducer::Waiter on both the shared state and each tracked broadcast's poll_closed, so a broadcast closing wakes its consumers directly: no spawned cleanup tasks, no tokio::time::sleep(1ms) in tests, no 127-message tokio-mpsc bug.

Active/Ended semantics, shortest-hop preference, is_clone dedup, and "newer wins ties" are all preserved. Active selection stays in the producer so relay forwarders don't have to reimplement the bookkeeping.

OriginAnnounce is now an enum (Active(path, broadcast) / Ended(path)) instead of a tuple, but the existing method names on main (publish_broadcast, create_broadcast, announced, try_announced, announced_broadcast, scope, with_root) are preserved so call sites only had to change at the match.

Also exposes BroadcastConsumer::poll_closed and is_closed so callers that need to compose close-detection without spawning have a primitive.

Test plan

  • cargo test -p moq-lite --lib model::origin — origin tests pass
  • cargo test --workspace — all unit + integration tests pass
  • cargo build --workspace
  • just check
  • Spot-check relay + cli end-to-end (manual)

🤖 Generated with Claude Code

Replace the OriginNode/NotifyNode tree, per-publish web_async::spawn
cleanup, and per-consumer mpsc fan-out with a flat HashMap behind a
parking-lot-style Mutex plus per-consumer queues. Wakers register on
both the global state and each tracked broadcast's `poll_closed`, so
broadcast closures wake consumers directly — no spawned cleanup tasks
and no more `tokio::time::sleep(1ms)` in tests.

Renames (with one-line migrations across the workspace):
  publish_broadcast    -> publish
  create_broadcast     -> create
  consume_broadcast    -> dropped (use wait_for_broadcast / try_next)
  publish_only         -> scope
  consume_only         -> scope (on OriginConsumer)
  announced            -> next  (returns OriginUpdate enum)
  try_announced        -> try_next
  announced_broadcast  -> wait_for_broadcast

Active/Ended semantics, shortest-hop preference, is_clone dedup, and
"newer wins ties" all preserved. Active selection still lives in the
producer so relay forwarders don't have to reimplement it.

Also exposes BroadcastConsumer::poll_closed and is_closed so callers
that need to compose close-detection without spawning have a primitive.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@kixelated kixelated marked this pull request as ready for review April 30, 2026 17:50
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 30, 2026

Walkthrough

The PR rewrites origin announcement handling from a tuple-based API to an enum-driven update model (Active/Ended). The core origin model is reworked around a centralized, mutex-guarded State with per-consumer queues and scope-based filtering. Origin producer/consumer methods and flows are replaced to emit and consume explicit announce updates; consumer call sites and examples are updated to pattern-match the new variants. A new BroadcastRejected error variant is added.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main change: rewriting Origin as a poll-driven, conducer-based model, which aligns with the comprehensive refactoring across the codebase.
Description check ✅ Passed The description is well-detailed, explaining the architectural changes from tree/spawn-based approach to HashMap/Mutex/conducer-based model, preserving existing semantics, and covering the enum change to OriginAnnounce.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch origin-rewrite
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch origin-rewrite

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
rs/moq-ffi/src/origin.rs (1)

118-123: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

This no longer waits for the exact broadcast path.

with_root(path) subscribes to the whole subtree under path, and Announced::available() returns the first Active update it sees. That means announced_broadcast("foo") can resolve with foo/bar if the nested broadcast arrives first. Please drive this through OriginConsumer::wait_for_broadcast(path) or store the requested path and filter for an exact match.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-ffi/src/origin.rs` around lines 118 - 123, The function
announced_broadcast currently uses self.inner.clone().with_root(path) which
subscribes to the whole subtree and returns the first Active update (via
Announced::available), allowing nested paths (e.g., "foo/bar") to satisfy a
request for "foo". Change announced_broadcast to obtain the exact broadcast
origin for the requested path by calling the consumer method that waits for an
exact match (OriginConsumer::wait_for_broadcast(path)) or, if you prefer keeping
with_root, capture and store the requested path and filter Announced::available
events until the update.path == requested_path; then construct the
MoqAnnouncedBroadcast using Task::new(Announced { inner: exact_origin }) and
return it as before.
rs/moq-lite/src/lite/subscriber.rs (1)

223-232: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Handle rejected origin publishes before registering the announce locally.

OriginProducer::publish() can return false here, but this path still keeps the entry in producers and spawns run_broadcast. If the publish was rejected, the announce becomes locally “active” even though no origin consumer can ever see it, and a later real announce on the same path will trip the duplicate-path check. Roll back the inserted producer and skip spawning when publish() returns false.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/lite/subscriber.rs` around lines 223 - 232, The code
currently inserts the producer and then calls OriginProducer::publish(...); if
publish returns false you must roll back that insertion and avoid spawning
run_broadcast. Change the flow so after creating `dynamic = broadcast.dynamic()`
you call `let published = self.origin.as_mut().unwrap().publish(path.clone(),
broadcast.consume());` and if `published` is false remove the just-inserted
producer from `self.producers` (or undo whatever created the local announce) and
do not call `web_async::spawn(self.clone().run_broadcast(path, dynamic));` —
only spawn run_broadcast when publish returns true. Ensure the rollback targets
the same producer entry created earlier so duplicate-path checks remain correct.
rs/moq-lite/src/ietf/subscriber.rs (1)

415-423: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don't treat a rejected publish() as a successful announce.

This migration now calls OriginProducer::publish(...), but the return value is ignored. If the origin rejects the publish, state.broadcasts still records the path and run_broadcast still starts, so later announces on that path can be rejected as duplicates even though nothing was actually published. Please fail or roll back the entry when publish() returns false.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/ietf/subscriber.rs` around lines 415 - 423, The Vacant-branch
currently ignores OriginProducer::publish(...)’s boolean result so a rejected
publish still inserts a BroadcastState and returns a Broadcast; update the
Entry::Vacant handling (the block that creates Broadcast::new().produce(), calls
origin.publish(path.clone(), ...), inserts BroadcastState and returns broadcast)
to check the publish return value and abort/roll back on false: if publish(...)
returns false, do not insert into the map (do not create or record
BroadcastState in state.broadcasts), do not start run_broadcast for that path,
and return an error or a None/appropriate failure value to the caller instead of
the broadcast; ensure you reference OriginProducer::publish, BroadcastState,
Entry::Vacant and run_broadcast when making the change so the insert and
broadcast-start are skipped/undone on rejection.
rs/moq-boy/src/input.rs (1)

72-97: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Avoid emitting ViewerLeft directly on OriginUpdate::Ended.

This can produce duplicate/spurious offline events during path handoff (Ended may be immediately followed by Active), and the spawned command task already emits ViewerLeft when the stream actually ends.

Suggested adjustment
-		let (path, broadcast) = match update {
-			moq_lite::OriginUpdate::Active(p, b) => (p, Some(b)),
-			moq_lite::OriginUpdate::Ended(p) => (p, None),
-		};
+		let (path, broadcast) = match update {
+			moq_lite::OriginUpdate::Active(p, b) => (p, b),
+			moq_lite::OriginUpdate::Ended(p) => {
+				tracing::debug!(viewer_id = %p, "viewer broadcast ended");
+				continue;
+			}
+		};

 		let viewer_id = path.to_string();

-		if let Some(broadcast) = broadcast {
+		{
 			tracing::info!(%viewer_id, "viewer connected");
 			let cmd_tx = cmd_tx.clone();
 			let vid = viewer_id.clone();
 			tokio::spawn(async move {
 				if let Err(e) = handle_viewer_commands(&vid, broadcast, &cmd_tx).await {
 					tracing::warn!(viewer_id = %vid, error = %e, "viewer command error");
 				}
 				tracing::info!(viewer_id = %vid, "viewer disconnected");
 				let _ = cmd_tx.send(Command::ViewerLeft { viewer_id: vid }).await;
 			});
-		} else {
-			tracing::info!(%viewer_id, "viewer went offline");
-			let _ = cmd_tx
-				.send(Command::ViewerLeft {
-					viewer_id: viewer_id.clone(),
-				})
-				.await;
 		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-boy/src/input.rs` around lines 72 - 97, Don't send Command::ViewerLeft
when matching moq_lite::OriginUpdate::Ended; that causes duplicate/offline
events because the spawned task from the Active case already sends ViewerLeft
when the stream closes. In the match/if block around OriginUpdate::Active/Ended,
keep the existing tokio::spawn + handle_viewer_commands flow for Active
(including the send in the spawned task), but for the Ended branch remove the
cmd_tx.send(Command::ViewerLeft { .. }).await and only emit a log
(tracing::info! or tracing::debug!) for viewer offline; this ensures ViewerLeft
is only emitted by the spawned task handling the broadcast and prevents spurious
duplicate events.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rs/libmoq/src/origin.rs`:
- Around line 113-126: The current scoped-consumer usage can return a descendant
(e.g., "foo/bar") first and incorrectly fail; after creating the consumer via
origin.consume().scope(&[path.as_path()]) you must drain the consumer (loop over
consumer.try_next()) until you find an OriginUpdate::Active where p.as_path() ==
path and then return Ok(b), or until try_next() returns None in which case
return Err(Error::BroadcastNotFound); keep the same origin.consume(),
scope(&[path.as_path()]), and matching on moq_lite::OriginUpdate::Active(p, b)
but iterate instead of a single try_next() call.
- Around line 135-137: The current wrapper in origin.rs calls
self.active.get_mut(origin) and then origin.publish(path, broadcast) but always
returns Ok(()), which masks rejections signaled by OriginProducer::publish()
returning false; update the code to check the boolean result of
OriginProducer::publish(path, broadcast) and propagate failure by returning an
Err variant (e.g., Err(Error::PublishRejected) or an appropriate existing Error)
when publish returns false, otherwise return Ok(()). Ensure you reference the
same origin variable from self.active.get_mut(origin) and preserve existing
error handling for get_mut (ok_or(Error::OriginNotFound)?).

---

Outside diff comments:
In `@rs/moq-boy/src/input.rs`:
- Around line 72-97: Don't send Command::ViewerLeft when matching
moq_lite::OriginUpdate::Ended; that causes duplicate/offline events because the
spawned task from the Active case already sends ViewerLeft when the stream
closes. In the match/if block around OriginUpdate::Active/Ended, keep the
existing tokio::spawn + handle_viewer_commands flow for Active (including the
send in the spawned task), but for the Ended branch remove the
cmd_tx.send(Command::ViewerLeft { .. }).await and only emit a log
(tracing::info! or tracing::debug!) for viewer offline; this ensures ViewerLeft
is only emitted by the spawned task handling the broadcast and prevents spurious
duplicate events.

In `@rs/moq-ffi/src/origin.rs`:
- Around line 118-123: The function announced_broadcast currently uses
self.inner.clone().with_root(path) which subscribes to the whole subtree and
returns the first Active update (via Announced::available), allowing nested
paths (e.g., "foo/bar") to satisfy a request for "foo". Change
announced_broadcast to obtain the exact broadcast origin for the requested path
by calling the consumer method that waits for an exact match
(OriginConsumer::wait_for_broadcast(path)) or, if you prefer keeping with_root,
capture and store the requested path and filter Announced::available events
until the update.path == requested_path; then construct the
MoqAnnouncedBroadcast using Task::new(Announced { inner: exact_origin }) and
return it as before.

In `@rs/moq-lite/src/ietf/subscriber.rs`:
- Around line 415-423: The Vacant-branch currently ignores
OriginProducer::publish(...)’s boolean result so a rejected publish still
inserts a BroadcastState and returns a Broadcast; update the Entry::Vacant
handling (the block that creates Broadcast::new().produce(), calls
origin.publish(path.clone(), ...), inserts BroadcastState and returns broadcast)
to check the publish return value and abort/roll back on false: if publish(...)
returns false, do not insert into the map (do not create or record
BroadcastState in state.broadcasts), do not start run_broadcast for that path,
and return an error or a None/appropriate failure value to the caller instead of
the broadcast; ensure you reference OriginProducer::publish, BroadcastState,
Entry::Vacant and run_broadcast when making the change so the insert and
broadcast-start are skipped/undone on rejection.

In `@rs/moq-lite/src/lite/subscriber.rs`:
- Around line 223-232: The code currently inserts the producer and then calls
OriginProducer::publish(...); if publish returns false you must roll back that
insertion and avoid spawning run_broadcast. Change the flow so after creating
`dynamic = broadcast.dynamic()` you call `let published =
self.origin.as_mut().unwrap().publish(path.clone(), broadcast.consume());` and
if `published` is false remove the just-inserted producer from `self.producers`
(or undo whatever created the local announce) and do not call
`web_async::spawn(self.clone().run_broadcast(path, dynamic));` — only spawn
run_broadcast when publish returns true. Ensure the rollback targets the same
producer entry created earlier so duplicate-path checks remain correct.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 3a30bf84-46cf-4da8-a44d-256ee6e5983f

📥 Commits

Reviewing files that changed from the base of the PR and between 68b0795 and 773c0cc.

📒 Files selected for processing (23)
  • rs/hang/examples/subscribe.rs
  • rs/hang/examples/video.rs
  • rs/libmoq/src/origin.rs
  • rs/moq-boy/src/input.rs
  • rs/moq-boy/src/main.rs
  • rs/moq-cli/src/client.rs
  • rs/moq-cli/src/server.rs
  • rs/moq-clock/src/main.rs
  • rs/moq-ffi/src/origin.rs
  • rs/moq-gst/src/sink/imp.rs
  • rs/moq-gst/src/source/imp.rs
  • rs/moq-lite/src/ietf/publisher.rs
  • rs/moq-lite/src/ietf/subscriber.rs
  • rs/moq-lite/src/lite/publisher.rs
  • rs/moq-lite/src/lite/subscriber.rs
  • rs/moq-lite/src/model/broadcast.rs
  • rs/moq-lite/src/model/origin.rs
  • rs/moq-native/examples/chat.rs
  • rs/moq-native/tests/backend.rs
  • rs/moq-native/tests/broadcast.rs
  • rs/moq-relay/src/cluster.rs
  • rs/moq-relay/src/connection.rs
  • rs/moq-relay/src/web.rs

Comment thread rs/libmoq/src/origin.rs
Comment thread rs/libmoq/src/origin.rs
kixelated added a commit that referenced this pull request May 5, 2026
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
# Conflicts:
#	rs/libmoq/src/origin.rs
#	rs/moq-clock/src/main.rs
#	rs/moq-lite/src/ietf/publisher.rs
#	rs/moq-lite/src/lite/publisher.rs
#	rs/moq-lite/src/model/broadcast.rs
#	rs/moq-lite/src/model/origin.rs
#	rs/moq-relay/src/cluster.rs
#	rs/moq-relay/src/web.rs
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
rs/moq-relay/src/web.rs (1)

465-474: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Snapshot can include ended or duplicate paths.

The drain only inspects Active updates, so if the queue contains any of:

  • Active(p, _) followed by Ended(p)p is reported as currently announced even though it just ended;
  • Active(p, b1) followed by Ended(p) and Active(p, b2) (the documented sequence when a path is replaced) — p appears twice in the response.

Per OriginUpdate's docs (rs/moq-lite/src/model/origin.rs:214-231), Active for a replacement is preceded by Ended for the same path, so duplicates are reachable in normal operation, not just edge cases.

Tracking both variants in a small set keeps the HTTP /announced snapshot consistent without changing its semantics.

🛠️ Suggested fix
-	let mut broadcasts = Vec::new();
-
-	while let Some(update) = origin.try_next() {
-		if let moq_lite::OriginUpdate::Active(suffix, _) = update {
-			broadcasts.push(suffix);
-		}
-	}
-
-	Ok(broadcasts.iter().map(|p| p.to_string()).collect::<Vec<_>>().join("\n"))
+	let mut broadcasts = std::collections::BTreeSet::new();
+
+	while let Some(update) = origin.try_next() {
+		match update {
+			moq_lite::OriginUpdate::Active(suffix, _) => {
+				broadcasts.insert(suffix);
+			}
+			moq_lite::OriginUpdate::Ended(suffix) => {
+				broadcasts.remove(&suffix);
+			}
+		}
+	}
+
+	Ok(broadcasts.iter().map(|p| p.to_string()).collect::<Vec<_>>().join("\n"))
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-relay/src/web.rs` around lines 465 - 474, The current drain loop only
pushes OriginUpdate::Active suffixes into broadcasts, allowing ended or
duplicate paths to appear; change the logic to maintain a small set (e.g.,
HashSet<String>) instead of Vec: when origin.try_next() yields
OriginUpdate::Active(suffix, _) insert suffix.into() into the set, and when it
yields OriginUpdate::Ended(suffix) remove that suffix from the set; after the
loop, collect the set into a Vec (optionally sort for stable output) and join
with "\n" instead of using broadcasts.iter(), ensuring the snapshot reflects
currently announced (non-ended, non-duplicated) paths — update references around
origin.try_next(), OriginUpdate::Active, OriginUpdate::Ended, and the broadcasts
variable accordingly.
rs/moq-lite/src/ietf/publisher.rs (1)

446-528: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Synchronous PublishNamespace handshake serializes the entire announce loop.

Each OriginUpdate::Active opens a bidi stream and inline-awaits the peer's response (stream.reader.decode().await? / read_exact(...).await? on lines 479–481). Until that single response arrives, no other update — including subsequent Actives and any Ended — can be drained from self.origin.next(). A slow or unresponsive peer therefore stalls all announce traffic on this session, and an Ended for a never-acked namespace can sit in the queue indefinitely. Any decoding error or unexpected message (line 497, 501) also fails the entire session.

Consider spawning a per-namespace task that owns the stream + handshake + cleanup, and have run_announce dispatch updates to it (e.g., via web_async::spawn and a small registry keyed by PathOwned). This would decouple peer responsiveness from the update draining loop and improve overall liveness under slow/adversarial peers.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-lite/src/ietf/publisher.rs` around lines 446 - 528, The announce loop
in run_announce blocks on the synchronous per-namespace handshake (calls like
stream.reader.decode().await and read_exact(...) in the OriginUpdate::Active
branch), which serializes the entire loop and lets any error abort the session;
refactor so run_announce only dispatches updates and does not await the peer
response: spawn a per-namespace task (e.g., via web_async::spawn) that takes
ownership of the Stream, performs the PublishNamespace handshake and local
decoding (previously done around stream.reader.decode()/read_exact()), inserts
itself into a small registry keyed by PathOwned (previously namespace_streams),
and handles cleanup on OriginUpdate::Ended by receiving a signal or
lookup+close; ensure the per-namespace task catches and logs decoding/IO errors
instead of returning Err from run_announce so a bad/slow peer won’t stall or
kill the whole session.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@rs/moq-boy/src/main.rs`:
- Line 222: The call to publish_origin.publish(&broadcast_path,
broadcast.consume()) currently ignores its boolean return; change it to assert
success using anyhow::ensure! so rejected publishes produce an error like
elsewhere (e.g., rs/moq-gst/src/sink/imp.rs). Specifically, after calling
publish_origin.publish(&broadcast_path, broadcast.consume()), use
anyhow::ensure!(result, "failed to publish broadcast for path {}: rejected by
OriginProducer", broadcast_path) (or equivalent) so a rejected publish returns
an error instead of silently proceeding; reference the publish_origin,
broadcast_path, and broadcast symbols when locating the code to modify.

In `@rs/moq-cli/src/client.rs`:
- Line 9: The call to origin.publish(&name, publish.consume()) currently ignores
the bool return that signals rejection; update the code to check the returned
bool and fail fast on rejection (e.g., use anyhow::ensure!(origin.publish(&name,
publish.consume()), "failed to publish {}: rejected by origin", name)) so that a
rejected publish does not let reconnect.closed() continue with no broadcast
registered; mirror the ensure pattern used around publish handling in the other
module to propagate the error instead of discarding the result.

In `@rs/moq-lite/src/lite/publisher.rs`:
- Around line 202-230: The Active branch may skip sending Announce::Active when
hops.push(self_origin).is_err(), but the Ended branch still sends
Announce::Ended and causes the subscriber to error; fix by tracking suppressed
paths: create a HashSet (e.g., suppressed_paths) in the same scope as the
announce loop, and when hops.push(self_origin).is_err() insert the full path (or
the same key used by subscriber) into suppressed_paths and continue; in the
OriginUpdate::Ended handler, check suppressed_paths.remove(&path) first and if
it was present, skip encoding/sending the Announce::Ended, otherwise proceed to
encode/send as before (ensuring you use the same path form used for
suppression/lookup).

---

Outside diff comments:
In `@rs/moq-lite/src/ietf/publisher.rs`:
- Around line 446-528: The announce loop in run_announce blocks on the
synchronous per-namespace handshake (calls like stream.reader.decode().await and
read_exact(...) in the OriginUpdate::Active branch), which serializes the entire
loop and lets any error abort the session; refactor so run_announce only
dispatches updates and does not await the peer response: spawn a per-namespace
task (e.g., via web_async::spawn) that takes ownership of the Stream, performs
the PublishNamespace handshake and local decoding (previously done around
stream.reader.decode()/read_exact()), inserts itself into a small registry keyed
by PathOwned (previously namespace_streams), and handles cleanup on
OriginUpdate::Ended by receiving a signal or lookup+close; ensure the
per-namespace task catches and logs decoding/IO errors instead of returning Err
from run_announce so a bad/slow peer won’t stall or kill the whole session.

In `@rs/moq-relay/src/web.rs`:
- Around line 465-474: The current drain loop only pushes OriginUpdate::Active
suffixes into broadcasts, allowing ended or duplicate paths to appear; change
the logic to maintain a small set (e.g., HashSet<String>) instead of Vec: when
origin.try_next() yields OriginUpdate::Active(suffix, _) insert suffix.into()
into the set, and when it yields OriginUpdate::Ended(suffix) remove that suffix
from the set; after the loop, collect the set into a Vec (optionally sort for
stable output) and join with "\n" instead of using broadcasts.iter(), ensuring
the snapshot reflects currently announced (non-ended, non-duplicated) paths —
update references around origin.try_next(), OriginUpdate::Active,
OriginUpdate::Ended, and the broadcasts variable accordingly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: fe8ca23a-ae99-432f-8389-e2f3f2f976e0

📥 Commits

Reviewing files that changed from the base of the PR and between 773c0cc and bbbae90.

📒 Files selected for processing (14)
  • rs/hang/examples/subscribe.rs
  • rs/hang/examples/video.rs
  • rs/libmoq/src/error.rs
  • rs/libmoq/src/origin.rs
  • rs/moq-boy/src/main.rs
  • rs/moq-cli/src/client.rs
  • rs/moq-cli/src/main.rs
  • rs/moq-gst/src/sink/imp.rs
  • rs/moq-gst/src/source/imp.rs
  • rs/moq-lite/src/ietf/publisher.rs
  • rs/moq-lite/src/ietf/subscriber.rs
  • rs/moq-lite/src/lite/publisher.rs
  • rs/moq-lite/src/lite/subscriber.rs
  • rs/moq-relay/src/web.rs

Comment thread rs/moq-boy/src/main.rs Outdated
Comment thread rs/moq-cli/src/client.rs Outdated
Comment thread rs/moq-lite/src/lite/publisher.rs Outdated
… etc.)

Reverts the method/type renames the rewrite introduced and restores the
names already in use on main, since those are clearer and avoid churn at
every existing call site.

| PR rewrite name        | Restored to main name  |
| ---------------------- | ---------------------- |
| publish                | publish_broadcast      |
| create                 | create_broadcast       |
| next (async)           | announced              |
| try_next               | try_announced          |
| wait_for_broadcast     | announced_broadcast    |
| OriginUpdate           | OriginAnnounce         |

The behavior (poll-driven conducer model, Active/Ended enum, is_closed
and poll_closed primitives) is unchanged.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
rs/moq-lite/src/lite/publisher.rs (1)

204-230: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

MAX_HOPS suppression still asymmetric — Active dropped but Ended still emitted.

When hops.push(self_origin).is_err() at line 211, the Active is suppressed (line 216 continue). However, the matching OriginAnnounce::Ended(path) arm (lines 221-230) unconditionally encodes Announce::Ended. The peer in lite/subscriber.rs does producers.remove(&path).ok_or(Error::NotFound)? for an Ended it has no record of, which propagates Error::NotFound and aborts the announce stream.

Track suppressed paths so the corresponding Ended is also suppressed (see prior review for sketch).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-lite/src/lite/publisher.rs` around lines 204 - 230, When
hops.push(self_origin).is_err() suppresses an Active announce in the
OriginAnnounce handling, also record that suffix/path in a local suppressed set
so the matching OriginAnnounce::Ended handling checks and skips encoding
lite::Announce::Ended for those suppressed paths; specifically, add a HashSet
(e.g., suppressed_paths) in the publisher scope, insert the suffix/path when the
Active branch hits the hops.push(...) error, and in the OriginAnnounce::Ended
branch check suppressed_paths first — if present, remove it and continue (skip
encoding), otherwise proceed to create and encode lite::Announce::Ended as
before; this keeps behavior symmetric with subscriber logic (which expects Ended
only for known producers removed via producers.remove).
🧹 Nitpick comments (3)
rs/moq-lite/src/model/origin.rs (2)

387-401: 💤 Low value

Per-consumer pending queues are unbounded.

distribute does an unbounded push_back per consumer with no high-water mark. This is an intentional trade-off vs the previous mpsc design (per the PR description, avoids the tokio 127-message bug), and announcement events are typically low-rate, so this is fine in practice. Worth a brief comment near ConsumerQueue documenting the choice so future contributors don't reintroduce a bound that breaks the no-blocking guarantee for publish.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-lite/src/model/origin.rs` around lines 387 - 401, The per-consumer
pending queues are intentionally unbounded to avoid blocking/pushing back in
publish (see distribute) — add a concise comment on the ConsumerQueue type
(and/or its pending field) stating that pending is intentionally unbounded to
preserve the no-blocking guarantee for publish, explaining the trade-off vs
bounded channels (mentioning the tokio 127-message mpsc issue) and that this is
deliberate to avoid reintroducing a capacity that could block; reference
ConsumerQueue and distribute in the comment so future contributors understand
the rationale.

688-743: 💤 Low value

Phase 4/5 waiter registration may accumulate across iterations.

When close_detected triggers a continue back to Phase 1, the waiter was already registered both globally (Phase 4) and on one or more poll_closed slots (Phase 5). Re-registration in the next iteration will append duplicate entries rather than replace them, since conducer::Waiter::register appends new Weak<Waker> slots instead of deduplicating by waker identity. This results in redundant wake-ups on the same waiter—still correct, but wasteful of space in the WaiterList.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-lite/src/model/origin.rs` around lines 688 - 743, The waiter can get
registered twice across loops because Phase 4 registers globally before Phase 5
checks per-active closures; move the global registration so it happens after the
loop that calls entry.active.poll_closed (i.e., perform the per-active
poll_closed checks first, and only call waiter.register(&mut state.waiters) if
no close_detected), ensuring poll_announced doesn't append duplicate waiter
entries when it continues the loop; adjust the code in poll_announced around the
waiter.register(&mut state.waiters) and the for (path, entry) ..
entry.active.poll_closed(...) section accordingly.
rs/moq-native/tests/broadcast.rs (1)

81-91: 💤 Low value

LGTM — consistent enum-match migration across all three transports.

The Active branch verifies the announced path and yields bc; the Ended arm is a fail-fast assertion, which is appropriate for a smoke test that publishes once and expects exactly one Active update.

Minor nit (skip if not interesting): the same 7-line match is now copy-pasted three times. Extracting a small helper like await_active(announcements, "test") would centralize the assertion message and keep future API drift to one site. Not worth blocking on.

Also applies to: 425-435, 536-546

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-native/tests/broadcast.rs` around lines 81 - 91, Extract the repeated
7-line match into a small helper (e.g., await_active) that takes the
announcements stream/handle and an expected path string (and optionally TIMEOUT)
and performs the tokio::time::timeout(...).await.expect(...).expect(...),
matches on moq_lite::OriginAnnounce::{Active(path, bc) | Ended(path)}, asserts
path == expected and returns bc; then replace the three inline match blocks with
calls to await_active(announcements, "test") (or the appropriate expected path)
to centralize the assertion and reduce duplication.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@rs/libmoq/src/origin.rs`:
- Around line 112-130: Update the workspace MSRV to at least Rust 1.88 in the
root Cargo.toml so the let-chain syntax used in the consume function compiles
under the declared toolchain, and modify the path handling in
rs/libmoq/src/origin.rs: after calling path.as_path() to obtain a Path<'_> (the
local variable named path), remove the redundant path.as_path() call and pass
the path directly to origin.consume().scope (e.g. use &[path] or &[path.clone()]
as needed to satisfy borrowing/lifetime requirements) so scope receives a slice
of Path without the extra conversion.

In `@rs/moq-lite/src/ietf/publisher.rs`:
- Around line 511-523: The match on Version should treat the newest-draft
behavior as the default so future drafts inherit it: change the arm that
explicitly lists Version::Draft17 to a wildcard default and keep the older
drafts explicitly listed; i.e., have Version::Draft14 | Version::Draft15 |
Version::Draft16 => send the ietf::PublishNamespaceDone (using
stream.writer.encode_message with suffix.as_path() and request_id), and replace
Version::Draft17 => {} with _ => {} so Draft17+ default to the no-op behavior.

---

Duplicate comments:
In `@rs/moq-lite/src/lite/publisher.rs`:
- Around line 204-230: When hops.push(self_origin).is_err() suppresses an Active
announce in the OriginAnnounce handling, also record that suffix/path in a local
suppressed set so the matching OriginAnnounce::Ended handling checks and skips
encoding lite::Announce::Ended for those suppressed paths; specifically, add a
HashSet (e.g., suppressed_paths) in the publisher scope, insert the suffix/path
when the Active branch hits the hops.push(...) error, and in the
OriginAnnounce::Ended branch check suppressed_paths first — if present, remove
it and continue (skip encoding), otherwise proceed to create and encode
lite::Announce::Ended as before; this keeps behavior symmetric with subscriber
logic (which expects Ended only for known producers removed via
producers.remove).

---

Nitpick comments:
In `@rs/moq-lite/src/model/origin.rs`:
- Around line 387-401: The per-consumer pending queues are intentionally
unbounded to avoid blocking/pushing back in publish (see distribute) — add a
concise comment on the ConsumerQueue type (and/or its pending field) stating
that pending is intentionally unbounded to preserve the no-blocking guarantee
for publish, explaining the trade-off vs bounded channels (mentioning the tokio
127-message mpsc issue) and that this is deliberate to avoid reintroducing a
capacity that could block; reference ConsumerQueue and distribute in the comment
so future contributors understand the rationale.
- Around line 688-743: The waiter can get registered twice across loops because
Phase 4 registers globally before Phase 5 checks per-active closures; move the
global registration so it happens after the loop that calls
entry.active.poll_closed (i.e., perform the per-active poll_closed checks first,
and only call waiter.register(&mut state.waiters) if no close_detected),
ensuring poll_announced doesn't append duplicate waiter entries when it
continues the loop; adjust the code in poll_announced around the
waiter.register(&mut state.waiters) and the for (path, entry) ..
entry.active.poll_closed(...) section accordingly.

In `@rs/moq-native/tests/broadcast.rs`:
- Around line 81-91: Extract the repeated 7-line match into a small helper
(e.g., await_active) that takes the announcements stream/handle and an expected
path string (and optionally TIMEOUT) and performs the
tokio::time::timeout(...).await.expect(...).expect(...), matches on
moq_lite::OriginAnnounce::{Active(path, bc) | Ended(path)}, asserts path ==
expected and returns bc; then replace the three inline match blocks with calls
to await_active(announcements, "test") (or the appropriate expected path) to
centralize the assertion and reduce duplication.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d989bb11-ca4c-490f-a6bb-f12532ea0511

📥 Commits

Reviewing files that changed from the base of the PR and between bbbae90 and 1496c6e.

📒 Files selected for processing (12)
  • rs/hang/examples/subscribe.rs
  • rs/libmoq/src/origin.rs
  • rs/moq-boy/src/input.rs
  • rs/moq-clock/src/main.rs
  • rs/moq-ffi/src/origin.rs
  • rs/moq-lite/src/ietf/publisher.rs
  • rs/moq-lite/src/lite/publisher.rs
  • rs/moq-lite/src/lite/subscriber.rs
  • rs/moq-lite/src/model/origin.rs
  • rs/moq-native/tests/backend.rs
  • rs/moq-native/tests/broadcast.rs
  • rs/moq-relay/src/web.rs

Comment thread rs/libmoq/src/origin.rs
Comment on lines 112 to 130
pub fn consume<P: moq_lite::AsPath>(&mut self, origin: Id, path: P) -> Result<moq_lite::BroadcastConsumer, Error> {
use moq_lite::AsPath;
let origin = self.active.get_mut(origin).ok_or(Error::OriginNotFound)?;
// TODO: expose an async variant backed by `announced_broadcast` so FFI callers can wait
// for gossip instead of racing it.
// Uses the deprecated direct lookup to avoid the per-call cost of OriginProducer::consume().
#[allow(deprecated)]
origin.get_broadcast(path).ok_or(Error::BroadcastNotFound)
// TODO: expose an async variant so FFI callers can wait for gossip instead of racing it.
// Scope is prefix-based, so drain the replay queue until we find an exact path match.
let path = path.as_path();
let mut consumer = origin
.consume()
.scope(&[path.as_path()])
.ok_or(Error::BroadcastNotFound)?;
while let Some(update) = consumer.try_announced() {
if let moq_lite::OriginAnnounce::Active(p, b) = update
&& p.as_path() == path
{
return Ok(b);
}
}
Err(Error::BroadcastNotFound)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | 💤 Low value

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Confirm every crate using let-chains is on edition 2024 and that workspace MSRV is >= 1.88.
fd -t f -e toml | xargs rg -nP '^\s*(edition|rust-version)\s*='

Repository: moq-dev/moq

Length of output: 763


🏁 Script executed:

#!/bin/bash
# Check if moq-gst depends on libmoq
grep -E "libmoq|moq_lite" rs/moq-gst/Cargo.toml

# Also get details on the moq_lite::AsPath trait to verify the redundancy claim
fd -t f -name "*.rs" -path "*/moq_lite/*" | xargs rg -l "trait AsPath" | head -1 | xargs cat -n

Repository: moq-dev/moq

Length of output: 34780


🏁 Script executed:

#!/bin/bash
# Check moq-gst Cargo.toml for dependencies on libmoq or moq_lite
cat rs/moq-gst/Cargo.toml | grep -E "libmoq|moq.lite"

Repository: moq-dev/moq

Length of output: 87


Update workspace MSRV from 1.85 to 1.88 or higher to support the let-chain syntax in lines 123–125.

Let chains were stabilized in Rust 1.88 and require edition 2024. This code is on the correct edition, but the workspace declares rust-version = "1.85" in the root Cargo.toml, which is incompatible. Any attempt to build with Rust 1.85–1.87 will fail with a let-chain feature error. Update the workspace MSRV to at least 1.88.

Additionally, path.as_path() on line 120 is redundant — path is already a Path<'_> after line 117, so use &[path] instead (or &[path.clone()] if needed).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/libmoq/src/origin.rs` around lines 112 - 130, Update the workspace MSRV to
at least Rust 1.88 in the root Cargo.toml so the let-chain syntax used in the
consume function compiles under the declared toolchain, and modify the path
handling in rs/libmoq/src/origin.rs: after calling path.as_path() to obtain a
Path<'_> (the local variable named path), remove the redundant path.as_path()
call and pass the path directly to origin.consume().scope (e.g. use &[path] or
&[path.clone()] as needed to satisfy borrowing/lifetime requirements) so scope
receives a slice of Path without the extra conversion.

Comment thread rs/moq-lite/src/ietf/publisher.rs Outdated
- lite/publisher.rs: when an Active is dropped because the hop chain hits
  MAX_HOPS, also drop the matching Ended. The subscriber treats Ended for
  an unknown path as Error::NotFound, so the asymmetric pair was a real
  protocol error.
- ietf/publisher.rs: replace `Version::Draft17 => {}` arms with `_ => {}`
  so future drafts inherit v17+ behavior by default (per CLAUDE.md).
- libmoq/origin.rs: split the let-chain into nested if so the code stays
  within the workspace MSRV (1.85; let-chains require 1.88).
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
rs/moq-lite/src/ietf/publisher.rs (1)

597-610: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Initial drain silently drops Ended, leaking stale namespaces to the peer.

try_announced() returns updates in queue order, so a path that went Active→Ended before this drain runs will yield both events back-to-back. The if let here handles only Active, so the code emits Namespace(A) and then drops Ended(A) on the floor. Since the original Ended(A) was already consumed from the consumer queue, the streaming loop below will never re-emit it, and the peer is left with a permanently stale namespace until some other event happens to A.

The streaming loop (lines 619-630) handles both variants symmetrically; the init drain should too.

🔧 Suggested fix
 				// Send initial NAMESPACE messages for currently active namespaces
 				while let Some(update) = origin.try_announced() {
-					if let crate::OriginAnnounce::Active(path, _) = update {
-						let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path");
-						tracing::debug!(broadcast = %origin.absolute(&path), "namespace");
-						stream.writer.encode(&ietf::Namespace::ID).await?;
-						stream
-							.writer
-							.encode(&ietf::Namespace {
-								suffix: suffix.to_owned(),
-							})
-							.await?;
-					}
+					match update {
+						crate::OriginAnnounce::Active(path, _) => {
+							let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path").to_owned();
+							tracing::debug!(broadcast = %origin.absolute(&path), "namespace");
+							stream.writer.encode(&ietf::Namespace::ID).await?;
+							stream.writer.encode(&ietf::Namespace { suffix }).await?;
+						}
+						crate::OriginAnnounce::Ended(path) => {
+							let suffix = path.strip_prefix(&prefix).expect("origin returned invalid path").to_owned();
+							tracing::debug!(broadcast = %origin.absolute(&path), "namespace_done");
+							stream.writer.encode(&ietf::NamespaceDone::ID).await?;
+							stream.writer.encode(&ietf::NamespaceDone { suffix }).await?;
+						}
+					}
 				}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-lite/src/ietf/publisher.rs` around lines 597 - 610, The initial drain
currently only handles OriginAnnounce::Active and silently drops
OriginAnnounce::Ended, leaking stale namespaces; update the while let loop over
origin.try_announced() to match both OriginAnnounce::Active(path, _) and
OriginAnnounce::Ended(path, _) and for the Ended case emit the same pair of
messages (stream.writer.encode(&ietf::Namespace::ID) and
stream.writer.encode(&ietf::Namespace { suffix: suffix.to_owned() }).await?) as
the Active branch so the peer receives the Ended notification just like the
streaming loop does; use the same suffix extraction via
path.strip_prefix(&prefix).expect(...) and the same tracing/logging symbol
(origin.absolute(&path)) to keep behavior consistent.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@rs/libmoq/src/origin.rs`:
- Line 82: The insert into state.origin.announced (announced_id =
state.origin.announced.insert((path.to_string(), active))?) creates unbounded
growth because entries are never removed; fix by ensuring every announced_id is
removed or evicted after being consumed—either call
state.origin.announced.remove(announced_id) (or equivalent) in the consumer path
where the announcement is processed, or replace the backing storage with a
bounded/evicting structure (e.g., an LRU or time-windowed cache) and add pruning
logic; locate uses of announced_id and the consumer functions that read
announcements and add a removal/prune step to reclaim entries.

---

Outside diff comments:
In `@rs/moq-lite/src/ietf/publisher.rs`:
- Around line 597-610: The initial drain currently only handles
OriginAnnounce::Active and silently drops OriginAnnounce::Ended, leaking stale
namespaces; update the while let loop over origin.try_announced() to match both
OriginAnnounce::Active(path, _) and OriginAnnounce::Ended(path, _) and for the
Ended case emit the same pair of messages
(stream.writer.encode(&ietf::Namespace::ID) and
stream.writer.encode(&ietf::Namespace { suffix: suffix.to_owned() }).await?) as
the Active branch so the peer receives the Ended notification just like the
streaming loop does; use the same suffix extraction via
path.strip_prefix(&prefix).expect(...) and the same tracing/logging symbol
(origin.absolute(&path)) to keep behavior consistent.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: c364712f-96f3-40b8-bff2-c9ee4388048e

📥 Commits

Reviewing files that changed from the base of the PR and between 1496c6e and 2fe191b.

📒 Files selected for processing (3)
  • rs/libmoq/src/origin.rs
  • rs/moq-lite/src/ietf/publisher.rs
  • rs/moq-lite/src/lite/publisher.rs

Comment thread rs/libmoq/src/origin.rs
let callback = entry.callback;

let announced_id = state.origin.announced.insert((path.to_string(), broadcast.is_some()))?;
let announced_id = state.origin.announced.insert((path.to_string(), active))?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Unbounded growth risk in origin.announced storage.

Each announce update inserts a new (String, bool) entry, but this module exposes no corresponding removal path for consumed announcement IDs. On long-lived/high-churn origins, this can grow indefinitely and eventually exhaust memory/IDs.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/libmoq/src/origin.rs` at line 82, The insert into state.origin.announced
(announced_id = state.origin.announced.insert((path.to_string(), active))?)
creates unbounded growth because entries are never removed; fix by ensuring
every announced_id is removed or evicted after being consumed—either call
state.origin.announced.remove(announced_id) (or equivalent) in the consumer path
where the announcement is processed, or replace the backing storage with a
bounded/evicting structure (e.g., an LRU or time-windowed cache) and add pruning
logic; locate uses of announced_id and the consumer functions that read
announcements and add a removal/prune step to reclaim entries.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant