Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@tangle-network/agent-runtime",
"version": "0.14.0",
"version": "0.14.1",
"description": "Reusable runtime lifecycle for domain-specific agents.",
"homepage": "https://github.com/tangle-network/agent-runtime#readme",
"repository": {
Expand Down
106 changes: 90 additions & 16 deletions src/backends.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ export interface BackendRetryPolicy {
jitter?: number
/** Status codes that trigger a retry. Default: 408, 425, 429, 500, 502, 503, 504. */
retryStatuses?: ReadonlyArray<number>
/**
* Per-attempt wall-clock deadline in ms. If a single fetch attempt does
* not return headers within this window the attempt is aborted and
* retried. Default 120000 (2 min). Without this a hung upstream blocks
* the attempt indefinitely — observed in production as a 15-minute
* `fetch failed` that burned an entire eval persona. Set to 0 to disable.
*/
requestTimeoutMs?: number
}

const DEFAULT_RETRY_STATUSES = [408, 425, 429, 500, 502, 503, 504] as const
Expand All @@ -105,6 +113,41 @@ function pickRetryDelayMs(attempt: number, policy: Required<BackendRetryPolicy>)
return Math.max(0, Math.round(capped + jitter))
}

/**
* Derive a per-attempt AbortSignal that fires when EITHER the caller's
* signal aborts OR `timeoutMs` elapses. `dispose()` clears the timer so a
* completed attempt doesn't leak a pending timeout. `timeoutMs <= 0`
* disables the deadline (caller signal still propagates).
*/
function withTimeout(
callerSignal: AbortSignal | undefined,
timeoutMs: number,
): { signal: AbortSignal; dispose: () => void } {
if (timeoutMs <= 0) {
return { signal: callerSignal ?? new AbortController().signal, dispose: () => undefined }
}
const controller = new AbortController()
const timer = setTimeout(
() => controller.abort(new Error(`request timed out after ${timeoutMs}ms`)),
timeoutMs,
)
if (typeof (timer as { unref?: () => void }).unref === 'function') {
;(timer as { unref: () => void }).unref()
}
const onCallerAbort = () => controller.abort(callerSignal?.reason ?? new Error('aborted'))
if (callerSignal) {
if (callerSignal.aborted) onCallerAbort()
else callerSignal.addEventListener('abort', onCallerAbort, { once: true })
}
return {
signal: controller.signal,
dispose: () => {
clearTimeout(timer)
callerSignal?.removeEventListener('abort', onCallerAbort)
},
}
}

function sleep(ms: number, signal?: AbortSignal): Promise<void> {
return new Promise((resolve, reject) => {
if (signal?.aborted) {
Expand Down Expand Up @@ -141,31 +184,54 @@ export function createOpenAICompatibleBackend<
maxBackoffMs: options.retry?.maxBackoffMs ?? 30000,
jitter: options.retry?.jitter ?? 0.25,
retryStatuses: options.retry?.retryStatuses ?? DEFAULT_RETRY_STATUSES,
requestTimeoutMs: options.retry?.requestTimeoutMs ?? 120_000,
}
return {
kind,
start(_input, context) {
return newRuntimeSession(kind, context.requestedSessionId)
},
async *stream(input, context) {
const url = `${options.baseUrl.replace(/\/$/, '')}/chat/completions`
const requestBody = JSON.stringify({
model: options.model,
stream: true,
messages: input.messages ?? [
{ role: 'user', content: input.message ?? context.task.intent },
],
})
let response: Response | undefined
let lastStatus = 0
// The last thrown transport error (timeout abort, DNS / connection
// failure). Network throws are retryable just like 5xx — without this
// a `fetch failed` propagated immediately and burned the attempt.
let lastThrown: unknown
for (let attempt = 1; attempt <= retryPolicy.maxAttempts; attempt++) {
response = await fetcher(`${options.baseUrl.replace(/\/$/, '')}/chat/completions`, {
method: 'POST',
headers: {
Authorization: `Bearer ${options.apiKey}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
model: options.model,
stream: true,
messages: input.messages ?? [
{ role: 'user', content: input.message ?? context.task.intent },
],
}),
signal: context.signal,
})
lastThrown = undefined
// Per-attempt deadline: abort a hung upstream instead of waiting
// forever. Linked to context.signal so a caller cancel still wins.
const attemptSignal = withTimeout(context.signal, retryPolicy.requestTimeoutMs)
try {
response = await fetcher(url, {
method: 'POST',
headers: {
Authorization: `Bearer ${options.apiKey}`,
'Content-Type': 'application/json',
},
body: requestBody,
signal: attemptSignal.signal,
})
} catch (err) {
attemptSignal.dispose()
// A caller-initiated abort is terminal — do not retry it.
if (context.signal?.aborted) throw err
lastThrown = err
response = undefined
if (attempt === retryPolicy.maxAttempts) break
await sleep(pickRetryDelayMs(attempt, retryPolicy), context.signal)
continue
}
attemptSignal.dispose()
if (response.ok) break
lastStatus = response.status
if (!retryPolicy.retryStatuses.includes(response.status)) break
Expand All @@ -179,7 +245,15 @@ export function createOpenAICompatibleBackend<
const delayMs = pickRetryDelayMs(attempt, retryPolicy)
await sleep(delayMs, context.signal)
}
if (!response || !response.ok) {
if (!response) {
const reason = lastThrown instanceof Error ? lastThrown.message : String(lastThrown)
throw new BackendTransportError(
kind,
`chat backend unreachable after ${retryPolicy.maxAttempts} attempts: ${reason}`,
{ status: 0 },
)
}
if (!response.ok) {
throw new BackendTransportError(kind, `chat backend returned ${lastStatus || 'unknown'}`, {
status: lastStatus || 0,
})
Expand Down
88 changes: 88 additions & 0 deletions tests/runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,94 @@ describe('runAgentTask', () => {
expect(events.at(-1)).toMatchObject({ type: 'final', status: 'completed', text: 'hello' })
})

it('retries a thrown fetch error and succeeds on a later attempt', async () => {
let calls = 0
const backend = createOpenAICompatibleBackend({
apiKey: 'sk-test',
baseUrl: 'https://router.example/v1',
model: 'model-a',
retry: { initialBackoffMs: 1, maxBackoffMs: 2 },
fetchImpl: async () => {
calls += 1
// First two attempts throw a network error (the `fetch failed`
// shape); the third returns a real stream.
if (calls < 3) throw new TypeError('fetch failed')
return new Response('data: {"choices":[{"delta":{"content":"ok"}}]}\n\ndata: [DONE]\n\n', {
status: 200,
})
},
})
const events = await collect(
runAgentTaskStream({
task: { id: 'retry-task', intent: 'go', requiredKnowledge: [readyReq] },
backend,
input: { message: 'hi' },
}),
)
expect(calls).toBe(3)
expect(events.at(-1)).toMatchObject({ type: 'final', status: 'completed', text: 'ok' })
})

it('aborts a hung attempt via the per-attempt timeout, then retries', async () => {
let calls = 0
const backend = createOpenAICompatibleBackend({
apiKey: 'sk-test',
baseUrl: 'https://router.example/v1',
model: 'model-a',
retry: { initialBackoffMs: 1, maxBackoffMs: 2, requestTimeoutMs: 30 },
fetchImpl: (_url, init) => {
calls += 1
// First attempt hangs until its per-attempt signal aborts; the
// second returns immediately.
if (calls === 1) {
return new Promise((_resolve, reject) => {
const signal = (init as RequestInit | undefined)?.signal
signal?.addEventListener('abort', () => reject(signal.reason ?? new Error('aborted')))
})
}
return Promise.resolve(
new Response(
'data: {"choices":[{"delta":{"content":"recovered"}}]}\n\ndata: [DONE]\n\n',
{
status: 200,
},
),
)
},
})
const events = await collect(
runAgentTaskStream({
task: { id: 'timeout-task', intent: 'go', requiredKnowledge: [readyReq] },
backend,
input: { message: 'hi' },
}),
)
expect(calls).toBe(2)
expect(events.at(-1)).toMatchObject({ type: 'final', status: 'completed', text: 'recovered' })
})

it('throws BackendTransportError when every attempt throws', async () => {
const backend = createOpenAICompatibleBackend({
apiKey: 'sk-test',
baseUrl: 'https://router.example/v1',
model: 'model-a',
retry: { maxAttempts: 2, initialBackoffMs: 1, maxBackoffMs: 2 },
fetchImpl: async () => {
throw new TypeError('fetch failed')
},
})
const events = await collect(
runAgentTaskStream({
task: { id: 'dead-task', intent: 'go', requiredKnowledge: [readyReq] },
backend,
input: { message: 'hi' },
}),
)
const final = events.at(-1)
expect(final).toMatchObject({ type: 'final' })
expect(final?.type === 'final' && final.status).not.toBe('completed')
})

it('stops a backend and emits failed final event when streaming throws', async () => {
const store = new InMemoryRuntimeSessionStore()
const stopped: string[] = []
Expand Down
Loading