Skip to content

Add stats aggregation and publishing for moq-lite sessions#1389

Open
kixelated wants to merge 2 commits into
mainfrom
claude/plan-relay-stats-NPK7p
Open

Add stats aggregation and publishing for moq-lite sessions#1389
kixelated wants to merge 2 commits into
mainfrom
claude/plan-relay-stats-NPK7p

Conversation

@kixelated
Copy link
Copy Markdown
Collaborator

Summary

This PR introduces a comprehensive stats aggregation system for moq-lite sessions that tracks and publishes per-broadcast and per-prefix-level metrics. The system publishes statistics as .stats/<level>/<name> broadcasts on a configurable origin, with separate counters for publisher (egress) and subscriber (ingress) roles.

Key Changes

  • New Stats module (rs/moq-lite/src/stats.rs): Core stats aggregator that:

    • Tracks cumulative counters for both publisher and subscriber roles (broadcasts, subscriptions, bytes, frames, groups)
    • Aggregates metrics across configurable path-prefix levels
    • Spawns per-level snapshot tasks that publish JSON-formatted stats every second
    • Automatically manages task lifecycle (spawns on first subscription, exits when idle)
    • Prevents feedback loops by treating hidden paths (segments starting with .) as no-ops
  • Path visibility filtering: Added Path::is_hidden() method to identify infrastructure paths (e.g., .stats/...) and updated OriginConsumer to:

    • Filter hidden paths from announced() / try_announced() methods
    • Provide separate announced_hidden() / try_announced_hidden() methods for infrastructure broadcasts
    • Support announced_broadcast() for both visible and hidden paths
  • Session integration: Updated Publisher and Subscriber in lite protocol to:

    • Accept optional Stats handle during initialization
    • Create per-broadcast stats guards that track lifecycle events
    • Bump counters for frames, bytes, and groups as data flows through tracks
    • Properly manage guard lifecycle to record open/close events
  • Relay configuration (rs/moq-relay/src/stats.rs): New StatsConfig struct with:

    • name: Identifier for stats broadcasts (disabled when unset)
    • levels: Configurable path-prefix bucketing depth
  • Cluster and connection updates: Integrated stats aggregator into relay's cluster and connection handling to attach stats to all sessions

  • API surface: Exposed stats through Client and Server builders with with_stats() methods

Notable Implementation Details

  • Stats use atomic counters with Relaxed ordering for minimal synchronization overhead
  • Snapshot task uses tokio::time::interval with Delay behavior to handle missed ticks gracefully
  • JSON serialization is hand-rolled to keep serde optional in moq-lite
  • Level keys are computed as path prefixes (0 to N-1 segments, excluding the full path itself)
  • RAII guards (PublisherStats, SubscriberStats, PublisherTrack, SubscriberTrack) ensure proper counter lifecycle management
  • Hidden path detection prevents stats infrastructure from generating its own stats traffic

https://claude.ai/code/session_01JeAPEjPTtCnAqS3hbBtqHP

Add a self-contained Stats aggregator in moq-lite that publishes
.stats/<level>/<name> broadcasts on a caller-provided OriginProducer,
plus the relay wiring to attach one per session.

Each stats broadcast carries two tracks (publisher and subscriber) with
disjoint cumulative counters (broadcasts, subscriptions, bytes, frames,
groups). A relay populates both; one-sided clients populate one. Per-level
snapshot tasks spawn on first subscribe and exit when both roles are idle,
so idle deployments do zero scheduled work. Counter fanout to multiple
levels is per-event via Arc<[Arc<Level>]>; no per-flush walks.

Also adds Path::is_hidden(), filters hidden paths from
OriginConsumer::announced(), and adds announced_hidden() / try_announced_hidden()
so .stats/* broadcasts don't leak into ordinary discovery while still being
addressable. The stats module returns empty handles for hidden paths so
forwarding stats traffic doesn't feed back into the aggregator.

Relay gains a [stats] config section (name, levels). The cluster constructs
the Stats handle in Cluster::new and threads it through with_stats() on
incoming connections, websocket sessions, and outbound cluster dials.

https://claude.ai/code/session_01JeAPEjPTtCnAqS3hbBtqHP
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 7, 2026

Review Change Stack

Walkthrough

This PR introduces comprehensive session-level statistics aggregation across moq-lite and moq-relay. The changes establish a hierarchical stats infrastructure: paths are tagged as hidden (infrastructure-style with . segments), origin announcements are filtered to separate hidden and visible broadcasts, and a new Stats module provides atomic counters, RAII guards, and periodic snapshot emission per broadcast and role. Both Client and Server gain with_stats() builders that thread stats through session negotiation, and Publisher/Subscriber track per-path and per-track metrics. The relay layer extends this with configuration-driven stats initialization, wiring stats into cluster sessions and WebSocket connections. Hidden paths receive empty stat handles to prevent feedback loops.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The PR title directly and clearly describes the main change: adding stats aggregation and publishing for moq-lite sessions, which is the primary focus of the entire changeset.
Description check ✅ Passed The PR description provides comprehensive and well-organized details about the stats system, covering key changes, implementation details, and relay integration that align with the actual changeset.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch claude/plan-relay-stats-NPK7p
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch claude/plan-relay-stats-NPK7p

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (2)
rs/moq-lite/src/client.rs (1)

61-80: 💤 Low value

Missing TODO comment for the IETF Draft-17 stats gap.

rs/moq-lite/src/server.rs has // TODO: ietf code path does not yet record stats. on the analogous Draft-17 early-return path, but client.rs has no equivalent note. Aligning them makes the known limitation consistently visible in both entry points.

🔧 Proposed addition
 			tracing::debug!(version = ?v, "connected");
+			// TODO: ietf code path does not yet record stats.
 			return Ok(Session::new(session, v, None));
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-lite/src/client.rs` around lines 61 - 80, Add the same TODO comment
about missing stats for the IETF Draft-17 code path to the early-return branch
in client.rs where ALPN_17 is matched; specifically, update the block that
selects Version::Ietf(ietf::Version::Draft17), calls ietf::start(...,
ietf::Version::Draft17) and returns Ok(Session::new(...)) to include a short "//
TODO: ietf code path does not yet record stats." note so the limitation is
visible alongside the analogous comment in server.rs.
rs/moq-lite/src/stats.rs (1)

123-128: entries grows monotonically — consider documenting the memory profile.

StatsInner.entries is never pruned: every unique broadcast prefix that passes through broadcast() creates a Level that lives for the lifetime of the Stats handle. For a long-running relay seeing high broadcast-path cardinality this will accumulate indefinitely. The memory per Level is small (a few AtomicU64s + strings), but it is worth calling out in the struct or Stats::new doc so operators can size the levels config appropriately.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-lite/src/stats.rs` around lines 123 - 128, StatsInner.entries (the
Lock<HashMap<PathOwned, Arc<Level>>>) grows monotonically because broadcast()
inserts a Level per unique prefix and never prunes them; add a short doc comment
on StatsInner or the public Stats::new describing this memory behavior, the
expected per-Level footprint, and advising operators to size the levels config
appropriately for long‑running relays with high broadcast-path cardinality so
they understand the potential unbounded growth. Mention the relevant symbols:
StatsInner, entries, Level, broadcast(), and Stats::new.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@rs/moq-lite/src/lite/publisher.rs`:
- Around line 281-284: recv_subscribe currently creates a temporary
PublisherStats via stats.as_ref().map(|s|
s.broadcast(&absolute).publisher().track(&track)) which increments then
immediately decrements the broadcast counters; instead create and retain the
PublisherStats guard alongside the PublisherTrack so it isn't dropped
immediately. Locate recv_subscribe and change the mapping to first call
stats.as_ref().map(|s| s.broadcast(&absolute).publisher()) to obtain a
PublisherStats guard, then call guard.track(&track) and store the guard together
with the PublisherTrack (e.g., return or insert a tuple/struct (PublisherTrack,
PublisherStats) or push the guard into the existing stats_guards collection used
by run_announce) so the guard's lifetime matches the real announce guard rather
than being a temporary. Ensure symbols: recv_subscribe, PublisherStats,
publisher(), track(), run_announce, stats_guards are updated accordingly.

In `@rs/moq-lite/src/lite/subscriber.rs`:
- Around line 290-304: The temporary SubscriberStats created in run_subscribe by
calling stats.broadcast(&path).subscriber() is dropped immediately (incrementing
broadcasts_closed) and thus creates spurious open/close events; fix by retaining
the guard for the broadcast's lifetime (like run_announce_prefix does) — obtain
the guard from self.stats.as_ref().map(|s| s.broadcast(&path).subscriber()),
store it in the existing self.stats_guards container (or attach it to the
TrackEntry) so it is not dropped after creating track_stats, and then use the
guard when deriving track_stats (e.g., clone or reference the guard when calling
.track(&track.name)) so the broadcasts counters reflect the true lifetime.

In `@rs/moq-lite/src/model/origin.rs`:
- Around line 785-793: Update the doc for try_announced_hidden to include the
same caveat as try_announced: state that returning None means "no update
available" (not that the consumer has closed), and add a pointer sentence like
"Use Self::try_announced to receive visible paths instead" so callers know the
counterpart method for non-hidden paths; reference the try_announced_hidden and
try_announced symbols when editing the documentation.

In `@rs/moq-lite/src/stats.rs`:
- Around line 538-546: The function level_keys currently treats levels == 0 the
same as levels == 1 (returning root), which is surprising; update level_keys so
that when levels == 0 it returns an empty Vec (no stats) while keeping the
existing broadcast.is_empty() check returning vec![PathOwned::default()] for
empty broadcasts; modify the early conditional in level_keys (and any related
logic using broadcast/PathOwned) accordingly and add a unit test asserting
levels == 0 yields an empty Vec to prevent regression.

In `@rs/moq-relay/src/config.rs`:
- Around line 43-46: Add a post-load validation in Config::load() that checks if
stats.name is set (non-empty/Some) and stats.levels == 0, and if so return an
error (or otherwise fail early) so enabled stats cannot have zero buckets;
locate the check around the deserialization result in Config::load(), reference
the StatsConfig fields stats.name and stats.levels, and ensure the guard
produces a clear error message indicating that stats.levels must be >= 1 when
stats.name is configured.

---

Nitpick comments:
In `@rs/moq-lite/src/client.rs`:
- Around line 61-80: Add the same TODO comment about missing stats for the IETF
Draft-17 code path to the early-return branch in client.rs where ALPN_17 is
matched; specifically, update the block that selects
Version::Ietf(ietf::Version::Draft17), calls ietf::start(...,
ietf::Version::Draft17) and returns Ok(Session::new(...)) to include a short "//
TODO: ietf code path does not yet record stats." note so the limitation is
visible alongside the analogous comment in server.rs.

In `@rs/moq-lite/src/stats.rs`:
- Around line 123-128: StatsInner.entries (the Lock<HashMap<PathOwned,
Arc<Level>>>) grows monotonically because broadcast() inserts a Level per unique
prefix and never prunes them; add a short doc comment on StatsInner or the
public Stats::new describing this memory behavior, the expected per-Level
footprint, and advising operators to size the levels config appropriately for
long‑running relays with high broadcast-path cardinality so they understand the
potential unbounded growth. Mention the relevant symbols: StatsInner, entries,
Level, broadcast(), and Stats::new.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 06d888d0-654f-4a85-ab59-30ba24894540

📥 Commits

Reviewing files that changed from the base of the PR and between bdda6bd and 39832ec.

📒 Files selected for processing (18)
  • rs/moq-lite/src/client.rs
  • rs/moq-lite/src/lib.rs
  • rs/moq-lite/src/lite/publisher.rs
  • rs/moq-lite/src/lite/session.rs
  • rs/moq-lite/src/lite/subscriber.rs
  • rs/moq-lite/src/model/origin.rs
  • rs/moq-lite/src/path.rs
  • rs/moq-lite/src/server.rs
  • rs/moq-lite/src/stats.rs
  • rs/moq-native/src/client.rs
  • rs/moq-native/src/server.rs
  • rs/moq-relay/src/cluster.rs
  • rs/moq-relay/src/config.rs
  • rs/moq-relay/src/connection.rs
  • rs/moq-relay/src/lib.rs
  • rs/moq-relay/src/main.rs
  • rs/moq-relay/src/stats.rs
  • rs/moq-relay/src/websocket.rs

Comment thread rs/moq-lite/src/lite/publisher.rs
Comment on lines 290 to +304
async fn run_subscribe(&mut self, id: u64, path: PathOwned, broadcast: BroadcastDynamic, mut track: TrackProducer) {
self.subscribes.lock().insert(id, track.clone());
// Subscriber-side track stats; counters bump as frames/bytes/groups arrive.
// Drop on subscription end records `subscriber.subscriptions_closed`.
let track_stats = self
.stats
.as_ref()
.map(|stats| Arc::new(stats.broadcast(&path).subscriber().track(&track.name)));

self.subscribes.lock().insert(
id,
TrackEntry {
producer: track.clone(),
stats: track_stats.clone(),
},
);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

# Find the stats module and understand the structure
find . -type f -name "*.rs" | grep -E "(stats|subscriber)" | head -20

Repository: moq-dev/moq

Length of output: 243


🏁 Script executed:

# Look at the subscriber.rs file to understand the context
cd rs/moq-lite/src/lite && wc -l subscriber.rs

Repository: moq-dev/moq

Length of output: 73


🏁 Script executed:

# Get the exact location of SubscriberStats and PublisherStats types
rg "struct SubscriberStats|struct PublisherStats" --type rs -A 3

Repository: moq-dev/moq

Length of output: 82


🏁 Script executed:

# Find where track() method is defined for SubscriberStats
rg "fn track\(" --type rs -B 2 -A 8 | grep -A 10 "SubscriberStats"

Repository: moq-dev/moq

Length of output: 82


🏁 Script executed:

# Look at the related comment in run_announce_prefix mentioned in the review
sed -n '134,150p' rs/moq-lite/src/lite/subscriber.rs

Repository: moq-dev/moq

Length of output: 857


🏁 Script executed:

# Read the exact lines in question (290-304)
sed -n '290,304p' rs/moq-lite/src/lite/subscriber.rs

Repository: moq-dev/moq

Length of output: 585


🏁 Script executed:

# Find SubscriberStats definition
rg "struct SubscriberStats" -A 10

Repository: moq-dev/moq

Length of output: 671


🏁 Script executed:

# Find the track() method implementation
rg "impl.*SubscriberStats" -A 50 | head -100

Repository: moq-dev/moq

Length of output: 3219


🏁 Script executed:

# Look at publisher.rs to see the similar issue mentioned
find . -name "publisher.rs" | xargs cat | grep -A 30 "recv_subscribe"

Repository: moq-dev/moq

Length of output: 2029


🏁 Script executed:

# Find the broadcast() method and understand what it returns
rg "fn broadcast\(" -A 15 | head -50

Repository: moq-dev/moq

Length of output: 1851


🏁 Script executed:

# Check SubscriberStats definition more completely, including Drop implementation
sed -n '/^pub struct SubscriberStats/,/^impl Drop for SubscriberStats/p' rs/moq-lite/src/stats.rs | head -40

Repository: moq-dev/moq

Length of output: 490


🏁 Script executed:

# Look at the full Drop impl for SubscriberStats
sed -n '/^impl Drop for SubscriberStats/,/^}/p' rs/moq-lite/src/stats.rs

Repository: moq-dev/moq

Length of output: 225


🏁 Script executed:

# Check what stats.broadcast() returns - find BroadcastStats definition
rg "pub struct BroadcastStats" -A 10

Repository: moq-dev/moq

Length of output: 619


🏁 Script executed:

# Find the subscriber() method on BroadcastStats
rg "fn subscriber\(" -A 10

Repository: moq-dev/moq

Length of output: 1463


🏁 Script executed:

# Look for any broadcasts_opened increments
rg "broadcasts_opened" -B 3 -A 3

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

# Check if there's a Drop impl for SubscriberTrack
rg "impl Drop for SubscriberTrack" -A 10

Repository: moq-dev/moq

Length of output: 597


The temporary SubscriberStats guard creates spurious broadcast counter events.

In run_subscribe (lines 293–296), stats.broadcast(&path).subscriber() returns a SubscriberStats guard that drops immediately after the .track() call. The SubscriberStats::Drop impl increments broadcasts_closed, while its creation increments broadcasts. This means every subscription creates a phantom open-and-close pair for the broadcast counter, polluting the metric.

Compare this to run_announce_prefix (lines 146–149), where the SubscriberStats guard is properly stored in stats_guards and kept alive for the broadcast's true lifetime. The comment there explicitly states: "Dropping the guard records subscriber.broadcasts_closed."

With one broadcast and N subscriptions, the broadcasts counter will read 1 + N instead of 1, and broadcasts_closed will fire N times while the broadcast is still active.

Store the temporary guard or refactor to avoid the temporary SubscriberStats creation.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-lite/src/lite/subscriber.rs` around lines 290 - 304, The temporary
SubscriberStats created in run_subscribe by calling
stats.broadcast(&path).subscriber() is dropped immediately (incrementing
broadcasts_closed) and thus creates spurious open/close events; fix by retaining
the guard for the broadcast's lifetime (like run_announce_prefix does) — obtain
the guard from self.stats.as_ref().map(|s| s.broadcast(&path).subscriber()),
store it in the existing self.stats_guards container (or attach it to the
TrackEntry) so it is not dropped after creating track_stats, and then use the
guard when deriving track_stats (e.g., clone or reference the guard when calling
.track(&track.name)) so the broadcasts counters reflect the true lifetime.

Comment on lines +785 to 793
/// Like [`Self::try_announced`] but returns only paths where any segment starts with `.`.
pub fn try_announced_hidden(&mut self) -> Option<OriginAnnounce> {
loop {
let next = self.updates.try_recv().ok()?;
if next.0.is_hidden() {
return Some(next);
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

try_announced_hidden is missing the try_announced doc caveats.

The try_announced doc explicitly states that None means "no update available", not "consumer closed". try_announced_hidden omits this, and also lacks the counterpart pointer ("Use Self::try_announced to receive visible paths instead").

📝 Suggested doc update
-	/// Like [`Self::try_announced`] but returns only paths where any segment starts with `.`.
+	/// Like [`Self::try_announced`] but returns only paths where any segment starts with `.`.
+	///
+	/// Returns `None` if there is no hidden update available; NOT because the consumer is closed.
+	/// You have to use `is_closed` to check if the consumer is closed.
+	///
+	/// Silently consumes and discards any visible-path updates encountered while scanning.
+	/// Use [`Self::try_announced`] to receive visible paths instead.
+	/// Do not mix calls to both variants on the same consumer instance.
 	pub fn try_announced_hidden(&mut self) -> Option<OriginAnnounce> {
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-lite/src/model/origin.rs` around lines 785 - 793, Update the doc for
try_announced_hidden to include the same caveat as try_announced: state that
returning None means "no update available" (not that the consumer has closed),
and add a pointer sentence like "Use Self::try_announced to receive visible
paths instead" so callers know the counterpart method for non-hidden paths;
reference the try_announced_hidden and try_announced symbols when editing the
documentation.

Comment thread rs/moq-lite/src/stats.rs
Comment on lines +538 to +546
fn level_keys(broadcast: &Path, levels: u32) -> Vec<PathOwned> {
if levels == 0 || broadcast.is_empty() {
return vec![PathOwned::default()];
}

let segs: Vec<&str> = broadcast.as_str().split('/').collect();
let max = (levels as usize).min(segs.len());
(0..max).map(|i| PathOwned::from(segs[..i].join("/"))).collect()
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

levels == 0 silently falls through to root-level aggregation.

The levels == 0 branch returns vec![PathOwned::default()] (root), identical to levels == 1. A relay operator setting levels = 0 expecting "no stats" would still get a root-level stats broadcast. No test covers this case. Either document the equivalence or treat 0 as truly empty:

🐛 Proposed fix
 fn level_keys(broadcast: &Path, levels: u32) -> Vec<PathOwned> {
-	if levels == 0 || broadcast.is_empty() {
-		return vec![PathOwned::default()];
-	}
+	if broadcast.is_empty() {
+		return if levels == 0 { vec![] } else { vec![PathOwned::default()] };
+	}
+	if levels == 0 {
+		return vec![];
+	}

Alternatively, keep the current behavior and document it:

 /// * `levels` controls how many path-prefix levels stats are bucketed into. A value
-///   of `1` produces only the root bucket.
+///   of `0` or `1` both produce only the root bucket.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn level_keys(broadcast: &Path, levels: u32) -> Vec<PathOwned> {
if levels == 0 || broadcast.is_empty() {
return vec![PathOwned::default()];
}
let segs: Vec<&str> = broadcast.as_str().split('/').collect();
let max = (levels as usize).min(segs.len());
(0..max).map(|i| PathOwned::from(segs[..i].join("/"))).collect()
}
fn level_keys(broadcast: &Path, levels: u32) -> Vec<PathOwned> {
if broadcast.is_empty() {
return if levels == 0 { vec![] } else { vec![PathOwned::default()] };
}
if levels == 0 {
return vec![];
}
let segs: Vec<&str> = broadcast.as_str().split('/').collect();
let max = (levels as usize).min(segs.len());
(0..max).map(|i| PathOwned::from(segs[..i].join("/"))).collect()
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-lite/src/stats.rs` around lines 538 - 546, The function level_keys
currently treats levels == 0 the same as levels == 1 (returning root), which is
surprising; update level_keys so that when levels == 0 it returns an empty Vec
(no stats) while keeping the existing broadcast.is_empty() check returning
vec![PathOwned::default()] for empty broadcasts; modify the early conditional in
level_keys (and any related logic using broadcast/PathOwned) accordingly and add
a unit test asserting levels == 0 yields an empty Vec to prevent regression.

Comment on lines +43 to +46
/// Stats publishing configuration. Disabled when `stats.name` is unset.
#[command(flatten)]
#[serde(default)]
pub stats: StatsConfig,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Validate stats.levels when stats are enabled.

Line 46 accepts deserialized values without enforcing a minimum. If stats.name is set and stats.levels == 0, stats can appear enabled but produce no level buckets. Please add a post-load guard in Config::load().

Suggested guard in Config::load()
 pub fn load() -> anyhow::Result<Self> {
     // Parse just the CLI arguments initially.
     let mut config = Config::parse();

     // If a file is provided, load it and merge the CLI arguments.
     if let Some(file) = config.file {
         config = toml::from_str(&std::fs::read_to_string(file)?)?;
         config.update_from(std::env::args());
     }
+
+    if config.stats.name.is_some() {
+        anyhow::ensure!(config.stats.levels > 0, "stats.levels must be >= 1 when stats are enabled");
+    }

     config.log.init();
     tracing::trace!(?config, "final config");

     Ok(config)
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-relay/src/config.rs` around lines 43 - 46, Add a post-load validation
in Config::load() that checks if stats.name is set (non-empty/Some) and
stats.levels == 0, and if so return an error (or otherwise fail early) so
enabled stats cannot have zero buckets; locate the check around the
deserialization result in Config::load(), reference the StatsConfig fields
stats.name and stats.levels, and ensure the guard produces a clear error message
indicating that stats.levels must be >= 1 when stats.name is configured.

Stats broadcasts now carry four tracks per level instead of two:
publisher and publisher_internal for egress, subscriber and
subscriber_internal for ingress. Each pair has its own atomic counter
set so a billing service can rate-differentiate between intra-cluster
and customer traffic.

A `Stats` handle carries a tier flag set via `Stats::external` /
`Stats::internal` / `Stats::tier(bool)`. All four tiers share level
state and a single per-level snapshot task; the task ticks only while
at least one of the four roles has an active subscription.

The relay decides the tier per session: mTLS-authenticated peers
(connection.rs and websocket.rs) and outbound cluster dials all flag
internal; everything else stays external.

https://claude.ai/code/session_01JeAPEjPTtCnAqS3hbBtqHP
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
rs/moq-lite/src/stats.rs (1)

174-177: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Clarify or fix levels = 0 semantics.

Right now level_keys(..., 0) still returns the root bucket, so a config of levels = 0 continues to publish .stats/<name> for non-empty paths even though the public docs only describe 1 as the root-only case. Either make 0 return no keys, or document 0 == 1 and add a regression test so operators do not read 0 as "disabled".

Suggested change
 fn level_keys(broadcast: &Path, levels: u32) -> Vec<PathOwned> {
-	if levels == 0 || broadcast.is_empty() {
+	if broadcast.is_empty() {
 		return vec![PathOwned::default()];
 	}
+	if levels == 0 {
+		return Vec::new();
+	}
 	#[test]
 	fn level_keys_basic() {
 		let key = |s: &str, n: u32| {
 			level_keys(&Path::new(s), n)
 				.into_iter()
 				.map(|p| p.as_str().to_string())
 				.collect::<Vec<_>>()
 		};
 
+		assert!(key("demo/bbb", 0).is_empty());
 		assert_eq!(key("demo/bbb", 1), vec![""]);

As per coding guidelines, "Comments must reflect the current state of the code, not its history. Don't write historical context; describe what the code does today or delete the comment."

Also applies to: 676-679

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-lite/src/stats.rs` around lines 174 - 177, The comment and behavior
are inconsistent for levels = 0; change the level_keys function so that when
levels == 0 it returns no keys (i.e., an empty vec/iterator) instead of
producing the root bucket, update the doc comment above level_keys (and the
similar block at 676-679) to state explicitly that 0 yields no keys, and add a
regression test (e.g., test_level_keys_zero_returns_none) verifying that
level_keys(path, 0) -> empty and level_keys("", 0) behaves as expected to
prevent treating 0 as “disabled” accidentally.
🧹 Nitpick comments (1)
rs/moq-lite/src/stats.rs (1)

576-591: ⚡ Quick win

Use one ts_ms per tick for all four role frames.

This loop emits four frames for one logical snapshot, but each write_snapshot() call computes its own timestamp. That makes exact joins across publisher, publisher_internal, subscriber, and subscriber_internal harder downstream for no real benefit.

Suggested change
 	loop {
 		tick.tick().await;
@@
+		let ts_ms = now_ms();
+
 		// Always emit a snapshot for every track. Idle roles see their counters
 		// held steady; that itself is informative for a billing service.
-		write_snapshot(&mut pub_ext, "publisher", &level, level.publisher_external.snapshot());
+		write_snapshot(
+			&mut pub_ext,
+			"publisher",
+			&level,
+			ts_ms,
+			level.publisher_external.snapshot(),
+		);
 		write_snapshot(
 			&mut pub_int,
 			"publisher_internal",
 			&level,
+			ts_ms,
 			level.publisher_internal.snapshot(),
 		);
-		write_snapshot(&mut sub_ext, "subscriber", &level, level.subscriber_external.snapshot());
+		write_snapshot(
+			&mut sub_ext,
+			"subscriber",
+			&level,
+			ts_ms,
+			level.subscriber_external.snapshot(),
+		);
 		write_snapshot(
 			&mut sub_int,
 			"subscriber_internal",
 			&level,
+			ts_ms,
 			level.subscriber_internal.snapshot(),
 		);
 	}
 }
@@
-fn write_snapshot(track: &mut crate::TrackProducer, role: &str, level: &Level, snap: RoleSnapshot) {
+fn write_snapshot(
+	track: &mut crate::TrackProducer,
+	role: &str,
+	level: &Level,
+	ts_ms: u64,
+	snap: RoleSnapshot,
+) {
@@
-		now_ms(),
+		ts_ms,

Also applies to: 603-617

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-lite/src/stats.rs` around lines 576 - 591, The four write_snapshot()
calls compute their own timestamps causing mismatched ts_ms across role frames;
compute a single timestamp once (e.g., let ts_ms = now_ms() or call the existing
timestamp helper) before the four frames and pass that same ts_ms into each
write_snapshot invocation (and repeat the same change for the later block around
the other four calls at 603-617), ensuring the snapshot values from
level.publisher_external, level.publisher_internal, level.subscriber_external,
and level.subscriber_internal all share the same ts_ms for a single logical
tick.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Duplicate comments:
In `@rs/moq-lite/src/stats.rs`:
- Around line 174-177: The comment and behavior are inconsistent for levels = 0;
change the level_keys function so that when levels == 0 it returns no keys
(i.e., an empty vec/iterator) instead of producing the root bucket, update the
doc comment above level_keys (and the similar block at 676-679) to state
explicitly that 0 yields no keys, and add a regression test (e.g.,
test_level_keys_zero_returns_none) verifying that level_keys(path, 0) -> empty
and level_keys("", 0) behaves as expected to prevent treating 0 as “disabled”
accidentally.

---

Nitpick comments:
In `@rs/moq-lite/src/stats.rs`:
- Around line 576-591: The four write_snapshot() calls compute their own
timestamps causing mismatched ts_ms across role frames; compute a single
timestamp once (e.g., let ts_ms = now_ms() or call the existing timestamp
helper) before the four frames and pass that same ts_ms into each write_snapshot
invocation (and repeat the same change for the later block around the other four
calls at 603-617), ensuring the snapshot values from level.publisher_external,
level.publisher_internal, level.subscriber_external, and
level.subscriber_internal all share the same ts_ms for a single logical tick.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: c2c57b10-1417-4b55-9eca-a3cc6420ccd2

📥 Commits

Reviewing files that changed from the base of the PR and between 39832ec and 9a47c6a.

📒 Files selected for processing (4)
  • rs/moq-lite/src/stats.rs
  • rs/moq-relay/src/cluster.rs
  • rs/moq-relay/src/connection.rs
  • rs/moq-relay/src/websocket.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • rs/moq-relay/src/cluster.rs
  • rs/moq-relay/src/websocket.rs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants