feat(agent): bus + event observers on AgentRuntime#204
Open
runyaga wants to merge 2 commits into
Open
Conversation
Adds an additive `addObserver` hook on `StateBus` that fires after every successful commit, plus an optional `tag` argument on `setAgentState` and `update` so writers can label the source of each write. Observers receive `(tag, snapshot)`; the snapshot is the frozen post-commit map already exposed via `agentState`. `addObserver` returns a disposer for symmetric tear-down. Detaching during dispatch is safe (siblings still fire). Adding to a disposed bus is a no-op. Pure additive: existing call sites continue to work without modification, and the new `BusObserver` typedef is exported via the existing `application` barrel. 11 tests cover the observer dispatch ordering, dispose semantics, self-detaching observers, tag preservation, and disposed-bus safety. Foundation for an upcoming Bus Inspector debug surface that needs to record every state commit on a per-thread basis without forcing a bus dependency on the agent layer's diagnostics.
Adds two optional observer hooks at the runtime layer for diagnostics consumers (e.g. an in-app Bus Inspector) without coupling the agent to any specific consumer: - `ThreadBusObserver` — receives every per-thread `StateBus` commit with the originating `ThreadKey`. Wired into the bus the first time the runtime materialises a `ThreadState`. Supports the `tag:` argument added in the StateBus observer PR so seed paths and other writers label their commits. - `ThreadEventObserver` — receives every raw AG-UI `BaseEvent` paired with the `ThreadKey` it was processed on. Forwarded from `AgentSession._bridgeBaseEvent` via a package-private `notifyThreadEvent` helper. The runtime makes no behavioural decision on the event; it only fans out. Tags `seedThreadState` and `seedThreadHistory` writes as `seed.initial` and `seed.history` respectively so consumers can distinguish initial state restoration from in-session updates. Bus writes inside `AgentSession._onStateChange` remain untagged at this layer — diagnostics consumers correlate them with the most recent AG-UI event observed via `eventObserver` to infer whether the commit came from a `StateSnapshotEvent`, `StateDeltaEvent`, or a non-state run-state transition. This keeps the agent diagnostics-agnostic. Test exercises the bus-observer wiring end-to-end via the seed APIs and verifies the per-thread context (key + tag + snapshot) is delivered to the observer.
This was referenced May 1, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Part of #202 (2 of 4 stacked PRs). Depends on #203 — merge that first then rebase this.
What this changes
AgentRuntime(insoliplex_agent) gains two optional observer hooks:with the originating `ThreadKey`. Wired into the bus the first time
the runtime materialises a `ThreadState`. Uses the `tag` argument
introduced in feat(client): StateBus observer + tag API #203.
with the `ThreadKey` it was processed on. Forwarded from
`AgentSession._bridgeBaseEvent` via a package-private
`notifyThreadEvent` helper. The runtime makes no behavioural
decision on the event; it only fans out.
`seedThreadState` and `seedThreadHistory` writes are now tagged
`seed.initial` and `seed.history` respectively so consumers can
distinguish initial state restoration from in-session updates.
Why two hooks
For an inspector to render bus events as either `agui.snapshot` or
`agui.delta` we need to know what kind of AG-UI event drove each
commit. By the time `AgentSession._onStateChange` fires, both kinds
have been folded into `conversation.aguiState` and the source event
type is lost. Rather than thread that context through state objects,
exposing the raw event stream as a separate observer lets the
inspector do the correlation itself — and it opens the door for any
other diagnostic that wants the full event stream (activity timeline,
message ledger, tool-call inspector).
Why `bus.setAgentState(next)` is untagged here
Diagnostics consumers correlate the commit with the most recent
AG-UI event observed on the same thread to infer the tag. Keeping
the agent layer untagged keeps it diagnostics-agnostic.
Testing
Test exercises the bus-observer end-to-end via the seed APIs and
verifies the per-thread context (key + tag + snapshot) is delivered
correctly.
Stack