Skip to content

Replace monolithic catalog with generic section-based registry#1149

Open
kixelated wants to merge 1 commit intodevfrom
generic-catalog
Open

Replace monolithic catalog with generic section-based registry#1149
kixelated wants to merge 1 commit intodevfrom
generic-catalog

Conversation

@kixelated
Copy link
Collaborator

Summary

  • Replaces the hardcoded Catalog/Root type with a generic Section<T> registry where sections are identified by name + schema pairs
  • Adds CatalogWriter for producing catalogs and CatalogReader with per-section change notifications (conducer on Rust, signals on JS)
  • Removes application-specific sections (chat, user, preview, location, capabilities) from @moq/hang — these now belong in the application layer
  • Predefined VIDEO and AUDIO sections are available but not auto-registered

Breaking changes

  • hang::Catalog struct removed — use CatalogWriter / CatalogReader with Section<T>
  • Catalog.Root, Catalog.encode(), Catalog.decode(), Catalog.fetch() removed from JS
  • CatalogConsumer::next() replaced with reader() + run() pattern
  • moq_mux::CatalogProducer now uses set()/flush() instead of lock() guard pattern
  • App-specific schemas (Chat, User, Preview, Location, Capabilities) moved to publish/watch packages

Test plan

  • just fix — no formatting issues
  • just check — all linting and type checks pass
  • cargo test — all 523 Rust tests pass, 0 failures
  • MSF catalog conversion tests pass with new (Option<&Video>, Option<&Audio>) API
  • moq-ffi catalog_update_on_new_track test passes

🤖 Generated with Claude Code

The hang catalog previously hardcoded application-specific sections
(chat, user, preview, location, capabilities). This replaces the
monolithic Catalog/Root type with a generic Section<T> registry where
sections are identified by name + schema pairs, enabling custom
applications to define their own sections.

Key changes:
- Add Section<T> type pairing a name with a typed schema (Rust: serde, JS: zod)
- Add CatalogWriter for producing catalogs with typed sections
- Add CatalogReader with per-section change notifications (conducer/signals)
- Predefined VIDEO/AUDIO sections (not auto-registered)
- Remove app-specific sections from @moq/hang (chat, user, preview, etc.)
- Migrate moq-mux, moq-cli, libmoq, moq-ffi to section-based API
- Migrate js/publish and js/watch to use CatalogWriter/CatalogReader

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 22, 2026

Walkthrough

This pull request refactors the catalog system from a monolithic root structure to a section-based architecture. Monolithic catalog types (Catalog, Root, Chat, User, Preview, Location, Capabilities) are removed from js/hang/src/catalog/ and corresponding Rust modules. New generic abstractions—Section<T>, CatalogReader, and CatalogWriter—are introduced. Catalog sections like audio and video are now paired with typed schemas via Section instances. All consumers in js/publish/, js/watch/, and Rust modules (rs/moq-mux/, rs/moq-ffi/, etc.) are updated to retrieve catalog data via section getters rather than a unified catalog object, enabling independent section registration and consumption.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The PR title 'Replace monolithic catalog with generic section-based registry' directly and clearly summarizes the main architectural change described in the changeset.
Description check ✅ Passed The PR description provides relevant context about the catalog refactoring, breaking changes, and test coverage, all related to the changeset modifications.
Docstring Coverage ✅ Passed Docstring coverage is 85.71% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch generic-catalog
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch generic-catalog

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Tip

CodeRabbit can generate a title for your PR based on the changes with custom instructions.

Set the reviews.auto_title_instructions setting to generate a title for your PR based on the changes in the PR with custom instructions.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
rs/moq-mux/src/producer/hev1.rs (1)

349-357: ⚠️ Potential issue | 🟡 Minor

Use take() instead of borrowing in Drop to properly clean up the track.

The current code borrows the track with &self.track instead of taking ownership. This differs from av01.rs which correctly uses self.track.take(). As a result, the Option still holds Some after the block, and the underlying track may not be properly dropped.

🐛 Proposed fix
 impl Drop for Hev1 {
     fn drop(&mut self) {
-        if let Some(track) = &self.track {
+        if let Some(track) = self.track.take() {
             tracing::debug!(name = ?track.info.name, "ending track");
             self.video.remove_track(&track.info);
             let _ = self.catalog.set(&hang::catalog::VIDEO, &self.video);
             self.catalog.flush();
         }
     }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/producer/hev1.rs` around lines 349 - 357, In Hev1's Drop
implementation, replace borrowing the Option with taking ownership so the inner
Track is actually removed and dropped: use self.track.take() to extract the
track (instead of matching on &self.track), then call tracing::debug! with the
extracted track.info.name, call self.video.remove_track(&track.info), update the
catalog with self.catalog.set(&hang::catalog::VIDEO, &self.video) and flush;
this ensures the Option is set to None and the underlying track can be dropped
(look for the impl Drop for Hev1, the track field, and the calls to
remove_track, catalog.set, and catalog.flush).
js/watch/src/preview.ts (1)

38-49: ⚠️ Potential issue | 🟠 Major

Keep reading preview updates from the track.

This task does a single Zod.read() and then exits. The publisher can emit multiple preview.json frames over the lifetime of the track, so later changes—and end-of-track cleanup—are never observed here.

♻️ Suggested fix
 			effect.spawn(async () => {
 				try {
-					const info = await Zod.read(track, PreviewSchema);
-					if (!info) return;
-
-					this.preview.set(info);
+					for (;;) {
+						const info = await Zod.read(track, PreviewSchema);
+						if (!info) break;
+						this.preview.set(info);
+					}
 				} catch (error) {
 					console.warn("Failed to parse preview JSON:", error);
+				} finally {
+					this.preview.set(undefined);
 				}
 			});
-
-			effect.cleanup(() => this.preview.set(undefined));
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/watch/src/preview.ts` around lines 38 - 49, The current effect.spawn
performs a single Zod.read(track, PreviewSchema) and exits, so subsequent
preview.json frames and end-of-track are missed; change the task launched in
effect.spawn (the block containing Zod.read, PreviewSchema and this.preview.set)
to continuously read frames from the track (e.g., loop or async iterator) and
call this.preview.set(info) for every successful parse, and ensure
effect.cleanup stops that loop and resets this.preview to undefined (use an
AbortController or a cancellation flag inside the spawned task) so later updates
and end-of-track cleanup are observed.
🧹 Nitpick comments (13)
js/publish/src/location/types.ts (1)

1-8: Clarify the s field intent in this public type.

Position.s is ambiguous for API consumers. If the short key is required for payload compatibility, add a doc comment explaining what it represents (for example, scale).

Suggested clarification
+/** Position in 3D space with optional scalar metadata. */
 export type Position = {
 	x?: number;
 	y?: number;
 	z?: number;
+	/** Uniform scale factor. */
 	s?: number;
 };

As per coding guidelines, "Use clear and descriptive variable names that convey intent" and "Document public APIs with clear docstrings or comments".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/publish/src/location/types.ts` around lines 1 - 8, The public type
Position exposes an ambiguous short field s; update the Position type
declaration to document what s stands for (e.g., scale, speed, semantic state)
with a concise doc comment above the Position type and above the s property, and
if payload compatibility requires the short key keep the name but add the
descriptive comment; reference the Position type and its s property (and Peers
which uses Position) so consumers understand the field's intent.
rs/moq-mux/src/producer/test/mod.rs (1)

22-31: Differentiate missing sections from invalid section payloads.

Right now malformed section JSON is silently treated as default, which can hide root-cause regressions in test failures. Keep defaulting for absent sections, but fail when a present section cannot be decoded.

Suggested adjustment
-	let video: Video = state
-		.sections
-		.get("video")
-		.and_then(|v| serde_json::from_value(v.clone()).ok())
-		.unwrap_or_default();
-	let audio: Audio = state
-		.sections
-		.get("audio")
-		.and_then(|v| serde_json::from_value(v.clone()).ok())
-		.unwrap_or_default();
+	let video: Video = state
+		.sections
+		.get("video")
+		.map(|value| serde_json::from_value(value.clone()).expect("invalid video section"))
+		.unwrap_or_default();
+	let audio: Audio = state
+		.sections
+		.get("audio")
+		.map(|value| serde_json::from_value(value.clone()).expect("invalid audio section"))
+		.unwrap_or_default();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/producer/test/mod.rs` around lines 22 - 31, Currently the test
silently defaults when decoding of present sections fails; change the logic so
that when state.sections.get("video") or get("audio") returns None you keep the
default Video/Audio, but when a Value is present you attempt
serde_json::from_value and propagate a decode error (panic or unwrap/expect)
instead of turning it into a default; update the two bindings (Video and Audio)
that use state.sections.get(...).and_then(|v|
serde_json::from_value(v.clone()).ok()).unwrap_or_default() to check presence
first (e.g., match on get("video")/get("audio")) and call
serde_json::from_value(...).expect("failed to decode video section") so
malformed payloads fail the test while absent sections still default.
rs/libmoq/src/consume.rs (1)

78-101: Manual catalog parsing replaces legacy CatalogConsumer.

The refactored run_catalog manually reads frames from the track and parses JSON sections. The use of unwrap_or_default() for missing sections is appropriate since video/audio sections are optional.

Consider using section name constants instead of the magic strings "video" and "audio" for consistency with the section definitions.

,

♻️ Optional: Use section name constants

If hang::catalog::VIDEO and hang::catalog::AUDIO expose their names, consider:

-				let video: hang::catalog::Video = json
-					.get("video")
+				let video: hang::catalog::Video = json
+					.get(hang::catalog::VIDEO.name)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/libmoq/src/consume.rs` around lines 78 - 101, In run_catalog replace the
magic string lookups for the JSON sections with the catalog section name
constants instead of "video" and "audio": locate the serde_json::Map accessors
where you call .get("video") and .get("audio") and use the appropriate constants
exported by hang::catalog (e.g., hang::catalog::VIDEO / hang::catalog::AUDIO or
their NAME variants) so the code reads the same but relies on the canonical
section identifiers used elsewhere (keep the same serde_json::from_value,
transpose, map_err and unwrap_or_default logic and types hang::catalog::Video
and hang::catalog::Audio).
rs/moq-mux/src/producer/avc3.rs (1)

39-47: Use the VIDEO constant's name field instead of the hardcoded string.

The hardcoded "video" string on line 44 should reference hang::catalog::VIDEO.name to ensure consistency and prevent drift if the section name changes.

♻️ Suggested improvement
 		// Read the current video section from the catalog, if any
 		let video: hang::catalog::Video = {
 			let state = catalog.writer().read();
 			state
 				.sections
-				.get("video")
+				.get(hang::catalog::VIDEO.name)
 				.and_then(|v| serde_json::from_value(v.clone()).ok())
 				.unwrap_or_default()
 		};

Note: The same pattern is used in hev1.rs and av01.rs and should be updated consistently.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/producer/avc3.rs` around lines 39 - 47, Replace the hardcoded
"video" key when reading from catalog.sections with the VIDEO constant's name
field: use hang::catalog::VIDEO.name() or hang::catalog::VIDEO.name (match the
constant's API) in the call inside the block that constructs `video` (the code
using `catalog.writer().read()` and `state.sections.get(...)`), so the lookup
uses the canonical section name; apply the same change to the analogous lookups
in `hev1.rs` and `av01.rs`.
rs/moq-mux/src/producer/av01.rs (2)

39-47: Consider using the VIDEO constant for consistency.

The read path uses the string literal "video" while the write path uses &hang::catalog::VIDEO. For consistency and to avoid potential typos, consider using the constant for both:

 let video: hang::catalog::Video = {
     let state = catalog.writer().read();
     state
         .sections
-        .get("video")
+        .get(hang::catalog::VIDEO.name)
         .and_then(|v| serde_json::from_value(v.clone()).ok())
         .unwrap_or_default()
 };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/producer/av01.rs` around lines 39 - 47, Replace the string
literal "video" in the read path with the catalog constant to match the write
path: when building the local video value in the block that calls
catalog.writer().read(), use hang::catalog::VIDEO (or &hang::catalog::VIDEO as
appropriate) instead of "video" so the get lookup is consistent with the write
side and avoids typos; update the use in the closure that does
.sections.get(...) accordingly.

116-117: Discarding set() result silently.

The let _ = self.catalog.set(...) pattern discards any error from set(). If set() can fail in a meaningful way, consider logging or propagating the error. If it's intentionally infallible in this context, the current pattern is acceptable.

Also applies to: 160-161, 249-250, 439-440

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/producer/av01.rs` around lines 116 - 117, The calls that
currently use "let _ = self.catalog.set(&hang::catalog::VIDEO, &self.video);"
(and the similar occurrences) silently discard the Result; update these to
handle failures by either propagating the error (returning the Result with ?
from the surrounding function) or logging the error via the project's logger
(e.g., processLogger or crate logger) with context including
hang::catalog::VIDEO and self.video; specifically replace the discard at the
sites calling self.catalog.set(...) with either "self.catalog.set(...)?", or a
match/if let Err(e) => log::error!(... , e) so errors are not lost. Ensure the
chosen approach is consistent across the other occurrences referenced by the
reviewer.
rs/moq-mux/src/producer/hev1.rs (1)

41-49: Same consistency suggestion as av01.rs: use VIDEO.name constant.

For consistency with the write path that uses &hang::catalog::VIDEO, consider using the constant's name field for the read path as well.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/producer/hev1.rs` around lines 41 - 49, Replace the hard-coded
"video" key with the catalog constant to match the write path: change
sections.get("video") to sections.get(hang::catalog::VIDEO.name) in the block
that reads the current video section (the code using catalog.writer().read(),
.sections, and serde_json::from_value). Ensure the lookup uses the VIDEO.name
constant so the read path and write path use the same identifier.
js/watch/src/broadcast.ts (1)

112-123: Status remains "live" even if all sections become undefined.

Once any section has a value, status is set to "live". If all sections subsequently become undefined (e.g., empty catalog update), the status won't revert to "loading". If the intent is "once live, stay live until cleanup," this is fine. Otherwise, consider adding an else branch:

 if (video || audio || chat || user || preview || location) {
     this.status.set("live");
+} else {
+    this.status.set("loading");
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/watch/src/broadcast.ts` around lines 112 - 123, The effect.run callback
currently sets this.status.set("live") whenever any of the tracked signals
(this.video, this.audio, this.chat, this.user, this.preview, this.location) are
truthy, but never resets the status when all become undefined; update the
effect.run in broadcast.ts to add an else branch that sets this.status back to
"loading" (or another appropriate non-live state) when none of the signals are
present, so after checking const video = inner.get(this.video); const audio =
inner.get(this.audio); const chat = inner.get(this.chat); const user =
inner.get(this.user); const preview = inner.get(this.preview); const location =
inner.get(this.location); you explicitly call this.status.set("loading") when
video||audio||chat||user||preview||location is falsey.
rs/moq-ffi/src/consumer.rs (1)

44-45: Clarify the delta support limitation.

The comment "We don't support deltas yet" explains why the group is taken after reading the first frame. However, this means subsequent frames in the same group are silently ignored. Consider adding a debug log when a group with multiple frames is encountered, to aid future debugging when delta support is added.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-ffi/src/consumer.rs` around lines 44 - 45, When handling a frame from
a group in consumer.rs (the match arm that calls
self.group.as_mut()?.read_frame().await.transpose() and then self.group.take()),
add a debug log before taking the group to record that a group with multiple
frames was encountered and that subsequent frames will be dropped; use the
crate's logging facility (e.g., tracing::debug! or log::debug!) and include
identifying details from the frame (Debug-format the frame or include a frame
id/metadata if available) so future delta support debugging can locate which
group/frame was ignored.
js/watch/src/sections.ts (1)

28-43: Clarify the distinction between PreviewTrackSchema and PreviewSchema.

The PREVIEW section uses PreviewTrackSchema (just a track name), while PreviewSchema defines a richer structure with name, avatar, audio, video, etc.

Is PreviewSchema intended for the content on the preview track rather than the catalog section itself? If so, a brief comment would help clarify this distinction.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/watch/src/sections.ts` around lines 28 - 43, Add a short clarifying
comment above PreviewTrackSchema/PreviewTrack type and PREVIEW explaining that
this schema represents the preview "track" identifier used by the Section (a
minimal schema containing only the track name), and add a separate comment above
PreviewSchema/Preview type explaining that this is the richer preview content
shape (name, avatar, audio/video/typing/chat/screen flags) used for preview
track payloads or catalog content; reference PreviewTrackSchema, PREVIEW, and
PreviewSchema in the comments so future readers understand the distinction and
intended usage.
rs/moq-mux/src/convert/fmp4.rs (1)

61-71: Consider extracting shared catalog reading logic.

The run_with_catalog pattern and the catalog reading in run() are nearly identical between fmp4.rs and hang.rs. While not blocking, extracting the common catalog frame reading and JSON parsing logic into a shared helper could reduce duplication.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/convert/fmp4.rs` around lines 61 - 71, Both run_with_catalog
and run in fmp4.rs duplicate the catalog frame reading and JSON parsing logic
found also in hang.rs; extract that shared behavior into a single helper (e.g.,
read_and_parse_catalog_frames or parse_catalog_from_consumer) that accepts a
&moq_lite::BroadcastConsumer (and any needed mutable CatalogProducer references)
and returns the parsed catalog structure (or Result). Replace the duplicated
blocks in run_with_catalog and run with calls to this helper to centralize
buffering, frame extraction, and serde_json parsing; ensure the helper exposes
any errors upward and preserves the same returned types so callers
(run_with_catalog, run) only perform protocol-specific handling after receiving
the parsed catalog.
rs/moq-mux/src/catalog.rs (1)

49-75: Silent deserialization failures may hide issues.

Lines 64-71 use .and_then(|v| serde_json::from_value(v.clone()).ok()) which silently ignores deserialization errors for video/audio sections. If the JSON structure doesn't match Video/Audio, the MSF catalog will be published without those sections, with no warning.

Consider logging a warning when deserialization fails:

🔧 Suggested improvement
 let video: Option<Video> = state
     .sections
     .get("video")
-    .and_then(|v| serde_json::from_value(v.clone()).ok());
+    .and_then(|v| {
+        serde_json::from_value(v.clone())
+            .map_err(|e| tracing::warn!("Failed to deserialize video section for MSF: {}", e))
+            .ok()
+    });
 let audio: Option<Audio> = state
     .sections
     .get("audio")
-    .and_then(|v| serde_json::from_value(v.clone()).ok());
+    .and_then(|v| {
+        serde_json::from_value(v.clone())
+            .map_err(|e| tracing::warn!("Failed to deserialize audio section for MSF: {}", e))
+            .ok()
+    });
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/catalog.rs` around lines 49 - 75, The flush method currently
swallows serde_json deserialization errors for the video/audio sections (using
.and_then(... .ok())), so update the deserialization in flush to explicitly
match the Result from serde_json::from_value for both Video and Audio: if Ok(v)
set video (or audio), if Err(e) log a warning that includes the section name
("video" or "audio"), the serde error e, and the offending JSON value (or a
short preview), then continue; keep dropping state and calling
crate::msf::publish(video.as_ref(), audio.as_ref(), &mut self.msf_track) so
publish behavior is unchanged but failures are visible.
rs/hang/src/catalog/reader.rs (1)

66-81: Consider potential lock contention during update.

The update method holds the mutex lock while iterating through all sections and potentially writing to each producer. For catalogs with many sections, this could block other operations like section() registration.

This is likely acceptable for typical use cases where catalogs have a small number of sections, but worth noting if the section count grows significantly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/hang/src/catalog/reader.rs` around lines 66 - 81, The update method holds
the mutex while iterating and calling producer.write(), which can cause
contention; change update to briefly lock self.inner to clone the necessary
state (clone or collect inner.sections entries as (name.clone(),
producer.clone()) and clone inner.last), then drop the lock and perform
comparisons and producer.write() outside the mutex, and finally re-lock once to
set inner.last = json; update references: update, inner, sections, inner.last,
producer.write(), and ensure producer clones are used so writes happen without
holding the mutex.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@js/hang/src/catalog/reader.ts`:
- Around line 45-51: When schema.parse(raw) throws in the reader for a catalog
section (inside the try/catch that currently calls schema.parse and signal.set),
clear the section's signal in the catch path so stale data is not retained;
i.e., in the catch that currently logs the invalid payload for name, call the
appropriate method to unset the signal (e.g., set it to undefined or reset the
signal) instead of leaving the previous value. Ensure you still log the error as
before and reference the same symbols (raw, schema.parse, signal.set) when
making the change.

In `@rs/hang/src/catalog/consumer.rs`:
- Around line 49-69: The run() loop currently only calls self.reader.close() on
clean EOF; change it so CatalogReader::close() is always invoked on every exit
path (errors from self.track.recv_group(), self.group.as_mut()?.read_frame(),
and serde_json::from_slice()). Implement a "finally"-style cleanup: wrap the
loop scope or the async select in a block and ensure self.reader.close() is
called after the loop (or install a guard that calls close in its Drop), so
regardless of early returns or ? errors the reader is closed; reference
functions/values: run(), self.track.recv_group(),
self.group.as_mut()?.read_frame(), serde_json::from_slice(), and
self.reader.close().

In `@rs/moq-cli/src/subscribe.rs`:
- Around line 71-79: The select! currently returns after the first section
change, which can drop the other section before exporter.init() / muxer_tracks
see it; change the logic to keep the catalog_consumer task alive until both
sections have produced their initial values. Replace the single select! with a
loop that checks video_section.get() and audio_section.get() (e.g., while
video_section.get()?.is_none() || audio_section.get()?.is_none()) and inside the
loop use tokio::select! to await catalog_consumer.run(),
video_section.changed(), and audio_section.changed(); on catalog_consumer.run()
error bail as before, otherwise loop until both sections are present, then read
video_section.get() and audio_section.get() and call exporter.init()/build
muxer_tracks.

---

Outside diff comments:
In `@js/watch/src/preview.ts`:
- Around line 38-49: The current effect.spawn performs a single Zod.read(track,
PreviewSchema) and exits, so subsequent preview.json frames and end-of-track are
missed; change the task launched in effect.spawn (the block containing Zod.read,
PreviewSchema and this.preview.set) to continuously read frames from the track
(e.g., loop or async iterator) and call this.preview.set(info) for every
successful parse, and ensure effect.cleanup stops that loop and resets
this.preview to undefined (use an AbortController or a cancellation flag inside
the spawned task) so later updates and end-of-track cleanup are observed.

In `@rs/moq-mux/src/producer/hev1.rs`:
- Around line 349-357: In Hev1's Drop implementation, replace borrowing the
Option with taking ownership so the inner Track is actually removed and dropped:
use self.track.take() to extract the track (instead of matching on &self.track),
then call tracing::debug! with the extracted track.info.name, call
self.video.remove_track(&track.info), update the catalog with
self.catalog.set(&hang::catalog::VIDEO, &self.video) and flush; this ensures the
Option is set to None and the underlying track can be dropped (look for the impl
Drop for Hev1, the track field, and the calls to remove_track, catalog.set, and
catalog.flush).

---

Nitpick comments:
In `@js/publish/src/location/types.ts`:
- Around line 1-8: The public type Position exposes an ambiguous short field s;
update the Position type declaration to document what s stands for (e.g., scale,
speed, semantic state) with a concise doc comment above the Position type and
above the s property, and if payload compatibility requires the short key keep
the name but add the descriptive comment; reference the Position type and its s
property (and Peers which uses Position) so consumers understand the field's
intent.

In `@js/watch/src/broadcast.ts`:
- Around line 112-123: The effect.run callback currently sets
this.status.set("live") whenever any of the tracked signals (this.video,
this.audio, this.chat, this.user, this.preview, this.location) are truthy, but
never resets the status when all become undefined; update the effect.run in
broadcast.ts to add an else branch that sets this.status back to "loading" (or
another appropriate non-live state) when none of the signals are present, so
after checking const video = inner.get(this.video); const audio =
inner.get(this.audio); const chat = inner.get(this.chat); const user =
inner.get(this.user); const preview = inner.get(this.preview); const location =
inner.get(this.location); you explicitly call this.status.set("loading") when
video||audio||chat||user||preview||location is falsey.

In `@js/watch/src/sections.ts`:
- Around line 28-43: Add a short clarifying comment above
PreviewTrackSchema/PreviewTrack type and PREVIEW explaining that this schema
represents the preview "track" identifier used by the Section (a minimal schema
containing only the track name), and add a separate comment above
PreviewSchema/Preview type explaining that this is the richer preview content
shape (name, avatar, audio/video/typing/chat/screen flags) used for preview
track payloads or catalog content; reference PreviewTrackSchema, PREVIEW, and
PreviewSchema in the comments so future readers understand the distinction and
intended usage.

In `@rs/hang/src/catalog/reader.rs`:
- Around line 66-81: The update method holds the mutex while iterating and
calling producer.write(), which can cause contention; change update to briefly
lock self.inner to clone the necessary state (clone or collect inner.sections
entries as (name.clone(), producer.clone()) and clone inner.last), then drop the
lock and perform comparisons and producer.write() outside the mutex, and finally
re-lock once to set inner.last = json; update references: update, inner,
sections, inner.last, producer.write(), and ensure producer clones are used so
writes happen without holding the mutex.

In `@rs/libmoq/src/consume.rs`:
- Around line 78-101: In run_catalog replace the magic string lookups for the
JSON sections with the catalog section name constants instead of "video" and
"audio": locate the serde_json::Map accessors where you call .get("video") and
.get("audio") and use the appropriate constants exported by hang::catalog (e.g.,
hang::catalog::VIDEO / hang::catalog::AUDIO or their NAME variants) so the code
reads the same but relies on the canonical section identifiers used elsewhere
(keep the same serde_json::from_value, transpose, map_err and unwrap_or_default
logic and types hang::catalog::Video and hang::catalog::Audio).

In `@rs/moq-ffi/src/consumer.rs`:
- Around line 44-45: When handling a frame from a group in consumer.rs (the
match arm that calls self.group.as_mut()?.read_frame().await.transpose() and
then self.group.take()), add a debug log before taking the group to record that
a group with multiple frames was encountered and that subsequent frames will be
dropped; use the crate's logging facility (e.g., tracing::debug! or log::debug!)
and include identifying details from the frame (Debug-format the frame or
include a frame id/metadata if available) so future delta support debugging can
locate which group/frame was ignored.

In `@rs/moq-mux/src/catalog.rs`:
- Around line 49-75: The flush method currently swallows serde_json
deserialization errors for the video/audio sections (using .and_then(...
.ok())), so update the deserialization in flush to explicitly match the Result
from serde_json::from_value for both Video and Audio: if Ok(v) set video (or
audio), if Err(e) log a warning that includes the section name ("video" or
"audio"), the serde error e, and the offending JSON value (or a short preview),
then continue; keep dropping state and calling
crate::msf::publish(video.as_ref(), audio.as_ref(), &mut self.msf_track) so
publish behavior is unchanged but failures are visible.

In `@rs/moq-mux/src/convert/fmp4.rs`:
- Around line 61-71: Both run_with_catalog and run in fmp4.rs duplicate the
catalog frame reading and JSON parsing logic found also in hang.rs; extract that
shared behavior into a single helper (e.g., read_and_parse_catalog_frames or
parse_catalog_from_consumer) that accepts a &moq_lite::BroadcastConsumer (and
any needed mutable CatalogProducer references) and returns the parsed catalog
structure (or Result). Replace the duplicated blocks in run_with_catalog and run
with calls to this helper to centralize buffering, frame extraction, and
serde_json parsing; ensure the helper exposes any errors upward and preserves
the same returned types so callers (run_with_catalog, run) only perform
protocol-specific handling after receiving the parsed catalog.

In `@rs/moq-mux/src/producer/av01.rs`:
- Around line 39-47: Replace the string literal "video" in the read path with
the catalog constant to match the write path: when building the local video
value in the block that calls catalog.writer().read(), use hang::catalog::VIDEO
(or &hang::catalog::VIDEO as appropriate) instead of "video" so the get lookup
is consistent with the write side and avoids typos; update the use in the
closure that does .sections.get(...) accordingly.
- Around line 116-117: The calls that currently use "let _ =
self.catalog.set(&hang::catalog::VIDEO, &self.video);" (and the similar
occurrences) silently discard the Result; update these to handle failures by
either propagating the error (returning the Result with ? from the surrounding
function) or logging the error via the project's logger (e.g., processLogger or
crate logger) with context including hang::catalog::VIDEO and self.video;
specifically replace the discard at the sites calling self.catalog.set(...) with
either "self.catalog.set(...)?", or a match/if let Err(e) => log::error!(... ,
e) so errors are not lost. Ensure the chosen approach is consistent across the
other occurrences referenced by the reviewer.

In `@rs/moq-mux/src/producer/avc3.rs`:
- Around line 39-47: Replace the hardcoded "video" key when reading from
catalog.sections with the VIDEO constant's name field: use
hang::catalog::VIDEO.name() or hang::catalog::VIDEO.name (match the constant's
API) in the call inside the block that constructs `video` (the code using
`catalog.writer().read()` and `state.sections.get(...)`), so the lookup uses the
canonical section name; apply the same change to the analogous lookups in
`hev1.rs` and `av01.rs`.

In `@rs/moq-mux/src/producer/hev1.rs`:
- Around line 41-49: Replace the hard-coded "video" key with the catalog
constant to match the write path: change sections.get("video") to
sections.get(hang::catalog::VIDEO.name) in the block that reads the current
video section (the code using catalog.writer().read(), .sections, and
serde_json::from_value). Ensure the lookup uses the VIDEO.name constant so the
read path and write path use the same identifier.

In `@rs/moq-mux/src/producer/test/mod.rs`:
- Around line 22-31: Currently the test silently defaults when decoding of
present sections fails; change the logic so that when
state.sections.get("video") or get("audio") returns None you keep the default
Video/Audio, but when a Value is present you attempt serde_json::from_value and
propagate a decode error (panic or unwrap/expect) instead of turning it into a
default; update the two bindings (Video and Audio) that use
state.sections.get(...).and_then(|v|
serde_json::from_value(v.clone()).ok()).unwrap_or_default() to check presence
first (e.g., match on get("video")/get("audio")) and call
serde_json::from_value(...).expect("failed to decode video section") so
malformed payloads fail the test while absent sections still default.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: dead851f-266c-4cad-89c6-b3d6e9490edc

📥 Commits

Reviewing files that changed from the base of the PR and between 64cbba8 and 997e161.

⛔ Files ignored due to path filters (2)
  • Cargo.lock is excluded by !**/*.lock
  • bun.lock is excluded by !**/*.lock
📒 Files selected for processing (74)
  • js/hang/src/catalog/audio.ts
  • js/hang/src/catalog/capabilities.ts
  • js/hang/src/catalog/chat.ts
  • js/hang/src/catalog/index.ts
  • js/hang/src/catalog/location.ts
  • js/hang/src/catalog/preview.ts
  • js/hang/src/catalog/reader.ts
  • js/hang/src/catalog/root.ts
  • js/hang/src/catalog/section.ts
  • js/hang/src/catalog/user.ts
  • js/hang/src/catalog/video.ts
  • js/hang/src/catalog/writer.ts
  • js/publish/src/audio/encoder.ts
  • js/publish/src/broadcast.ts
  • js/publish/src/chat/index.ts
  • js/publish/src/chat/message.ts
  • js/publish/src/chat/typing.ts
  • js/publish/src/location/index.ts
  • js/publish/src/location/peers.ts
  • js/publish/src/location/types.ts
  • js/publish/src/location/window.ts
  • js/publish/src/preview.ts
  • js/publish/src/user.ts
  • js/publish/src/video/encoder.ts
  • js/publish/src/video/index.ts
  • js/watch/package.json
  • js/watch/src/audio/source.ts
  • js/watch/src/broadcast.ts
  • js/watch/src/chat/index.ts
  • js/watch/src/chat/message.ts
  • js/watch/src/chat/typing.ts
  • js/watch/src/location/index.ts
  • js/watch/src/location/peers.ts
  • js/watch/src/location/window.ts
  • js/watch/src/preview.ts
  • js/watch/src/sections.ts
  • js/watch/src/user.ts
  • js/watch/src/video/source.ts
  • rs/hang/Cargo.toml
  • rs/hang/examples/subscribe.rs
  • rs/hang/examples/video.rs
  • rs/hang/src/catalog/audio/mod.rs
  • rs/hang/src/catalog/chat.rs
  • rs/hang/src/catalog/consumer.rs
  • rs/hang/src/catalog/mod.rs
  • rs/hang/src/catalog/preview.rs
  • rs/hang/src/catalog/reader.rs
  • rs/hang/src/catalog/root.rs
  • rs/hang/src/catalog/section.rs
  • rs/hang/src/catalog/user.rs
  • rs/hang/src/catalog/video/mod.rs
  • rs/hang/src/catalog/writer.rs
  • rs/hang/src/error.rs
  • rs/hang/src/lib.rs
  • rs/libmoq/Cargo.toml
  • rs/libmoq/src/consume.rs
  • rs/moq-cli/src/subscribe.rs
  • rs/moq-ffi/Cargo.toml
  • rs/moq-ffi/src/consumer.rs
  • rs/moq-ffi/src/media.rs
  • rs/moq-mux/Cargo.toml
  • rs/moq-mux/src/catalog.rs
  • rs/moq-mux/src/consumer/fmp4.rs
  • rs/moq-mux/src/convert/fmp4.rs
  • rs/moq-mux/src/convert/hang.rs
  • rs/moq-mux/src/convert/test.rs
  • rs/moq-mux/src/msf.rs
  • rs/moq-mux/src/producer/aac.rs
  • rs/moq-mux/src/producer/av01.rs
  • rs/moq-mux/src/producer/avc3.rs
  • rs/moq-mux/src/producer/fmp4.rs
  • rs/moq-mux/src/producer/hev1.rs
  • rs/moq-mux/src/producer/opus.rs
  • rs/moq-mux/src/producer/test/mod.rs
💤 Files with no reviewable changes (10)
  • js/hang/src/catalog/chat.ts
  • js/hang/src/catalog/user.ts
  • rs/hang/src/catalog/user.rs
  • js/hang/src/catalog/preview.ts
  • rs/hang/src/catalog/root.rs
  • rs/hang/src/catalog/chat.rs
  • rs/hang/src/catalog/preview.rs
  • js/hang/src/catalog/location.ts
  • js/hang/src/catalog/capabilities.ts
  • js/hang/src/catalog/root.ts

Comment on lines +19 to +25
section<T>(def: Section<T>): Getter<T | undefined> {
const existing = this.#sections.get(def.name);
if (existing) return existing.signal as Getter<T | undefined>;

const signal = new Signal<T | undefined>(undefined);
this.#sections.set(def.name, { schema: def.schema, signal });
return signal;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Make section() honor the current snapshot and full section identity.

Registrations are reused by def.name alone and new signals always start at undefined. That means a late subscriber misses the current catalog until another frame arrives, and a same-name section with a different schema aliases the first registration via an unchecked cast. Cache the last parsed object and reject schema mismatches before reusing an existing entry.

Comment on lines +45 to +51
if (raw !== undefined) {
try {
const parsed = schema.parse(raw);
signal.set(parsed);
} catch (err) {
console.warn(`invalid catalog section "${name}"`, err);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Invalidate the section when schema validation fails.

A bad payload currently logs and leaves the previous value in place. Because catalog frames are whole snapshots, that exposes stale section state after an invalid update. Clear the signal in the catch path so consumers stop using data that no longer validates.

♻️ Suggested fix
 						if (raw !== undefined) {
 							try {
 								const parsed = schema.parse(raw);
 								signal.set(parsed);
 							} catch (err) {
 								console.warn(`invalid catalog section "${name}"`, err);
+								signal.set(undefined);
 							}
 						} else {
 							signal.set(undefined);
 						}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (raw !== undefined) {
try {
const parsed = schema.parse(raw);
signal.set(parsed);
} catch (err) {
console.warn(`invalid catalog section "${name}"`, err);
}
if (raw !== undefined) {
try {
const parsed = schema.parse(raw);
signal.set(parsed);
} catch (err) {
console.warn(`invalid catalog section "${name}"`, err);
signal.set(undefined);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/hang/src/catalog/reader.ts` around lines 45 - 51, When schema.parse(raw)
throws in the reader for a catalog section (inside the try/catch that currently
calls schema.parse and signal.set), clear the section's signal in the catch path
so stale data is not retained; i.e., in the catch that currently logs the
invalid payload for name, call the appropriate method to unset the signal (e.g.,
set it to undefined or reset the signal) instead of leaving the previous value.
Ensure you still log the error as before and reference the same symbols (raw,
schema.parse, signal.set) when making the change.

Comment on lines +49 to 69
pub async fn run(&mut self) -> Result<()> {
loop {
tokio::select! {
res = self.track.recv_group() => {
match res? {
Some(group) => {
// Use the new group.
self.group = Some(group);
}
// The track has ended, so we should return None.
None => return Ok(None),
None => {
self.reader.close();
return Ok(());
}
}
},
Some(frame) = async { self.group.as_mut()?.read_frame().await.transpose() } => {
self.group.take(); // We don't support deltas yet
let catalog = Catalog::from_slice(&frame?)?;
return Ok(Some(catalog));

let json: serde_json::Map<String, serde_json::Value> =
serde_json::from_slice(&frame?)?;
self.reader.update(json);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Close the CatalogReader on every exit path.

self.reader.close() only runs on clean EOF. Any error from recv_group(), read_frame(), or serde_json::from_slice() returns early and leaves section consumers open with their last value. Close the reader in a finally-style wrapper so waiters always see termination.

♻️ Suggested fix
 	pub async fn run(&mut self) -> Result<()> {
-		loop {
-			tokio::select! {
-				res = self.track.recv_group() => {
-					match res? {
-						Some(group) => {
-							self.group = Some(group);
-						}
-						None => {
-							self.reader.close();
-							return Ok(());
-						}
-					}
-				},
-				Some(frame) = async { self.group.as_mut()?.read_frame().await.transpose() } => {
-					self.group.take(); // We don't support deltas yet
-
-					let json: serde_json::Map<String, serde_json::Value> =
-						serde_json::from_slice(&frame?)?;
-					self.reader.update(json);
-				}
-			}
-		}
+		let result = async {
+			loop {
+				tokio::select! {
+					res = self.track.recv_group() => {
+						match res? {
+							Some(group) => {
+								self.group = Some(group);
+							}
+							None => return Ok(()),
+						}
+					},
+					Some(frame) = async { self.group.as_mut()?.read_frame().await.transpose() } => {
+						self.group.take(); // We don't support deltas yet
+
+						let json: serde_json::Map<String, serde_json::Value> =
+							serde_json::from_slice(&frame?)?;
+						self.reader.update(json);
+					}
+				}
+			}
+		}
+		.await;
+
+		self.reader.close();
+		result
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn run(&mut self) -> Result<()> {
loop {
tokio::select! {
res = self.track.recv_group() => {
match res? {
Some(group) => {
// Use the new group.
self.group = Some(group);
}
// The track has ended, so we should return None.
None => return Ok(None),
None => {
self.reader.close();
return Ok(());
}
}
},
Some(frame) = async { self.group.as_mut()?.read_frame().await.transpose() } => {
self.group.take(); // We don't support deltas yet
let catalog = Catalog::from_slice(&frame?)?;
return Ok(Some(catalog));
let json: serde_json::Map<String, serde_json::Value> =
serde_json::from_slice(&frame?)?;
self.reader.update(json);
}
pub async fn run(&mut self) -> Result<()> {
let result = async {
loop {
tokio::select! {
res = self.track.recv_group() => {
match res? {
Some(group) => {
self.group = Some(group);
}
None => return Ok(()),
}
},
Some(frame) = async { self.group.as_mut()?.read_frame().await.transpose() } => {
self.group.take(); // We don't support deltas yet
let json: serde_json::Map<String, serde_json::Value> =
serde_json::from_slice(&frame?)?;
self.reader.update(json);
}
}
}
}
.await;
self.reader.close();
result
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/hang/src/catalog/consumer.rs` around lines 49 - 69, The run() loop
currently only calls self.reader.close() on clean EOF; change it so
CatalogReader::close() is always invoked on every exit path (errors from
self.track.recv_group(), self.group.as_mut()?.read_frame(), and
serde_json::from_slice()). Implement a "finally"-style cleanup: wrap the loop
scope or the async select in a block and ensure self.reader.close() is called
after the loop (or install a guard that calls close in its Drop), so regardless
of early returns or ? errors the reader is closed; reference functions/values:
run(), self.track.recv_group(), self.group.as_mut()?.read_frame(),
serde_json::from_slice(), and self.reader.close().

Comment on lines +71 to +79
// Run until first catalog update
tokio::select! {
res = catalog_consumer.run() => { res?; anyhow::bail!("catalog closed before first update"); },
_ = video_section.changed() => {},
_ = audio_section.changed() => {},
}

let video = video_section.get()?.unwrap_or_default();
let audio = audio_section.get()?.unwrap_or_default();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't lock the muxer to the first single-section update.

This select! stops consuming the catalog as soon as either video_section or audio_section changes. If the publisher advertises video and audio in separate catalog flushes, the later section never makes it into exporter.init() or muxer_tracks, so the output silently drops that media type. Keep the catalog task alive until you've captured the initial layout you want to mux.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-cli/src/subscribe.rs` around lines 71 - 79, The select! currently
returns after the first section change, which can drop the other section before
exporter.init() / muxer_tracks see it; change the logic to keep the
catalog_consumer task alive until both sections have produced their initial
values. Replace the single select! with a loop that checks video_section.get()
and audio_section.get() (e.g., while video_section.get()?.is_none() ||
audio_section.get()?.is_none()) and inside the loop use tokio::select! to await
catalog_consumer.run(), video_section.changed(), and audio_section.changed(); on
catalog_consumer.run() error bail as before, otherwise loop until both sections
are present, then read video_section.get() and audio_section.get() and call
exporter.init()/build muxer_tracks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant