feat(lite): implement SUBSCRIBE_UPDATE API for JS subscriber and publisher#1396
feat(lite): implement SUBSCRIBE_UPDATE API for JS subscriber and publisher#1396metapox wants to merge 1 commit into
Conversation
|
ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
🚧 Files skipped from review as they are similar to previous changes (3)
WalkthroughThis PR adds support for dynamic track priority updates in the MOQ subscriber-publisher system. The 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
✨ Simplify code
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@js/lite/src/lite/subscriber.ts`:
- Around line 193-209: The priorityWatch loop in the priority watcher (the async
IIFE named priorityWatch) must be rewritten to avoid listener leaks and missed
updates: before attaching a listener via
request.track.state.priority.changed(resolve) first check the current priority
snapshot (e.g., read the priority getter/value) so an update that happened
already is handled immediately; if you attach a listener ensure its
dispose/unsubscribe is always called in every path (use try/finally around the
awaited Promise) so listeners are not accumulated; and make the loop
cancellable/terminating when stream.reader.closed or request.track.closed
resolves (for example pass an AbortSignal tied to those closures or break the
loop when either is closed) so the pending listener is cleaned up on shutdown.
Apply changes inside the priorityWatch IIFE referencing
request.track.state.priority.changed, updatePriority handling, SubscribeUpdate
encoding, and the Promise.race with stream.reader.closed and
request.track.closed.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 14817bcb-af2f-4924-8e95-029b6bca2793
📒 Files selected for processing (3)
js/lite/src/lite/publisher.tsjs/lite/src/lite/subscriber.tsjs/lite/src/track.ts
| const priorityWatch = (async () => { | ||
| for (;;) { | ||
| let dispose!: () => void; | ||
| const priority = await new Promise<number | undefined>((resolve) => { | ||
| dispose = request.track.state.priority.changed(resolve); | ||
| }); | ||
| if (priority === undefined) { | ||
| dispose(); | ||
| continue; | ||
| } | ||
| const update = new SubscribeUpdate({ priority }); | ||
| await update.encode(stream.writer, this.version); | ||
| console.debug(`subscribe update: id=${id} broadcast=${broadcast} track=${request.track.name} priority=${priority}`); | ||
| } | ||
| })(); | ||
|
|
||
| await Promise.race([stream.reader.closed, request.track.closed, priorityWatch]); |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In the latest @moq/signals API, does Signal.changed(callback) remain subscribed until the returned dispose function is called?
💡 Result:
Yes. In the latest @moq/signals API, calling Signal.changed(callback) returns a “dispose” function, and the subscription stays active until that dispose function is called. This is consistent with the library’s standard subscribe pattern: adapters/hooks in the repo use a subscription callback that returns an unsubscribe/dispose function to React’s useSyncExternalStore subscription mechanism (i.e., subscription lifetime is tied to the returned disposer) [1]. The docs site for @moq/signals also emphasizes the explicit subscription mechanism used by effects, rather than implicit global listeners, aligning with disposer-controlled teardown [2] [3].
Citations:
- 1: https://github.com/moq-dev/moq/blob/main/js/signals/src/react.ts
- 2: https://www.jsdelivr.com/package/npm/@moq/signals
- 3: https://doc.moq.dev/js/
🏁 Script executed:
# First, find the file to confirm it exists
find . -name "subscriber.ts" -path "*/lite/*" | head -20Repository: moq-dev/moq
Length of output: 121
🏁 Script executed:
# Examine the file around lines 193-209 to verify the issue
sed -n '180,220p' ./js/lite/src/lite/subscriber.ts | cat -nRepository: moq-dev/moq
Length of output: 1877
🏁 Script executed:
# Check for Signal.changed usage elsewhere in the file to understand the pattern
rg "Signal.changed\|changed\(" ./js/lite/src/lite/subscriber.ts -B 2 -A 2Repository: moq-dev/moq
Length of output: 37
🏁 Script executed:
# Also check the StateSignal or Signal type definition to understand its behavior
fd -e ts -e tsx -path "*signals*" | head -10Repository: moq-dev/moq
Length of output: 224
🏁 Script executed:
# Search for Signal or state.priority definitions to understand the changed() behavior
rg "state\.priority|\.changed\(" ./js/lite/src/lite/subscriber.ts -B 1 -A 1Repository: moq-dev/moq
Length of output: 202
🏁 Script executed:
# Find the Signal type definition in the `@moq/signals` library
find . -name "*.ts" -o -name "*.tsx" | xargs grep -l "changed.*\(.*resolve" | head -5Repository: moq-dev/moq
Length of output: 79
🏁 Script executed:
# Check what happens when priorityWatch promise resolves - verify if dispose is called
sed -n '30,35p' ./js/lite/src/lite/subscriber.ts | cat -nRepository: moq-dev/moq
Length of output: 197
🏁 Script executed:
# Search for peek() method usage to understand the pattern for checking current state
rg "\.peek\(" ./js/lite/src/ | head -5Repository: moq-dev/moq
Length of output: 438
🏁 Script executed:
# Verify the exact control flow - check what happens after the Promise.race on line 30
sed -n '190,240p' ./js/lite/src/lite/subscriber.ts | cat -nRepository: moq-dev/moq
Length of output: 2035
🏁 Script executed:
# Search for how updatePriority is called to understand if early updates are possible
rg "updatePriority" ./js/lite/src/lite/ -B 2 -A 2Repository: moq-dev/moq
Length of output: 431
Fix priority watcher lifecycle to prevent listener buildup and dropped updates.
The current implementation has two critical issues:
-
Listener accumulation: On line 197, each loop iteration registers a new listener via
request.track.state.priority.changed(resolve), but disposal only happens on theundefinedpath (line 200). Whenpriority !== undefined, the listener is never disposed, causing accumulation with each update. -
Dropped early updates: If
updatePriority()is called before the listener starts on line 197, that change is missed. The listener only captures changes after registration. -
Resource leak on close: When
Promise.race()resolves from stream or track closure (line 209), thepriorityWatchloop never exits, leaving the final listener active indefinitely.
Proposed fix
// Watch for priority changes and send SUBSCRIBE_UPDATE.
+ const sendPriorityUpdate = async (priority: number) => {
+ const update = new SubscribeUpdate({ priority });
+ await update.encode(stream.writer, this.version);
+ console.debug(`subscribe update: id=${id} broadcast=${broadcast} track=${request.track.name} priority=${priority}`);
+ };
+
+ // Flush any priority set before the watcher starts.
+ const pendingPriority = request.track.state.priority.peek();
+ if (pendingPriority !== undefined && pendingPriority !== request.priority) {
+ await sendPriorityUpdate(pendingPriority);
+ }
+
const priorityWatch = (async () => {
+ const stop = Symbol("stop");
for (;;) {
let dispose!: () => void;
- const priority = await new Promise<number | undefined>((resolve) => {
+ const priority = await Promise.race([
+ new Promise<number | undefined>((resolve) => {
+ dispose = request.track.state.priority.changed(resolve);
+ }),
+ Promise.race([stream.reader.closed, request.track.closed]).then(() => stop),
+ ]);
+ dispose();
+
+ if (priority === stop) break;
+ if (priority === undefined) continue;
+ await sendPriorityUpdate(priority);
- dispose = request.track.state.priority.changed(resolve);
- });
- if (priority === undefined) {
- dispose();
- continue;
- }
- const update = new SubscribeUpdate({ priority });
- await update.encode(stream.writer, this.version);
- console.debug(`subscribe update: id=${id} broadcast=${broadcast} track=${request.track.name} priority=${priority}`;
}
})();🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@js/lite/src/lite/subscriber.ts` around lines 193 - 209, The priorityWatch
loop in the priority watcher (the async IIFE named priorityWatch) must be
rewritten to avoid listener leaks and missed updates: before attaching a
listener via request.track.state.priority.changed(resolve) first check the
current priority snapshot (e.g., read the priority getter/value) so an update
that happened already is handled immediately; if you attach a listener ensure
its dispose/unsubscribe is always called in every path (use try/finally around
the awaited Promise) so listeners are not accumulated; and make the loop
cancellable/terminating when stream.reader.closed or request.track.closed
resolves (for example pass an AbortSignal tied to those closures or break the
loop when either is closed) so the pending listener is cleaned up on shutdown.
Apply changes inside the priorityWatch IIFE referencing
request.track.state.priority.changed, updatePriority handling, SubscribeUpdate
encoding, and the Promise.race with stream.reader.closed and
request.track.closed.
8f672bf to
83d79b4
Compare
…isher - Track: add priority Signal and updatePriority() method - Subscriber: watch for priority changes after SubscribeOk, send SubscribeUpdate - Publisher: apply received SubscribeUpdate priority to track (was TODO)
83d79b4 to
a08dcb3
Compare
Closes #1363
Problem
The JS subscriber has no API to change priority after subscribing. The only option is to close and re-subscribe, which causes the relay to replay cached groups.
Changes
updatePriority(priority)method that updates an internal SignalSubscribeUpdatewhen it changesSubscribeUpdatepriority to the track (implements existing TODO)Usage
Context
I'm building moq-multicam, a multi-camera streaming system. Camera switching needs instant priority changes without re-subscribe to avoid replaying stale groups from the relay cache.