Replace monolithic catalog with generic section-based registry#1149
Replace monolithic catalog with generic section-based registry#1149
Conversation
The hang catalog previously hardcoded application-specific sections (chat, user, preview, location, capabilities). This replaces the monolithic Catalog/Root type with a generic Section<T> registry where sections are identified by name + schema pairs, enabling custom applications to define their own sections. Key changes: - Add Section<T> type pairing a name with a typed schema (Rust: serde, JS: zod) - Add CatalogWriter for producing catalogs with typed sections - Add CatalogReader with per-section change notifications (conducer/signals) - Predefined VIDEO/AUDIO sections (not auto-registered) - Remove app-specific sections from @moq/hang (chat, user, preview, etc.) - Migrate moq-mux, moq-cli, libmoq, moq-ffi to section-based API - Migrate js/publish and js/watch to use CatalogWriter/CatalogReader Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
WalkthroughThis pull request refactors the catalog system from a monolithic root structure to a section-based architecture. Monolithic catalog types ( 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment Tip CodeRabbit can generate a title for your PR based on the changes with custom instructions.Set the |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
rs/moq-mux/src/producer/hev1.rs (1)
349-357:⚠️ Potential issue | 🟡 MinorUse
take()instead of borrowing inDropto properly clean up the track.The current code borrows the track with
&self.trackinstead of taking ownership. This differs fromav01.rswhich correctly usesself.track.take(). As a result, theOptionstill holdsSomeafter the block, and the underlying track may not be properly dropped.🐛 Proposed fix
impl Drop for Hev1 { fn drop(&mut self) { - if let Some(track) = &self.track { + if let Some(track) = self.track.take() { tracing::debug!(name = ?track.info.name, "ending track"); self.video.remove_track(&track.info); let _ = self.catalog.set(&hang::catalog::VIDEO, &self.video); self.catalog.flush(); } } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-mux/src/producer/hev1.rs` around lines 349 - 357, In Hev1's Drop implementation, replace borrowing the Option with taking ownership so the inner Track is actually removed and dropped: use self.track.take() to extract the track (instead of matching on &self.track), then call tracing::debug! with the extracted track.info.name, call self.video.remove_track(&track.info), update the catalog with self.catalog.set(&hang::catalog::VIDEO, &self.video) and flush; this ensures the Option is set to None and the underlying track can be dropped (look for the impl Drop for Hev1, the track field, and the calls to remove_track, catalog.set, and catalog.flush).js/watch/src/preview.ts (1)
38-49:⚠️ Potential issue | 🟠 MajorKeep reading preview updates from the track.
This task does a single
Zod.read()and then exits. The publisher can emit multiplepreview.jsonframes over the lifetime of the track, so later changes—and end-of-track cleanup—are never observed here.♻️ Suggested fix
effect.spawn(async () => { try { - const info = await Zod.read(track, PreviewSchema); - if (!info) return; - - this.preview.set(info); + for (;;) { + const info = await Zod.read(track, PreviewSchema); + if (!info) break; + this.preview.set(info); + } } catch (error) { console.warn("Failed to parse preview JSON:", error); + } finally { + this.preview.set(undefined); } }); - - effect.cleanup(() => this.preview.set(undefined));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@js/watch/src/preview.ts` around lines 38 - 49, The current effect.spawn performs a single Zod.read(track, PreviewSchema) and exits, so subsequent preview.json frames and end-of-track are missed; change the task launched in effect.spawn (the block containing Zod.read, PreviewSchema and this.preview.set) to continuously read frames from the track (e.g., loop or async iterator) and call this.preview.set(info) for every successful parse, and ensure effect.cleanup stops that loop and resets this.preview to undefined (use an AbortController or a cancellation flag inside the spawned task) so later updates and end-of-track cleanup are observed.
🧹 Nitpick comments (13)
js/publish/src/location/types.ts (1)
1-8: Clarify thesfield intent in this public type.
Position.sis ambiguous for API consumers. If the short key is required for payload compatibility, add a doc comment explaining what it represents (for example, scale).Suggested clarification
+/** Position in 3D space with optional scalar metadata. */ export type Position = { x?: number; y?: number; z?: number; + /** Uniform scale factor. */ s?: number; };As per coding guidelines, "Use clear and descriptive variable names that convey intent" and "Document public APIs with clear docstrings or comments".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@js/publish/src/location/types.ts` around lines 1 - 8, The public type Position exposes an ambiguous short field s; update the Position type declaration to document what s stands for (e.g., scale, speed, semantic state) with a concise doc comment above the Position type and above the s property, and if payload compatibility requires the short key keep the name but add the descriptive comment; reference the Position type and its s property (and Peers which uses Position) so consumers understand the field's intent.rs/moq-mux/src/producer/test/mod.rs (1)
22-31: Differentiate missing sections from invalid section payloads.Right now malformed section JSON is silently treated as default, which can hide root-cause regressions in test failures. Keep defaulting for absent sections, but fail when a present section cannot be decoded.
Suggested adjustment
- let video: Video = state - .sections - .get("video") - .and_then(|v| serde_json::from_value(v.clone()).ok()) - .unwrap_or_default(); - let audio: Audio = state - .sections - .get("audio") - .and_then(|v| serde_json::from_value(v.clone()).ok()) - .unwrap_or_default(); + let video: Video = state + .sections + .get("video") + .map(|value| serde_json::from_value(value.clone()).expect("invalid video section")) + .unwrap_or_default(); + let audio: Audio = state + .sections + .get("audio") + .map(|value| serde_json::from_value(value.clone()).expect("invalid audio section")) + .unwrap_or_default();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-mux/src/producer/test/mod.rs` around lines 22 - 31, Currently the test silently defaults when decoding of present sections fails; change the logic so that when state.sections.get("video") or get("audio") returns None you keep the default Video/Audio, but when a Value is present you attempt serde_json::from_value and propagate a decode error (panic or unwrap/expect) instead of turning it into a default; update the two bindings (Video and Audio) that use state.sections.get(...).and_then(|v| serde_json::from_value(v.clone()).ok()).unwrap_or_default() to check presence first (e.g., match on get("video")/get("audio")) and call serde_json::from_value(...).expect("failed to decode video section") so malformed payloads fail the test while absent sections still default.rs/libmoq/src/consume.rs (1)
78-101: Manual catalog parsing replaces legacy CatalogConsumer.The refactored
run_catalogmanually reads frames from the track and parses JSON sections. The use ofunwrap_or_default()for missing sections is appropriate since video/audio sections are optional.Consider using section name constants instead of the magic strings
"video"and"audio"for consistency with the section definitions.,
♻️ Optional: Use section name constants
If
hang::catalog::VIDEOandhang::catalog::AUDIOexpose their names, consider:- let video: hang::catalog::Video = json - .get("video") + let video: hang::catalog::Video = json + .get(hang::catalog::VIDEO.name)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/libmoq/src/consume.rs` around lines 78 - 101, In run_catalog replace the magic string lookups for the JSON sections with the catalog section name constants instead of "video" and "audio": locate the serde_json::Map accessors where you call .get("video") and .get("audio") and use the appropriate constants exported by hang::catalog (e.g., hang::catalog::VIDEO / hang::catalog::AUDIO or their NAME variants) so the code reads the same but relies on the canonical section identifiers used elsewhere (keep the same serde_json::from_value, transpose, map_err and unwrap_or_default logic and types hang::catalog::Video and hang::catalog::Audio).rs/moq-mux/src/producer/avc3.rs (1)
39-47: Use theVIDEOconstant'snamefield instead of the hardcoded string.The hardcoded
"video"string on line 44 should referencehang::catalog::VIDEO.nameto ensure consistency and prevent drift if the section name changes.♻️ Suggested improvement
// Read the current video section from the catalog, if any let video: hang::catalog::Video = { let state = catalog.writer().read(); state .sections - .get("video") + .get(hang::catalog::VIDEO.name) .and_then(|v| serde_json::from_value(v.clone()).ok()) .unwrap_or_default() };Note: The same pattern is used in
hev1.rsandav01.rsand should be updated consistently.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-mux/src/producer/avc3.rs` around lines 39 - 47, Replace the hardcoded "video" key when reading from catalog.sections with the VIDEO constant's name field: use hang::catalog::VIDEO.name() or hang::catalog::VIDEO.name (match the constant's API) in the call inside the block that constructs `video` (the code using `catalog.writer().read()` and `state.sections.get(...)`), so the lookup uses the canonical section name; apply the same change to the analogous lookups in `hev1.rs` and `av01.rs`.rs/moq-mux/src/producer/av01.rs (2)
39-47: Consider using theVIDEOconstant for consistency.The read path uses the string literal
"video"while the write path uses&hang::catalog::VIDEO. For consistency and to avoid potential typos, consider using the constant for both:let video: hang::catalog::Video = { let state = catalog.writer().read(); state .sections - .get("video") + .get(hang::catalog::VIDEO.name) .and_then(|v| serde_json::from_value(v.clone()).ok()) .unwrap_or_default() };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-mux/src/producer/av01.rs` around lines 39 - 47, Replace the string literal "video" in the read path with the catalog constant to match the write path: when building the local video value in the block that calls catalog.writer().read(), use hang::catalog::VIDEO (or &hang::catalog::VIDEO as appropriate) instead of "video" so the get lookup is consistent with the write side and avoids typos; update the use in the closure that does .sections.get(...) accordingly.
116-117: Discardingset()result silently.The
let _ = self.catalog.set(...)pattern discards any error fromset(). Ifset()can fail in a meaningful way, consider logging or propagating the error. If it's intentionally infallible in this context, the current pattern is acceptable.Also applies to: 160-161, 249-250, 439-440
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-mux/src/producer/av01.rs` around lines 116 - 117, The calls that currently use "let _ = self.catalog.set(&hang::catalog::VIDEO, &self.video);" (and the similar occurrences) silently discard the Result; update these to handle failures by either propagating the error (returning the Result with ? from the surrounding function) or logging the error via the project's logger (e.g., processLogger or crate logger) with context including hang::catalog::VIDEO and self.video; specifically replace the discard at the sites calling self.catalog.set(...) with either "self.catalog.set(...)?", or a match/if let Err(e) => log::error!(... , e) so errors are not lost. Ensure the chosen approach is consistent across the other occurrences referenced by the reviewer.rs/moq-mux/src/producer/hev1.rs (1)
41-49: Same consistency suggestion as av01.rs: useVIDEO.nameconstant.For consistency with the write path that uses
&hang::catalog::VIDEO, consider using the constant's name field for the read path as well.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-mux/src/producer/hev1.rs` around lines 41 - 49, Replace the hard-coded "video" key with the catalog constant to match the write path: change sections.get("video") to sections.get(hang::catalog::VIDEO.name) in the block that reads the current video section (the code using catalog.writer().read(), .sections, and serde_json::from_value). Ensure the lookup uses the VIDEO.name constant so the read path and write path use the same identifier.js/watch/src/broadcast.ts (1)
112-123: Status remains "live" even if all sections become undefined.Once any section has a value, status is set to
"live". If all sections subsequently become undefined (e.g., empty catalog update), the status won't revert to"loading". If the intent is "once live, stay live until cleanup," this is fine. Otherwise, consider adding anelsebranch:if (video || audio || chat || user || preview || location) { this.status.set("live"); +} else { + this.status.set("loading"); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@js/watch/src/broadcast.ts` around lines 112 - 123, The effect.run callback currently sets this.status.set("live") whenever any of the tracked signals (this.video, this.audio, this.chat, this.user, this.preview, this.location) are truthy, but never resets the status when all become undefined; update the effect.run in broadcast.ts to add an else branch that sets this.status back to "loading" (or another appropriate non-live state) when none of the signals are present, so after checking const video = inner.get(this.video); const audio = inner.get(this.audio); const chat = inner.get(this.chat); const user = inner.get(this.user); const preview = inner.get(this.preview); const location = inner.get(this.location); you explicitly call this.status.set("loading") when video||audio||chat||user||preview||location is falsey.rs/moq-ffi/src/consumer.rs (1)
44-45: Clarify the delta support limitation.The comment "We don't support deltas yet" explains why the group is taken after reading the first frame. However, this means subsequent frames in the same group are silently ignored. Consider adding a debug log when a group with multiple frames is encountered, to aid future debugging when delta support is added.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-ffi/src/consumer.rs` around lines 44 - 45, When handling a frame from a group in consumer.rs (the match arm that calls self.group.as_mut()?.read_frame().await.transpose() and then self.group.take()), add a debug log before taking the group to record that a group with multiple frames was encountered and that subsequent frames will be dropped; use the crate's logging facility (e.g., tracing::debug! or log::debug!) and include identifying details from the frame (Debug-format the frame or include a frame id/metadata if available) so future delta support debugging can locate which group/frame was ignored.js/watch/src/sections.ts (1)
28-43: Clarify the distinction betweenPreviewTrackSchemaandPreviewSchema.The
PREVIEWsection usesPreviewTrackSchema(just a track name), whilePreviewSchemadefines a richer structure with name, avatar, audio, video, etc.Is
PreviewSchemaintended for the content on the preview track rather than the catalog section itself? If so, a brief comment would help clarify this distinction.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@js/watch/src/sections.ts` around lines 28 - 43, Add a short clarifying comment above PreviewTrackSchema/PreviewTrack type and PREVIEW explaining that this schema represents the preview "track" identifier used by the Section (a minimal schema containing only the track name), and add a separate comment above PreviewSchema/Preview type explaining that this is the richer preview content shape (name, avatar, audio/video/typing/chat/screen flags) used for preview track payloads or catalog content; reference PreviewTrackSchema, PREVIEW, and PreviewSchema in the comments so future readers understand the distinction and intended usage.rs/moq-mux/src/convert/fmp4.rs (1)
61-71: Consider extracting shared catalog reading logic.The
run_with_catalogpattern and the catalog reading inrun()are nearly identical betweenfmp4.rsandhang.rs. While not blocking, extracting the common catalog frame reading and JSON parsing logic into a shared helper could reduce duplication.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-mux/src/convert/fmp4.rs` around lines 61 - 71, Both run_with_catalog and run in fmp4.rs duplicate the catalog frame reading and JSON parsing logic found also in hang.rs; extract that shared behavior into a single helper (e.g., read_and_parse_catalog_frames or parse_catalog_from_consumer) that accepts a &moq_lite::BroadcastConsumer (and any needed mutable CatalogProducer references) and returns the parsed catalog structure (or Result). Replace the duplicated blocks in run_with_catalog and run with calls to this helper to centralize buffering, frame extraction, and serde_json parsing; ensure the helper exposes any errors upward and preserves the same returned types so callers (run_with_catalog, run) only perform protocol-specific handling after receiving the parsed catalog.rs/moq-mux/src/catalog.rs (1)
49-75: Silent deserialization failures may hide issues.Lines 64-71 use
.and_then(|v| serde_json::from_value(v.clone()).ok())which silently ignores deserialization errors for video/audio sections. If the JSON structure doesn't matchVideo/Audio, the MSF catalog will be published without those sections, with no warning.Consider logging a warning when deserialization fails:
🔧 Suggested improvement
let video: Option<Video> = state .sections .get("video") - .and_then(|v| serde_json::from_value(v.clone()).ok()); + .and_then(|v| { + serde_json::from_value(v.clone()) + .map_err(|e| tracing::warn!("Failed to deserialize video section for MSF: {}", e)) + .ok() + }); let audio: Option<Audio> = state .sections .get("audio") - .and_then(|v| serde_json::from_value(v.clone()).ok()); + .and_then(|v| { + serde_json::from_value(v.clone()) + .map_err(|e| tracing::warn!("Failed to deserialize audio section for MSF: {}", e)) + .ok() + });🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-mux/src/catalog.rs` around lines 49 - 75, The flush method currently swallows serde_json deserialization errors for the video/audio sections (using .and_then(... .ok())), so update the deserialization in flush to explicitly match the Result from serde_json::from_value for both Video and Audio: if Ok(v) set video (or audio), if Err(e) log a warning that includes the section name ("video" or "audio"), the serde error e, and the offending JSON value (or a short preview), then continue; keep dropping state and calling crate::msf::publish(video.as_ref(), audio.as_ref(), &mut self.msf_track) so publish behavior is unchanged but failures are visible.rs/hang/src/catalog/reader.rs (1)
66-81: Consider potential lock contention during update.The
updatemethod holds the mutex lock while iterating through all sections and potentially writing to each producer. For catalogs with many sections, this could block other operations likesection()registration.This is likely acceptable for typical use cases where catalogs have a small number of sections, but worth noting if the section count grows significantly.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/hang/src/catalog/reader.rs` around lines 66 - 81, The update method holds the mutex while iterating and calling producer.write(), which can cause contention; change update to briefly lock self.inner to clone the necessary state (clone or collect inner.sections entries as (name.clone(), producer.clone()) and clone inner.last), then drop the lock and perform comparisons and producer.write() outside the mutex, and finally re-lock once to set inner.last = json; update references: update, inner, sections, inner.last, producer.write(), and ensure producer clones are used so writes happen without holding the mutex.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@js/hang/src/catalog/reader.ts`:
- Around line 45-51: When schema.parse(raw) throws in the reader for a catalog
section (inside the try/catch that currently calls schema.parse and signal.set),
clear the section's signal in the catch path so stale data is not retained;
i.e., in the catch that currently logs the invalid payload for name, call the
appropriate method to unset the signal (e.g., set it to undefined or reset the
signal) instead of leaving the previous value. Ensure you still log the error as
before and reference the same symbols (raw, schema.parse, signal.set) when
making the change.
In `@rs/hang/src/catalog/consumer.rs`:
- Around line 49-69: The run() loop currently only calls self.reader.close() on
clean EOF; change it so CatalogReader::close() is always invoked on every exit
path (errors from self.track.recv_group(), self.group.as_mut()?.read_frame(),
and serde_json::from_slice()). Implement a "finally"-style cleanup: wrap the
loop scope or the async select in a block and ensure self.reader.close() is
called after the loop (or install a guard that calls close in its Drop), so
regardless of early returns or ? errors the reader is closed; reference
functions/values: run(), self.track.recv_group(),
self.group.as_mut()?.read_frame(), serde_json::from_slice(), and
self.reader.close().
In `@rs/moq-cli/src/subscribe.rs`:
- Around line 71-79: The select! currently returns after the first section
change, which can drop the other section before exporter.init() / muxer_tracks
see it; change the logic to keep the catalog_consumer task alive until both
sections have produced their initial values. Replace the single select! with a
loop that checks video_section.get() and audio_section.get() (e.g., while
video_section.get()?.is_none() || audio_section.get()?.is_none()) and inside the
loop use tokio::select! to await catalog_consumer.run(),
video_section.changed(), and audio_section.changed(); on catalog_consumer.run()
error bail as before, otherwise loop until both sections are present, then read
video_section.get() and audio_section.get() and call exporter.init()/build
muxer_tracks.
---
Outside diff comments:
In `@js/watch/src/preview.ts`:
- Around line 38-49: The current effect.spawn performs a single Zod.read(track,
PreviewSchema) and exits, so subsequent preview.json frames and end-of-track are
missed; change the task launched in effect.spawn (the block containing Zod.read,
PreviewSchema and this.preview.set) to continuously read frames from the track
(e.g., loop or async iterator) and call this.preview.set(info) for every
successful parse, and ensure effect.cleanup stops that loop and resets
this.preview to undefined (use an AbortController or a cancellation flag inside
the spawned task) so later updates and end-of-track cleanup are observed.
In `@rs/moq-mux/src/producer/hev1.rs`:
- Around line 349-357: In Hev1's Drop implementation, replace borrowing the
Option with taking ownership so the inner Track is actually removed and dropped:
use self.track.take() to extract the track (instead of matching on &self.track),
then call tracing::debug! with the extracted track.info.name, call
self.video.remove_track(&track.info), update the catalog with
self.catalog.set(&hang::catalog::VIDEO, &self.video) and flush; this ensures the
Option is set to None and the underlying track can be dropped (look for the impl
Drop for Hev1, the track field, and the calls to remove_track, catalog.set, and
catalog.flush).
---
Nitpick comments:
In `@js/publish/src/location/types.ts`:
- Around line 1-8: The public type Position exposes an ambiguous short field s;
update the Position type declaration to document what s stands for (e.g., scale,
speed, semantic state) with a concise doc comment above the Position type and
above the s property, and if payload compatibility requires the short key keep
the name but add the descriptive comment; reference the Position type and its s
property (and Peers which uses Position) so consumers understand the field's
intent.
In `@js/watch/src/broadcast.ts`:
- Around line 112-123: The effect.run callback currently sets
this.status.set("live") whenever any of the tracked signals (this.video,
this.audio, this.chat, this.user, this.preview, this.location) are truthy, but
never resets the status when all become undefined; update the effect.run in
broadcast.ts to add an else branch that sets this.status back to "loading" (or
another appropriate non-live state) when none of the signals are present, so
after checking const video = inner.get(this.video); const audio =
inner.get(this.audio); const chat = inner.get(this.chat); const user =
inner.get(this.user); const preview = inner.get(this.preview); const location =
inner.get(this.location); you explicitly call this.status.set("loading") when
video||audio||chat||user||preview||location is falsey.
In `@js/watch/src/sections.ts`:
- Around line 28-43: Add a short clarifying comment above
PreviewTrackSchema/PreviewTrack type and PREVIEW explaining that this schema
represents the preview "track" identifier used by the Section (a minimal schema
containing only the track name), and add a separate comment above
PreviewSchema/Preview type explaining that this is the richer preview content
shape (name, avatar, audio/video/typing/chat/screen flags) used for preview
track payloads or catalog content; reference PreviewTrackSchema, PREVIEW, and
PreviewSchema in the comments so future readers understand the distinction and
intended usage.
In `@rs/hang/src/catalog/reader.rs`:
- Around line 66-81: The update method holds the mutex while iterating and
calling producer.write(), which can cause contention; change update to briefly
lock self.inner to clone the necessary state (clone or collect inner.sections
entries as (name.clone(), producer.clone()) and clone inner.last), then drop the
lock and perform comparisons and producer.write() outside the mutex, and finally
re-lock once to set inner.last = json; update references: update, inner,
sections, inner.last, producer.write(), and ensure producer clones are used so
writes happen without holding the mutex.
In `@rs/libmoq/src/consume.rs`:
- Around line 78-101: In run_catalog replace the magic string lookups for the
JSON sections with the catalog section name constants instead of "video" and
"audio": locate the serde_json::Map accessors where you call .get("video") and
.get("audio") and use the appropriate constants exported by hang::catalog (e.g.,
hang::catalog::VIDEO / hang::catalog::AUDIO or their NAME variants) so the code
reads the same but relies on the canonical section identifiers used elsewhere
(keep the same serde_json::from_value, transpose, map_err and unwrap_or_default
logic and types hang::catalog::Video and hang::catalog::Audio).
In `@rs/moq-ffi/src/consumer.rs`:
- Around line 44-45: When handling a frame from a group in consumer.rs (the
match arm that calls self.group.as_mut()?.read_frame().await.transpose() and
then self.group.take()), add a debug log before taking the group to record that
a group with multiple frames was encountered and that subsequent frames will be
dropped; use the crate's logging facility (e.g., tracing::debug! or log::debug!)
and include identifying details from the frame (Debug-format the frame or
include a frame id/metadata if available) so future delta support debugging can
locate which group/frame was ignored.
In `@rs/moq-mux/src/catalog.rs`:
- Around line 49-75: The flush method currently swallows serde_json
deserialization errors for the video/audio sections (using .and_then(...
.ok())), so update the deserialization in flush to explicitly match the Result
from serde_json::from_value for both Video and Audio: if Ok(v) set video (or
audio), if Err(e) log a warning that includes the section name ("video" or
"audio"), the serde error e, and the offending JSON value (or a short preview),
then continue; keep dropping state and calling
crate::msf::publish(video.as_ref(), audio.as_ref(), &mut self.msf_track) so
publish behavior is unchanged but failures are visible.
In `@rs/moq-mux/src/convert/fmp4.rs`:
- Around line 61-71: Both run_with_catalog and run in fmp4.rs duplicate the
catalog frame reading and JSON parsing logic found also in hang.rs; extract that
shared behavior into a single helper (e.g., read_and_parse_catalog_frames or
parse_catalog_from_consumer) that accepts a &moq_lite::BroadcastConsumer (and
any needed mutable CatalogProducer references) and returns the parsed catalog
structure (or Result). Replace the duplicated blocks in run_with_catalog and run
with calls to this helper to centralize buffering, frame extraction, and
serde_json parsing; ensure the helper exposes any errors upward and preserves
the same returned types so callers (run_with_catalog, run) only perform
protocol-specific handling after receiving the parsed catalog.
In `@rs/moq-mux/src/producer/av01.rs`:
- Around line 39-47: Replace the string literal "video" in the read path with
the catalog constant to match the write path: when building the local video
value in the block that calls catalog.writer().read(), use hang::catalog::VIDEO
(or &hang::catalog::VIDEO as appropriate) instead of "video" so the get lookup
is consistent with the write side and avoids typos; update the use in the
closure that does .sections.get(...) accordingly.
- Around line 116-117: The calls that currently use "let _ =
self.catalog.set(&hang::catalog::VIDEO, &self.video);" (and the similar
occurrences) silently discard the Result; update these to handle failures by
either propagating the error (returning the Result with ? from the surrounding
function) or logging the error via the project's logger (e.g., processLogger or
crate logger) with context including hang::catalog::VIDEO and self.video;
specifically replace the discard at the sites calling self.catalog.set(...) with
either "self.catalog.set(...)?", or a match/if let Err(e) => log::error!(... ,
e) so errors are not lost. Ensure the chosen approach is consistent across the
other occurrences referenced by the reviewer.
In `@rs/moq-mux/src/producer/avc3.rs`:
- Around line 39-47: Replace the hardcoded "video" key when reading from
catalog.sections with the VIDEO constant's name field: use
hang::catalog::VIDEO.name() or hang::catalog::VIDEO.name (match the constant's
API) in the call inside the block that constructs `video` (the code using
`catalog.writer().read()` and `state.sections.get(...)`), so the lookup uses the
canonical section name; apply the same change to the analogous lookups in
`hev1.rs` and `av01.rs`.
In `@rs/moq-mux/src/producer/hev1.rs`:
- Around line 41-49: Replace the hard-coded "video" key with the catalog
constant to match the write path: change sections.get("video") to
sections.get(hang::catalog::VIDEO.name) in the block that reads the current
video section (the code using catalog.writer().read(), .sections, and
serde_json::from_value). Ensure the lookup uses the VIDEO.name constant so the
read path and write path use the same identifier.
In `@rs/moq-mux/src/producer/test/mod.rs`:
- Around line 22-31: Currently the test silently defaults when decoding of
present sections fails; change the logic so that when
state.sections.get("video") or get("audio") returns None you keep the default
Video/Audio, but when a Value is present you attempt serde_json::from_value and
propagate a decode error (panic or unwrap/expect) instead of turning it into a
default; update the two bindings (Video and Audio) that use
state.sections.get(...).and_then(|v|
serde_json::from_value(v.clone()).ok()).unwrap_or_default() to check presence
first (e.g., match on get("video")/get("audio")) and call
serde_json::from_value(...).expect("failed to decode video section") so
malformed payloads fail the test while absent sections still default.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: dead851f-266c-4cad-89c6-b3d6e9490edc
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.lockbun.lockis excluded by!**/*.lock
📒 Files selected for processing (74)
js/hang/src/catalog/audio.tsjs/hang/src/catalog/capabilities.tsjs/hang/src/catalog/chat.tsjs/hang/src/catalog/index.tsjs/hang/src/catalog/location.tsjs/hang/src/catalog/preview.tsjs/hang/src/catalog/reader.tsjs/hang/src/catalog/root.tsjs/hang/src/catalog/section.tsjs/hang/src/catalog/user.tsjs/hang/src/catalog/video.tsjs/hang/src/catalog/writer.tsjs/publish/src/audio/encoder.tsjs/publish/src/broadcast.tsjs/publish/src/chat/index.tsjs/publish/src/chat/message.tsjs/publish/src/chat/typing.tsjs/publish/src/location/index.tsjs/publish/src/location/peers.tsjs/publish/src/location/types.tsjs/publish/src/location/window.tsjs/publish/src/preview.tsjs/publish/src/user.tsjs/publish/src/video/encoder.tsjs/publish/src/video/index.tsjs/watch/package.jsonjs/watch/src/audio/source.tsjs/watch/src/broadcast.tsjs/watch/src/chat/index.tsjs/watch/src/chat/message.tsjs/watch/src/chat/typing.tsjs/watch/src/location/index.tsjs/watch/src/location/peers.tsjs/watch/src/location/window.tsjs/watch/src/preview.tsjs/watch/src/sections.tsjs/watch/src/user.tsjs/watch/src/video/source.tsrs/hang/Cargo.tomlrs/hang/examples/subscribe.rsrs/hang/examples/video.rsrs/hang/src/catalog/audio/mod.rsrs/hang/src/catalog/chat.rsrs/hang/src/catalog/consumer.rsrs/hang/src/catalog/mod.rsrs/hang/src/catalog/preview.rsrs/hang/src/catalog/reader.rsrs/hang/src/catalog/root.rsrs/hang/src/catalog/section.rsrs/hang/src/catalog/user.rsrs/hang/src/catalog/video/mod.rsrs/hang/src/catalog/writer.rsrs/hang/src/error.rsrs/hang/src/lib.rsrs/libmoq/Cargo.tomlrs/libmoq/src/consume.rsrs/moq-cli/src/subscribe.rsrs/moq-ffi/Cargo.tomlrs/moq-ffi/src/consumer.rsrs/moq-ffi/src/media.rsrs/moq-mux/Cargo.tomlrs/moq-mux/src/catalog.rsrs/moq-mux/src/consumer/fmp4.rsrs/moq-mux/src/convert/fmp4.rsrs/moq-mux/src/convert/hang.rsrs/moq-mux/src/convert/test.rsrs/moq-mux/src/msf.rsrs/moq-mux/src/producer/aac.rsrs/moq-mux/src/producer/av01.rsrs/moq-mux/src/producer/avc3.rsrs/moq-mux/src/producer/fmp4.rsrs/moq-mux/src/producer/hev1.rsrs/moq-mux/src/producer/opus.rsrs/moq-mux/src/producer/test/mod.rs
💤 Files with no reviewable changes (10)
- js/hang/src/catalog/chat.ts
- js/hang/src/catalog/user.ts
- rs/hang/src/catalog/user.rs
- js/hang/src/catalog/preview.ts
- rs/hang/src/catalog/root.rs
- rs/hang/src/catalog/chat.rs
- rs/hang/src/catalog/preview.rs
- js/hang/src/catalog/location.ts
- js/hang/src/catalog/capabilities.ts
- js/hang/src/catalog/root.ts
| section<T>(def: Section<T>): Getter<T | undefined> { | ||
| const existing = this.#sections.get(def.name); | ||
| if (existing) return existing.signal as Getter<T | undefined>; | ||
|
|
||
| const signal = new Signal<T | undefined>(undefined); | ||
| this.#sections.set(def.name, { schema: def.schema, signal }); | ||
| return signal; |
There was a problem hiding this comment.
Make section() honor the current snapshot and full section identity.
Registrations are reused by def.name alone and new signals always start at undefined. That means a late subscriber misses the current catalog until another frame arrives, and a same-name section with a different schema aliases the first registration via an unchecked cast. Cache the last parsed object and reject schema mismatches before reusing an existing entry.
| if (raw !== undefined) { | ||
| try { | ||
| const parsed = schema.parse(raw); | ||
| signal.set(parsed); | ||
| } catch (err) { | ||
| console.warn(`invalid catalog section "${name}"`, err); | ||
| } |
There was a problem hiding this comment.
Invalidate the section when schema validation fails.
A bad payload currently logs and leaves the previous value in place. Because catalog frames are whole snapshots, that exposes stale section state after an invalid update. Clear the signal in the catch path so consumers stop using data that no longer validates.
♻️ Suggested fix
if (raw !== undefined) {
try {
const parsed = schema.parse(raw);
signal.set(parsed);
} catch (err) {
console.warn(`invalid catalog section "${name}"`, err);
+ signal.set(undefined);
}
} else {
signal.set(undefined);
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if (raw !== undefined) { | |
| try { | |
| const parsed = schema.parse(raw); | |
| signal.set(parsed); | |
| } catch (err) { | |
| console.warn(`invalid catalog section "${name}"`, err); | |
| } | |
| if (raw !== undefined) { | |
| try { | |
| const parsed = schema.parse(raw); | |
| signal.set(parsed); | |
| } catch (err) { | |
| console.warn(`invalid catalog section "${name}"`, err); | |
| signal.set(undefined); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@js/hang/src/catalog/reader.ts` around lines 45 - 51, When schema.parse(raw)
throws in the reader for a catalog section (inside the try/catch that currently
calls schema.parse and signal.set), clear the section's signal in the catch path
so stale data is not retained; i.e., in the catch that currently logs the
invalid payload for name, call the appropriate method to unset the signal (e.g.,
set it to undefined or reset the signal) instead of leaving the previous value.
Ensure you still log the error as before and reference the same symbols (raw,
schema.parse, signal.set) when making the change.
| pub async fn run(&mut self) -> Result<()> { | ||
| loop { | ||
| tokio::select! { | ||
| res = self.track.recv_group() => { | ||
| match res? { | ||
| Some(group) => { | ||
| // Use the new group. | ||
| self.group = Some(group); | ||
| } | ||
| // The track has ended, so we should return None. | ||
| None => return Ok(None), | ||
| None => { | ||
| self.reader.close(); | ||
| return Ok(()); | ||
| } | ||
| } | ||
| }, | ||
| Some(frame) = async { self.group.as_mut()?.read_frame().await.transpose() } => { | ||
| self.group.take(); // We don't support deltas yet | ||
| let catalog = Catalog::from_slice(&frame?)?; | ||
| return Ok(Some(catalog)); | ||
|
|
||
| let json: serde_json::Map<String, serde_json::Value> = | ||
| serde_json::from_slice(&frame?)?; | ||
| self.reader.update(json); | ||
| } |
There was a problem hiding this comment.
Close the CatalogReader on every exit path.
self.reader.close() only runs on clean EOF. Any error from recv_group(), read_frame(), or serde_json::from_slice() returns early and leaves section consumers open with their last value. Close the reader in a finally-style wrapper so waiters always see termination.
♻️ Suggested fix
pub async fn run(&mut self) -> Result<()> {
- loop {
- tokio::select! {
- res = self.track.recv_group() => {
- match res? {
- Some(group) => {
- self.group = Some(group);
- }
- None => {
- self.reader.close();
- return Ok(());
- }
- }
- },
- Some(frame) = async { self.group.as_mut()?.read_frame().await.transpose() } => {
- self.group.take(); // We don't support deltas yet
-
- let json: serde_json::Map<String, serde_json::Value> =
- serde_json::from_slice(&frame?)?;
- self.reader.update(json);
- }
- }
- }
+ let result = async {
+ loop {
+ tokio::select! {
+ res = self.track.recv_group() => {
+ match res? {
+ Some(group) => {
+ self.group = Some(group);
+ }
+ None => return Ok(()),
+ }
+ },
+ Some(frame) = async { self.group.as_mut()?.read_frame().await.transpose() } => {
+ self.group.take(); // We don't support deltas yet
+
+ let json: serde_json::Map<String, serde_json::Value> =
+ serde_json::from_slice(&frame?)?;
+ self.reader.update(json);
+ }
+ }
+ }
+ }
+ .await;
+
+ self.reader.close();
+ result
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub async fn run(&mut self) -> Result<()> { | |
| loop { | |
| tokio::select! { | |
| res = self.track.recv_group() => { | |
| match res? { | |
| Some(group) => { | |
| // Use the new group. | |
| self.group = Some(group); | |
| } | |
| // The track has ended, so we should return None. | |
| None => return Ok(None), | |
| None => { | |
| self.reader.close(); | |
| return Ok(()); | |
| } | |
| } | |
| }, | |
| Some(frame) = async { self.group.as_mut()?.read_frame().await.transpose() } => { | |
| self.group.take(); // We don't support deltas yet | |
| let catalog = Catalog::from_slice(&frame?)?; | |
| return Ok(Some(catalog)); | |
| let json: serde_json::Map<String, serde_json::Value> = | |
| serde_json::from_slice(&frame?)?; | |
| self.reader.update(json); | |
| } | |
| pub async fn run(&mut self) -> Result<()> { | |
| let result = async { | |
| loop { | |
| tokio::select! { | |
| res = self.track.recv_group() => { | |
| match res? { | |
| Some(group) => { | |
| self.group = Some(group); | |
| } | |
| None => return Ok(()), | |
| } | |
| }, | |
| Some(frame) = async { self.group.as_mut()?.read_frame().await.transpose() } => { | |
| self.group.take(); // We don't support deltas yet | |
| let json: serde_json::Map<String, serde_json::Value> = | |
| serde_json::from_slice(&frame?)?; | |
| self.reader.update(json); | |
| } | |
| } | |
| } | |
| } | |
| .await; | |
| self.reader.close(); | |
| result | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rs/hang/src/catalog/consumer.rs` around lines 49 - 69, The run() loop
currently only calls self.reader.close() on clean EOF; change it so
CatalogReader::close() is always invoked on every exit path (errors from
self.track.recv_group(), self.group.as_mut()?.read_frame(), and
serde_json::from_slice()). Implement a "finally"-style cleanup: wrap the loop
scope or the async select in a block and ensure self.reader.close() is called
after the loop (or install a guard that calls close in its Drop), so regardless
of early returns or ? errors the reader is closed; reference functions/values:
run(), self.track.recv_group(), self.group.as_mut()?.read_frame(),
serde_json::from_slice(), and self.reader.close().
| // Run until first catalog update | ||
| tokio::select! { | ||
| res = catalog_consumer.run() => { res?; anyhow::bail!("catalog closed before first update"); }, | ||
| _ = video_section.changed() => {}, | ||
| _ = audio_section.changed() => {}, | ||
| } | ||
|
|
||
| let video = video_section.get()?.unwrap_or_default(); | ||
| let audio = audio_section.get()?.unwrap_or_default(); |
There was a problem hiding this comment.
Don't lock the muxer to the first single-section update.
This select! stops consuming the catalog as soon as either video_section or audio_section changes. If the publisher advertises video and audio in separate catalog flushes, the later section never makes it into exporter.init() or muxer_tracks, so the output silently drops that media type. Keep the catalog task alive until you've captured the initial layout you want to mux.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rs/moq-cli/src/subscribe.rs` around lines 71 - 79, The select! currently
returns after the first section change, which can drop the other section before
exporter.init() / muxer_tracks see it; change the logic to keep the
catalog_consumer task alive until both sections have produced their initial
values. Replace the single select! with a loop that checks video_section.get()
and audio_section.get() (e.g., while video_section.get()?.is_none() ||
audio_section.get()?.is_none()) and inside the loop use tokio::select! to await
catalog_consumer.run(), video_section.changed(), and audio_section.changed(); on
catalog_consumer.run() error bail as before, otherwise loop until both sections
are present, then read video_section.get() and audio_section.get() and call
exporter.init()/build muxer_tracks.
Summary
Catalog/Roottype with a genericSection<T>registry where sections are identified by name + schema pairsCatalogWriterfor producing catalogs andCatalogReaderwith per-section change notifications (conducer on Rust, signals on JS)@moq/hang— these now belong in the application layerVIDEOandAUDIOsections are available but not auto-registeredBreaking changes
hang::Catalogstruct removed — useCatalogWriter/CatalogReaderwithSection<T>Catalog.Root,Catalog.encode(),Catalog.decode(),Catalog.fetch()removed from JSCatalogConsumer::next()replaced withreader()+run()patternmoq_mux::CatalogProducernow usesset()/flush()instead oflock()guard patternTest plan
just fix— no formatting issuesjust check— all linting and type checks passcargo test— all 523 Rust tests pass, 0 failures(Option<&Video>, Option<&Audio>)API🤖 Generated with Claude Code