Skip to content

fix(lite): update in-flight group priorities on SUBSCRIBE_UPDATE#1397

Open
metapox wants to merge 1 commit into
moq-dev:mainfrom
metapox:fix/priority-queue-update
Open

fix(lite): update in-flight group priorities on SUBSCRIBE_UPDATE#1397
metapox wants to merge 1 commit into
moq-dev:mainfrom
metapox:fix/priority-queue-update

Conversation

@metapox
Copy link
Copy Markdown

@metapox metapox commented May 10, 2026

Closes #1370

Problem

When subscription priority changes via SUBSCRIBE_UPDATE, only new groups get the updated priority. In-flight groups continue at the old priority, causing stale scheduling under bandwidth constraints.

Changes

  • priority.rs: Add subscription_id to PriorityItem. Add update_subscription(id, new_priority) to re-sort existing items. Widen quinn priority range (index * 64) for effective differentiation.
  • publisher.rs: Add run_subscribe_updates() that listens for SubscribeUpdate messages and calls priority.update_subscription() to update in-flight groups immediately.

Testing

Verified with bandwidth throttling (tc 2000kbps): focus camera maintains quality while background cameras degrade. Priority switch is instant for both new and in-flight groups.

Context

Built for moq-multicam where camera switching under bandwidth constraints requires immediate priority propagation to all groups, not just future ones.

Ref: draft-ietf-moq-transport-13 Section 6.1

PriorityQueue did not update existing items when subscription priority
changed via SUBSCRIBE_UPDATE. Only new groups got the updated priority,
causing stale scheduling under bandwidth constraints.

Changes:
- Add subscription_id to PriorityItem and insert()
- Add PriorityQueue::update_subscription() to re-sort existing items
- Call update_subscription() in run_track on SUBSCRIBE_UPDATE
- Widen quinn priority range (index * 64) for effective differentiation
- Monitor priority changes during write_all via tokio::select!

Ref: moq-dev#1370
Spec: draft-ietf-moq-transport-13 Section 6.1
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 10, 2026

Review Change Stack

Walkthrough

This pull request introduces subscription-aware priority management to the media stream serving system. The PriorityQueue now tracks subscription_id alongside existing priority metadata. A new update_subscription API allows runtime priority changes for all items belonging to a subscription. The publisher concurrently decodes incoming SubscribeUpdate messages and applies them to the queue. Track serving now includes subscribe_id in priority insertions, and QUIC stream priorities are dynamically mapped and updated during chunk transmission, including mid-write adjustments when priority changes occur.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 72.41% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main objective: fixing in-flight group priority updates when SUBSCRIBE_UPDATE messages arrive, which is the primary change across both modified files.
Description check ✅ Passed The description clearly explains the problem, changes made to both files, testing performed, and references the linked issue #1370 and relevant RFC section.
Linked Issues check ✅ Passed The PR successfully implements all coding requirements from issue #1370: adds subscription_id tracking to PriorityItem, implements update_subscription() API, integrates SUBSCRIBE_UPDATE handling in publisher.rs, and widens quinn priority mapping (index * 64).
Out of Scope Changes check ✅ Passed All changes are directly scoped to issue #1370 objectives: priority.rs updates for subscription tracking and reordering, publisher.rs integration of SUBSCRIBE_UPDATE handling, and quinn priority differentiation via index * 64 mapping.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
✨ Simplify code
  • Create PR with simplified code

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
rs/moq-lite/src/lite/priority.rs (1)

411-427: ⚡ Quick win

Call tokio::time::pause() at the start of these async tests.

All three async tests use tokio::time::sleep() without pausing time, causing them to depend on real wall-clock duration and making them slow and prone to CI timing variance. Call tokio::time::pause() at the start of each test (or use #[tokio::test(start_paused = true)]) to make timers advance instantly and deterministically.

Applies to lines 411–427, 528–549, and 593–612.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-lite/src/lite/priority.rs` around lines 411 - 427, The async tests
that call tokio::time::sleep() (including
test_watch_notification_on_overflow_promotion and the other two tests at the
ranges 528–549 and 593–612) rely on real wall-clock timers; add
tokio::time::pause() as the first statement in each test (or switch the test
attribute to #[tokio::test(start_paused = true)]) so timers advance
deterministically and the tokio::time::sleep() calls complete instantly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@rs/moq-lite/src/lite/publisher.rs`:
- Around line 300-303: run_subscribe_updates only reorders existing queued
handles but the code at lines around the provided diff still inserts new groups
using the original track.priority, so updated subscription priority never
affects future inserts; to fix, make the updated priority visible to producers
and consumers by sharing the mutable priority with run_track (e.g., pass a
shared/atomic/Arc-wrapped priority or a reference to subscribe) or by resolving
priority inside PriorityQueue::insert (look up current subscribe.priority by
subscribe_id) so that both run_track and insert use the same up-to-date value;
update call sites (including run_track(session, track, subscribe,
priority.clone(), version) and run_subscribe_updates(&mut stream.reader,
&priority, subscribe_id)) and PriorityQueue::insert to read the shared current
priority rather than the stale track.priority.
- Around line 374-377: The priority mapping uses idx.saturating_mul(64) which
collapses many inputs to 255; update the closure quinn_pri and its use (where
initial_priority is read and passed to stream.set_priority) to perform a
monotonic scaling across the full 0..=255 range instead of a fixed multiply so
each distinct u8 maps to a distinct or properly spaced u8 value (e.g., compute
using wider intermediate integer math and scale proportionally: cast idx to a
larger integer, scale to 0..255, clamp to u8, then call stream.set_priority with
that result).

---

Nitpick comments:
In `@rs/moq-lite/src/lite/priority.rs`:
- Around line 411-427: The async tests that call tokio::time::sleep() (including
test_watch_notification_on_overflow_promotion and the other two tests at the
ranges 528–549 and 593–612) rely on real wall-clock timers; add
tokio::time::pause() as the first statement in each test (or switch the test
attribute to #[tokio::test(start_paused = true)]) so timers advance
deterministically and the tokio::time::sleep() calls complete instantly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 75a5a7f2-7b64-4f8c-89d3-82eb6e680d5a

📥 Commits

Reviewing files that changed from the base of the PR and between 6adf3c3 and ac4d7de.

📒 Files selected for processing (2)
  • rs/moq-lite/src/lite/priority.rs
  • rs/moq-lite/src/lite/publisher.rs

Comment on lines +300 to +303
let subscribe_id = subscribe.id;
tokio::select! {
res = Self::run_track(session, track, subscribe, priority, version) => res?,
res = stream.reader.closed() => res?,
res = Self::run_track(session, track, subscribe, priority.clone(), version) => res?,
res = Self::run_subscribe_updates(&mut stream.reader, &priority, subscribe_id) => res?,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

New groups still use the pre-update priority.

run_subscribe_updates() only reorders handles that are already in the queue. Line 359 still inserts later groups with the original track.priority, so once the current in-flight groups drain, scheduling falls back to the stale value. The updated subscription priority needs to be shared with run_track, or resolved inside PriorityQueue::insert, so future groups stay in sync too.

Also applies to: 358-360

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-lite/src/lite/publisher.rs` around lines 300 - 303,
run_subscribe_updates only reorders existing queued handles but the code at
lines around the provided diff still inserts new groups using the original
track.priority, so updated subscription priority never affects future inserts;
to fix, make the updated priority visible to producers and consumers by sharing
the mutable priority with run_track (e.g., pass a shared/atomic/Arc-wrapped
priority or a reference to subscribe) or by resolving priority inside
PriorityQueue::insert (look up current subscribe.priority by subscribe_id) so
that both run_track and insert use the same up-to-date value; update call sites
(including run_track(session, track, subscribe, priority.clone(), version) and
run_subscribe_updates(&mut stream.reader, &priority, subscribe_id)) and
PriorityQueue::insert to read the shared current priority rather than the stale
track.priority.

Comment on lines +374 to +377
let initial_priority = priority.current();
// Widen the priority range so quinn can effectively differentiate streams.
let quinn_pri = |idx: u8| -> u8 { idx.saturating_mul(64) };
stream.set_priority(quinn_pri(initial_priority));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect the priority setter signature and every Rust call site.
rg -n -C3 '\bfn\s+set_priority\b|\bset_priority\s*\(' --type rust

# Inspect the exact mapping added in this PR.
rg -n -C2 'saturating_mul\(64\)|quinn_pri' rs/moq-lite/src/lite/publisher.rs

Repository: moq-dev/moq

Length of output: 5119


🏁 Script executed:

# Find the Writer struct definition and its stream field type
rg -n 'struct Writer' --type rust -A 10 rs/moq-lite/src/coding/writer.rs

# Search for quinn stream usage and type hints
rg -n 'quinn\s*::|use\s+quinn' --type rust rs/moq-lite/src/

# Look for the actual stream wrapper or trait definition
rg -n 'pub trait.*Stream|impl.*Stream.*set_priority' --type rust

Repository: moq-dev/moq

Length of output: 402


🏁 Script executed:

# Find the web_transport_trait crate location and SendStream trait definition
fd -e rs 'web_transport' | head -20

# Search for SendStream trait and its set_priority method
rg -n 'trait SendStream|fn set_priority' --type rust | grep -A 2 SendStream

# Check what web_transport_trait is (might be local or external)
rg -n 'web_transport_trait' --type rust rs/moq-lite/Cargo.toml rs/moq-lite/src/lib.rs

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

# Find Cargo.toml and check dependencies
find . -name Cargo.toml -type f | head -10

# Search for web_transport_trait more broadly
rg -n 'web_transport_trait' --type toml

# Check if it's a local workspace member
find . -type d -name '*web_transport*'

Repository: moq-dev/moq

Length of output: 285


🏁 Script executed:

# Check moq-lite's Cargo.toml for dependencies
cat rs/moq-lite/Cargo.toml

# Search for SendStream trait definition anywhere in repo
rg -n 'trait SendStream' --type rust

# Look for web_transport imports
rg -n 'use.*web_transport' --type rust rs/moq-lite/src/

Repository: moq-dev/moq

Length of output: 1143


🏁 Script executed:

# Find web-transport-trait crate location
find . -type f -name Cargo.toml -path '*web-transport*'

# List the workspace structure to find web-transport-trait
ls -la rs/ | grep -i web

# Find the trait definition
rg -n 'trait SendStream' --type rust

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

# Check root Cargo.toml for workspace definition
cat Cargo.toml | head -50

# Search for web-transport-trait in root Cargo.toml
rg -n 'web-transport-trait' Cargo.toml

# Look at all available workspace members
ls -la rs/

Repository: moq-dev/moq

Length of output: 2467


🏁 Script executed:

# web-transport-trait is external crate 0.3.4
# Let's look at what the actual stream implementations do in the IETF adapter
cat rs/moq-lite/src/ietf/adapter.rs | head -100

# Look specifically at the Real and Virtual stream definitions
rg -n 'Self::Real|Self::Virtual' rs/moq-lite/src/ietf/adapter.rs | head -20

# Check if there's documentation about what set_priority does
rg -n 'set_priority|priority' rs/moq-lite/src/ietf/adapter.rs | head -30

Repository: moq-dev/moq

Length of output: 3430


🌐 Web query:

web-transport-trait 0.3.4 SendStream set_priority signature

💡 Result:

In web-transport-trait 0.3.4, the SendStream::set_priority method has the signature: fn set_priority(&mut self, order: u8) [1]. It takes a mutable reference to the stream and an 8-bit priority value (order: u8) [1].

Citations:


🏁 Script executed:

# Check the priority type - what is priority.current()?
rg -n 'priority\.current\(\)|struct.*riority|enum.*riority' --type rust rs/moq-lite/src/lite/publisher.rs -B 5 -A 2

# Look for priority definition
rg -n 'let priority.*=|priority:' --type rust rs/moq-lite/src/lite/publisher.rs | head -20

Repository: moq-dev/moq

Length of output: 877


🏁 Script executed:

# Find PriorityQueue and PriorityHandle definitions
rg -n 'struct PriorityQueue|struct PriorityHandle|impl PriorityQueue|impl PriorityHandle' --type rust

# Look for their current() method
rg -n 'fn current\(' --type rust -A 2

Repository: moq-dev/moq

Length of output: 479


Fix the priority mapping to avoid u8 saturation collapsing the range.

idx.saturating_mul(64) on u8 produces only five distinct values: 0, 64, 128, 192, and 255. This means indices 4..=255 all map to 255, completely defeating the stated goal of "widening the priority range so quinn can effectively differentiate streams." Since set_priority accepts only u8, a different monotonic remap is needed—for example, scaling the u8 value across the full 0–255 range rather than multiplying by a constant.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-lite/src/lite/publisher.rs` around lines 374 - 377, The priority
mapping uses idx.saturating_mul(64) which collapses many inputs to 255; update
the closure quinn_pri and its use (where initial_priority is read and passed to
stream.set_priority) to perform a monotonic scaling across the full 0..=255
range instead of a fixed multiply so each distinct u8 maps to a distinct or
properly spaced u8 value (e.g., compute using wider intermediate integer math
and scale proportionally: cast idx to a larger integer, scale to 0..255, clamp
to u8, then call stream.set_priority with that result).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

fix(lite): PriorityQueue does not update in-flight groups on SUBSCRIBE_UPDATE

1 participant