Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion clients/python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "agent-eval-rpc"
version = "0.28.0"
version = "0.29.0"
description = "Python RPC client for @tangle-network/agent-eval — judge content against rubrics over HTTP or stdio RPC. Eval logic runs in the Node runtime; this package is a thin wire client."
readme = "README.md"
requires-python = ">=3.10"
Expand Down
2 changes: 1 addition & 1 deletion clients/python/src/agent_eval_rpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
try:
__version__ = version("agent-eval-rpc")
except PackageNotFoundError:
__version__ = "0.28.0"
__version__ = "0.29.0"

__all__ = [
"Client",
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@tangle-network/agent-eval",
"version": "0.28.0",
"version": "0.29.0",
"description": "Substrate for self-improving agents: traces, verifiable rewards, preferences, GEPA / reflective mutation, auto-research, replay, sequential anytime-valid stats, and release gates.",
"homepage": "https://github.com/tangle-network/agent-eval#readme",
"repository": {
Expand Down
167 changes: 164 additions & 3 deletions src/analyst/analyst.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,9 @@ describe('AnalystHooks', () => {
// 1 from converted error + 1 from ok analyst
expect(result.findings).toHaveLength(2)
const byId = Object.fromEntries(result.per_analyst.map((s) => [s.analyst_id, s]))
expect(byId['a']?.status).toBe('failed')
expect(byId['a']?.findings_count).toBe(1) // surfaced from hook
expect(byId['b']?.status).toBe('ok')
expect(byId.a?.status).toBe('failed')
expect(byId.a?.findings_count).toBe(1) // surfaced from hook
expect(byId.b?.status).toBe('ok')
})

it('onAfter runs for skipped analysts too', async () => {
Expand Down Expand Up @@ -493,6 +493,167 @@ describe('diffFindings policy', () => {
})
})

describe('AnalystRegistry.runStream', () => {
function makeOkAnalyst(id: string, findings: AnalystFinding[]): Analyst {
return {
id,
description: `ok ${id}`,
inputKind: 'custom',
cost: { kind: 'deterministic' },
version: '1.0.0',
analyze: async () => findings,
} as never
}

function makeFailingAnalyst(id: string, error: Error): Analyst {
return {
id,
description: `failing ${id}`,
inputKind: 'custom',
cost: { kind: 'deterministic' },
version: '1.0.0',
analyze: async () => {
throw error
},
} as never
}

function makeMissingInputAnalyst(id: string): Analyst {
return {
id,
description: `needs trace-store ${id}`,
inputKind: 'trace-store',
cost: { kind: 'deterministic' },
version: '1.0.0',
analyze: async () => [],
} as never
}

function mkFinding(id: string, analystId: string): AnalystFinding {
return makeFinding({
analyst_id: analystId,
area: analystId,
claim: id,
severity: 'low',
confidence: 0.5,
evidence_refs: [],
})
}

const inputs: AnalystRunInputs = {
custom: { a: 1, b: 2, c: 3, boom: 1, after: 1, 'needs-trace': 1 },
}

it('emits run-started → analyst-started → analyst-completed → run-completed for a clean run', async () => {
const r = new AnalystRegistry()
r.register(makeOkAnalyst('a', [mkFinding('f1', 'a'), mkFinding('f2', 'a')]))
r.register(makeOkAnalyst('b', [mkFinding('g1', 'b')]))

const events: import('./types').AnalystRunEvent[] = []
for await (const ev of r.runStream('run-1', inputs)) events.push(ev)

expect(events.map((e) => e.type)).toEqual([
'run-started',
'analyst-started',
'analyst-completed',
'analyst-started',
'analyst-completed',
'run-completed',
])

const runStarted = events[0]
if (runStarted?.type !== 'run-started') throw new Error('order invariant')
expect(runStarted.analyst_ids).toEqual(['a', 'b'])
expect(runStarted.run_id).toBe('run-1')

const completedA = events[2]
if (completedA?.type !== 'analyst-completed') throw new Error('order invariant')
expect(completedA.summary.analyst_id).toBe('a')
expect(completedA.summary.status).toBe('ok')
expect(completedA.findings).toHaveLength(2)

const runCompleted = events[5]
if (runCompleted?.type !== 'run-completed') throw new Error('order invariant')
expect(runCompleted.result.findings).toHaveLength(3)
expect(runCompleted.result.per_analyst.map((s) => s.analyst_id)).toEqual(['a', 'b'])
})

it('emits analyst-skipped instead of analyst-started when input is missing', async () => {
const r = new AnalystRegistry()
r.register(makeMissingInputAnalyst('needs-trace'))

const events: import('./types').AnalystRunEvent[] = []
for await (const ev of r.runStream('run-1', inputs)) events.push(ev)

expect(events.map((e) => e.type)).toEqual(['run-started', 'analyst-skipped', 'run-completed'])
const skipped = events[1]
if (skipped?.type !== 'analyst-skipped') throw new Error('order invariant')
expect(skipped.summary.status).toBe('skipped')
expect(skipped.summary.reason).toMatch(/missing input/)
})

it('emits analyst-completed with status=failed when the analyst throws; siblings still run', async () => {
const r = new AnalystRegistry()
r.register(makeFailingAnalyst('boom', new TypeError('synthetic')))
r.register(makeOkAnalyst('after', [mkFinding('f1', 'after')]))

const events: import('./types').AnalystRunEvent[] = []
for await (const ev of r.runStream('run-1', inputs)) events.push(ev)

const failedEv = events.find(
(e) => e.type === 'analyst-completed' && e.summary.analyst_id === 'boom',
)
if (!failedEv || failedEv.type !== 'analyst-completed') throw new Error('expected failed event')
expect(failedEv.summary.status).toBe('failed')
expect(failedEv.summary.error?.class).toBe('TypeError')

const afterEv = events.find(
(e) => e.type === 'analyst-completed' && e.summary.analyst_id === 'after',
)
if (!afterEv || afterEv.type !== 'analyst-completed') throw new Error('expected after event')
expect(afterEv.summary.status).toBe('ok')
})

it('run() returns the same envelope as the run-completed event from runStream()', async () => {
const r = new AnalystRegistry()
r.register(makeOkAnalyst('a', [mkFinding('f1', 'a')]))

const result = await r.run('run-1', inputs)

const r2 = new AnalystRegistry()
r2.register(makeOkAnalyst('a', [mkFinding('f1', 'a')]))
let streamResult: import('./types').AnalystRunResult | undefined
for await (const ev of r2.runStream('run-1', inputs)) {
if (ev.type === 'run-completed') streamResult = ev.result
}

expect(result.findings.map((f) => f.finding_id)).toEqual(
streamResult?.findings.map((f) => f.finding_id),
)
expect(result.per_analyst).toEqual(streamResult?.per_analyst)
})

it('honours backpressure: slow consumer between events preserves ordering', async () => {
const r = new AnalystRegistry()
r.register(makeOkAnalyst('a', [mkFinding('f1', 'a')]))
r.register(makeOkAnalyst('b', [mkFinding('g1', 'b')]))

const events: import('./types').AnalystRunEvent[] = []
for await (const ev of r.runStream('run-1', inputs)) {
events.push(ev)
await new Promise((r) => setTimeout(r, 1))
}
expect(events.map((e) => e.type)).toEqual([
'run-started',
'analyst-started',
'analyst-completed',
'analyst-started',
'analyst-completed',
'run-completed',
])
})
})

describe('RegistryRunOpts.priorFindings forwarding', () => {
function makeRecordingAnalyst(id: string): Analyst & {
seen: Array<ReadonlyArray<AnalystFinding> | undefined>
Expand Down
43 changes: 42 additions & 1 deletion src/analyst/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import type {
Analyst,
AnalystContext,
AnalystFinding,
AnalystRunEvent,
AnalystRunInputs,
AnalystRunResult,
AnalystRunSummary,
Expand Down Expand Up @@ -148,6 +149,30 @@ export class AnalystRegistry {
inputs: AnalystRunInputs,
runOpts: RegistryRunOpts = {},
): Promise<AnalystRunResult> {
// Thin collector over `runStream`. Both surfaces share the same
// loop body so they cannot drift on isolation / hook order / cost.
for await (const ev of this.runStream(runId, inputs, runOpts)) {
if (ev.type === 'run-completed') return ev.result
}
throw new Error('AnalystRegistry.run: stream completed without run-completed event')
}

/**
* Streaming counterpart to `run()`. Emits `AnalystRunEvent` values
* in real time — `run-started`, then per-analyst `skipped` /
* `started` / `completed`, then a terminal `run-completed` whose
* payload is the full `AnalystRunResult`. UIs use this to render
* progress; persistence consumers use `run()` and read the result.
*
* Hooks (`onBeforeAnalyze` / `onAfterAnalyze` / `onError` /
* `onComplete`) fire as before — streaming is additive, not a hook
* replacement.
*/
async *runStream(
runId: string,
inputs: AnalystRunInputs,
runOpts: RegistryRunOpts = {},
): AsyncGenerator<AnalystRunEvent, void, void> {
const correlationId = `ar_${randomUUID().slice(0, 12)}`
const log = this.options.log ?? (() => {})
const hooks = this.options.hooks ?? {}
Expand All @@ -158,6 +183,14 @@ export class AnalystRegistry {
const selected = this.selectAnalysts(runOpts)
const budget = runOpts.budget ?? this.options.defaultBudget

yield {
type: 'run-started',
run_id: runId,
correlation_id: correlationId,
started_at: startedAt,
analyst_ids: selected.map((a) => a.id),
}

const summaries: AnalystRunSummary[] = []
const allFindings: AnalystFinding[] = []
let totalCost = 0
Expand All @@ -178,6 +211,7 @@ export class AnalystRegistry {
summaries.push(summary)
log(`[analyst] skip ${analyst.id} — missing input`, { runId, kind: analyst.inputKind })
await hooks.onAfterAnalyze?.({ analyst, summary, findings: [], runId })
yield { type: 'analyst-skipped', summary }
continue
}

Expand All @@ -200,6 +234,11 @@ export class AnalystRegistry {
}

await hooks.onBeforeAnalyze?.({ analyst, ctx, runId })
yield {
type: 'analyst-started',
analyst_id: analyst.id,
started_at: new Date(t0).toISOString(),
}

try {
const findings = await (analyst as Analyst<unknown>).analyze(input.value, ctx)
Expand All @@ -223,6 +262,7 @@ export class AnalystRegistry {
cost_usd: cost,
})
await hooks.onAfterAnalyze?.({ analyst, summary, findings, runId })
yield { type: 'analyst-completed', summary, findings }
} catch (err) {
const latency = Date.now() - t0
const e = err instanceof Error ? err : new Error(String(err))
Expand All @@ -244,6 +284,7 @@ export class AnalystRegistry {
error: e.message,
})
await hooks.onAfterAnalyze?.({ analyst, summary, findings: hookFindings, runId })
yield { type: 'analyst-completed', summary, findings: hookFindings }
// Continue — isolation invariant.
}
}
Expand All @@ -258,7 +299,7 @@ export class AnalystRegistry {
total_cost_usd: totalCost,
}
await hooks.onComplete?.({ result })
return result
yield { type: 'run-completed', result }
}

private selectAnalysts(opts: RegistryRunOpts): Analyst[] {
Expand Down
44 changes: 44 additions & 0 deletions src/analyst/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,47 @@ export interface AnalystRunResult {
/** Total LLM cost in USD across all analysts in this registry.run(). */
total_cost_usd: number
}

// ── Streaming event envelope ────────────────────────────────────────

/**
* Events emitted by `AnalystRegistry.runStream(...)` in real time as
* the registry executes. UIs subscribe via `for await (const ev of
* registry.runStream(...))`; `registry.run(...)` is a thin collector
* over the same stream, so the two surfaces share their invariants.
*
* Per-finding events are intentionally omitted — analyzers are batch
* operations (an Ax actor returns the full `findings:json[]` at the
* end of the responder), so streaming inside one analyst would only
* emit partial JSON consumers can't render. The kind-completion event
* is the right granularity; subscribers wanting per-finding rendering
* iterate `event.findings` themselves.
*/
export type AnalystRunEvent =
| {
type: 'run-started'
run_id: string
correlation_id: string
started_at: string
/** The ordered list of analyst ids the registry will run. */
analyst_ids: ReadonlyArray<string>
}
| {
type: 'analyst-skipped'
summary: AnalystRunSummary
}
| {
type: 'analyst-started'
analyst_id: string
started_at: string
}
| {
type: 'analyst-completed'
/** `summary.status` is `'ok'` for clean completion or `'failed'` for thrown analysts. */
summary: AnalystRunSummary
findings: ReadonlyArray<AnalystFinding>
}
| {
type: 'run-completed'
result: AnalystRunResult
}
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export type {
AnalystFinding,
AnalystInputKind,
AnalystRequirements,
AnalystRunEvent,
AnalystRunInputs,
AnalystRunResult,
AnalystRunSummary,
Expand Down
Loading