Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@tangle-network/agent-runtime",
"version": "0.12.0",
"version": "0.12.1",
"description": "Reusable runtime lifecycle for domain-specific agents.",
"homepage": "https://github.com/tangle-network/agent-runtime#readme",
"repository": {
Expand Down
113 changes: 95 additions & 18 deletions src/backends.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,59 @@ export function createSandboxPromptBackend<
}

/** @stable */
/**
* Retry policy for transient transport errors (rate limits, upstream
* timeouts). Defaults to 5 attempts with exponential backoff starting at
* 1s, ±25% jitter, capped at 30s. Set `maxAttempts: 1` to disable retries.
*
* Retried status codes:
* - 408 Request Timeout
* - 425 Too Early
* - 429 Too Many Requests
* - 500 / 502 / 503 / 504 — upstream transient failures
*
* Hard failures (401, 403, 4xx other than the above) propagate immediately.
*/
export interface BackendRetryPolicy {
/** Total attempts including the first try. Default 5. */
maxAttempts?: number
/** Initial backoff in ms before the second attempt. Default 1000. */
initialBackoffMs?: number
/** Hard ceiling on backoff in ms. Default 30000. */
maxBackoffMs?: number
/** Jitter fraction in [0, 1]. Default 0.25 (±25%). */
jitter?: number
/** Status codes that trigger a retry. Default: 408, 425, 429, 500, 502, 503, 504. */
retryStatuses?: ReadonlyArray<number>
}

const DEFAULT_RETRY_STATUSES = [408, 425, 429, 500, 502, 503, 504] as const

function pickRetryDelayMs(attempt: number, policy: Required<BackendRetryPolicy>): number {
const exp = policy.initialBackoffMs * 2 ** (attempt - 1)
const capped = Math.min(exp, policy.maxBackoffMs)
const jitter = capped * policy.jitter * (Math.random() * 2 - 1)
return Math.max(0, Math.round(capped + jitter))
}

function sleep(ms: number, signal?: AbortSignal): Promise<void> {
return new Promise((resolve, reject) => {
if (signal?.aborted) {
reject(signal.reason ?? new Error('aborted'))
return
}
const t = setTimeout(() => {
signal?.removeEventListener('abort', onAbort)
resolve()
}, ms)
const onAbort = () => {
clearTimeout(t)
reject(signal?.reason ?? new Error('aborted'))
}
signal?.addEventListener('abort', onAbort, { once: true })
})
}

export function createOpenAICompatibleBackend<
TInput extends AgentBackendInput = AgentBackendInput,
>(options: {
Expand All @@ -78,33 +131,57 @@ export function createOpenAICompatibleBackend<
model: string
kind?: string
fetchImpl?: typeof fetch
retry?: BackendRetryPolicy
}): AgentExecutionBackend<TInput> {
const fetcher = options.fetchImpl ?? fetch
const kind = options.kind ?? 'tcloud'
const retryPolicy: Required<BackendRetryPolicy> = {
maxAttempts: options.retry?.maxAttempts ?? 5,
initialBackoffMs: options.retry?.initialBackoffMs ?? 1000,
maxBackoffMs: options.retry?.maxBackoffMs ?? 30000,
jitter: options.retry?.jitter ?? 0.25,
retryStatuses: options.retry?.retryStatuses ?? DEFAULT_RETRY_STATUSES,
}
return {
kind,
start(_input, context) {
return newRuntimeSession(kind, context.requestedSessionId)
},
async *stream(input, context) {
const response = await fetcher(`${options.baseUrl.replace(/\/$/, '')}/chat/completions`, {
method: 'POST',
headers: {
Authorization: `Bearer ${options.apiKey}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
model: options.model,
stream: true,
messages: input.messages ?? [
{ role: 'user', content: input.message ?? context.task.intent },
],
}),
signal: context.signal,
})
if (!response.ok) {
throw new BackendTransportError(kind, `chat backend returned ${response.status}`, {
status: response.status,
let response: Response | undefined
let lastStatus = 0
for (let attempt = 1; attempt <= retryPolicy.maxAttempts; attempt++) {
response = await fetcher(`${options.baseUrl.replace(/\/$/, '')}/chat/completions`, {
method: 'POST',
headers: {
Authorization: `Bearer ${options.apiKey}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
model: options.model,
stream: true,
messages: input.messages ?? [
{ role: 'user', content: input.message ?? context.task.intent },
],
}),
signal: context.signal,
})
if (response.ok) break
lastStatus = response.status
if (!retryPolicy.retryStatuses.includes(response.status)) break
if (attempt === retryPolicy.maxAttempts) break
// Drain the failed body so the connection can be reused.
try {
await response.body?.cancel()
} catch {
// Best-effort — some runtimes don't expose cancel.
}
const delayMs = pickRetryDelayMs(attempt, retryPolicy)
await sleep(delayMs, context.signal)
}
if (!response || !response.ok) {
throw new BackendTransportError(kind, `chat backend returned ${lastStatus || 'unknown'}`, {
status: lastStatus || 0,
})
}
yield* streamResponseEvents(response, context)
Expand Down
Loading