Add stats aggregation and publishing for moq-lite sessions#1389
Add stats aggregation and publishing for moq-lite sessions#1389kixelated wants to merge 2 commits into
Conversation
Add a self-contained Stats aggregator in moq-lite that publishes .stats/<level>/<name> broadcasts on a caller-provided OriginProducer, plus the relay wiring to attach one per session. Each stats broadcast carries two tracks (publisher and subscriber) with disjoint cumulative counters (broadcasts, subscriptions, bytes, frames, groups). A relay populates both; one-sided clients populate one. Per-level snapshot tasks spawn on first subscribe and exit when both roles are idle, so idle deployments do zero scheduled work. Counter fanout to multiple levels is per-event via Arc<[Arc<Level>]>; no per-flush walks. Also adds Path::is_hidden(), filters hidden paths from OriginConsumer::announced(), and adds announced_hidden() / try_announced_hidden() so .stats/* broadcasts don't leak into ordinary discovery while still being addressable. The stats module returns empty handles for hidden paths so forwarding stats traffic doesn't feed back into the aggregator. Relay gains a [stats] config section (name, levels). The cluster constructs the Stats handle in Cluster::new and threads it through with_stats() on incoming connections, websocket sessions, and outbound cluster dials. https://claude.ai/code/session_01JeAPEjPTtCnAqS3hbBtqHP
WalkthroughThis PR introduces comprehensive session-level statistics aggregation across moq-lite and moq-relay. The changes establish a hierarchical stats infrastructure: paths are tagged as hidden (infrastructure-style with 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (2)
rs/moq-lite/src/client.rs (1)
61-80: 💤 Low valueMissing TODO comment for the IETF Draft-17 stats gap.
rs/moq-lite/src/server.rshas// TODO: ietf code path does not yet record stats.on the analogous Draft-17 early-return path, butclient.rshas no equivalent note. Aligning them makes the known limitation consistently visible in both entry points.🔧 Proposed addition
tracing::debug!(version = ?v, "connected"); + // TODO: ietf code path does not yet record stats. return Ok(Session::new(session, v, None));🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-lite/src/client.rs` around lines 61 - 80, Add the same TODO comment about missing stats for the IETF Draft-17 code path to the early-return branch in client.rs where ALPN_17 is matched; specifically, update the block that selects Version::Ietf(ietf::Version::Draft17), calls ietf::start(..., ietf::Version::Draft17) and returns Ok(Session::new(...)) to include a short "// TODO: ietf code path does not yet record stats." note so the limitation is visible alongside the analogous comment in server.rs.rs/moq-lite/src/stats.rs (1)
123-128:entriesgrows monotonically — consider documenting the memory profile.
StatsInner.entriesis never pruned: every unique broadcast prefix that passes throughbroadcast()creates aLevelthat lives for the lifetime of theStatshandle. For a long-running relay seeing high broadcast-path cardinality this will accumulate indefinitely. The memory perLevelis small (a fewAtomicU64s + strings), but it is worth calling out in the struct orStats::newdoc so operators can size thelevelsconfig appropriately.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-lite/src/stats.rs` around lines 123 - 128, StatsInner.entries (the Lock<HashMap<PathOwned, Arc<Level>>>) grows monotonically because broadcast() inserts a Level per unique prefix and never prunes them; add a short doc comment on StatsInner or the public Stats::new describing this memory behavior, the expected per-Level footprint, and advising operators to size the levels config appropriately for long‑running relays with high broadcast-path cardinality so they understand the potential unbounded growth. Mention the relevant symbols: StatsInner, entries, Level, broadcast(), and Stats::new.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rs/moq-lite/src/lite/publisher.rs`:
- Around line 281-284: recv_subscribe currently creates a temporary
PublisherStats via stats.as_ref().map(|s|
s.broadcast(&absolute).publisher().track(&track)) which increments then
immediately decrements the broadcast counters; instead create and retain the
PublisherStats guard alongside the PublisherTrack so it isn't dropped
immediately. Locate recv_subscribe and change the mapping to first call
stats.as_ref().map(|s| s.broadcast(&absolute).publisher()) to obtain a
PublisherStats guard, then call guard.track(&track) and store the guard together
with the PublisherTrack (e.g., return or insert a tuple/struct (PublisherTrack,
PublisherStats) or push the guard into the existing stats_guards collection used
by run_announce) so the guard's lifetime matches the real announce guard rather
than being a temporary. Ensure symbols: recv_subscribe, PublisherStats,
publisher(), track(), run_announce, stats_guards are updated accordingly.
In `@rs/moq-lite/src/lite/subscriber.rs`:
- Around line 290-304: The temporary SubscriberStats created in run_subscribe by
calling stats.broadcast(&path).subscriber() is dropped immediately (incrementing
broadcasts_closed) and thus creates spurious open/close events; fix by retaining
the guard for the broadcast's lifetime (like run_announce_prefix does) — obtain
the guard from self.stats.as_ref().map(|s| s.broadcast(&path).subscriber()),
store it in the existing self.stats_guards container (or attach it to the
TrackEntry) so it is not dropped after creating track_stats, and then use the
guard when deriving track_stats (e.g., clone or reference the guard when calling
.track(&track.name)) so the broadcasts counters reflect the true lifetime.
In `@rs/moq-lite/src/model/origin.rs`:
- Around line 785-793: Update the doc for try_announced_hidden to include the
same caveat as try_announced: state that returning None means "no update
available" (not that the consumer has closed), and add a pointer sentence like
"Use Self::try_announced to receive visible paths instead" so callers know the
counterpart method for non-hidden paths; reference the try_announced_hidden and
try_announced symbols when editing the documentation.
In `@rs/moq-lite/src/stats.rs`:
- Around line 538-546: The function level_keys currently treats levels == 0 the
same as levels == 1 (returning root), which is surprising; update level_keys so
that when levels == 0 it returns an empty Vec (no stats) while keeping the
existing broadcast.is_empty() check returning vec![PathOwned::default()] for
empty broadcasts; modify the early conditional in level_keys (and any related
logic using broadcast/PathOwned) accordingly and add a unit test asserting
levels == 0 yields an empty Vec to prevent regression.
In `@rs/moq-relay/src/config.rs`:
- Around line 43-46: Add a post-load validation in Config::load() that checks if
stats.name is set (non-empty/Some) and stats.levels == 0, and if so return an
error (or otherwise fail early) so enabled stats cannot have zero buckets;
locate the check around the deserialization result in Config::load(), reference
the StatsConfig fields stats.name and stats.levels, and ensure the guard
produces a clear error message indicating that stats.levels must be >= 1 when
stats.name is configured.
---
Nitpick comments:
In `@rs/moq-lite/src/client.rs`:
- Around line 61-80: Add the same TODO comment about missing stats for the IETF
Draft-17 code path to the early-return branch in client.rs where ALPN_17 is
matched; specifically, update the block that selects
Version::Ietf(ietf::Version::Draft17), calls ietf::start(...,
ietf::Version::Draft17) and returns Ok(Session::new(...)) to include a short "//
TODO: ietf code path does not yet record stats." note so the limitation is
visible alongside the analogous comment in server.rs.
In `@rs/moq-lite/src/stats.rs`:
- Around line 123-128: StatsInner.entries (the Lock<HashMap<PathOwned,
Arc<Level>>>) grows monotonically because broadcast() inserts a Level per unique
prefix and never prunes them; add a short doc comment on StatsInner or the
public Stats::new describing this memory behavior, the expected per-Level
footprint, and advising operators to size the levels config appropriately for
long‑running relays with high broadcast-path cardinality so they understand the
potential unbounded growth. Mention the relevant symbols: StatsInner, entries,
Level, broadcast(), and Stats::new.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 06d888d0-654f-4a85-ab59-30ba24894540
📒 Files selected for processing (18)
rs/moq-lite/src/client.rsrs/moq-lite/src/lib.rsrs/moq-lite/src/lite/publisher.rsrs/moq-lite/src/lite/session.rsrs/moq-lite/src/lite/subscriber.rsrs/moq-lite/src/model/origin.rsrs/moq-lite/src/path.rsrs/moq-lite/src/server.rsrs/moq-lite/src/stats.rsrs/moq-native/src/client.rsrs/moq-native/src/server.rsrs/moq-relay/src/cluster.rsrs/moq-relay/src/config.rsrs/moq-relay/src/connection.rsrs/moq-relay/src/lib.rsrs/moq-relay/src/main.rsrs/moq-relay/src/stats.rsrs/moq-relay/src/websocket.rs
| async fn run_subscribe(&mut self, id: u64, path: PathOwned, broadcast: BroadcastDynamic, mut track: TrackProducer) { | ||
| self.subscribes.lock().insert(id, track.clone()); | ||
| // Subscriber-side track stats; counters bump as frames/bytes/groups arrive. | ||
| // Drop on subscription end records `subscriber.subscriptions_closed`. | ||
| let track_stats = self | ||
| .stats | ||
| .as_ref() | ||
| .map(|stats| Arc::new(stats.broadcast(&path).subscriber().track(&track.name))); | ||
|
|
||
| self.subscribes.lock().insert( | ||
| id, | ||
| TrackEntry { | ||
| producer: track.clone(), | ||
| stats: track_stats.clone(), | ||
| }, | ||
| ); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Find the stats module and understand the structure
find . -type f -name "*.rs" | grep -E "(stats|subscriber)" | head -20Repository: moq-dev/moq
Length of output: 243
🏁 Script executed:
# Look at the subscriber.rs file to understand the context
cd rs/moq-lite/src/lite && wc -l subscriber.rsRepository: moq-dev/moq
Length of output: 73
🏁 Script executed:
# Get the exact location of SubscriberStats and PublisherStats types
rg "struct SubscriberStats|struct PublisherStats" --type rs -A 3Repository: moq-dev/moq
Length of output: 82
🏁 Script executed:
# Find where track() method is defined for SubscriberStats
rg "fn track\(" --type rs -B 2 -A 8 | grep -A 10 "SubscriberStats"Repository: moq-dev/moq
Length of output: 82
🏁 Script executed:
# Look at the related comment in run_announce_prefix mentioned in the review
sed -n '134,150p' rs/moq-lite/src/lite/subscriber.rsRepository: moq-dev/moq
Length of output: 857
🏁 Script executed:
# Read the exact lines in question (290-304)
sed -n '290,304p' rs/moq-lite/src/lite/subscriber.rsRepository: moq-dev/moq
Length of output: 585
🏁 Script executed:
# Find SubscriberStats definition
rg "struct SubscriberStats" -A 10Repository: moq-dev/moq
Length of output: 671
🏁 Script executed:
# Find the track() method implementation
rg "impl.*SubscriberStats" -A 50 | head -100Repository: moq-dev/moq
Length of output: 3219
🏁 Script executed:
# Look at publisher.rs to see the similar issue mentioned
find . -name "publisher.rs" | xargs cat | grep -A 30 "recv_subscribe"Repository: moq-dev/moq
Length of output: 2029
🏁 Script executed:
# Find the broadcast() method and understand what it returns
rg "fn broadcast\(" -A 15 | head -50Repository: moq-dev/moq
Length of output: 1851
🏁 Script executed:
# Check SubscriberStats definition more completely, including Drop implementation
sed -n '/^pub struct SubscriberStats/,/^impl Drop for SubscriberStats/p' rs/moq-lite/src/stats.rs | head -40Repository: moq-dev/moq
Length of output: 490
🏁 Script executed:
# Look at the full Drop impl for SubscriberStats
sed -n '/^impl Drop for SubscriberStats/,/^}/p' rs/moq-lite/src/stats.rsRepository: moq-dev/moq
Length of output: 225
🏁 Script executed:
# Check what stats.broadcast() returns - find BroadcastStats definition
rg "pub struct BroadcastStats" -A 10Repository: moq-dev/moq
Length of output: 619
🏁 Script executed:
# Find the subscriber() method on BroadcastStats
rg "fn subscriber\(" -A 10Repository: moq-dev/moq
Length of output: 1463
🏁 Script executed:
# Look for any broadcasts_opened increments
rg "broadcasts_opened" -B 3 -A 3Repository: moq-dev/moq
Length of output: 37
🏁 Script executed:
# Check if there's a Drop impl for SubscriberTrack
rg "impl Drop for SubscriberTrack" -A 10Repository: moq-dev/moq
Length of output: 597
The temporary SubscriberStats guard creates spurious broadcast counter events.
In run_subscribe (lines 293–296), stats.broadcast(&path).subscriber() returns a SubscriberStats guard that drops immediately after the .track() call. The SubscriberStats::Drop impl increments broadcasts_closed, while its creation increments broadcasts. This means every subscription creates a phantom open-and-close pair for the broadcast counter, polluting the metric.
Compare this to run_announce_prefix (lines 146–149), where the SubscriberStats guard is properly stored in stats_guards and kept alive for the broadcast's true lifetime. The comment there explicitly states: "Dropping the guard records subscriber.broadcasts_closed."
With one broadcast and N subscriptions, the broadcasts counter will read 1 + N instead of 1, and broadcasts_closed will fire N times while the broadcast is still active.
Store the temporary guard or refactor to avoid the temporary SubscriberStats creation.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-lite/src/lite/subscriber.rs` around lines 290 - 304, The temporary
SubscriberStats created in run_subscribe by calling
stats.broadcast(&path).subscriber() is dropped immediately (incrementing
broadcasts_closed) and thus creates spurious open/close events; fix by retaining
the guard for the broadcast's lifetime (like run_announce_prefix does) — obtain
the guard from self.stats.as_ref().map(|s| s.broadcast(&path).subscriber()),
store it in the existing self.stats_guards container (or attach it to the
TrackEntry) so it is not dropped after creating track_stats, and then use the
guard when deriving track_stats (e.g., clone or reference the guard when calling
.track(&track.name)) so the broadcasts counters reflect the true lifetime.
| /// Like [`Self::try_announced`] but returns only paths where any segment starts with `.`. | ||
| pub fn try_announced_hidden(&mut self) -> Option<OriginAnnounce> { | ||
| loop { | ||
| let next = self.updates.try_recv().ok()?; | ||
| if next.0.is_hidden() { | ||
| return Some(next); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
try_announced_hidden is missing the try_announced doc caveats.
The try_announced doc explicitly states that None means "no update available", not "consumer closed". try_announced_hidden omits this, and also lacks the counterpart pointer ("Use Self::try_announced to receive visible paths instead").
📝 Suggested doc update
- /// Like [`Self::try_announced`] but returns only paths where any segment starts with `.`.
+ /// Like [`Self::try_announced`] but returns only paths where any segment starts with `.`.
+ ///
+ /// Returns `None` if there is no hidden update available; NOT because the consumer is closed.
+ /// You have to use `is_closed` to check if the consumer is closed.
+ ///
+ /// Silently consumes and discards any visible-path updates encountered while scanning.
+ /// Use [`Self::try_announced`] to receive visible paths instead.
+ /// Do not mix calls to both variants on the same consumer instance.
pub fn try_announced_hidden(&mut self) -> Option<OriginAnnounce> {🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-lite/src/model/origin.rs` around lines 785 - 793, Update the doc for
try_announced_hidden to include the same caveat as try_announced: state that
returning None means "no update available" (not that the consumer has closed),
and add a pointer sentence like "Use Self::try_announced to receive visible
paths instead" so callers know the counterpart method for non-hidden paths;
reference the try_announced_hidden and try_announced symbols when editing the
documentation.
| fn level_keys(broadcast: &Path, levels: u32) -> Vec<PathOwned> { | ||
| if levels == 0 || broadcast.is_empty() { | ||
| return vec![PathOwned::default()]; | ||
| } | ||
|
|
||
| let segs: Vec<&str> = broadcast.as_str().split('/').collect(); | ||
| let max = (levels as usize).min(segs.len()); | ||
| (0..max).map(|i| PathOwned::from(segs[..i].join("/"))).collect() | ||
| } |
There was a problem hiding this comment.
levels == 0 silently falls through to root-level aggregation.
The levels == 0 branch returns vec![PathOwned::default()] (root), identical to levels == 1. A relay operator setting levels = 0 expecting "no stats" would still get a root-level stats broadcast. No test covers this case. Either document the equivalence or treat 0 as truly empty:
🐛 Proposed fix
fn level_keys(broadcast: &Path, levels: u32) -> Vec<PathOwned> {
- if levels == 0 || broadcast.is_empty() {
- return vec![PathOwned::default()];
- }
+ if broadcast.is_empty() {
+ return if levels == 0 { vec![] } else { vec![PathOwned::default()] };
+ }
+ if levels == 0 {
+ return vec![];
+ }Alternatively, keep the current behavior and document it:
/// * `levels` controls how many path-prefix levels stats are bucketed into. A value
-/// of `1` produces only the root bucket.
+/// of `0` or `1` both produce only the root bucket.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| fn level_keys(broadcast: &Path, levels: u32) -> Vec<PathOwned> { | |
| if levels == 0 || broadcast.is_empty() { | |
| return vec![PathOwned::default()]; | |
| } | |
| let segs: Vec<&str> = broadcast.as_str().split('/').collect(); | |
| let max = (levels as usize).min(segs.len()); | |
| (0..max).map(|i| PathOwned::from(segs[..i].join("/"))).collect() | |
| } | |
| fn level_keys(broadcast: &Path, levels: u32) -> Vec<PathOwned> { | |
| if broadcast.is_empty() { | |
| return if levels == 0 { vec![] } else { vec![PathOwned::default()] }; | |
| } | |
| if levels == 0 { | |
| return vec![]; | |
| } | |
| let segs: Vec<&str> = broadcast.as_str().split('/').collect(); | |
| let max = (levels as usize).min(segs.len()); | |
| (0..max).map(|i| PathOwned::from(segs[..i].join("/"))).collect() | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-lite/src/stats.rs` around lines 538 - 546, The function level_keys
currently treats levels == 0 the same as levels == 1 (returning root), which is
surprising; update level_keys so that when levels == 0 it returns an empty Vec
(no stats) while keeping the existing broadcast.is_empty() check returning
vec![PathOwned::default()] for empty broadcasts; modify the early conditional in
level_keys (and any related logic using broadcast/PathOwned) accordingly and add
a unit test asserting levels == 0 yields an empty Vec to prevent regression.
| /// Stats publishing configuration. Disabled when `stats.name` is unset. | ||
| #[command(flatten)] | ||
| #[serde(default)] | ||
| pub stats: StatsConfig, |
There was a problem hiding this comment.
Validate stats.levels when stats are enabled.
Line 46 accepts deserialized values without enforcing a minimum. If stats.name is set and stats.levels == 0, stats can appear enabled but produce no level buckets. Please add a post-load guard in Config::load().
Suggested guard in Config::load()
pub fn load() -> anyhow::Result<Self> {
// Parse just the CLI arguments initially.
let mut config = Config::parse();
// If a file is provided, load it and merge the CLI arguments.
if let Some(file) = config.file {
config = toml::from_str(&std::fs::read_to_string(file)?)?;
config.update_from(std::env::args());
}
+
+ if config.stats.name.is_some() {
+ anyhow::ensure!(config.stats.levels > 0, "stats.levels must be >= 1 when stats are enabled");
+ }
config.log.init();
tracing::trace!(?config, "final config");
Ok(config)
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-relay/src/config.rs` around lines 43 - 46, Add a post-load validation
in Config::load() that checks if stats.name is set (non-empty/Some) and
stats.levels == 0, and if so return an error (or otherwise fail early) so
enabled stats cannot have zero buckets; locate the check around the
deserialization result in Config::load(), reference the StatsConfig fields
stats.name and stats.levels, and ensure the guard produces a clear error message
indicating that stats.levels must be >= 1 when stats.name is configured.
Stats broadcasts now carry four tracks per level instead of two: publisher and publisher_internal for egress, subscriber and subscriber_internal for ingress. Each pair has its own atomic counter set so a billing service can rate-differentiate between intra-cluster and customer traffic. A `Stats` handle carries a tier flag set via `Stats::external` / `Stats::internal` / `Stats::tier(bool)`. All four tiers share level state and a single per-level snapshot task; the task ticks only while at least one of the four roles has an active subscription. The relay decides the tier per session: mTLS-authenticated peers (connection.rs and websocket.rs) and outbound cluster dials all flag internal; everything else stays external. https://claude.ai/code/session_01JeAPEjPTtCnAqS3hbBtqHP
There was a problem hiding this comment.
♻️ Duplicate comments (1)
rs/moq-lite/src/stats.rs (1)
174-177:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winClarify or fix
levels = 0semantics.Right now
level_keys(..., 0)still returns the root bucket, so a config oflevels = 0continues to publish.stats/<name>for non-empty paths even though the public docs only describe1as the root-only case. Either make0return no keys, or document0 == 1and add a regression test so operators do not read0as "disabled".Suggested change
fn level_keys(broadcast: &Path, levels: u32) -> Vec<PathOwned> { - if levels == 0 || broadcast.is_empty() { + if broadcast.is_empty() { return vec![PathOwned::default()]; } + if levels == 0 { + return Vec::new(); + }#[test] fn level_keys_basic() { let key = |s: &str, n: u32| { level_keys(&Path::new(s), n) .into_iter() .map(|p| p.as_str().to_string()) .collect::<Vec<_>>() }; + assert!(key("demo/bbb", 0).is_empty()); assert_eq!(key("demo/bbb", 1), vec![""]);As per coding guidelines, "Comments must reflect the current state of the code, not its history. Don't write historical context; describe what the code does today or delete the comment."
Also applies to: 676-679
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-lite/src/stats.rs` around lines 174 - 177, The comment and behavior are inconsistent for levels = 0; change the level_keys function so that when levels == 0 it returns no keys (i.e., an empty vec/iterator) instead of producing the root bucket, update the doc comment above level_keys (and the similar block at 676-679) to state explicitly that 0 yields no keys, and add a regression test (e.g., test_level_keys_zero_returns_none) verifying that level_keys(path, 0) -> empty and level_keys("", 0) behaves as expected to prevent treating 0 as “disabled” accidentally.
🧹 Nitpick comments (1)
rs/moq-lite/src/stats.rs (1)
576-591: ⚡ Quick winUse one
ts_msper tick for all four role frames.This loop emits four frames for one logical snapshot, but each
write_snapshot()call computes its own timestamp. That makes exact joins acrosspublisher,publisher_internal,subscriber, andsubscriber_internalharder downstream for no real benefit.Suggested change
loop { tick.tick().await; @@ + let ts_ms = now_ms(); + // Always emit a snapshot for every track. Idle roles see their counters // held steady; that itself is informative for a billing service. - write_snapshot(&mut pub_ext, "publisher", &level, level.publisher_external.snapshot()); + write_snapshot( + &mut pub_ext, + "publisher", + &level, + ts_ms, + level.publisher_external.snapshot(), + ); write_snapshot( &mut pub_int, "publisher_internal", &level, + ts_ms, level.publisher_internal.snapshot(), ); - write_snapshot(&mut sub_ext, "subscriber", &level, level.subscriber_external.snapshot()); + write_snapshot( + &mut sub_ext, + "subscriber", + &level, + ts_ms, + level.subscriber_external.snapshot(), + ); write_snapshot( &mut sub_int, "subscriber_internal", &level, + ts_ms, level.subscriber_internal.snapshot(), ); } } @@ -fn write_snapshot(track: &mut crate::TrackProducer, role: &str, level: &Level, snap: RoleSnapshot) { +fn write_snapshot( + track: &mut crate::TrackProducer, + role: &str, + level: &Level, + ts_ms: u64, + snap: RoleSnapshot, +) { @@ - now_ms(), + ts_ms,Also applies to: 603-617
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-lite/src/stats.rs` around lines 576 - 591, The four write_snapshot() calls compute their own timestamps causing mismatched ts_ms across role frames; compute a single timestamp once (e.g., let ts_ms = now_ms() or call the existing timestamp helper) before the four frames and pass that same ts_ms into each write_snapshot invocation (and repeat the same change for the later block around the other four calls at 603-617), ensuring the snapshot values from level.publisher_external, level.publisher_internal, level.subscriber_external, and level.subscriber_internal all share the same ts_ms for a single logical tick.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Duplicate comments:
In `@rs/moq-lite/src/stats.rs`:
- Around line 174-177: The comment and behavior are inconsistent for levels = 0;
change the level_keys function so that when levels == 0 it returns no keys
(i.e., an empty vec/iterator) instead of producing the root bucket, update the
doc comment above level_keys (and the similar block at 676-679) to state
explicitly that 0 yields no keys, and add a regression test (e.g.,
test_level_keys_zero_returns_none) verifying that level_keys(path, 0) -> empty
and level_keys("", 0) behaves as expected to prevent treating 0 as “disabled”
accidentally.
---
Nitpick comments:
In `@rs/moq-lite/src/stats.rs`:
- Around line 576-591: The four write_snapshot() calls compute their own
timestamps causing mismatched ts_ms across role frames; compute a single
timestamp once (e.g., let ts_ms = now_ms() or call the existing timestamp
helper) before the four frames and pass that same ts_ms into each write_snapshot
invocation (and repeat the same change for the later block around the other four
calls at 603-617), ensuring the snapshot values from level.publisher_external,
level.publisher_internal, level.subscriber_external, and
level.subscriber_internal all share the same ts_ms for a single logical tick.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: c2c57b10-1417-4b55-9eca-a3cc6420ccd2
📒 Files selected for processing (4)
rs/moq-lite/src/stats.rsrs/moq-relay/src/cluster.rsrs/moq-relay/src/connection.rsrs/moq-relay/src/websocket.rs
🚧 Files skipped from review as they are similar to previous changes (2)
- rs/moq-relay/src/cluster.rs
- rs/moq-relay/src/websocket.rs
Summary
This PR introduces a comprehensive stats aggregation system for moq-lite sessions that tracks and publishes per-broadcast and per-prefix-level metrics. The system publishes statistics as
.stats/<level>/<name>broadcasts on a configurable origin, with separate counters for publisher (egress) and subscriber (ingress) roles.Key Changes
New
Statsmodule (rs/moq-lite/src/stats.rs): Core stats aggregator that:.) as no-opsPath visibility filtering: Added
Path::is_hidden()method to identify infrastructure paths (e.g.,.stats/...) and updatedOriginConsumerto:announced()/try_announced()methodsannounced_hidden()/try_announced_hidden()methods for infrastructure broadcastsannounced_broadcast()for both visible and hidden pathsSession integration: Updated
PublisherandSubscriberin lite protocol to:Statshandle during initializationRelay configuration (
rs/moq-relay/src/stats.rs): NewStatsConfigstruct with:name: Identifier for stats broadcasts (disabled when unset)levels: Configurable path-prefix bucketing depthCluster and connection updates: Integrated stats aggregator into relay's cluster and connection handling to attach stats to all sessions
API surface: Exposed stats through
ClientandServerbuilders withwith_stats()methodsNotable Implementation Details
Relaxedordering for minimal synchronization overheadtokio::time::intervalwithDelaybehavior to handle missed ticks gracefullyPublisherStats,SubscriberStats,PublisherTrack,SubscriberTrack) ensure proper counter lifecycle managementhttps://claude.ai/code/session_01JeAPEjPTtCnAqS3hbBtqHP