Skip to content

Add dual MSF + Hang catalog support in @moq/watch#1147

Open
kixelated wants to merge 16 commits intodevfrom
msf-playback
Open

Add dual MSF + Hang catalog support in @moq/watch#1147
kixelated wants to merge 16 commits intodevfrom
msf-playback

Conversation

@kixelated
Copy link
Collaborator

@kixelated kixelated commented Mar 21, 2026

Summary

  • Create @moq/msf package with Zod schemas for MSF catalog format (matching rs/moq-msf types)
  • Update @moq/watch broadcast to subscribe to both catalog.json (hang) and catalog (MSF) in parallel
  • Hang gets a 100ms headstart; first catalog to arrive wins, loser track is closed
  • Add MSF→Hang conversion (toHang()) so the player works with MSF-only publishers
  • Includes Rust-side subscription API changes: UnknownBroadcast/UnknownTrack error split, TrackSubscriber redesign, and moq_mux consumer updates

Test plan

  • just check passes
  • Connect to relay publishing both formats — hang catalog should win due to headstart
  • Connect to MSF-only source — player constructs hang catalog from MSF correctly
  • Test catalog: ["hang"] and catalog: ["msf"] for single-format mode

🤖 Generated with Claude Code

kixelated and others added 14 commits March 18, 2026 14:40
- Add `ordered` and `max_latency` fields to `Track` struct
- Create `Subscription` struct for per-consumer preferences
- Create `TrackSubscription` type with RAII lifecycle (auto-removes on drop)
- Add `TrackProducer::poll_max` for aggregate subscription change detection
- Add `TrackConsumer::subscribe()` to register subscriptions
- Add `GroupProducer::is_aborted()` for duplicate group handling
- Allow `create_group` to replace aborted groups (for subscription restart)
- Replace `BroadcastDynamic::requested_track() -> TrackProducer` with
  `requested_track() -> TrackRequest` (handler creates producer, responds)
- Make `BroadcastConsumer::subscribe_track` async with request/response
- Add `BroadcastDynamic::insert_track` for handler-side track insertion
- Add dedup for pending track requests (multiple subscribers share reply)
- Update all callers across the workspace

Known issues:
- libmoq FFI tests deadlock (likely BroadcastConsumer upgrade in async context)
- moq-mux tests may deadlock similarly

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Track is now just `{ name: String }` — no priority/ordered/max_latency
- Subscription gains `start: Option<u64>` and `end: Option<u64>` fields
- Aggregation: start = min (None wins), end = max (None wins)
- No cap/ceiling from producer — aggregation is purely across subscribers
- TrackProducer::subscription() is async, blocks until aggregate changes
- TrackProducer stores prev_subscription internally (no prev arg needed)
- Remove TrackConsumer::start_at() — use Subscription.start instead
- Remove subscription tests (will rewrite in follow-up)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Revert broadcast.rs to sync: consume_track() looks up or creates
  tracks synchronously, requested_track() returns TrackProducer directly
- Remove TrackRequest and oneshot channel machinery
- Replace TrackSubscription with TrackSubscriber using conducer-based
  subscriptions for two-level poll aggregation
- TrackConsumer::subscribe() is async, blocks until first group exists
- TrackSubscriber::recv_group() respects start/end range
- Fix create_group aborted replacement: tombstone old entry, push new
- Update all callers to use consume_track (sync) or subscribe_track
- Fix moq-ffi to remove RUNTIME.block_on() now that consume_track is sync

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- libmoq: move consume_track + consumer construction before tokio::spawn
  so errors surface synchronously
- lite/ietf subscriber: take TrackProducer by value instead of &mut

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
None now means "no preference" rather than "latest wins all."
Callers wanting "latest" should start with None, then update()
with a concrete value once latest() is known.

Also: replace expect() with ? in consume_track, remove dead comment.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Remove recv_group/poll_recv_group from TrackConsumer (deprecated stubs remain)
- Remove index field from TrackConsumer
- Add poll_recv_group, recv_group, closed, poll_closed to TrackSubscriber
- Rename OrderedConsumer → OrderedSubscriber (takes TrackSubscriber)
- CatalogConsumer now takes TrackSubscriber
- Update all protocol publishers to subscribe() before iterating groups
- Update moq-ffi: subscribe_catalog/subscribe_media are now async
- Update libmoq: move subscribe into spawned tasks
- Update moq-mux: CatalogProducer::consume() is now async
- Fix hang tests: write first group before subscribe to avoid deadlock
- Fix hang tests: await tokio::time::sleep in paused-time tests
- Add unit tests for subscribe() blocking on first group, finish, abort

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Deprecate the generic NotFound error in favor of more specific
variants to make debugging easier. Each maps to its own wire code.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The cleanup task was firing immediately (unused() resolved with zero
consumers), removing tracks from the lookup before any subscriber
could find them. This matches main's behavior: stale entries are
cleaned up lazily by consume_track when it finds closed weak refs.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Take dev's moq_mux::consumer types (OrderedConsumer, LegacyConsumer)
- Update them to use TrackSubscriber instead of TrackConsumer
- Take dev's deprecated stubs in hang::container::consumer
- Fix moq-cli subscribe to use consume_track + subscribe
- Fix moq-ffi to use LegacyConsumer with TrackSubscriber

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Create @moq/msf package with Zod schemas for MSF catalog format, and
update the watch broadcast to subscribe to both catalog formats in
parallel. Hang gets a 100ms headstart; first catalog to arrive wins.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 21, 2026

Walkthrough

This pull request adds a new JavaScript package @moq/msf with Zod schemas and encode/decode utilities for MSF catalogs, integrates MSF support into the js/watch broadcast flow, and updates workspace manifests/tsconfig. Concurrently, the Rust codebase is refactored: the consumer API changes from synchronous subscribe_track() to a two-step consume_track() plus async subscribe() returning TrackSubscriber; a new Subscription type and TrackSubscriber are introduced; Track no longer exposes priority; and Error gains UnknownBroadcast and UnknownTrack variants.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The PR title clearly and concisely describes the main change: adding dual MSF + Hang catalog support to the @moq/watch package, which is the primary focus of this changeset.
Description check ✅ Passed The PR description is well-structured and directly related to the changeset, covering the creation of @moq/msf package, dual catalog subscription in @moq/watch, MSF→Hang conversion, and the Rust-side subscription API changes.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch msf-playback
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch msf-playback

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
rs/moq-mux/src/convert/fmp4.rs (1)

37-50: ⚠️ Potential issue | 🟠 Major

Avoid keeping CatalogGuard alive across the awaited track setup.

CatalogGuard publishes on drop, but here it stays live across every subscribe_track(...).await and gets mutated before later create_track(...) calls can fail. That means one slow/missing source track stalls the first catalog publish, and a mid-loop error can still drop/publish a partial catalog that points at tracks you never finished wiring up.

Also applies to: 80-106, 119-123

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/convert/fmp4.rs` around lines 37 - 50, The CatalogGuard
(output_catalog.lock() -> guard) is held across await points and possible
failures (subscribe_track.await and broadcast.create_track), causing premature
or partial catalog publishes; instead, call subscribe_track(...).await and
attempt broadcast.create_track(...) first without holding the guard, and only
after create_track succeeds acquire output_catalog.lock(), insert the rendition
into catalog.video.renditions (and any other catalog mutations), then drop the
guard immediately so the lock is not held across awaits or long ops. Update the
loop around subscribe_track, create_track, and guard usage in the code that
touches output_catalog, subscribe_track, and broadcast.create_track so the guard
is only alive for the minimal mutation window.
🧹 Nitpick comments (6)
js/watch/src/msf.ts (2)

43-44: Consider extracting default audio constants.

The default sampleRate of 48000 and numberOfChannels of 2 are reasonable but could be named constants for clarity.

♻️ Optional: Named constants
+const DEFAULT_SAMPLE_RATE = 48000;
+const DEFAULT_CHANNEL_COUNT = 2;
+
 function toAudioConfig(track: Msf.Track): Catalog.AudioConfig | undefined {
 	if (!track.codec) return undefined;

 	return {
 		codec: track.codec,
 		container: toContainer(track),
 		description: track.packaging !== "cmaf" && track.initData ? base64ToHex(track.initData) : undefined,
-		sampleRate: u53(track.samplerate ?? 48000),
-		numberOfChannels: u53(track.channelConfig ? Number.parseInt(track.channelConfig, 10) : 2),
+		sampleRate: u53(track.samplerate ?? DEFAULT_SAMPLE_RATE),
+		numberOfChannels: u53(track.channelConfig ? Number.parseInt(track.channelConfig, 10) : DEFAULT_CHANNEL_COUNT),
 		bitrate: track.bitrate != null ? u53(track.bitrate) : undefined,
 	};
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/watch/src/msf.ts` around lines 43 - 44, Extract the literal defaults into
named constants (e.g., DEFAULT_SAMPLE_RATE = 48000 and
DEFAULT_NUMBER_OF_CHANNELS = 2) and replace the inline literals in the
sampleRate and numberOfChannels expressions so the code becomes
u53(track.samplerate ?? DEFAULT_SAMPLE_RATE) and
Number.parseInt(track.channelConfig, 10) : DEFAULT_NUMBER_OF_CHANNELS; declare
the constants near the top of js/watch/src/msf.ts so they are reusable and
self-documenting.

6-13: Consider handling invalid base64 gracefully.

atob() throws a DOMException if the input is not valid base64. If an MSF catalog contains malformed initData, this will cause an uncaught exception without helpful context.

♻️ Suggested defensive wrapper
 function base64ToHex(b64: string): string {
-	const raw = atob(b64);
+	let raw: string;
+	try {
+		raw = atob(b64);
+	} catch {
+		console.warn("invalid base64 initData, skipping conversion");
+		return "";
+	}
 	let hex = "";
 	for (let i = 0; i < raw.length; i++) {
 		hex += raw.charCodeAt(i).toString(16).padStart(2, "0");
 	}
 	return hex;
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/watch/src/msf.ts` around lines 6 - 13, The base64ToHex function currently
calls atob(b64) which will throw a DOMException for invalid base64; wrap the
atob call in a try/catch inside base64ToHex, handle the error by either
returning a safe sentinel (e.g., null or empty string) or rethrowing a clearer
Error with context (include the original exception and the offending b64 value),
and ensure callers of base64ToHex handle the sentinel/exception. Update
base64ToHex to validate or catch atob errors and document the chosen failure
behavior so callers (where initData is parsed) can react appropriately.
js/watch/src/broadcast.ts (2)

146-148: Redundant nullish coalescing.

update is already Catalog.Root | undefined, so update ?? undefined is equivalent to just update.

-								return update ?? undefined;
+								return update;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/watch/src/broadcast.ts` around lines 146 - 148, The return statement
unnecessarily uses nullish coalescing on a value already typed as Catalog.Root |
undefined; replace the redundant "return update ?? undefined;" with a plain
"return update;" (locate the occurrence where the variable update is returned
after the array close in broadcast.ts) so the behavior and types remain
unchanged while removing the superfluous operator.

117-117: Consider extracting the headstart delay as a named constant.

The 100ms delay is a tuning parameter that may need adjustment. A named constant would clarify intent and make it easier to find/change.

♻️ Optional: Named constant
+const HANG_HEADSTART_MS = 100;
+
 // In `#runCatalog`:
 				const msfFetch = msfTrack
-					? new Promise((r) => setTimeout(r, 100))
+					? new Promise((r) => setTimeout(r, HANG_HEADSTART_MS))
 							.then(() => Msf.fetch(msfTrack))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/watch/src/broadcast.ts` at line 117, Extract the magic number 100ms used
in the headstart delay into a named top-level constant (e.g., HEADSTART_DELAY_MS
or BROADCAST_HEADSTART_MS) in js/watch/src/broadcast.ts and replace the inline
literal in the ternary new Promise((r) => setTimeout(r, 100)) with that
constant; declare it near the top of the module (and export it if other modules
may tune it) so the tuning parameter is clearly named and easy to change.
rs/moq-mux/src/catalog.rs (1)

57-60: Make the consumed catalog format explicit.

CatalogProducer now publishes both catalog.json and catalog, but consume() still hardcodes the Hang path. Renaming this to something like consume_hang() or taking a format parameter would make the public API less surprising.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/catalog.rs` around lines 57 - 60, The public method consume()
currently always subscribes to the Hang path but CatalogProducer now publishes
both catalog.json and catalog, so update the API to make format explicit: either
rename consume() to consume_hang() and leave behavior unchanged, or add a
parameter (e.g., format: CatalogFormat enum with variants Hang and Json) to
consume() and branch on it to subscribe to the appropriate track (e.g., use
self.hang_track.consume() for Hang and the JSON track for Json). Update
hang::CatalogConsumer construction or add a new consumer type if needed, and
adjust all call sites and tests to use the new name or pass the format argument;
also update public docs/comments for the changed function.
rs/moq-lite/src/ietf/subscriber.rs (1)

521-522: Minor: Redundant variable rename.

The parameter is renamed to broadcast_path and immediately aliased back to broadcast. Consider keeping the original parameter name broadcast if the rest of the function uses that name.

Suggested simplification
-	async fn run_subscribe(&mut self, broadcast_path: Path<'_>, mut track: TrackProducer) {
-		let broadcast = broadcast_path;
+	async fn run_subscribe(&mut self, broadcast: Path<'_>, mut track: TrackProducer) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/ietf/subscriber.rs` around lines 521 - 522, The function
run_subscribe has a redundant alias: parameter broadcast_path is immediately
assigned to broadcast; update the function signature async fn run_subscribe(&mut
self, broadcast: Path<'_>, mut track: TrackProducer) and remove the temporary
let broadcast = broadcast_path; so all uses refer directly to the broadcast
parameter (or alternatively replace subsequent uses with broadcast_path if you
prefer that name) — adjust the signature and remove the redundant binding in
run_subscribe to eliminate the unnecessary rename.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@js/watch/src/broadcast.ts`:
- Around line 111-126: The current Promise.race can return undefined if
hangFetch resolves to undefined before msfFetch wakes up; change the logic so
only successful fetches compete: wrap hangFetch and msfFetch so they reject when
they would yield undefined (or use Promise.any with those wrapped promises) and
keep effect.cancel in the cancel slot; update the array built from hangFetch,
msfFetch and effect.cancel accordingly and then await the winner (e.g., await
Promise.any(wrappedCandidates)) so a quick failed hangFetch no longer causes an
early return and MSF can be selected when hang returns nothing.

In `@rs/moq-lite/src/lite/subscriber.rs`:
- Around line 192-201: The Subscribe message is hardcoded with priority: 0 in
run_subscribe, preventing per-track priority; update the subscriber API to
accept and propagate a priority and set msg.priority accordingly: either add a
priority field to Track or TrackProducer (e.g., Track::priority or
TrackProducer::priority) and read it when building lite::Subscribe, or add a
priority parameter to run_subscribe so callers can pass the desired priority;
ensure the created Subscribe (in run_subscribe) uses that priority value instead
of 0 so the publisher can honor subscribe.priority when ordering deliveries.

---

Outside diff comments:
In `@rs/moq-mux/src/convert/fmp4.rs`:
- Around line 37-50: The CatalogGuard (output_catalog.lock() -> guard) is held
across await points and possible failures (subscribe_track.await and
broadcast.create_track), causing premature or partial catalog publishes;
instead, call subscribe_track(...).await and attempt broadcast.create_track(...)
first without holding the guard, and only after create_track succeeds acquire
output_catalog.lock(), insert the rendition into catalog.video.renditions (and
any other catalog mutations), then drop the guard immediately so the lock is not
held across awaits or long ops. Update the loop around subscribe_track,
create_track, and guard usage in the code that touches output_catalog,
subscribe_track, and broadcast.create_track so the guard is only alive for the
minimal mutation window.

---

Nitpick comments:
In `@js/watch/src/broadcast.ts`:
- Around line 146-148: The return statement unnecessarily uses nullish
coalescing on a value already typed as Catalog.Root | undefined; replace the
redundant "return update ?? undefined;" with a plain "return update;" (locate
the occurrence where the variable update is returned after the array close in
broadcast.ts) so the behavior and types remain unchanged while removing the
superfluous operator.
- Line 117: Extract the magic number 100ms used in the headstart delay into a
named top-level constant (e.g., HEADSTART_DELAY_MS or BROADCAST_HEADSTART_MS) in
js/watch/src/broadcast.ts and replace the inline literal in the ternary new
Promise((r) => setTimeout(r, 100)) with that constant; declare it near the top
of the module (and export it if other modules may tune it) so the tuning
parameter is clearly named and easy to change.

In `@js/watch/src/msf.ts`:
- Around line 43-44: Extract the literal defaults into named constants (e.g.,
DEFAULT_SAMPLE_RATE = 48000 and DEFAULT_NUMBER_OF_CHANNELS = 2) and replace the
inline literals in the sampleRate and numberOfChannels expressions so the code
becomes u53(track.samplerate ?? DEFAULT_SAMPLE_RATE) and
Number.parseInt(track.channelConfig, 10) : DEFAULT_NUMBER_OF_CHANNELS; declare
the constants near the top of js/watch/src/msf.ts so they are reusable and
self-documenting.
- Around line 6-13: The base64ToHex function currently calls atob(b64) which
will throw a DOMException for invalid base64; wrap the atob call in a try/catch
inside base64ToHex, handle the error by either returning a safe sentinel (e.g.,
null or empty string) or rethrowing a clearer Error with context (include the
original exception and the offending b64 value), and ensure callers of
base64ToHex handle the sentinel/exception. Update base64ToHex to validate or
catch atob errors and document the chosen failure behavior so callers (where
initData is parsed) can react appropriately.

In `@rs/moq-lite/src/ietf/subscriber.rs`:
- Around line 521-522: The function run_subscribe has a redundant alias:
parameter broadcast_path is immediately assigned to broadcast; update the
function signature async fn run_subscribe(&mut self, broadcast: Path<'_>, mut
track: TrackProducer) and remove the temporary let broadcast = broadcast_path;
so all uses refer directly to the broadcast parameter (or alternatively replace
subsequent uses with broadcast_path if you prefer that name) — adjust the
signature and remove the redundant binding in run_subscribe to eliminate the
unnecessary rename.

In `@rs/moq-mux/src/catalog.rs`:
- Around line 57-60: The public method consume() currently always subscribes to
the Hang path but CatalogProducer now publishes both catalog.json and catalog,
so update the API to make format explicit: either rename consume() to
consume_hang() and leave behavior unchanged, or add a parameter (e.g., format:
CatalogFormat enum with variants Hang and Json) to consume() and branch on it to
subscribe to the appropriate track (e.g., use self.hang_track.consume() for Hang
and the JSON track for Json). Update hang::CatalogConsumer construction or add a
new consumer type if needed, and adjust all call sites and tests to use the new
name or pass the format argument; also update public docs/comments for the
changed function.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 10bb7c3e-9894-4069-ad11-b10ffd7b82a2

📥 Commits

Reviewing files that changed from the base of the PR and between 2626a25 and 73fd42b.

⛔ Files ignored due to path filters (1)
  • bun.lock is excluded by !**/*.lock
📒 Files selected for processing (38)
  • js/msf/package.json
  • js/msf/src/catalog.ts
  • js/msf/src/index.ts
  • js/msf/tsconfig.json
  • js/watch/package.json
  • js/watch/src/broadcast.ts
  • js/watch/src/msf.ts
  • package.json
  • rs/hang/examples/subscribe.rs
  • rs/hang/examples/video.rs
  • rs/hang/src/catalog/audio/mod.rs
  • rs/hang/src/catalog/consumer.rs
  • rs/hang/src/catalog/root.rs
  • rs/hang/src/catalog/video/mod.rs
  • rs/libmoq/src/consume.rs
  • rs/moq-cli/src/subscribe.rs
  • rs/moq-clock/src/clock.rs
  • rs/moq-clock/src/main.rs
  • rs/moq-ffi/src/consumer.rs
  • rs/moq-ffi/src/test.rs
  • rs/moq-lite/src/error.rs
  • rs/moq-lite/src/ietf/adapter.rs
  • rs/moq-lite/src/ietf/publisher.rs
  • rs/moq-lite/src/ietf/subscriber.rs
  • rs/moq-lite/src/lite/publisher.rs
  • rs/moq-lite/src/lite/subscriber.rs
  • rs/moq-lite/src/model/broadcast.rs
  • rs/moq-lite/src/model/group.rs
  • rs/moq-lite/src/model/track.rs
  • rs/moq-mux/src/catalog.rs
  • rs/moq-mux/src/consumer/ordered.rs
  • rs/moq-mux/src/convert/fmp4.rs
  • rs/moq-mux/src/convert/hang.rs
  • rs/moq-mux/src/convert/test.rs
  • rs/moq-native/examples/chat.rs
  • rs/moq-native/tests/backend.rs
  • rs/moq-native/tests/broadcast.rs
  • rs/moq-relay/src/web.rs

kixelated and others added 2 commits March 23, 2026 13:52
…cross awaits

- broadcast.ts: Use Promise.any so a failed hang fetch no longer short-circuits MSF
- lite/subscriber.rs: Read subscription params from consumer instead of hardcoding priority: 0
- fmp4.rs: Restructure to hold CatalogGuard only during catalog mutation, not across awaits
- Minor: extract magic numbers, harden base64ToHex, remove redundant alias/nullish coalescing

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
rs/moq-mux/src/convert/fmp4.rs (1)

142-180: Consider extracting duplicated spawn logic.

The video and audio task spawning blocks (lines 143-161 and 162-180) are nearly identical, differing only in the is_video boolean passed to convert_legacy_to_cmaf. A helper could reduce duplication.

♻️ Optional: extract common spawn logic
fn spawn_track_task(
    tasks: &mut tokio::task::JoinSet<()>,
    name: String,
    input: moq_lite::TrackSubscriber,
    output: moq_lite::TrackProducer,
    convert: Option<(Vec<u8>, u64)>,
    is_video: bool,
) {
    match convert {
        None => {
            tasks.spawn(async move {
                if let Err(e) = passthrough_track(input, output).await {
                    tracing::error!(%e, track = %name, "passthrough_track failed");
                }
            });
        }
        Some((_, timescale)) => {
            tasks.spawn(async move {
                if let Err(e) = convert_legacy_to_cmaf(input, output, timescale, is_video).await {
                    tracing::error!(%e, track = %name, "convert_legacy_to_cmaf failed");
                }
            });
        }
    }
}

Then call:

for task in video_tasks {
    spawn_track_task(&mut tasks, task.name, task.input, task.output, task.convert, true);
}
for task in audio_tasks {
    spawn_track_task(&mut tasks, task.name, task.input, task.output, task.convert, false);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-mux/src/convert/fmp4.rs` around lines 142 - 180, The video/audio spawn
blocks duplicate logic; extract a helper (e.g., spawn_track_task) that accepts
&mut tokio::task::JoinSet<()>, name: String, input: moq_lite::TrackSubscriber,
output: moq_lite::TrackProducer, convert: Option<(Vec<u8>, u64)>, and is_video:
bool, and moves the match that calls passthrough_track or convert_legacy_to_cmaf
and logs errors; then replace the for loops over video_tasks and audio_tasks to
call spawn_track_task(&mut tasks, task.name, task.input, task.output,
task.convert, true/false) respectively to remove duplication while keeping
existing error logging via tracing::error!.
js/watch/src/msf.ts (1)

8-8: Prefer documenting rationale, not just behavior, in helper comment.

Consider briefly stating why invalid base64 is tolerated (e.g., keep catalog conversion resilient per-track).

As per coding guidelines, "Write comments that explain the 'why', not just the 'what'".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/watch/src/msf.ts` at line 8, The helper comment that currently states
"Convert base64 string to hex string, returning undefined on invalid input."
should be expanded to include the rationale for tolerating invalid base64:
explain that the base64→hex helper (the function that converts base64 strings to
hex and returns undefined on invalid input) deliberately accepts and returns
undefined for bad input to keep per-track catalog conversion resilient and avoid
failing the whole conversion pipeline; update the comment to briefly state this
"why" and keep the existing behavioral note (returning undefined) so future
readers understand both behavior and intent.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@js/watch/src/msf.ts`:
- Around line 43-53: In toAudioConfig, guard parsing of track.channelConfig
before calling u53: parse channelConfig with Number.parseInt only when it is a
valid numeric string (e.g., test with /^\s*\d+\s*$/ or Number.isFinite after
parse), fall back to DEFAULT_NUMBER_OF_CHANNELS when parse yields NaN or invalid
values, and then pass the safe integer to u53; update the computation that
currently calls u53(track.channelConfig ? Number.parseInt(track.channelConfig,
10) : DEFAULT_NUMBER_OF_CHANNELS) to validate the parse result first so u53
never receives NaN or malformed input.

In `@rs/moq-lite/src/lite/publisher.rs`:
- Around line 258-270: The code currently constructs a default Subscription
(Subscription::default()) which ignores the subscriber's requested
start_group/end_group from the incoming Subscribe message, so replace the
default with a Subscription built from those fields: when handling subscribe
(the incoming Subscribe struct), map its start_group and end_group into
Subscription's start and end fields (e.g., Subscription { start:
subscribe.start_group.map(|g| g.into()), end: subscribe.end_group.map(|g|
g.into()), ..Default::default() }) and pass that into track.subscribe(...)
instead of Subscription::default(); keep the rest of the flow (Track::new,
consumer.ok_or, broadcast.consume_track, lite::SubscribeOk) unchanged.

---

Nitpick comments:
In `@js/watch/src/msf.ts`:
- Line 8: The helper comment that currently states "Convert base64 string to hex
string, returning undefined on invalid input." should be expanded to include the
rationale for tolerating invalid base64: explain that the base64→hex helper (the
function that converts base64 strings to hex and returns undefined on invalid
input) deliberately accepts and returns undefined for bad input to keep
per-track catalog conversion resilient and avoid failing the whole conversion
pipeline; update the comment to briefly state this "why" and keep the existing
behavioral note (returning undefined) so future readers understand both behavior
and intent.

In `@rs/moq-mux/src/convert/fmp4.rs`:
- Around line 142-180: The video/audio spawn blocks duplicate logic; extract a
helper (e.g., spawn_track_task) that accepts &mut tokio::task::JoinSet<()>,
name: String, input: moq_lite::TrackSubscriber, output: moq_lite::TrackProducer,
convert: Option<(Vec<u8>, u64)>, and is_video: bool, and moves the match that
calls passthrough_track or convert_legacy_to_cmaf and logs errors; then replace
the for loops over video_tasks and audio_tasks to call spawn_track_task(&mut
tasks, task.name, task.input, task.output, task.convert, true/false)
respectively to remove duplication while keeping existing error logging via
tracing::error!.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: e144d820-8975-4097-ba6a-47f515ad7a3d

📥 Commits

Reviewing files that changed from the base of the PR and between 73fd42b and 107045c.

📒 Files selected for processing (8)
  • js/watch/src/broadcast.ts
  • js/watch/src/msf.ts
  • rs/moq-lite/src/ietf/adapter.rs
  • rs/moq-lite/src/ietf/publisher.rs
  • rs/moq-lite/src/ietf/subscriber.rs
  • rs/moq-lite/src/lite/publisher.rs
  • rs/moq-lite/src/lite/subscriber.rs
  • rs/moq-mux/src/convert/fmp4.rs
✅ Files skipped from review due to trivial changes (1)
  • js/watch/src/broadcast.ts
🚧 Files skipped from review as they are similar to previous changes (4)
  • rs/moq-lite/src/ietf/adapter.rs
  • rs/moq-lite/src/ietf/subscriber.rs
  • rs/moq-lite/src/lite/subscriber.rs
  • rs/moq-lite/src/ietf/publisher.rs

Comment on lines +43 to +53
function toAudioConfig(track: Msf.Track): Catalog.AudioConfig | undefined {
if (!track.codec) return undefined;

return {
codec: track.codec,
container: toContainer(track),
description: track.packaging !== "cmaf" && track.initData ? base64ToHex(track.initData) : undefined,
sampleRate: u53(track.samplerate ?? DEFAULT_SAMPLE_RATE),
numberOfChannels: u53(
track.channelConfig ? Number.parseInt(track.channelConfig, 10) : DEFAULT_NUMBER_OF_CHANNELS,
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# 1) Inspect u53 implementation/validation behavior.
rg -n --type=ts -C3 '\bexport\s+(const|function)\s+u53\b|\bu53\s*\(' js/hang

# 2) Inspect MSF track schema + channelConfig type and current parser usage.
rg -n --type=ts -C3 'channelConfig|samplerate|toAudioConfig|Number\.parseInt' js/msf js/watch

Repository: moq-dev/moq

Length of output: 2184


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# 1) Get full u53 implementation and schema
cat -n js/hang/src/catalog/integers.ts

# 2) Check what u53Schema is defined as
rg -n --type=ts -C5 'u53Schema' js/hang/src/catalog/integers.ts

Repository: moq-dev/moq

Length of output: 1616


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# 1) Check how toAudioConfig failures are handled
rg -n --type=ts -B5 -A10 'toAudioConfig' js/watch/src/msf.ts

# 2) Check if there are any tests for toAudioConfig with edge cases
fd -t ts 'test|spec' js/watch | head -20

# 3) Look for any existing error handling or validation of channelConfig
rg -n --type=ts 'channelConfig' js/ -B2 -A2

Repository: moq-dev/moq

Length of output: 1345


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Find test files
find js/watch -name '*test*' -o -name '*spec*' 2>/dev/null | head -20

# Check MSF data source and schema
rg -n --type=ts 'Msf\.Track|type Track' js/msf -B2 -A5

# Check if there's validation on incoming MSF data
rg -n --type=ts 'parse.*msf|validate.*msf|msf.*schema' js/watch -i

Repository: moq-dev/moq

Length of output: 447


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Get the full TrackSchema definition
rg -n --type=ts -B1 'export const TrackSchema' js/msf/src/catalog.ts -A20

Repository: moq-dev/moq

Length of output: 720


Guard channel parsing before passing into u53.

Number.parseInt(track.channelConfig, 10) can produce NaN or partial values (e.g., "abc"NaN, "12abc"12). When NaN is passed to u53(), Zod validation fails and throws, crashing the entire catalog conversion with no error handling in the caller.

🛠️ Proposed fix
 function toAudioConfig(track: Msf.Track): Catalog.AudioConfig | undefined {
 	if (!track.codec) return undefined;
+	const parsedChannels =
+		track.channelConfig != null ? Number.parseInt(track.channelConfig, 10) : Number.NaN;
+	const channelCount =
+		Number.isInteger(parsedChannels) && parsedChannels > 0
+			? parsedChannels
+			: DEFAULT_NUMBER_OF_CHANNELS;
 
 	return {
 		codec: track.codec,
 		container: toContainer(track),
 		description: track.packaging !== "cmaf" && track.initData ? base64ToHex(track.initData) : undefined,
 		sampleRate: u53(track.samplerate ?? DEFAULT_SAMPLE_RATE),
-		numberOfChannels: u53(
-			track.channelConfig ? Number.parseInt(track.channelConfig, 10) : DEFAULT_NUMBER_OF_CHANNELS,
-		),
+		numberOfChannels: u53(channelCount),
 		bitrate: track.bitrate != null ? u53(track.bitrate) : undefined,
 	};
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/watch/src/msf.ts` around lines 43 - 53, In toAudioConfig, guard parsing of
track.channelConfig before calling u53: parse channelConfig with Number.parseInt
only when it is a valid numeric string (e.g., test with /^\s*\d+\s*$/ or
Number.isFinite after parse), fall back to DEFAULT_NUMBER_OF_CHANNELS when parse
yields NaN or invalid values, and then pass the safe integer to u53; update the
computation that currently calls u53(track.channelConfig ?
Number.parseInt(track.channelConfig, 10) : DEFAULT_NUMBER_OF_CHANNELS) to
validate the parse result first so u53 never receives NaN or malformed input.

Comment on lines +258 to 270
let track = Track::new(subscribe.track.to_string());

// TODO wait until track.info() to get the *real* priority
let broadcast = consumer.ok_or(Error::UnknownBroadcast)?;
let track = broadcast.consume_track(&track)?;
let subscriber = track.subscribe(Subscription::default()).await?;

let info = lite::SubscribeOk {
priority: track.info.priority,
priority: 0,
ordered: false,
max_latency: std::time::Duration::ZERO,
max_latency: Duration::ZERO,
start_group: None,
end_group: None,
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Subscriber's requested start_group/end_group are ignored.

The Subscribe message (per context snippet 4) contains start_group: Option<u64> and end_group: Option<u64> that subscribers use to request specific group ranges. Using Subscription::default() here ignores these fields entirely, since Subscription::default() sets start: None and end: None (meaning "no preference").

This breaks the protocol contract—subscribers requesting playback from a specific group will instead receive groups from wherever the publisher decides.

🔧 Proposed fix to honor subscriber's requested range
-		let subscriber = track.subscribe(Subscription::default()).await?;
+		let subscriber = track
+			.subscribe(moq_lite::Subscription {
+				priority: subscribe.priority,
+				ordered: subscribe.ordered,
+				max_latency: subscribe.max_latency,
+				start: subscribe.start_group,
+				end: subscribe.end_group,
+			})
+			.await?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let track = Track::new(subscribe.track.to_string());
// TODO wait until track.info() to get the *real* priority
let broadcast = consumer.ok_or(Error::UnknownBroadcast)?;
let track = broadcast.consume_track(&track)?;
let subscriber = track.subscribe(Subscription::default()).await?;
let info = lite::SubscribeOk {
priority: track.info.priority,
priority: 0,
ordered: false,
max_latency: std::time::Duration::ZERO,
max_latency: Duration::ZERO,
start_group: None,
end_group: None,
};
let track = Track::new(subscribe.track.to_string());
let broadcast = consumer.ok_or(Error::UnknownBroadcast)?;
let track = broadcast.consume_track(&track)?;
let subscriber = track
.subscribe(moq_lite::Subscription {
priority: subscribe.priority,
ordered: subscribe.ordered,
max_latency: subscribe.max_latency,
start: subscribe.start_group,
end: subscribe.end_group,
})
.await?;
let info = lite::SubscribeOk {
priority: 0,
ordered: false,
max_latency: Duration::ZERO,
start_group: None,
end_group: None,
};
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/lite/publisher.rs` around lines 258 - 270, The code currently
constructs a default Subscription (Subscription::default()) which ignores the
subscriber's requested start_group/end_group from the incoming Subscribe
message, so replace the default with a Subscription built from those fields:
when handling subscribe (the incoming Subscribe struct), map its start_group and
end_group into Subscription's start and end fields (e.g., Subscription { start:
subscribe.start_group.map(|g| g.into()), end: subscribe.end_group.map(|g|
g.into()), ..Default::default() }) and pass that into track.subscribe(...)
instead of Subscription::default(); keep the rest of the flow (Track::new,
consumer.ok_or, broadcast.consume_track, lite::SubscribeOk) unchanged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant