fix(lite): update in-flight group priorities on SUBSCRIBE_UPDATE#1397
fix(lite): update in-flight group priorities on SUBSCRIBE_UPDATE#1397metapox wants to merge 1 commit into
Conversation
PriorityQueue did not update existing items when subscription priority changed via SUBSCRIBE_UPDATE. Only new groups got the updated priority, causing stale scheduling under bandwidth constraints. Changes: - Add subscription_id to PriorityItem and insert() - Add PriorityQueue::update_subscription() to re-sort existing items - Call update_subscription() in run_track on SUBSCRIBE_UPDATE - Widen quinn priority range (index * 64) for effective differentiation - Monitor priority changes during write_all via tokio::select! Ref: moq-dev#1370 Spec: draft-ietf-moq-transport-13 Section 6.1
WalkthroughThis pull request introduces subscription-aware priority management to the media stream serving system. The 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
✨ Simplify code
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
rs/moq-lite/src/lite/priority.rs (1)
411-427: ⚡ Quick winCall
tokio::time::pause()at the start of these async tests.All three async tests use
tokio::time::sleep()without pausing time, causing them to depend on real wall-clock duration and making them slow and prone to CI timing variance. Calltokio::time::pause()at the start of each test (or use#[tokio::test(start_paused = true)]) to make timers advance instantly and deterministically.Applies to lines 411–427, 528–549, and 593–612.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-lite/src/lite/priority.rs` around lines 411 - 427, The async tests that call tokio::time::sleep() (including test_watch_notification_on_overflow_promotion and the other two tests at the ranges 528–549 and 593–612) rely on real wall-clock timers; add tokio::time::pause() as the first statement in each test (or switch the test attribute to #[tokio::test(start_paused = true)]) so timers advance deterministically and the tokio::time::sleep() calls complete instantly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rs/moq-lite/src/lite/publisher.rs`:
- Around line 300-303: run_subscribe_updates only reorders existing queued
handles but the code at lines around the provided diff still inserts new groups
using the original track.priority, so updated subscription priority never
affects future inserts; to fix, make the updated priority visible to producers
and consumers by sharing the mutable priority with run_track (e.g., pass a
shared/atomic/Arc-wrapped priority or a reference to subscribe) or by resolving
priority inside PriorityQueue::insert (look up current subscribe.priority by
subscribe_id) so that both run_track and insert use the same up-to-date value;
update call sites (including run_track(session, track, subscribe,
priority.clone(), version) and run_subscribe_updates(&mut stream.reader,
&priority, subscribe_id)) and PriorityQueue::insert to read the shared current
priority rather than the stale track.priority.
- Around line 374-377: The priority mapping uses idx.saturating_mul(64) which
collapses many inputs to 255; update the closure quinn_pri and its use (where
initial_priority is read and passed to stream.set_priority) to perform a
monotonic scaling across the full 0..=255 range instead of a fixed multiply so
each distinct u8 maps to a distinct or properly spaced u8 value (e.g., compute
using wider intermediate integer math and scale proportionally: cast idx to a
larger integer, scale to 0..255, clamp to u8, then call stream.set_priority with
that result).
---
Nitpick comments:
In `@rs/moq-lite/src/lite/priority.rs`:
- Around line 411-427: The async tests that call tokio::time::sleep() (including
test_watch_notification_on_overflow_promotion and the other two tests at the
ranges 528–549 and 593–612) rely on real wall-clock timers; add
tokio::time::pause() as the first statement in each test (or switch the test
attribute to #[tokio::test(start_paused = true)]) so timers advance
deterministically and the tokio::time::sleep() calls complete instantly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 75a5a7f2-7b64-4f8c-89d3-82eb6e680d5a
📒 Files selected for processing (2)
rs/moq-lite/src/lite/priority.rsrs/moq-lite/src/lite/publisher.rs
| let subscribe_id = subscribe.id; | ||
| tokio::select! { | ||
| res = Self::run_track(session, track, subscribe, priority, version) => res?, | ||
| res = stream.reader.closed() => res?, | ||
| res = Self::run_track(session, track, subscribe, priority.clone(), version) => res?, | ||
| res = Self::run_subscribe_updates(&mut stream.reader, &priority, subscribe_id) => res?, |
There was a problem hiding this comment.
New groups still use the pre-update priority.
run_subscribe_updates() only reorders handles that are already in the queue. Line 359 still inserts later groups with the original track.priority, so once the current in-flight groups drain, scheduling falls back to the stale value. The updated subscription priority needs to be shared with run_track, or resolved inside PriorityQueue::insert, so future groups stay in sync too.
Also applies to: 358-360
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-lite/src/lite/publisher.rs` around lines 300 - 303,
run_subscribe_updates only reorders existing queued handles but the code at
lines around the provided diff still inserts new groups using the original
track.priority, so updated subscription priority never affects future inserts;
to fix, make the updated priority visible to producers and consumers by sharing
the mutable priority with run_track (e.g., pass a shared/atomic/Arc-wrapped
priority or a reference to subscribe) or by resolving priority inside
PriorityQueue::insert (look up current subscribe.priority by subscribe_id) so
that both run_track and insert use the same up-to-date value; update call sites
(including run_track(session, track, subscribe, priority.clone(), version) and
run_subscribe_updates(&mut stream.reader, &priority, subscribe_id)) and
PriorityQueue::insert to read the shared current priority rather than the stale
track.priority.
| let initial_priority = priority.current(); | ||
| // Widen the priority range so quinn can effectively differentiate streams. | ||
| let quinn_pri = |idx: u8| -> u8 { idx.saturating_mul(64) }; | ||
| stream.set_priority(quinn_pri(initial_priority)); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Inspect the priority setter signature and every Rust call site.
rg -n -C3 '\bfn\s+set_priority\b|\bset_priority\s*\(' --type rust
# Inspect the exact mapping added in this PR.
rg -n -C2 'saturating_mul\(64\)|quinn_pri' rs/moq-lite/src/lite/publisher.rsRepository: moq-dev/moq
Length of output: 5119
🏁 Script executed:
# Find the Writer struct definition and its stream field type
rg -n 'struct Writer' --type rust -A 10 rs/moq-lite/src/coding/writer.rs
# Search for quinn stream usage and type hints
rg -n 'quinn\s*::|use\s+quinn' --type rust rs/moq-lite/src/
# Look for the actual stream wrapper or trait definition
rg -n 'pub trait.*Stream|impl.*Stream.*set_priority' --type rustRepository: moq-dev/moq
Length of output: 402
🏁 Script executed:
# Find the web_transport_trait crate location and SendStream trait definition
fd -e rs 'web_transport' | head -20
# Search for SendStream trait and its set_priority method
rg -n 'trait SendStream|fn set_priority' --type rust | grep -A 2 SendStream
# Check what web_transport_trait is (might be local or external)
rg -n 'web_transport_trait' --type rust rs/moq-lite/Cargo.toml rs/moq-lite/src/lib.rsRepository: moq-dev/moq
Length of output: 37
🏁 Script executed:
# Find Cargo.toml and check dependencies
find . -name Cargo.toml -type f | head -10
# Search for web_transport_trait more broadly
rg -n 'web_transport_trait' --type toml
# Check if it's a local workspace member
find . -type d -name '*web_transport*'Repository: moq-dev/moq
Length of output: 285
🏁 Script executed:
# Check moq-lite's Cargo.toml for dependencies
cat rs/moq-lite/Cargo.toml
# Search for SendStream trait definition anywhere in repo
rg -n 'trait SendStream' --type rust
# Look for web_transport imports
rg -n 'use.*web_transport' --type rust rs/moq-lite/src/Repository: moq-dev/moq
Length of output: 1143
🏁 Script executed:
# Find web-transport-trait crate location
find . -type f -name Cargo.toml -path '*web-transport*'
# List the workspace structure to find web-transport-trait
ls -la rs/ | grep -i web
# Find the trait definition
rg -n 'trait SendStream' --type rustRepository: moq-dev/moq
Length of output: 37
🏁 Script executed:
# Check root Cargo.toml for workspace definition
cat Cargo.toml | head -50
# Search for web-transport-trait in root Cargo.toml
rg -n 'web-transport-trait' Cargo.toml
# Look at all available workspace members
ls -la rs/Repository: moq-dev/moq
Length of output: 2467
🏁 Script executed:
# web-transport-trait is external crate 0.3.4
# Let's look at what the actual stream implementations do in the IETF adapter
cat rs/moq-lite/src/ietf/adapter.rs | head -100
# Look specifically at the Real and Virtual stream definitions
rg -n 'Self::Real|Self::Virtual' rs/moq-lite/src/ietf/adapter.rs | head -20
# Check if there's documentation about what set_priority does
rg -n 'set_priority|priority' rs/moq-lite/src/ietf/adapter.rs | head -30Repository: moq-dev/moq
Length of output: 3430
🌐 Web query:
web-transport-trait 0.3.4 SendStream set_priority signature
💡 Result:
In web-transport-trait 0.3.4, the SendStream::set_priority method has the signature: fn set_priority(&mut self, order: u8) [1]. It takes a mutable reference to the stream and an 8-bit priority value (order: u8) [1].
Citations:
🏁 Script executed:
# Check the priority type - what is priority.current()?
rg -n 'priority\.current\(\)|struct.*riority|enum.*riority' --type rust rs/moq-lite/src/lite/publisher.rs -B 5 -A 2
# Look for priority definition
rg -n 'let priority.*=|priority:' --type rust rs/moq-lite/src/lite/publisher.rs | head -20Repository: moq-dev/moq
Length of output: 877
🏁 Script executed:
# Find PriorityQueue and PriorityHandle definitions
rg -n 'struct PriorityQueue|struct PriorityHandle|impl PriorityQueue|impl PriorityHandle' --type rust
# Look for their current() method
rg -n 'fn current\(' --type rust -A 2Repository: moq-dev/moq
Length of output: 479
Fix the priority mapping to avoid u8 saturation collapsing the range.
idx.saturating_mul(64) on u8 produces only five distinct values: 0, 64, 128, 192, and 255. This means indices 4..=255 all map to 255, completely defeating the stated goal of "widening the priority range so quinn can effectively differentiate streams." Since set_priority accepts only u8, a different monotonic remap is needed—for example, scaling the u8 value across the full 0–255 range rather than multiplying by a constant.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-lite/src/lite/publisher.rs` around lines 374 - 377, The priority
mapping uses idx.saturating_mul(64) which collapses many inputs to 255; update
the closure quinn_pri and its use (where initial_priority is read and passed to
stream.set_priority) to perform a monotonic scaling across the full 0..=255
range instead of a fixed multiply so each distinct u8 maps to a distinct or
properly spaced u8 value (e.g., compute using wider intermediate integer math
and scale proportionally: cast idx to a larger integer, scale to 0..255, clamp
to u8, then call stream.set_priority with that result).
Closes #1370
Problem
When subscription priority changes via SUBSCRIBE_UPDATE, only new groups get the updated priority. In-flight groups continue at the old priority, causing stale scheduling under bandwidth constraints.
Changes
subscription_idto PriorityItem. Addupdate_subscription(id, new_priority)to re-sort existing items. Widen quinn priority range (index * 64) for effective differentiation.run_subscribe_updates()that listens for SubscribeUpdate messages and callspriority.update_subscription()to update in-flight groups immediately.Testing
Verified with bandwidth throttling (tc 2000kbps): focus camera maintains quality while background cameras degrade. Priority switch is instant for both new and in-flight groups.
Context
Built for moq-multicam where camera switching under bandwidth constraints requires immediate priority propagation to all groups, not just future ones.
Ref: draft-ietf-moq-transport-13 Section 6.1