Skip to content

feat(lite): implement SUBSCRIBE_UPDATE API for JS subscriber and publisher#1396

Open
metapox wants to merge 1 commit into
moq-dev:mainfrom
metapox:feat/subscribe-update-js
Open

feat(lite): implement SUBSCRIBE_UPDATE API for JS subscriber and publisher#1396
metapox wants to merge 1 commit into
moq-dev:mainfrom
metapox:feat/subscribe-update-js

Conversation

@metapox
Copy link
Copy Markdown

@metapox metapox commented May 10, 2026

Closes #1363

Problem

The JS subscriber has no API to change priority after subscribing. The only option is to close and re-subscribe, which causes the relay to replay cached groups.

Changes

  • track.ts: Add updatePriority(priority) method that updates an internal Signal
  • subscriber.ts: After SubscribeOk, watch the priority Signal and send SubscribeUpdate when it changes
  • publisher.ts: Apply received SubscribeUpdate priority to the track (implements existing TODO)

Usage

const track = broadcast.subscribe("video", 200);
// Later, change priority without re-subscribing:
track.updatePriority(0);

Context

I'm building moq-multicam, a multi-camera streaming system. Camera switching needs instant priority changes without re-subscribe to avoid replaying stale groups from the relay cache.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 10, 2026

Review Change Stack
No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 19acf53e-b11a-46ee-9bce-bf5aeae1613b

📥 Commits

Reviewing files that changed from the base of the PR and between 83d79b4 and a08dcb3.

📒 Files selected for processing (3)
  • js/lite/src/lite/publisher.ts
  • js/lite/src/lite/subscriber.ts
  • js/lite/src/track.ts
🚧 Files skipped from review as they are similar to previous changes (3)
  • js/lite/src/lite/publisher.ts
  • js/lite/src/track.ts
  • js/lite/src/lite/subscriber.ts

Walkthrough

This PR adds support for dynamic track priority updates in the MOQ subscriber-publisher system. The TrackState data model gains a priority signal, and the Track class exposes a new updatePriority(priority: number) method. The subscriber monitors this priority signal, sends SubscribeUpdate protocol messages when priority changes occur, and includes the watcher in subscription shutdown logic. The publisher receives these SubscribeUpdate messages, logs them, and applies priority updates to the track.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately and specifically describes the main feature added: implementing SUBSCRIBE_UPDATE API support for both JS subscriber and publisher components.
Description check ✅ Passed The description clearly explains the problem, lists the three specific changes across files, provides a usage example, and includes context about the multi-camera streaming use case.
Linked Issues check ✅ Passed The pull request fully addresses all three coding objectives from issue #1363: adding priority Signal and updatePriority() method to Track, implementing priority change monitoring in subscriber, and applying SubscribeUpdate in publisher.
Out of Scope Changes check ✅ Passed All changes are directly aligned with issue #1363 objectives; no unrelated modifications to other components or unnecessary refactoring were introduced.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
✨ Simplify code
  • Create PR with simplified code

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@js/lite/src/lite/subscriber.ts`:
- Around line 193-209: The priorityWatch loop in the priority watcher (the async
IIFE named priorityWatch) must be rewritten to avoid listener leaks and missed
updates: before attaching a listener via
request.track.state.priority.changed(resolve) first check the current priority
snapshot (e.g., read the priority getter/value) so an update that happened
already is handled immediately; if you attach a listener ensure its
dispose/unsubscribe is always called in every path (use try/finally around the
awaited Promise) so listeners are not accumulated; and make the loop
cancellable/terminating when stream.reader.closed or request.track.closed
resolves (for example pass an AbortSignal tied to those closures or break the
loop when either is closed) so the pending listener is cleaned up on shutdown.
Apply changes inside the priorityWatch IIFE referencing
request.track.state.priority.changed, updatePriority handling, SubscribeUpdate
encoding, and the Promise.race with stream.reader.closed and
request.track.closed.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 14817bcb-af2f-4924-8e95-029b6bca2793

📥 Commits

Reviewing files that changed from the base of the PR and between 6adf3c3 and 8f672bf.

📒 Files selected for processing (3)
  • js/lite/src/lite/publisher.ts
  • js/lite/src/lite/subscriber.ts
  • js/lite/src/track.ts

Comment on lines +193 to +209
const priorityWatch = (async () => {
for (;;) {
let dispose!: () => void;
const priority = await new Promise<number | undefined>((resolve) => {
dispose = request.track.state.priority.changed(resolve);
});
if (priority === undefined) {
dispose();
continue;
}
const update = new SubscribeUpdate({ priority });
await update.encode(stream.writer, this.version);
console.debug(`subscribe update: id=${id} broadcast=${broadcast} track=${request.track.name} priority=${priority}`);
}
})();

await Promise.race([stream.reader.closed, request.track.closed, priorityWatch]);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

In the latest @moq/signals API, does Signal.changed(callback) remain subscribed until the returned dispose function is called?

💡 Result:

Yes. In the latest @moq/signals API, calling Signal.changed(callback) returns a “dispose” function, and the subscription stays active until that dispose function is called. This is consistent with the library’s standard subscribe pattern: adapters/hooks in the repo use a subscription callback that returns an unsubscribe/dispose function to React’s useSyncExternalStore subscription mechanism (i.e., subscription lifetime is tied to the returned disposer) [1]. The docs site for @moq/signals also emphasizes the explicit subscription mechanism used by effects, rather than implicit global listeners, aligning with disposer-controlled teardown [2] [3].

Citations:


🏁 Script executed:

# First, find the file to confirm it exists
find . -name "subscriber.ts" -path "*/lite/*" | head -20

Repository: moq-dev/moq

Length of output: 121


🏁 Script executed:

# Examine the file around lines 193-209 to verify the issue
sed -n '180,220p' ./js/lite/src/lite/subscriber.ts | cat -n

Repository: moq-dev/moq

Length of output: 1877


🏁 Script executed:

# Check for Signal.changed usage elsewhere in the file to understand the pattern
rg "Signal.changed\|changed\(" ./js/lite/src/lite/subscriber.ts -B 2 -A 2

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

# Also check the StateSignal or Signal type definition to understand its behavior
fd -e ts -e tsx -path "*signals*" | head -10

Repository: moq-dev/moq

Length of output: 224


🏁 Script executed:

# Search for Signal or state.priority definitions to understand the changed() behavior
rg "state\.priority|\.changed\(" ./js/lite/src/lite/subscriber.ts -B 1 -A 1

Repository: moq-dev/moq

Length of output: 202


🏁 Script executed:

# Find the Signal type definition in the `@moq/signals` library
find . -name "*.ts" -o -name "*.tsx" | xargs grep -l "changed.*\(.*resolve" | head -5

Repository: moq-dev/moq

Length of output: 79


🏁 Script executed:

# Check what happens when priorityWatch promise resolves - verify if dispose is called
sed -n '30,35p' ./js/lite/src/lite/subscriber.ts | cat -n

Repository: moq-dev/moq

Length of output: 197


🏁 Script executed:

# Search for peek() method usage to understand the pattern for checking current state
rg "\.peek\(" ./js/lite/src/ | head -5

Repository: moq-dev/moq

Length of output: 438


🏁 Script executed:

# Verify the exact control flow - check what happens after the Promise.race on line 30
sed -n '190,240p' ./js/lite/src/lite/subscriber.ts | cat -n

Repository: moq-dev/moq

Length of output: 2035


🏁 Script executed:

# Search for how updatePriority is called to understand if early updates are possible
rg "updatePriority" ./js/lite/src/lite/ -B 2 -A 2

Repository: moq-dev/moq

Length of output: 431


Fix priority watcher lifecycle to prevent listener buildup and dropped updates.

The current implementation has two critical issues:

  1. Listener accumulation: On line 197, each loop iteration registers a new listener via request.track.state.priority.changed(resolve), but disposal only happens on the undefined path (line 200). When priority !== undefined, the listener is never disposed, causing accumulation with each update.

  2. Dropped early updates: If updatePriority() is called before the listener starts on line 197, that change is missed. The listener only captures changes after registration.

  3. Resource leak on close: When Promise.race() resolves from stream or track closure (line 209), the priorityWatch loop never exits, leaving the final listener active indefinitely.

Proposed fix
 			// Watch for priority changes and send SUBSCRIBE_UPDATE.
+			const sendPriorityUpdate = async (priority: number) => {
+				const update = new SubscribeUpdate({ priority });
+				await update.encode(stream.writer, this.version);
+				console.debug(`subscribe update: id=${id} broadcast=${broadcast} track=${request.track.name} priority=${priority}`);
+			};
+
+			// Flush any priority set before the watcher starts.
+			const pendingPriority = request.track.state.priority.peek();
+			if (pendingPriority !== undefined && pendingPriority !== request.priority) {
+				await sendPriorityUpdate(pendingPriority);
+			}
+
 			const priorityWatch = (async () => {
+				const stop = Symbol("stop");
 				for (;;) {
 					let dispose!: () => void;
-					const priority = await new Promise<number | undefined>((resolve) => {
+					const priority = await Promise.race([
+						new Promise<number | undefined>((resolve) => {
+							dispose = request.track.state.priority.changed(resolve);
+						}),
+						Promise.race([stream.reader.closed, request.track.closed]).then(() => stop),
+					]);
+					dispose();
+
+					if (priority === stop) break;
+					if (priority === undefined) continue;
+					await sendPriorityUpdate(priority);
-						dispose = request.track.state.priority.changed(resolve);
-					});
-					if (priority === undefined) {
-						dispose();
-						continue;
-					}
-					const update = new SubscribeUpdate({ priority });
-					await update.encode(stream.writer, this.version);
-					console.debug(`subscribe update: id=${id} broadcast=${broadcast} track=${request.track.name} priority=${priority}`;
 				}
 			})();
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@js/lite/src/lite/subscriber.ts` around lines 193 - 209, The priorityWatch
loop in the priority watcher (the async IIFE named priorityWatch) must be
rewritten to avoid listener leaks and missed updates: before attaching a
listener via request.track.state.priority.changed(resolve) first check the
current priority snapshot (e.g., read the priority getter/value) so an update
that happened already is handled immediately; if you attach a listener ensure
its dispose/unsubscribe is always called in every path (use try/finally around
the awaited Promise) so listeners are not accumulated; and make the loop
cancellable/terminating when stream.reader.closed or request.track.closed
resolves (for example pass an AbortSignal tied to those closures or break the
loop when either is closed) so the pending listener is cleaned up on shutdown.
Apply changes inside the priorityWatch IIFE referencing
request.track.state.priority.changed, updatePriority handling, SubscribeUpdate
encoding, and the Promise.race with stream.reader.closed and
request.track.closed.

@metapox metapox force-pushed the feat/subscribe-update-js branch from 8f672bf to 83d79b4 Compare May 11, 2026 08:20
…isher

- Track: add priority Signal and updatePriority() method
- Subscriber: watch for priority changes after SubscribeOk, send SubscribeUpdate
- Publisher: apply received SubscribeUpdate priority to track (was TODO)
@metapox metapox force-pushed the feat/subscribe-update-js branch from 83d79b4 to a08dcb3 Compare May 11, 2026 08:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(lite): JS Subscriber lacks SUBSCRIBE_UPDATE support for dynamic priority changes

1 participant