Skip to content

feat(gate): add state-gate write-conditioning worker#74

Open
docbloxy wants to merge 11 commits into
iii-experimental:mainfrom
docbloxy:state-gate-v1
Open

feat(gate): add state-gate write-conditioning worker#74
docbloxy wants to merge 11 commits into
iii-experimental:mainfrom
docbloxy:state-gate-v1

Conversation

@docbloxy
Copy link
Copy Markdown

@docbloxy docbloxy commented May 15, 2026

feat(gate): add state-gate write-conditioning worker

Summary

Introduces workers/gate — a new Rust worker that sits between callers and iii-state, conditioning writes so that downstream reactive triggers only fire on effective changes.

  • gate::set_if_changed — strict, epsilon (numeric tolerance), or deep structural comparison; skips state::set when value hasn't changed.
  • gate::increment_throttled — accumulates increments within a configurable time window; flushes one composite state::update per window.
  • gate::debounce — last-write-wins within a delay window; only the final settled value commits.
  • gate::accumulate — coalesces concurrent update ops per key; flushes one consolidated write when the in-flight write completes or a batch-size threshold is reached.
  • gate::batch_commit — deduplicates a caller-assembled batch of mutations by (scope, key) and commits the survivors in parallel.

Optional GATE_ATTEMPT_TOPIC environment variable enables a PubSub stream of raw attempt events (function, scope, key, accepted, reason) for callers that need visibility into suppressed writes.

Verification status (Phase 2 smoke test — 2026-05-13)

Claim Status Evidence
gate::set_if_changed suppresses redundant state::set calls Verified 10 calls, identical value, running engine: 1 write, 9 suppressed. written field per call, state::get post-test, smoketest.bin mtime unchanged after call 1.
Trigger amplification reduction Inferred from contract Triggers fire only when state::set / state::update is called. Gate suppresses the call; no trigger fires as a consequence. Not directly observed — no trigger subscriber registered during smoke test.
All 5 functions callable on the bus Verified Phase 1: live trigger calls to all 5 functions returned correct responses.

A follow-on integration test with a real state trigger subscriber would directly measure trigger fire count and close the remaining gap.

The problem this solves

iii-state fires triggers unconditionally on every write (verified from engine source: state.rs calls invoke_triggers after every successful adapter write regardless of value change). For high-frequency signal sources, this produces write amplification: 1,000 write attempts against a stable value produce 1,000 trigger dispatches, each O(N) over the registered trigger list.

The gate is an interposition layer. Callers express intent; the gate decides if/when the actual state::set or state::update happens. Triggers downstream fire only on effective changes.

This pattern generalises to any reactive system on iii state with high-frequency signal sources:

  • Agent memory substrates (reinforcement, confidence decay)
  • Telemetry and metrics pipelines (counter accumulation)
  • Real-time collaboration state (last-known-value debouncing)
  • Bulk import flows (batch dedup before hitting the KV layer)

Related upstream observations

iii-sdk double-registers every function on connect. Verified during Phase 1 local testing: the engine emits Function X is already registered. Overwriting. for all SDK-connected workers, including agentos-memory and agentos-gate. Root cause: the SDK's connection loop calls collect_registrations() then flush_queue() twice on initial connect (confirmed in iii-sdk@0.11.6 src/iii.rs:1310). The engine overwrites silently; functional behaviour is unaffected. This is not gate-specific — every Rust SDK worker exhibits it. Filed for upstream awareness; no action needed in this PR.

Design decisions and deviations from original brief

  • Workers live in workers/, not crates/. The design doc specified crates/gate/. The actual workspace places all Rust workers in workers/. This crate is at workers/gate/ to match the established convention.
  • State function IDs are state::* (double-colon), not state.* (dots). Engine source and every existing worker in the workspace confirm state::get, state::set, state::update, state::list, state::delete.
  • GateOp uses "value" for increment delta, not "by". The published SDK type (UpdateOp::Increment { by: i64 }) uses by, but every existing agentos worker sends "value" when constructing state::update operations manually (rate-limiter, memory, etc.). The gate follows the in-use convention; if the engine resolves this ambiguity in a future SDK release, a one-field rename suffices.
  • No OTEL span instrumentation beyond tracing::info!. Workers in this workspace use tracing_subscriber::fmt::init() and tracing::info/error macros. The OTEL export is handled engine-side by iii-observability. Adding custom spans would diverge from the established pattern and requires opentelemetry crate dependencies not present in the workspace.
  • No benchmark crate. No existing worker has one; adding criterion is a workspace-level change outside this worker's scope. The integration test in tests/integration.rs asserts effective-write counts directly. Benchmarking is documented as future work.

Test plan

  • cargo fmt --check -p agentos-gate — formatting
  • cargo clippy -p agentos-gate -- -D warnings — lint
  • cargo test -p agentos-gate — 19 unit tests + 9 integration tests, all passing
  • cargo build --release -p agentos-gate — release build
  • Manual smoke test: 10 calls with identical value against running engine; 1 write, 9 suppressed. Corroborated by written field, state::get, and state file mtime (2026-05-13).

Future work

  • Persistent buffer storage (replace DashMaps with state-backed implementation; internal types designed for this swap).
  • Benchmarking suite (criterion-based, measuring write-amplification reduction).
  • Cross-process gate coordination (currently single-process semantics only).

Filing following the kind invitation from @rohitg00 Rohit_Ghumare in Discord. Happy to address any review comments.

Summary by CodeRabbit

  • New Features

    • Added a "gate" worker to condition state writes: conditional writes with tolerance, throttled increments, debounce (last-write-wins), queued/coalesced updates, and batch deduplication; exposes runtime-configurable endpoints for these behaviors.
  • Examples

    • Added TypeScript and Python demos showing reinforcement, debounce, and throttling scenarios.
  • Documentation

    • Comprehensive README and PR description covering usage, semantics, limits, observability, and roadmap.
  • Tests

    • Integration tests validating comparison, deduplication, and merge behaviors.
  • Chores

    • Workspace updated to include the new gate worker crate; environment-config support added for connection and optional observability.

Review Change Stack

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 15, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

A new workers/gate Rust crate adds a write-conditioning middleware with five operations—set_if_changed, increment_throttled, debounce, accumulate, batch_commit—using DashMap-backed in-memory state to suppress redundant state updates. It includes a Tokio worker entrypoint, docs, TypeScript/Python examples, and integration/unit tests.

Changes

Gate Write-Conditioning Worker

Layer / File(s) Summary
Workspace and Configuration Setup
Cargo.toml, workers/gate/Cargo.toml, workers/gate/src/config.rs
Gate worker added to workspace members; GateConfig::from_env() reads III_WS_URL (defaults to ws://localhost:49134) and optional GATE_ATTEMPT_TOPIC.
Data Models and Type Definitions
workers/gate/src/structs.rs
Request structs (SetIfChangedRequest, IncrementThrottledRequest, DebounceRequest, AccumulateRequest, BatchCommitRequest), FlushWhen, GateOp, and internal per-key state structs define wire and in-memory contracts.
Gate State and Core Implementations
workers/gate/src/gate.rs
GateState with DashMap storage and helpers; implementations of set_if_changed (strict/epsilon compare), increment_throttled (windowed accumulation), debounce (generation-based last-write-wins), accumulate (queueing + drain loop with merge_ops), and batch_commit (dedupe + concurrent commits). Unit tests cover compare semantics, merging, batch dedup, and serde invariants.
Worker Entrypoint and Registration
workers/gate/src/main.rs
Tokio main initializes tracing, loads config, registers the worker and five async functions that forward JSON input to gate handlers, and performs graceful shutdown on ctrl_c.
Design and User Documentation
workers/gate/PR_DESCRIPTION.md, workers/gate/README.md
Design doc and README describe gate capabilities, configuration, API shapes/semantics, observability via GATE_ATTEMPT_TOPIC, and semantic constraints (TOCTOU, in-memory non-durability, suppressed writes affecting trigger firing).
Example Demonstrations
examples/gate-reinforcement-loop.ts, examples/gate-signal-debounce.py
TypeScript example exercises set_if_changed with epsilon filtering; Python example demonstrates debounce and increment_throttled scenarios with timing/concurrency.
Integration and Decision Logic Tests
workers/gate/tests/integration.rs
Self-contained tests validate comparison behavior (strict/epsilon/missing), batch deduplication (last-write-wins per key), and op merging (summed increments, last set per path).

Sequence Diagram(s)

sequenceDiagram
  participant Caller
  participant GateWorker as gate::operation
  participant GateState
  participant StateAPI as state::set/update
  Caller->>GateWorker: input (scope, key, value/op)
  GateWorker->>GateState: check/update in-memory buffers
  alt write condition met
    GateWorker->>StateAPI: trigger state change (state::set/state::update)
  else write suppressed
    GateWorker->>GateWorker: emit attempt event (optional PubSub)
  end
  GateWorker-->>Caller: response (written, reason or result)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Poem

🐰 I nibble at noisy writes tonight,

Five gentle gates keep changes light,
Debounce and throttle, merge and batch,
Quieted triggers in a tidy patch—
Hops-approved, the state sleeps tight.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 61.54% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely summarizes the main change: adding a new state-gate write-conditioning worker to the codebase, which is the primary objective across all file changes.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

🧹 Nitpick comments (2)
examples/gate-signal-debounce.py (1)

106-106: 💤 Low value

Parameter name shadows Python builtin.

The parameter name input shadows the Python builtin input() function. While this doesn't cause functional issues in this context, consider using data, params, or request for better practice.

📝 Proposed rename
-async def sensor_debounce(input: dict) -> dict:
+async def sensor_debounce(params: dict) -> dict:
     return await run_debounce_scenario(
-        scope=input.get("scope", "sensors"),
-        sensor_id=input.get("sensorId", "temp-001"),
+        scope=params.get("scope", "sensors"),
+        sensor_id=params.get("sensorId", "temp-001"),
     )

Apply the same pattern to counter_throttle.

Also applies to: 117-117

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@examples/gate-signal-debounce.py` at line 106, The function parameter name
`input` shadows Python's builtin input(), so rename the parameter in
sensor_debounce (and likewise in counter_throttle) to a non-conflicting name
such as `data`, `params`, or `request`; update all references inside the
function bodies (e.g., any uses of `input[...]`) to the new name to preserve
behavior and run tests to ensure no other callers rely on the old parameter
name.
examples/gate-reinforcement-loop.ts (1)

65-66: 💤 Low value

Minor comment inaccuracy.

The comment states the confidence "oscillates between 0.85–0.86" but the calculation 0.85 + (i % 2) * 0.005 actually produces values 0.85 and 0.855 (a 0.005 range, not 0.01). The logic is correct for demonstrating epsilon suppression; the comment range is slightly overstated.

📝 Proposed comment correction
-      // Confidence oscillates between 0.85–0.86 — within epsilon, no write.
+      // Confidence oscillates between 0.85–0.855 — within epsilon, no write.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@examples/gate-reinforcement-loop.ts` around lines 65 - 66, Update the
inaccurate inline comment for the confidence calculation: the variable
`confidence` (defined as `const confidence = 0.85 + (i % 2) * 0.005`) oscillates
between 0.85 and 0.855 (a 0.005 difference), not 0.85–0.86; change the comment
to reflect the correct range so it matches the implementation.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@workers/gate/PR_DESCRIPTION.md`:
- Line 104: The attribution line in PR_DESCRIPTION.md contains a duplicated word
"with with"; edit the line "Co-authored with with [Claude
Code](https://claude.com/claude-code)" to remove the extra "with" so it reads
"Co-authored with [Claude Code](https://claude.com/claude-code)".
- Around line 83-84: The doc example in PR_DESCRIPTION.md conflicts with the
smoke-test in tests/integration.rs; replace the ambiguous "10 calls with stable
value → 0 writes" with wording that matches the smoke-test outcome and concept
of suppressed versus effective writes (e.g. "10 calls with stable value → 1
write, 9 suppressed (effective writes: 1)") and/or add a short parenthetical
clarifying that suppressed calls do not count as effective writes so readers
reconcile that with the tests/integration.rs assertion.

In `@workers/gate/README.md`:
- Line 208: The table row for the operations column contains an unescaped pipe
inside the cell (`{ scope, key, value? | op? }`) which breaks the Markdown
table; update the README table cell for `operations` to escape the pipe (e.g.,
replace `|` with `\|` or use the HTML entity `&`#124`;`) inside the code snippet
so the cell reads something like `{ scope, key, value? \| op? }` and the table
renders correctly.

In `@workers/gate/src/config.rs`:
- Around line 9-11: The code currently treats empty environment variables as
valid values for ws_url and attempt_topic causing "" to be used; change both to
treat empty strings as unset by calling std::env::var(...).ok().filter(|s|
!s.is_empty()) so ws_url becomes std::env::var("III_WS_URL").ok().filter(|s|
!s.is_empty()).unwrap_or_else(|| "ws://localhost:49134".to_string()) and
attempt_topic becomes std::env::var("GATE_ATTEMPT_TOPIC").ok().filter(|s|
!s.is_empty()), ensuring the ws_url fallback is used for empty values and
attempt_topic stays None when empty.

In `@workers/gate/src/gate.rs`:
- Around line 367-379: The code currently drops buffered ops when the
TriggerRequest (via iii.trigger(...).await) fails; capture the result of that
await and if it returns an error, re-insert the drained operations back into the
in-memory buffer instead of ignoring the error. Specifically, change the
fire-and-forget pattern around iii.trigger(TriggerRequest { function_id:
"state::update", payload: json!({ "scope": &scope, "key": &key_str,
"operations": ops_json, }), ... }).await to check the Result, and on Err call
the same mechanism that originally buffered ops (e.g., call accumulate(...) or
push ops_json back onto the pending ops queue using the same internal
identifiers scope and key_str, cloning ops_json as needed) so nothing is lost on
failed flush; only drop the ops after a successful trigger.
- Around line 389-409: merge_ops currently buckets Set/Increment/Decrement into
set_ops, incr_ops, decr_ops and then re-emits them grouped which changes
original ordering for mixed ops on the same path; modify merge_ops to preserve
original op ordering by recording the sequence of path occurrences while
iterating ops (e.g., a Vec<Path> first_seen) and storing per-path lists or flags
(use existing set_ops/incr_ops/decr_ops but also record the first seen index or
order), then when building result iterate paths in that first_seen order and
emit operations for that path in the same relative order they appeared (emit
GateOp::Set, GateOp::Increment, GateOp::Decrement according to their original
sequence for each path), leaving non-collapsible other variants in the original
position via the others collection; refer to GateOp::{Set,Increment,Decrement},
and the local collections set_ops, incr_ops, decr_ops, others to locate where to
change the logic.
- Around line 87-97: The code currently swallows errors from
iii.trigger(TriggerRequest { function_id: "state::get", ... }) by calling
.await.ok().filter(...), treating any trigger failure as a missing key and
causing unnecessary writes; change this to explicitly handle the Result from
iii.trigger(...).await: match or if let on the Result so that Ok(res) is
converted to Some(res) only when !res.is_null() and Err(e) is propagated (or
returned) to the caller instead of being converted to None; update references to
the current variable (currently assigned from iii.trigger(...).await) so only
actual Null payloads become None and trigger errors are no longer masked.
- Around line 441-468: The BatchOp handling currently prefers op.value when both
fields exist and silently treats missing both as false; add an explicit
validation for BatchOp (check op.value and op.op) so exactly one is present: if
both are Some or both are None, return an error (or set the batch item to a
failed result and log) rather than proceeding. Implement the check immediately
before building TriggerRequest in the gate.rs block that uses iii_clone.trigger
(the code paths creating TriggerRequest with function_id "state::set" and
"state::update"); only call iii_clone.trigger for the exclusive cases (value
present => use "state::set", op present => serialize gate_op and use
"state::update"), and surface a clear error for invalid shapes referencing
BatchOp, op.value, and op.op.

In `@workers/gate/src/main.rs`:
- Line 29: Call sites and signatures for gate handlers are inconsistent:
set_if_changed and batch_commit are called with &state while
increment_throttled, debounce, and accumulate use Arc<GateState>. Make the API
consistent by choosing one ownership model; recommended: use Arc<GateState>
everywhere. Update the callers in main.rs (the async move closures invoking
gate::set_if_changed and gate::batch_commit) to pass Arc::clone(&state) (or
state.clone()) instead of &state, and update the gate.rs function signatures for
set_if_changed and batch_commit to accept Arc<GateState> (matching
increment_throttled, debounce, accumulate) so all handlers use the same
Arc-based ownership.

In `@workers/gate/tests/integration.rs`:
- Around line 13-18: Tests are duplicating the decision/merge logic locally
instead of invoking the real helpers, so make the pure decision/merge helper
functions that are currently pub(crate) in gate.rs publicly exportable from a
shared library surface, re-export them in the binary entrypoint, then update the
integration tests to call those real helpers instead of the duplicate thin
wrappers; specifically, change the visibility of the pure decision/merge helpers
(the pub(crate) functions in gate.rs) to be exported from the crate root (lib)
and remove the local duplicate wrappers in the integration test so tests assert
behavior against the real decision/merge functions.

---

Nitpick comments:
In `@examples/gate-reinforcement-loop.ts`:
- Around line 65-66: Update the inaccurate inline comment for the confidence
calculation: the variable `confidence` (defined as `const confidence = 0.85 + (i
% 2) * 0.005`) oscillates between 0.85 and 0.855 (a 0.005 difference), not
0.85–0.86; change the comment to reflect the correct range so it matches the
implementation.

In `@examples/gate-signal-debounce.py`:
- Line 106: The function parameter name `input` shadows Python's builtin
input(), so rename the parameter in sensor_debounce (and likewise in
counter_throttle) to a non-conflicting name such as `data`, `params`, or
`request`; update all references inside the function bodies (e.g., any uses of
`input[...]`) to the new name to preserve behavior and run tests to ensure no
other callers rely on the old parameter name.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: 1e4d7e45-94e8-4159-92dc-8a025fcdddd4

📥 Commits

Reviewing files that changed from the base of the PR and between caca2b4 and 49621fc.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (11)
  • Cargo.toml
  • examples/gate-reinforcement-loop.ts
  • examples/gate-signal-debounce.py
  • workers/gate/Cargo.toml
  • workers/gate/PR_DESCRIPTION.md
  • workers/gate/README.md
  • workers/gate/src/config.rs
  • workers/gate/src/gate.rs
  • workers/gate/src/main.rs
  • workers/gate/src/structs.rs
  • workers/gate/tests/integration.rs

Comment thread workers/gate/PR_DESCRIPTION.md Outdated
Comment thread workers/gate/PR_DESCRIPTION.md Outdated
Comment thread workers/gate/README.md Outdated
Comment thread workers/gate/src/config.rs Outdated
Comment thread workers/gate/src/gate.rs
Comment thread workers/gate/src/gate.rs Outdated
Comment thread workers/gate/src/gate.rs
Comment thread workers/gate/src/gate.rs Outdated
Comment thread workers/gate/src/main.rs Outdated
Comment on lines +13 to +18
// Pull in the crate's internals through the lib re-exports. Since this is a
// binary crate (no lib.rs), we inline the minimal logic needed here.
//
// The functions under test are `pub(crate)` in gate.rs; we test the same logic
// via duplicate thin wrappers below so the integration test is self-contained.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

These integration tests are validating mirrors, not the real gate implementation.

Line 13–18 confirms the suite duplicates decision logic locally. That can let regressions in workers/gate/src/gate.rs pass unnoticed if the mirror and production code drift. Please expose the pure decision/merge helpers through a testable library surface (e.g., lib.rs shared by bin + tests), then assert behavior against the real functions.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@workers/gate/tests/integration.rs` around lines 13 - 18, Tests are
duplicating the decision/merge logic locally instead of invoking the real
helpers, so make the pure decision/merge helper functions that are currently
pub(crate) in gate.rs publicly exportable from a shared library surface,
re-export them in the binary entrypoint, then update the integration tests to
call those real helpers instead of the duplicate thin wrappers; specifically,
change the visibility of the pure decision/merge helpers (the pub(crate)
functions in gate.rs) to be exported from the crate root (lib) and remove the
local duplicate wrappers in the integration test so tests assert behavior
against the real decision/merge functions.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point that the integration tests reimplement the dedup/merge logic instead of calling the worker's functions directly.

Holding off on the lib.rs extraction for now. The core decision logic (compare_values, merge_ops, batch dedup) is unit-tested against the real implementation in gate.rs, including a regression test for the mixed-op-type ordering case, so the logic these integration tests mirror isn't itself untested; the integration tests sit on top of that asserting end to end write counts. Also, no other worker in workers/ exposes a lib.rs, so adding one here would make gate the only exception to the repo structure.

Happy to do the extraction as a follow up if the maintainers would prefer the integration tests exercise the real functions; flagging it mainly because it'd be a structural departure from the other workers.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@docbloxy, understood — the mitigating factors are sound:

  1. The pure helpers (compare_values, merge_ops, batch dedup) are already covered by unit tests in gate.rs against the real code, so the mirrors here can't silently diverge without those tests catching a change first.
  2. Keeping workers/gate structurally consistent with the other binary-only workers is a legitimate reason to avoid a one-off lib.rs.

The residual gap is narrow: if someone later refactors the helpers in gate.rs but forgets to update the mirrored wrappers in tests/integration.rs, the integration tests would still pass while validating stale behavior. A comment in the test file pointing readers to the canonical implementations in gate.rs would be enough to make that risk visible.

Would you like me to open a follow-up issue to track the possible extraction (and the trigger-fire-count e2e test noted in the README as future work)?


✏️ Learnings added
Learnt from: docbloxy
Repo: iii-experimental/agentos PR: 74
File: workers/gate/tests/integration.rs:13-18
Timestamp: 2026-05-27T20:12:54.866Z
Learning: In workers/gate (agentos-gate, Rust binary crate), the integration tests in workers/gate/tests/integration.rs duplicate decision/merge logic locally (compare, dedup, merge wrappers) rather than calling the real pub(crate) helpers from gate.rs. This is intentional for now: the core logic (compare_values, merge_ops, batch dedup) is already unit-tested against the real implementation in gate.rs, and no other worker in workers/ exposes a lib.rs, so adding one would be a structural exception. The mirrors are accepted as a known trade-off; extraction to a shared lib surface is deferred as future work.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

Copy link
Copy Markdown
Author

@docbloxy docbloxy May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for engaging on this. Good call on the in file comment that's a cheap way to make the mirror's intentionality visible to future readers and flag the drift risk. I'll add it.
Holding off on the follow-up issue for now: the README already notes the trigger-fire-count e2e test as future work, and the lib.rs question is captured here on the PR, so I'd rather not duplicate the tracking. If a maintainer thinks a tracking issue would help once the PR is merged, happy to revisit.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@docbloxy, sounds good — the in-file comment will be enough to make the intentional trade-off visible, and the README already serves as the canonical future-work tracker. No need to duplicate it in an issue. Appreciate the clear reasoning throughout.

(。・ω・。)ノ♡


🧠 Learnings used
Learnt from: docbloxy
Repo: iii-experimental/agentos PR: 74
File: workers/gate/tests/integration.rs:13-18
Timestamp: 2026-05-27T20:12:54.866Z
Learning: In workers/gate integration tests, it may be acceptable to mirror/duplicate the decision/merge wrapper logic from gate.rs (e.g., compare/dedup/merge wrappers) instead of calling the pub(crate) helpers directly. This is an intentional trade-off when the core logic (compare_values/merge_ops/batch dedup) is already covered by unit tests against gate.rs and there is no shared lib.rs surface to reuse. During review, don’t automatically flag such duplication as a refactor requirement unless a shared crate/lib interface is available or planned to remove the need for the mirrors.

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (4)
examples/gate-signal-debounce.py (1)

38-58: 💤 Low value

Docstring claims "20 readings in 100 ms" but readings are fired concurrently with no spacing.

The 20 trigger(...) calls are dispatched via asyncio.gather with no await asyncio.sleep(...) between them, so they arrive at the gate essentially simultaneously rather than spread over 100 ms. For a debounce demo this is still functionally correct (the 200 ms quiet window starts after the last reception), but readers following the docstring may expect a paced stream. Consider either pacing the sends or tightening the wording.

♻️ Option A — pace the readings to match the docstring
     readings = [20.0 + random.uniform(-0.5, 0.5) for _ in range(20)]
     tasks = []
     for temp in readings:
         tasks.append(
             trigger(
                 "gate::debounce",
                 {
                     "scope": scope,
                     "key": sensor_id,
                     "value": {"temperature": round(temp, 3), "ts": now_ms()},
                     "delay_ms": 200,
                 },
             )
         )
+        await asyncio.sleep(0.005)  # ~5 ms between readings → ~100 ms total
     results = await asyncio.gather(*tasks)
♻️ Option B — adjust the docstring to reflect actual behavior
-    Simulate a noisy sensor emitting 20 readings in 100 ms. With a 200 ms
-    debounce window only the final reading commits to state, producing exactly
-    1 state::set instead of 20.
+    Simulate a noisy sensor emitting a burst of 20 readings back-to-back.
+    With a 200 ms debounce window only the final reading commits to state,
+    producing exactly 1 state::set instead of 20.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@examples/gate-signal-debounce.py` around lines 38 - 58, The docstring for
run_debounce_scenario claims "20 readings in 100 ms" but the code fires 20
trigger(...) calls concurrently via asyncio.gather with no spacing; either
update the docstring to say the readings are sent concurrently (arrive
essentially simultaneously) or implement pacing by awaiting a short sleep
between sends (e.g., spread the 20 sends across ~100ms) so the timed behavior
matches the description—modify run_debounce_scenario accordingly and keep the
trigger(...) usage and debounce semantics intact.
workers/gate/README.md (3)

106-112: 💤 Low value

Consider showing both immediate cases in the example.

The example at line 108 shows "immediate": false, while the explanation at lines 111-112 focuses on what immediate: true means. For clarity, consider either showing two example return values (one with true, one with false) or ensuring the example and explanation are aligned to the same case.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@workers/gate/README.md` around lines 106 - 112, Update the README example so
the `immediate` behavior is unambiguous: either include two JSON examples (one
with "immediate": true and one with "immediate": false) and label them (e.g.,
"first call — immediate: true" and "subsequent call — immediate: false"), or
change the existing JSON to match the explanatory sentence about `immediate:
true`; ensure the surrounding text references the correct example so readers can
see both cases clearly.

183-186: 💤 Low value

Vary sentence structure for improved readability.

Three consecutive sentences begin with "Multiple," which creates repetitive prose. Consider rewording for flow, e.g., "Increments on the same path merge into one operation with summed values. Decrements follow the same rule. Multiple set operations..."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@workers/gate/README.md` around lines 183 - 186, The three list items starting
with "Multiple" (the lines describing `increment`, `decrement`, and `set`) are
repetitive; reword them to vary sentence structure and improve flow—for example:
"Increments on the same path merge into a single operation with summed values,"
"Decrements follow the same rule," and "Multiple `set` operations collapse to
one `set` using the last value." Update the README entries that currently read
"Multiple `increment` on the same path → one increment with summed value.",
"Multiple `decrement` on the same path → one decrement with summed value.", and
"Multiple `set` on the same path → one set with the last value." to these
clearer phrasings while leaving the `merge`, `append`, and `remove` line
unchanged.

66-74: ⚡ Quick win

Clarify the distinction between strict and deep comparison modes.

Lines 71-72 state that strict and deep are "equivalent for all JSON types," which raises the question: why offer both options if they produce identical behavior? Users may be confused about which mode to choose or wonder if there's a subtle difference not documented here.

Consider either:

  • Explaining the semantic intent difference (e.g., "use strict for primitive comparisons, deep when you want to emphasize nested structure comparison")
  • Documenting if one is provided for API compatibility or future extensibility
  • Simplifying to a single option if they truly are identical

Additionally, line 74's phrasing "Falls back to changed" is unclear. Consider rewording to "Falls back to structural comparison and reports the result as changed or unchanged" to clarify that epsilon mode uses the same comparison logic as strict/deep when values aren't numeric.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@workers/gate/README.md` around lines 66 - 74, Update the README text to
clarify why both comparison modes exist and to rephrase the epsilon fallback:
explicitly state whether "strict" and "deep" are truly identical (e.g., "strict
and deep currently perform the same structural JSON equality check; both are
provided for semantic clarity or API compatibility/future extension") or
describe their intended semantic difference (e.g., "use strict for
primitive/value equality and deep to emphasize nested structural checks"), and
change the epsilon description to "Falls back to structural comparison and
reports the result as `changed` or `unchanged` when either value is
non-numeric." Ensure you reference the comparison mode names `strict`, `deep`,
`epsilon`, and `changed` in the updated sentence so readers can easily map
behaviors.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@workers/gate/src/gate.rs`:
- Around line 184-199: The async calls to iii.trigger(...) in the throttle flush
and debounce commit paths currently ignore errors and can drop accepted writes;
update the handlers around iii.trigger (the TriggerRequest sends in the throttle
flush block that uses state_clone.throttle.remove(&throttle_key) and the
debounce commit path) to check the Result from .await, log/report the failure
with context (scope, key_str, path, total or the batched operations), and
implement a retry or requeue strategy (e.g., push the entry back into
state_clone.throttle or enqueue the operations for later processing) or
propagate an error so the caller can handle it instead of silently discarding
writes. Ensure you reference and preserve TriggerRequest payloads when requeuing
and keep failures observable via tracing::error or by returning Err from the
enclosing async function.

---

Nitpick comments:
In `@examples/gate-signal-debounce.py`:
- Around line 38-58: The docstring for run_debounce_scenario claims "20 readings
in 100 ms" but the code fires 20 trigger(...) calls concurrently via
asyncio.gather with no spacing; either update the docstring to say the readings
are sent concurrently (arrive essentially simultaneously) or implement pacing by
awaiting a short sleep between sends (e.g., spread the 20 sends across ~100ms)
so the timed behavior matches the description—modify run_debounce_scenario
accordingly and keep the trigger(...) usage and debounce semantics intact.

In `@workers/gate/README.md`:
- Around line 106-112: Update the README example so the `immediate` behavior is
unambiguous: either include two JSON examples (one with "immediate": true and
one with "immediate": false) and label them (e.g., "first call — immediate:
true" and "subsequent call — immediate: false"), or change the existing JSON to
match the explanatory sentence about `immediate: true`; ensure the surrounding
text references the correct example so readers can see both cases clearly.
- Around line 183-186: The three list items starting with "Multiple" (the lines
describing `increment`, `decrement`, and `set`) are repetitive; reword them to
vary sentence structure and improve flow—for example: "Increments on the same
path merge into a single operation with summed values," "Decrements follow the
same rule," and "Multiple `set` operations collapse to one `set` using the last
value." Update the README entries that currently read "Multiple `increment` on
the same path → one increment with summed value.", "Multiple `decrement` on the
same path → one decrement with summed value.", and "Multiple `set` on the same
path → one set with the last value." to these clearer phrasings while leaving
the `merge`, `append`, and `remove` line unchanged.
- Around line 66-74: Update the README text to clarify why both comparison modes
exist and to rephrase the epsilon fallback: explicitly state whether "strict"
and "deep" are truly identical (e.g., "strict and deep currently perform the
same structural JSON equality check; both are provided for semantic clarity or
API compatibility/future extension") or describe their intended semantic
difference (e.g., "use strict for primitive/value equality and deep to emphasize
nested structural checks"), and change the epsilon description to "Falls back to
structural comparison and reports the result as `changed` or `unchanged` when
either value is non-numeric." Ensure you reference the comparison mode names
`strict`, `deep`, `epsilon`, and `changed` in the updated sentence so readers
can easily map behaviors.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: 0d6e06d8-cd6f-46b4-a438-0b2eecd8ff3a

📥 Commits

Reviewing files that changed from the base of the PR and between 49621fc and 9fff5d0.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (11)
  • Cargo.toml
  • examples/gate-reinforcement-loop.ts
  • examples/gate-signal-debounce.py
  • workers/gate/Cargo.toml
  • workers/gate/PR_DESCRIPTION.md
  • workers/gate/README.md
  • workers/gate/src/config.rs
  • workers/gate/src/gate.rs
  • workers/gate/src/main.rs
  • workers/gate/src/structs.rs
  • workers/gate/tests/integration.rs
✅ Files skipped from review due to trivial changes (2)
  • workers/gate/Cargo.toml
  • workers/gate/PR_DESCRIPTION.md

Comment thread workers/gate/src/gate.rs
docbloxy added 4 commits May 21, 2026 13:42
- Remove duplicated word in PR_DESCRIPTION attribution
- Escape pipe inside README operations table cell
- Rename input param to params in Python example (avoid builtin shadow)
- Correct comment range in TS example (0.85-0.855)
Transient RPC failures were collapsed to None via .ok(), causing a forced
write on every read error. Now returns IIIError::Handler on failure; null
is mapped to None separately to preserve missing-key semantics.
state::update failures silently discarded the drained batch. Now captures
the Result; on Err, prepends the merged ops back before any newly-queued
ops, resets in_flight, and breaks so a future accumulate call can retry.
Buckets were iterated in HashMap order, so all Sets always emitted before
all Increments regardless of input sequence. Now tracks a per-bucket
first-seen insertion order and re-emits in that order, so Set(x) then
Increment(x) stays Set(x) then Increment(x) rather than being reordered.

Adds merge_preserves_first_seen_order_across_op_types as a regression
guard for this case.

Known limitation: Increment(x), Set(x), Increment(x) still merges both
increments into one bucket across the Set boundary, changing the result.
Cross-boundary mixed-type merging on the same path is a follow-on fix.
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
workers/gate/src/gate.rs (1)

379-388: ⚡ Quick win

Log the trigger failure on the recovery path.

The recovery correctly re-prepends merged ahead of any concurrently-accumulated e.pending (preserving time order) and clears in_flight so a future accumulate call re-triggers a drain. However the underlying iii.trigger error is silently discarded, making failed drains invisible in logs — the only observable sign is that pending grows. A single tracing::warn! keeps the recovery semantics intact while surfacing the failure (mirroring the suggestion in the past 184-199 / 262-272 thread).

♻️ Proposed fix
-        if result.is_err() {
+        if let Err(err) = &result {
+            tracing::warn!(
+                scope = %scope,
+                key = %key_str,
+                count,
+                error = %err,
+                "gate accumulate drain failed; requeuing ops"
+            );
             if let Some(mut e) = state.accumulate.get_mut(&map_key) {
                 let mut recovered = merged;
                 recovered.append(&mut e.pending);
                 e.pending = recovered;
                 e.in_flight = false;
+            } else {
+                tracing::error!(
+                    scope = %scope,
+                    key = %key_str,
+                    count,
+                    "gate accumulate entry missing during recovery; ops dropped"
+                );
             }
             break;
         }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@workers/gate/src/gate.rs` around lines 379 - 388, When result.is_err() is hit
after attempting iii.trigger, keep the existing recovery logic (re-prepend
merged onto e.pending and set e.in_flight = false) but also log the trigger
failure: capture the error from result (or the error returned by iii.trigger)
and emit a tracing::warn! (or tracing::error! per project convention) that
includes a short context string (e.g. "trigger failed during drain") plus the
error and the map_key or other identifying info; update the branch inside the if
let Some(mut e) = state.accumulate.get_mut(&map_key) block to add that single
warn log call while preserving the current mutation order and break.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@workers/gate/src/gate.rs`:
- Around line 379-388: When result.is_err() is hit after attempting iii.trigger,
keep the existing recovery logic (re-prepend merged onto e.pending and set
e.in_flight = false) but also log the trigger failure: capture the error from
result (or the error returned by iii.trigger) and emit a tracing::warn! (or
tracing::error! per project convention) that includes a short context string
(e.g. "trigger failed during drain") plus the error and the map_key or other
identifying info; update the branch inside the if let Some(mut e) =
state.accumulate.get_mut(&map_key) block to add that single warn log call while
preserving the current mutation order and break.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: 8125b73f-1422-402a-9edf-68ef4e88f08f

📥 Commits

Reviewing files that changed from the base of the PR and between 0c3f1ee and 711c8f4.

📒 Files selected for processing (1)
  • workers/gate/src/gate.rs

docbloxy added 2 commits May 21, 2026 20:32
The previous if/else-if silently preferred value when both fields were
set and silently returned false when neither was set, letting malformed
input pass unnoticed. Replaced with match (op.value, op.op) so the
both-set and neither-set shapes are explicitly rejected as failed items.
set_if_changed and batch_commit took &GateState while the other three
handlers took Arc<GateState>. Unified all five on Arc<GateState> so the
ownership model is consistent across the worker. Call sites in main.rs
updated to pass the cloned Arc directly rather than a plain reference.
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
workers/gate/src/gate.rs (1)

462-488: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Reject invalid BatchOp shapes instead of marking them skipped.

_ => false still treats malformed input as a normal skipped write. Because dedup already selected the last (scope, key), a bad final item can silently discard an earlier valid mutation for that key. Return a clear validation error, or at least a per-item validation failure, when both value and op are set or both are missing.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@workers/gate/src/gate.rs` around lines 462 - 488, The match arm handling
(op.value, op.op) currently falls through to _ => false which silently treats
malformed BatchOp (both value and op set or both missing) as a skipped write;
change this to perform explicit validation for the BatchOp shape inside the code
that matches (op.value, op.op) (the match around iii_clone.trigger calls) and
return a clear per-item validation failure (or propagate an error) when both are
Some or both are None instead of returning false—e.g., detect the invalid case
where (Some(_), Some(_)) or (None, None) and produce a validation error/result
for that item so earlier valid mutations for the same (scope, key) aren’t
silently discarded.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Duplicate comments:
In `@workers/gate/src/gate.rs`:
- Around line 462-488: The match arm handling (op.value, op.op) currently falls
through to _ => false which silently treats malformed BatchOp (both value and op
set or both missing) as a skipped write; change this to perform explicit
validation for the BatchOp shape inside the code that matches (op.value, op.op)
(the match around iii_clone.trigger calls) and return a clear per-item
validation failure (or propagate an error) when both are Some or both are None
instead of returning false—e.g., detect the invalid case where (Some(_),
Some(_)) or (None, None) and produce a validation error/result for that item so
earlier valid mutations for the same (scope, key) aren’t silently discarded.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro Plus

Run ID: d01f5bca-433c-4146-ae59-05e98837c9bb

📥 Commits

Reviewing files that changed from the base of the PR and between 711c8f4 and cdcc0b9.

📒 Files selected for processing (2)
  • workers/gate/src/gate.rs
  • workers/gate/src/main.rs

docbloxy added 3 commits May 27, 2026 15:58
A malformed final item for a key could silently evict an earlier valid
mutation during the (scope, key) dedup pass, then report written:false —
indistinguishable from a normal suppressed write.

Fix: validate all BatchOp shapes up front via validate_batch_ops() before
the dedup loop. Items with both value and op set, or neither, are a caller
programming error; fail the whole batch fast with IIIError::Handler.

The _ => false arm in the worker-task match is now unreachable (comment
added). It is left as false rather than unreachable!() to avoid panicking
inside a spawned task if validation is ever bypassed.

Adds unit tests for both malformed shapes via the pure helper.
@docbloxy
Copy link
Copy Markdown
Author

@rohitg00 hey CodeRabbit's review across 11 commits, addressed; the last bot exchange (on the integration test mirror question) is the only thread intentionally still open in case you have a view on it. Otherwise ready for your review whenever you have a moment. Thanks again for the invitation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant