Skip to content

feat(agent): bus + event observers on AgentRuntime#204

Open
runyaga wants to merge 2 commits into
soliplex:mainfrom
runyaga:feat/agent-runtime-observers
Open

feat(agent): bus + event observers on AgentRuntime#204
runyaga wants to merge 2 commits into
soliplex:mainfrom
runyaga:feat/agent-runtime-observers

Conversation

@runyaga
Copy link
Copy Markdown
Contributor

@runyaga runyaga commented May 1, 2026

Part of #202 (2 of 4 stacked PRs). Depends on #203 — merge that first then rebase this.

What this changes

AgentRuntime (in soliplex_agent) gains two optional observer hooks:

  • `ThreadBusObserver` — receives every per-thread `StateBus` commit
    with the originating `ThreadKey`. Wired into the bus the first time
    the runtime materialises a `ThreadState`. Uses the `tag` argument
    introduced in feat(client): StateBus observer + tag API #203.
  • `ThreadEventObserver` — receives every raw AG-UI `BaseEvent` paired
    with the `ThreadKey` it was processed on. Forwarded from
    `AgentSession._bridgeBaseEvent` via a package-private
    `notifyThreadEvent` helper. The runtime makes no behavioural
    decision on the event; it only fans out.

`seedThreadState` and `seedThreadHistory` writes are now tagged
`seed.initial` and `seed.history` respectively so consumers can
distinguish initial state restoration from in-session updates.

Why two hooks

For an inspector to render bus events as either `agui.snapshot` or
`agui.delta` we need to know what kind of AG-UI event drove each
commit. By the time `AgentSession._onStateChange` fires, both kinds
have been folded into `conversation.aguiState` and the source event
type is lost. Rather than thread that context through state objects,
exposing the raw event stream as a separate observer lets the
inspector do the correlation itself — and it opens the door for any
other diagnostic that wants the full event stream (activity timeline,
message ledger, tool-call inspector).

Why `bus.setAgentState(next)` is untagged here

Diagnostics consumers correlate the commit with the most recent
AG-UI event observed on the same thread to infer the tag. Keeping
the agent layer untagged keeps it diagnostics-agnostic.

Testing

Test exercises the bus-observer end-to-end via the seed APIs and
verifies the per-thread context (key + tag + snapshot) is delivered
correctly.

Stack

  1. feat(client): StateBus observer + tag API #203 — StateBus observer + tag API
  2. This PR — bus + event observers on AgentRuntime
  3. `feat/bus-inspector-diagnostics` — Bus Inspector module
  4. `feat/wire-bus-inspector` — wire into shell

runyaga added 2 commits May 1, 2026 14:10
Adds an additive `addObserver` hook on `StateBus` that fires after
every successful commit, plus an optional `tag` argument on
`setAgentState` and `update` so writers can label the source of each
write. Observers receive `(tag, snapshot)`; the snapshot is the
frozen post-commit map already exposed via `agentState`.

`addObserver` returns a disposer for symmetric tear-down. Detaching
during dispatch is safe (siblings still fire). Adding to a disposed
bus is a no-op.

Pure additive: existing call sites continue to work without
modification, and the new `BusObserver` typedef is exported via the
existing `application` barrel.

11 tests cover the observer dispatch ordering, dispose semantics,
self-detaching observers, tag preservation, and disposed-bus
safety.

Foundation for an upcoming Bus Inspector debug surface that needs
to record every state commit on a per-thread basis without forcing
a bus dependency on the agent layer's diagnostics.
Adds two optional observer hooks at the runtime layer for
diagnostics consumers (e.g. an in-app Bus Inspector) without
coupling the agent to any specific consumer:

- `ThreadBusObserver` — receives every per-thread `StateBus`
  commit with the originating `ThreadKey`. Wired into the bus the
  first time the runtime materialises a `ThreadState`. Supports
  the `tag:` argument added in the StateBus observer PR so seed
  paths and other writers label their commits.
- `ThreadEventObserver` — receives every raw AG-UI `BaseEvent`
  paired with the `ThreadKey` it was processed on. Forwarded from
  `AgentSession._bridgeBaseEvent` via a package-private
  `notifyThreadEvent` helper. The runtime makes no behavioural
  decision on the event; it only fans out.

Tags `seedThreadState` and `seedThreadHistory` writes as
`seed.initial` and `seed.history` respectively so consumers can
distinguish initial state restoration from in-session updates.

Bus writes inside `AgentSession._onStateChange` remain untagged at
this layer — diagnostics consumers correlate them with the most
recent AG-UI event observed via `eventObserver` to infer whether
the commit came from a `StateSnapshotEvent`, `StateDeltaEvent`,
or a non-state run-state transition. This keeps the agent
diagnostics-agnostic.

Test exercises the bus-observer wiring end-to-end via the seed
APIs and verifies the per-thread context (key + tag + snapshot)
is delivered to the observer.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant