Add dual MSF + Hang catalog support in @moq/watch#1147
Add dual MSF + Hang catalog support in @moq/watch#1147
Conversation
- Add `ordered` and `max_latency` fields to `Track` struct - Create `Subscription` struct for per-consumer preferences - Create `TrackSubscription` type with RAII lifecycle (auto-removes on drop) - Add `TrackProducer::poll_max` for aggregate subscription change detection - Add `TrackConsumer::subscribe()` to register subscriptions - Add `GroupProducer::is_aborted()` for duplicate group handling - Allow `create_group` to replace aborted groups (for subscription restart) - Replace `BroadcastDynamic::requested_track() -> TrackProducer` with `requested_track() -> TrackRequest` (handler creates producer, responds) - Make `BroadcastConsumer::subscribe_track` async with request/response - Add `BroadcastDynamic::insert_track` for handler-side track insertion - Add dedup for pending track requests (multiple subscribers share reply) - Update all callers across the workspace Known issues: - libmoq FFI tests deadlock (likely BroadcastConsumer upgrade in async context) - moq-mux tests may deadlock similarly Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Track is now just `{ name: String }` — no priority/ordered/max_latency
- Subscription gains `start: Option<u64>` and `end: Option<u64>` fields
- Aggregation: start = min (None wins), end = max (None wins)
- No cap/ceiling from producer — aggregation is purely across subscribers
- TrackProducer::subscription() is async, blocks until aggregate changes
- TrackProducer stores prev_subscription internally (no prev arg needed)
- Remove TrackConsumer::start_at() — use Subscription.start instead
- Remove subscription tests (will rewrite in follow-up)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Revert broadcast.rs to sync: consume_track() looks up or creates tracks synchronously, requested_track() returns TrackProducer directly - Remove TrackRequest and oneshot channel machinery - Replace TrackSubscription with TrackSubscriber using conducer-based subscriptions for two-level poll aggregation - TrackConsumer::subscribe() is async, blocks until first group exists - TrackSubscriber::recv_group() respects start/end range - Fix create_group aborted replacement: tombstone old entry, push new - Update all callers to use consume_track (sync) or subscribe_track - Fix moq-ffi to remove RUNTIME.block_on() now that consume_track is sync Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- libmoq: move consume_track + consumer construction before tokio::spawn so errors surface synchronously - lite/ietf subscriber: take TrackProducer by value instead of &mut Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
None now means "no preference" rather than "latest wins all." Callers wanting "latest" should start with None, then update() with a concrete value once latest() is known. Also: replace expect() with ? in consume_track, remove dead comment. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Remove recv_group/poll_recv_group from TrackConsumer (deprecated stubs remain) - Remove index field from TrackConsumer - Add poll_recv_group, recv_group, closed, poll_closed to TrackSubscriber - Rename OrderedConsumer → OrderedSubscriber (takes TrackSubscriber) - CatalogConsumer now takes TrackSubscriber - Update all protocol publishers to subscribe() before iterating groups - Update moq-ffi: subscribe_catalog/subscribe_media are now async - Update libmoq: move subscribe into spawned tasks - Update moq-mux: CatalogProducer::consume() is now async - Fix hang tests: write first group before subscribe to avoid deadlock - Fix hang tests: await tokio::time::sleep in paused-time tests - Add unit tests for subscribe() blocking on first group, finish, abort Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Deprecate the generic NotFound error in favor of more specific variants to make debugging easier. Each maps to its own wire code. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The cleanup task was firing immediately (unused() resolved with zero consumers), removing tracks from the lookup before any subscriber could find them. This matches main's behavior: stale entries are cleaned up lazily by consume_track when it finds closed weak refs. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Take dev's moq_mux::consumer types (OrderedConsumer, LegacyConsumer) - Update them to use TrackSubscriber instead of TrackConsumer - Take dev's deprecated stubs in hang::container::consumer - Fix moq-cli subscribe to use consume_track + subscribe - Fix moq-ffi to use LegacyConsumer with TrackSubscriber Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Create @moq/msf package with Zod schemas for MSF catalog format, and update the watch broadcast to subscribe to both catalog formats in parallel. Hang gets a 100ms headstart; first catalog to arrive wins. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
WalkthroughThis pull request adds a new JavaScript package 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rs/moq-mux/src/convert/fmp4.rs (1)
37-50:⚠️ Potential issue | 🟠 MajorAvoid keeping
CatalogGuardalive across the awaited track setup.
CatalogGuardpublishes on drop, but here it stays live across everysubscribe_track(...).awaitand gets mutated before latercreate_track(...)calls can fail. That means one slow/missing source track stalls the first catalog publish, and a mid-loop error can still drop/publish a partial catalog that points at tracks you never finished wiring up.Also applies to: 80-106, 119-123
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-mux/src/convert/fmp4.rs` around lines 37 - 50, The CatalogGuard (output_catalog.lock() -> guard) is held across await points and possible failures (subscribe_track.await and broadcast.create_track), causing premature or partial catalog publishes; instead, call subscribe_track(...).await and attempt broadcast.create_track(...) first without holding the guard, and only after create_track succeeds acquire output_catalog.lock(), insert the rendition into catalog.video.renditions (and any other catalog mutations), then drop the guard immediately so the lock is not held across awaits or long ops. Update the loop around subscribe_track, create_track, and guard usage in the code that touches output_catalog, subscribe_track, and broadcast.create_track so the guard is only alive for the minimal mutation window.
🧹 Nitpick comments (6)
js/watch/src/msf.ts (2)
43-44: Consider extracting default audio constants.The default
sampleRateof 48000 andnumberOfChannelsof 2 are reasonable but could be named constants for clarity.♻️ Optional: Named constants
+const DEFAULT_SAMPLE_RATE = 48000; +const DEFAULT_CHANNEL_COUNT = 2; + function toAudioConfig(track: Msf.Track): Catalog.AudioConfig | undefined { if (!track.codec) return undefined; return { codec: track.codec, container: toContainer(track), description: track.packaging !== "cmaf" && track.initData ? base64ToHex(track.initData) : undefined, - sampleRate: u53(track.samplerate ?? 48000), - numberOfChannels: u53(track.channelConfig ? Number.parseInt(track.channelConfig, 10) : 2), + sampleRate: u53(track.samplerate ?? DEFAULT_SAMPLE_RATE), + numberOfChannels: u53(track.channelConfig ? Number.parseInt(track.channelConfig, 10) : DEFAULT_CHANNEL_COUNT), bitrate: track.bitrate != null ? u53(track.bitrate) : undefined, }; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@js/watch/src/msf.ts` around lines 43 - 44, Extract the literal defaults into named constants (e.g., DEFAULT_SAMPLE_RATE = 48000 and DEFAULT_NUMBER_OF_CHANNELS = 2) and replace the inline literals in the sampleRate and numberOfChannels expressions so the code becomes u53(track.samplerate ?? DEFAULT_SAMPLE_RATE) and Number.parseInt(track.channelConfig, 10) : DEFAULT_NUMBER_OF_CHANNELS; declare the constants near the top of js/watch/src/msf.ts so they are reusable and self-documenting.
6-13: Consider handling invalid base64 gracefully.
atob()throws aDOMExceptionif the input is not valid base64. If an MSF catalog contains malformedinitData, this will cause an uncaught exception without helpful context.♻️ Suggested defensive wrapper
function base64ToHex(b64: string): string { - const raw = atob(b64); + let raw: string; + try { + raw = atob(b64); + } catch { + console.warn("invalid base64 initData, skipping conversion"); + return ""; + } let hex = ""; for (let i = 0; i < raw.length; i++) { hex += raw.charCodeAt(i).toString(16).padStart(2, "0"); } return hex; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@js/watch/src/msf.ts` around lines 6 - 13, The base64ToHex function currently calls atob(b64) which will throw a DOMException for invalid base64; wrap the atob call in a try/catch inside base64ToHex, handle the error by either returning a safe sentinel (e.g., null or empty string) or rethrowing a clearer Error with context (include the original exception and the offending b64 value), and ensure callers of base64ToHex handle the sentinel/exception. Update base64ToHex to validate or catch atob errors and document the chosen failure behavior so callers (where initData is parsed) can react appropriately.js/watch/src/broadcast.ts (2)
146-148: Redundant nullish coalescing.
updateis alreadyCatalog.Root | undefined, soupdate ?? undefinedis equivalent to justupdate.- return update ?? undefined; + return update;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@js/watch/src/broadcast.ts` around lines 146 - 148, The return statement unnecessarily uses nullish coalescing on a value already typed as Catalog.Root | undefined; replace the redundant "return update ?? undefined;" with a plain "return update;" (locate the occurrence where the variable update is returned after the array close in broadcast.ts) so the behavior and types remain unchanged while removing the superfluous operator.
117-117: Consider extracting the headstart delay as a named constant.The 100ms delay is a tuning parameter that may need adjustment. A named constant would clarify intent and make it easier to find/change.
♻️ Optional: Named constant
+const HANG_HEADSTART_MS = 100; + // In `#runCatalog`: const msfFetch = msfTrack - ? new Promise((r) => setTimeout(r, 100)) + ? new Promise((r) => setTimeout(r, HANG_HEADSTART_MS)) .then(() => Msf.fetch(msfTrack))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@js/watch/src/broadcast.ts` at line 117, Extract the magic number 100ms used in the headstart delay into a named top-level constant (e.g., HEADSTART_DELAY_MS or BROADCAST_HEADSTART_MS) in js/watch/src/broadcast.ts and replace the inline literal in the ternary new Promise((r) => setTimeout(r, 100)) with that constant; declare it near the top of the module (and export it if other modules may tune it) so the tuning parameter is clearly named and easy to change.rs/moq-mux/src/catalog.rs (1)
57-60: Make the consumed catalog format explicit.
CatalogProducernow publishes bothcatalog.jsonandcatalog, butconsume()still hardcodes the Hang path. Renaming this to something likeconsume_hang()or taking a format parameter would make the public API less surprising.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-mux/src/catalog.rs` around lines 57 - 60, The public method consume() currently always subscribes to the Hang path but CatalogProducer now publishes both catalog.json and catalog, so update the API to make format explicit: either rename consume() to consume_hang() and leave behavior unchanged, or add a parameter (e.g., format: CatalogFormat enum with variants Hang and Json) to consume() and branch on it to subscribe to the appropriate track (e.g., use self.hang_track.consume() for Hang and the JSON track for Json). Update hang::CatalogConsumer construction or add a new consumer type if needed, and adjust all call sites and tests to use the new name or pass the format argument; also update public docs/comments for the changed function.rs/moq-lite/src/ietf/subscriber.rs (1)
521-522: Minor: Redundant variable rename.The parameter is renamed to
broadcast_pathand immediately aliased back tobroadcast. Consider keeping the original parameter namebroadcastif the rest of the function uses that name.Suggested simplification
- async fn run_subscribe(&mut self, broadcast_path: Path<'_>, mut track: TrackProducer) { - let broadcast = broadcast_path; + async fn run_subscribe(&mut self, broadcast: Path<'_>, mut track: TrackProducer) {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-lite/src/ietf/subscriber.rs` around lines 521 - 522, The function run_subscribe has a redundant alias: parameter broadcast_path is immediately assigned to broadcast; update the function signature async fn run_subscribe(&mut self, broadcast: Path<'_>, mut track: TrackProducer) and remove the temporary let broadcast = broadcast_path; so all uses refer directly to the broadcast parameter (or alternatively replace subsequent uses with broadcast_path if you prefer that name) — adjust the signature and remove the redundant binding in run_subscribe to eliminate the unnecessary rename.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@js/watch/src/broadcast.ts`:
- Around line 111-126: The current Promise.race can return undefined if
hangFetch resolves to undefined before msfFetch wakes up; change the logic so
only successful fetches compete: wrap hangFetch and msfFetch so they reject when
they would yield undefined (or use Promise.any with those wrapped promises) and
keep effect.cancel in the cancel slot; update the array built from hangFetch,
msfFetch and effect.cancel accordingly and then await the winner (e.g., await
Promise.any(wrappedCandidates)) so a quick failed hangFetch no longer causes an
early return and MSF can be selected when hang returns nothing.
In `@rs/moq-lite/src/lite/subscriber.rs`:
- Around line 192-201: The Subscribe message is hardcoded with priority: 0 in
run_subscribe, preventing per-track priority; update the subscriber API to
accept and propagate a priority and set msg.priority accordingly: either add a
priority field to Track or TrackProducer (e.g., Track::priority or
TrackProducer::priority) and read it when building lite::Subscribe, or add a
priority parameter to run_subscribe so callers can pass the desired priority;
ensure the created Subscribe (in run_subscribe) uses that priority value instead
of 0 so the publisher can honor subscribe.priority when ordering deliveries.
---
Outside diff comments:
In `@rs/moq-mux/src/convert/fmp4.rs`:
- Around line 37-50: The CatalogGuard (output_catalog.lock() -> guard) is held
across await points and possible failures (subscribe_track.await and
broadcast.create_track), causing premature or partial catalog publishes;
instead, call subscribe_track(...).await and attempt broadcast.create_track(...)
first without holding the guard, and only after create_track succeeds acquire
output_catalog.lock(), insert the rendition into catalog.video.renditions (and
any other catalog mutations), then drop the guard immediately so the lock is not
held across awaits or long ops. Update the loop around subscribe_track,
create_track, and guard usage in the code that touches output_catalog,
subscribe_track, and broadcast.create_track so the guard is only alive for the
minimal mutation window.
---
Nitpick comments:
In `@js/watch/src/broadcast.ts`:
- Around line 146-148: The return statement unnecessarily uses nullish
coalescing on a value already typed as Catalog.Root | undefined; replace the
redundant "return update ?? undefined;" with a plain "return update;" (locate
the occurrence where the variable update is returned after the array close in
broadcast.ts) so the behavior and types remain unchanged while removing the
superfluous operator.
- Line 117: Extract the magic number 100ms used in the headstart delay into a
named top-level constant (e.g., HEADSTART_DELAY_MS or BROADCAST_HEADSTART_MS) in
js/watch/src/broadcast.ts and replace the inline literal in the ternary new
Promise((r) => setTimeout(r, 100)) with that constant; declare it near the top
of the module (and export it if other modules may tune it) so the tuning
parameter is clearly named and easy to change.
In `@js/watch/src/msf.ts`:
- Around line 43-44: Extract the literal defaults into named constants (e.g.,
DEFAULT_SAMPLE_RATE = 48000 and DEFAULT_NUMBER_OF_CHANNELS = 2) and replace the
inline literals in the sampleRate and numberOfChannels expressions so the code
becomes u53(track.samplerate ?? DEFAULT_SAMPLE_RATE) and
Number.parseInt(track.channelConfig, 10) : DEFAULT_NUMBER_OF_CHANNELS; declare
the constants near the top of js/watch/src/msf.ts so they are reusable and
self-documenting.
- Around line 6-13: The base64ToHex function currently calls atob(b64) which
will throw a DOMException for invalid base64; wrap the atob call in a try/catch
inside base64ToHex, handle the error by either returning a safe sentinel (e.g.,
null or empty string) or rethrowing a clearer Error with context (include the
original exception and the offending b64 value), and ensure callers of
base64ToHex handle the sentinel/exception. Update base64ToHex to validate or
catch atob errors and document the chosen failure behavior so callers (where
initData is parsed) can react appropriately.
In `@rs/moq-lite/src/ietf/subscriber.rs`:
- Around line 521-522: The function run_subscribe has a redundant alias:
parameter broadcast_path is immediately assigned to broadcast; update the
function signature async fn run_subscribe(&mut self, broadcast: Path<'_>, mut
track: TrackProducer) and remove the temporary let broadcast = broadcast_path;
so all uses refer directly to the broadcast parameter (or alternatively replace
subsequent uses with broadcast_path if you prefer that name) — adjust the
signature and remove the redundant binding in run_subscribe to eliminate the
unnecessary rename.
In `@rs/moq-mux/src/catalog.rs`:
- Around line 57-60: The public method consume() currently always subscribes to
the Hang path but CatalogProducer now publishes both catalog.json and catalog,
so update the API to make format explicit: either rename consume() to
consume_hang() and leave behavior unchanged, or add a parameter (e.g., format:
CatalogFormat enum with variants Hang and Json) to consume() and branch on it to
subscribe to the appropriate track (e.g., use self.hang_track.consume() for Hang
and the JSON track for Json). Update hang::CatalogConsumer construction or add a
new consumer type if needed, and adjust all call sites and tests to use the new
name or pass the format argument; also update public docs/comments for the
changed function.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 10bb7c3e-9894-4069-ad11-b10ffd7b82a2
⛔ Files ignored due to path filters (1)
bun.lockis excluded by!**/*.lock
📒 Files selected for processing (38)
js/msf/package.jsonjs/msf/src/catalog.tsjs/msf/src/index.tsjs/msf/tsconfig.jsonjs/watch/package.jsonjs/watch/src/broadcast.tsjs/watch/src/msf.tspackage.jsonrs/hang/examples/subscribe.rsrs/hang/examples/video.rsrs/hang/src/catalog/audio/mod.rsrs/hang/src/catalog/consumer.rsrs/hang/src/catalog/root.rsrs/hang/src/catalog/video/mod.rsrs/libmoq/src/consume.rsrs/moq-cli/src/subscribe.rsrs/moq-clock/src/clock.rsrs/moq-clock/src/main.rsrs/moq-ffi/src/consumer.rsrs/moq-ffi/src/test.rsrs/moq-lite/src/error.rsrs/moq-lite/src/ietf/adapter.rsrs/moq-lite/src/ietf/publisher.rsrs/moq-lite/src/ietf/subscriber.rsrs/moq-lite/src/lite/publisher.rsrs/moq-lite/src/lite/subscriber.rsrs/moq-lite/src/model/broadcast.rsrs/moq-lite/src/model/group.rsrs/moq-lite/src/model/track.rsrs/moq-mux/src/catalog.rsrs/moq-mux/src/consumer/ordered.rsrs/moq-mux/src/convert/fmp4.rsrs/moq-mux/src/convert/hang.rsrs/moq-mux/src/convert/test.rsrs/moq-native/examples/chat.rsrs/moq-native/tests/backend.rsrs/moq-native/tests/broadcast.rsrs/moq-relay/src/web.rs
…cross awaits - broadcast.ts: Use Promise.any so a failed hang fetch no longer short-circuits MSF - lite/subscriber.rs: Read subscription params from consumer instead of hardcoding priority: 0 - fmp4.rs: Restructure to hold CatalogGuard only during catalog mutation, not across awaits - Minor: extract magic numbers, harden base64ToHex, remove redundant alias/nullish coalescing Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
rs/moq-mux/src/convert/fmp4.rs (1)
142-180: Consider extracting duplicated spawn logic.The video and audio task spawning blocks (lines 143-161 and 162-180) are nearly identical, differing only in the
is_videoboolean passed toconvert_legacy_to_cmaf. A helper could reduce duplication.♻️ Optional: extract common spawn logic
fn spawn_track_task( tasks: &mut tokio::task::JoinSet<()>, name: String, input: moq_lite::TrackSubscriber, output: moq_lite::TrackProducer, convert: Option<(Vec<u8>, u64)>, is_video: bool, ) { match convert { None => { tasks.spawn(async move { if let Err(e) = passthrough_track(input, output).await { tracing::error!(%e, track = %name, "passthrough_track failed"); } }); } Some((_, timescale)) => { tasks.spawn(async move { if let Err(e) = convert_legacy_to_cmaf(input, output, timescale, is_video).await { tracing::error!(%e, track = %name, "convert_legacy_to_cmaf failed"); } }); } } }Then call:
for task in video_tasks { spawn_track_task(&mut tasks, task.name, task.input, task.output, task.convert, true); } for task in audio_tasks { spawn_track_task(&mut tasks, task.name, task.input, task.output, task.convert, false); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@rs/moq-mux/src/convert/fmp4.rs` around lines 142 - 180, The video/audio spawn blocks duplicate logic; extract a helper (e.g., spawn_track_task) that accepts &mut tokio::task::JoinSet<()>, name: String, input: moq_lite::TrackSubscriber, output: moq_lite::TrackProducer, convert: Option<(Vec<u8>, u64)>, and is_video: bool, and moves the match that calls passthrough_track or convert_legacy_to_cmaf and logs errors; then replace the for loops over video_tasks and audio_tasks to call spawn_track_task(&mut tasks, task.name, task.input, task.output, task.convert, true/false) respectively to remove duplication while keeping existing error logging via tracing::error!.js/watch/src/msf.ts (1)
8-8: Prefer documenting rationale, not just behavior, in helper comment.Consider briefly stating why invalid base64 is tolerated (e.g., keep catalog conversion resilient per-track).
As per coding guidelines, "Write comments that explain the 'why', not just the 'what'".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@js/watch/src/msf.ts` at line 8, The helper comment that currently states "Convert base64 string to hex string, returning undefined on invalid input." should be expanded to include the rationale for tolerating invalid base64: explain that the base64→hex helper (the function that converts base64 strings to hex and returns undefined on invalid input) deliberately accepts and returns undefined for bad input to keep per-track catalog conversion resilient and avoid failing the whole conversion pipeline; update the comment to briefly state this "why" and keep the existing behavioral note (returning undefined) so future readers understand both behavior and intent.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@js/watch/src/msf.ts`:
- Around line 43-53: In toAudioConfig, guard parsing of track.channelConfig
before calling u53: parse channelConfig with Number.parseInt only when it is a
valid numeric string (e.g., test with /^\s*\d+\s*$/ or Number.isFinite after
parse), fall back to DEFAULT_NUMBER_OF_CHANNELS when parse yields NaN or invalid
values, and then pass the safe integer to u53; update the computation that
currently calls u53(track.channelConfig ? Number.parseInt(track.channelConfig,
10) : DEFAULT_NUMBER_OF_CHANNELS) to validate the parse result first so u53
never receives NaN or malformed input.
In `@rs/moq-lite/src/lite/publisher.rs`:
- Around line 258-270: The code currently constructs a default Subscription
(Subscription::default()) which ignores the subscriber's requested
start_group/end_group from the incoming Subscribe message, so replace the
default with a Subscription built from those fields: when handling subscribe
(the incoming Subscribe struct), map its start_group and end_group into
Subscription's start and end fields (e.g., Subscription { start:
subscribe.start_group.map(|g| g.into()), end: subscribe.end_group.map(|g|
g.into()), ..Default::default() }) and pass that into track.subscribe(...)
instead of Subscription::default(); keep the rest of the flow (Track::new,
consumer.ok_or, broadcast.consume_track, lite::SubscribeOk) unchanged.
---
Nitpick comments:
In `@js/watch/src/msf.ts`:
- Line 8: The helper comment that currently states "Convert base64 string to hex
string, returning undefined on invalid input." should be expanded to include the
rationale for tolerating invalid base64: explain that the base64→hex helper (the
function that converts base64 strings to hex and returns undefined on invalid
input) deliberately accepts and returns undefined for bad input to keep
per-track catalog conversion resilient and avoid failing the whole conversion
pipeline; update the comment to briefly state this "why" and keep the existing
behavioral note (returning undefined) so future readers understand both behavior
and intent.
In `@rs/moq-mux/src/convert/fmp4.rs`:
- Around line 142-180: The video/audio spawn blocks duplicate logic; extract a
helper (e.g., spawn_track_task) that accepts &mut tokio::task::JoinSet<()>,
name: String, input: moq_lite::TrackSubscriber, output: moq_lite::TrackProducer,
convert: Option<(Vec<u8>, u64)>, and is_video: bool, and moves the match that
calls passthrough_track or convert_legacy_to_cmaf and logs errors; then replace
the for loops over video_tasks and audio_tasks to call spawn_track_task(&mut
tasks, task.name, task.input, task.output, task.convert, true/false)
respectively to remove duplication while keeping existing error logging via
tracing::error!.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e144d820-8975-4097-ba6a-47f515ad7a3d
📒 Files selected for processing (8)
js/watch/src/broadcast.tsjs/watch/src/msf.tsrs/moq-lite/src/ietf/adapter.rsrs/moq-lite/src/ietf/publisher.rsrs/moq-lite/src/ietf/subscriber.rsrs/moq-lite/src/lite/publisher.rsrs/moq-lite/src/lite/subscriber.rsrs/moq-mux/src/convert/fmp4.rs
✅ Files skipped from review due to trivial changes (1)
- js/watch/src/broadcast.ts
🚧 Files skipped from review as they are similar to previous changes (4)
- rs/moq-lite/src/ietf/adapter.rs
- rs/moq-lite/src/ietf/subscriber.rs
- rs/moq-lite/src/lite/subscriber.rs
- rs/moq-lite/src/ietf/publisher.rs
| function toAudioConfig(track: Msf.Track): Catalog.AudioConfig | undefined { | ||
| if (!track.codec) return undefined; | ||
|
|
||
| return { | ||
| codec: track.codec, | ||
| container: toContainer(track), | ||
| description: track.packaging !== "cmaf" && track.initData ? base64ToHex(track.initData) : undefined, | ||
| sampleRate: u53(track.samplerate ?? DEFAULT_SAMPLE_RATE), | ||
| numberOfChannels: u53( | ||
| track.channelConfig ? Number.parseInt(track.channelConfig, 10) : DEFAULT_NUMBER_OF_CHANNELS, | ||
| ), |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# 1) Inspect u53 implementation/validation behavior.
rg -n --type=ts -C3 '\bexport\s+(const|function)\s+u53\b|\bu53\s*\(' js/hang
# 2) Inspect MSF track schema + channelConfig type and current parser usage.
rg -n --type=ts -C3 'channelConfig|samplerate|toAudioConfig|Number\.parseInt' js/msf js/watchRepository: moq-dev/moq
Length of output: 2184
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# 1) Get full u53 implementation and schema
cat -n js/hang/src/catalog/integers.ts
# 2) Check what u53Schema is defined as
rg -n --type=ts -C5 'u53Schema' js/hang/src/catalog/integers.tsRepository: moq-dev/moq
Length of output: 1616
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# 1) Check how toAudioConfig failures are handled
rg -n --type=ts -B5 -A10 'toAudioConfig' js/watch/src/msf.ts
# 2) Check if there are any tests for toAudioConfig with edge cases
fd -t ts 'test|spec' js/watch | head -20
# 3) Look for any existing error handling or validation of channelConfig
rg -n --type=ts 'channelConfig' js/ -B2 -A2Repository: moq-dev/moq
Length of output: 1345
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find test files
find js/watch -name '*test*' -o -name '*spec*' 2>/dev/null | head -20
# Check MSF data source and schema
rg -n --type=ts 'Msf\.Track|type Track' js/msf -B2 -A5
# Check if there's validation on incoming MSF data
rg -n --type=ts 'parse.*msf|validate.*msf|msf.*schema' js/watch -iRepository: moq-dev/moq
Length of output: 447
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Get the full TrackSchema definition
rg -n --type=ts -B1 'export const TrackSchema' js/msf/src/catalog.ts -A20Repository: moq-dev/moq
Length of output: 720
Guard channel parsing before passing into u53.
Number.parseInt(track.channelConfig, 10) can produce NaN or partial values (e.g., "abc" → NaN, "12abc" → 12). When NaN is passed to u53(), Zod validation fails and throws, crashing the entire catalog conversion with no error handling in the caller.
🛠️ Proposed fix
function toAudioConfig(track: Msf.Track): Catalog.AudioConfig | undefined {
if (!track.codec) return undefined;
+ const parsedChannels =
+ track.channelConfig != null ? Number.parseInt(track.channelConfig, 10) : Number.NaN;
+ const channelCount =
+ Number.isInteger(parsedChannels) && parsedChannels > 0
+ ? parsedChannels
+ : DEFAULT_NUMBER_OF_CHANNELS;
return {
codec: track.codec,
container: toContainer(track),
description: track.packaging !== "cmaf" && track.initData ? base64ToHex(track.initData) : undefined,
sampleRate: u53(track.samplerate ?? DEFAULT_SAMPLE_RATE),
- numberOfChannels: u53(
- track.channelConfig ? Number.parseInt(track.channelConfig, 10) : DEFAULT_NUMBER_OF_CHANNELS,
- ),
+ numberOfChannels: u53(channelCount),
bitrate: track.bitrate != null ? u53(track.bitrate) : undefined,
};
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@js/watch/src/msf.ts` around lines 43 - 53, In toAudioConfig, guard parsing of
track.channelConfig before calling u53: parse channelConfig with Number.parseInt
only when it is a valid numeric string (e.g., test with /^\s*\d+\s*$/ or
Number.isFinite after parse), fall back to DEFAULT_NUMBER_OF_CHANNELS when parse
yields NaN or invalid values, and then pass the safe integer to u53; update the
computation that currently calls u53(track.channelConfig ?
Number.parseInt(track.channelConfig, 10) : DEFAULT_NUMBER_OF_CHANNELS) to
validate the parse result first so u53 never receives NaN or malformed input.
| let track = Track::new(subscribe.track.to_string()); | ||
|
|
||
| // TODO wait until track.info() to get the *real* priority | ||
| let broadcast = consumer.ok_or(Error::UnknownBroadcast)?; | ||
| let track = broadcast.consume_track(&track)?; | ||
| let subscriber = track.subscribe(Subscription::default()).await?; | ||
|
|
||
| let info = lite::SubscribeOk { | ||
| priority: track.info.priority, | ||
| priority: 0, | ||
| ordered: false, | ||
| max_latency: std::time::Duration::ZERO, | ||
| max_latency: Duration::ZERO, | ||
| start_group: None, | ||
| end_group: None, | ||
| }; |
There was a problem hiding this comment.
Subscriber's requested start_group/end_group are ignored.
The Subscribe message (per context snippet 4) contains start_group: Option<u64> and end_group: Option<u64> that subscribers use to request specific group ranges. Using Subscription::default() here ignores these fields entirely, since Subscription::default() sets start: None and end: None (meaning "no preference").
This breaks the protocol contract—subscribers requesting playback from a specific group will instead receive groups from wherever the publisher decides.
🔧 Proposed fix to honor subscriber's requested range
- let subscriber = track.subscribe(Subscription::default()).await?;
+ let subscriber = track
+ .subscribe(moq_lite::Subscription {
+ priority: subscribe.priority,
+ ordered: subscribe.ordered,
+ max_latency: subscribe.max_latency,
+ start: subscribe.start_group,
+ end: subscribe.end_group,
+ })
+ .await?;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let track = Track::new(subscribe.track.to_string()); | |
| // TODO wait until track.info() to get the *real* priority | |
| let broadcast = consumer.ok_or(Error::UnknownBroadcast)?; | |
| let track = broadcast.consume_track(&track)?; | |
| let subscriber = track.subscribe(Subscription::default()).await?; | |
| let info = lite::SubscribeOk { | |
| priority: track.info.priority, | |
| priority: 0, | |
| ordered: false, | |
| max_latency: std::time::Duration::ZERO, | |
| max_latency: Duration::ZERO, | |
| start_group: None, | |
| end_group: None, | |
| }; | |
| let track = Track::new(subscribe.track.to_string()); | |
| let broadcast = consumer.ok_or(Error::UnknownBroadcast)?; | |
| let track = broadcast.consume_track(&track)?; | |
| let subscriber = track | |
| .subscribe(moq_lite::Subscription { | |
| priority: subscribe.priority, | |
| ordered: subscribe.ordered, | |
| max_latency: subscribe.max_latency, | |
| start: subscribe.start_group, | |
| end: subscribe.end_group, | |
| }) | |
| .await?; | |
| let info = lite::SubscribeOk { | |
| priority: 0, | |
| ordered: false, | |
| max_latency: Duration::ZERO, | |
| start_group: None, | |
| end_group: None, | |
| }; |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@rs/moq-lite/src/lite/publisher.rs` around lines 258 - 270, The code currently
constructs a default Subscription (Subscription::default()) which ignores the
subscriber's requested start_group/end_group from the incoming Subscribe
message, so replace the default with a Subscription built from those fields:
when handling subscribe (the incoming Subscribe struct), map its start_group and
end_group into Subscription's start and end fields (e.g., Subscription { start:
subscribe.start_group.map(|g| g.into()), end: subscribe.end_group.map(|g|
g.into()), ..Default::default() }) and pass that into track.subscribe(...)
instead of Subscription::default(); keep the rest of the flow (Track::new,
consumer.ok_or, broadcast.consume_track, lite::SubscribeOk) unchanged.
Summary
@moq/msfpackage with Zod schemas for MSF catalog format (matchingrs/moq-msftypes)@moq/watchbroadcast to subscribe to bothcatalog.json(hang) andcatalog(MSF) in paralleltoHang()) so the player works with MSF-only publishersUnknownBroadcast/UnknownTrackerror split,TrackSubscriberredesign, andmoq_muxconsumer updatesTest plan
just checkpassescatalog: ["hang"]andcatalog: ["msf"]for single-format mode🤖 Generated with Claude Code