Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions packages/opencode/src/plugin/codex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,16 @@ interface CodexAuthPluginOptions {
codexApiEndpoint?: string
}

async function exchangeCodeForTokens(code: string, redirectUri: string, pkce: PkceCodes): Promise<TokenResponse> {
const response = await fetch(`${ISSUER}/oauth/token`, {
async function exchangeCodeForTokens(
code: string,
redirectUri: string,
pkce: PkceCodes,
issuer = ISSUER,
signal?: AbortSignal,
): Promise<TokenResponse> {
const response = await fetch(`${issuer}/oauth/token`, {
method: "POST",
signal,
headers: { "Content-Type": "application/x-www-form-urlencoded" },
body: new URLSearchParams({
grant_type: "authorization_code",
Expand All @@ -140,9 +147,10 @@ async function exchangeCodeForTokens(code: string, redirectUri: string, pkce: Pk
return response.json()
}

async function refreshAccessToken(refreshToken: string, issuer = ISSUER): Promise<TokenResponse> {
async function refreshAccessToken(refreshToken: string, issuer = ISSUER, signal?: AbortSignal): Promise<TokenResponse> {
const response = await fetch(`${issuer}/oauth/token`, {
method: "POST",
signal,
headers: { "Content-Type": "application/x-www-form-urlencoded" },
body: new URLSearchParams({
grant_type: "refresh_token",
Expand Down Expand Up @@ -249,6 +257,7 @@ const HTML_ERROR = (error: string) => `<!doctype html>
interface PendingOAuth {
pkce: PkceCodes
state: string
issuer: string
resolve: (tokens: TokenResponse) => void
reject: (error: Error) => void
}
Expand Down Expand Up @@ -300,7 +309,7 @@ async function startOAuthServer(): Promise<{ port: number; redirectUri: string }
const current = pendingOAuth
pendingOAuth = undefined

exchangeCodeForTokens(code, `http://localhost:${OAUTH_PORT}/auth/callback`, current.pkce)
exchangeCodeForTokens(code, `http://localhost:${OAUTH_PORT}/auth/callback`, current.pkce, current.issuer)
.then((tokens) => current.resolve(tokens))
.catch((err) => current.reject(err))

Expand Down Expand Up @@ -341,7 +350,7 @@ function stopOAuthServer() {
}
}

function waitForOAuthCallback(pkce: PkceCodes, state: string): Promise<TokenResponse> {
function waitForOAuthCallback(pkce: PkceCodes, state: string, issuer: string): Promise<TokenResponse> {
return new Promise((resolve, reject) => {
const timeout = setTimeout(
() => {
Expand All @@ -356,6 +365,7 @@ function waitForOAuthCallback(pkce: PkceCodes, state: string): Promise<TokenResp
pendingOAuth = {
pkce,
state,
issuer,
resolve: (tokens) => {
clearTimeout(timeout)
resolve(tokens)
Expand Down Expand Up @@ -445,7 +455,7 @@ export async function CodexAuthPlugin(input: PluginInput, options: CodexAuthPlug
if (!currentAuth.access || currentAuth.expires < Date.now()) {
if (!refreshPromise) {
log.info("refreshing codex access token")
refreshPromise = refreshAccessToken(currentAuth.refresh, issuer)
refreshPromise = refreshAccessToken(currentAuth.refresh, issuer, init?.signal ?? undefined)
.then(async (tokens) => {
const accountId = extractAccountId(tokens) || authWithAccount.accountId
await input.client.auth.set({
Expand Down Expand Up @@ -522,9 +532,9 @@ export async function CodexAuthPlugin(input: PluginInput, options: CodexAuthPlug
const { redirectUri } = await startOAuthServer()
const pkce = await generatePKCE()
const state = generateState()
const authUrl = buildAuthorizeUrl(redirectUri, pkce, state)
const authUrl = buildAuthorizeUrl(redirectUri, pkce, state).replace(ISSUER, issuer)

const callbackPromise = waitForOAuthCallback(pkce, state)
const callbackPromise = waitForOAuthCallback(pkce, state, issuer)

return {
url: authUrl,
Expand Down
2 changes: 1 addition & 1 deletion packages/opencode/src/provider/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ function isOpenAiErrorRetryable(e: APICallError) {
const status = e.statusCode
if (!status) return e.isRetryable
// openai sometimes returns 404 for models that are actually available
return status === 404 || e.isRetryable
return status === 404 || status === 429 || status === 502 || status === 503 || status === 504 || e.isRetryable
}

// Providers not reliably handled in this function:
Expand Down
47 changes: 33 additions & 14 deletions packages/opencode/src/provider/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { ModelStatus } from "./model-status"
import { RuntimeFlags } from "@/effect/runtime-flags"

const log = Log.create({ service: "provider" })
export const REQUEST_TIMEOUT = 300_000

function shouldUseCopilotResponsesApi(modelID: string): boolean {
const match = /^gpt-(\d+)/.exec(modelID)
Expand Down Expand Up @@ -1603,16 +1604,28 @@ export const layer = Layer.effect(
const chunkTimeout = options["chunkTimeout"]
delete options["chunkTimeout"]

options["fetch"] = async (input: any, init?: BunFetchRequestInit) => {
const fetchFn = customFetch ?? fetch
const opts = init ?? {}
options["fetch"] = async (input: URL | RequestInfo, init?: BunFetchRequestInit) => {
const fetchFn = (customFetch ?? fetch) as typeof fetch
const opts = { ...(init ?? {}) }
const signal = opts.signal
const chunkAbortCtl = typeof chunkTimeout === "number" && chunkTimeout > 0 ? new AbortController() : undefined
const timeout = options["timeout"] === false ? undefined : (options["timeout"] ?? REQUEST_TIMEOUT)
const timeoutAbortCtl = timeout === undefined ? undefined : new AbortController()
const timeoutID =
timeout === undefined || !timeoutAbortCtl
? undefined
: setTimeout(
timeoutAbortCtl.abort.bind(
timeoutAbortCtl,
new DOMException(`Provider request timed out after ${timeout}ms`, "TimeoutError"),
),
timeout,
)
const signals: AbortSignal[] = []

if (opts.signal) signals.push(opts.signal)
if (signal) signals.push(signal)
if (chunkAbortCtl) signals.push(chunkAbortCtl.signal)
if (options["timeout"] !== undefined && options["timeout"] !== null && options["timeout"] !== false)
signals.push(AbortSignal.timeout(options["timeout"]))
if (timeoutAbortCtl) signals.push(timeoutAbortCtl.signal)

const combined = signals.length === 0 ? null : signals.length === 1 ? signals[0] : AbortSignal.any(signals)
if (combined) opts.signal = combined
Expand All @@ -1635,14 +1648,20 @@ export const layer = Layer.effect(
}
}

const res = await fetchFn(input, {
...opts,
// @ts-ignore see here: https://github.com/oven-sh/bun/issues/16682
timeout: false,
})

if (!chunkAbortCtl) return res
return wrapSSE(res, chunkTimeout, chunkAbortCtl)
try {
const res = await fetchFn(input, {
...opts,
// @ts-expect-error see here: https://github.com/oven-sh/bun/issues/16682
timeout: false,
})
if (!chunkAbortCtl) return res
return wrapSSE(res, chunkTimeout, chunkAbortCtl)
} catch (e) {
if (signal?.aborted) throw signal.reason ?? e
throw e
} finally {
if (timeoutID) clearTimeout(timeoutID)
}
}

const bundledLoader = BUNDLED_PROVIDERS[model.api.npm]
Expand Down
13 changes: 13 additions & 0 deletions packages/opencode/src/session/message-v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,19 @@ export function fromError(
cause: e,
},
).toObject()
case e instanceof DOMException && e.name === "TimeoutError":
return new APIError(
{
message: e.message || "Provider request timed out",
isRetryable: true,
metadata: {
name: e.name,
},
},
{
cause: e,
},
).toObject()
case OutputLengthError.isInstance(e):
return e
case LoadAPIKeyError.isInstance(e):
Expand Down
77 changes: 74 additions & 3 deletions packages/opencode/src/session/processor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Image } from "@/image/image"
import { Cause, Deferred, Effect, Exit, Layer, Context, Scope, Schema } from "effect"
import { Cause, Deferred, Effect, Exit, Layer, Context, Schedule, Scope, Schema } from "effect"
import * as Stream from "effect/Stream"
import { Agent } from "@/agent/agent"
import { Bus } from "@/bus"
Expand Down Expand Up @@ -31,6 +31,9 @@ import { Usage, type LLMEvent } from "@opencode-ai/llm"

const DOOM_LOOP_THRESHOLD = 3
const log = Log.create({ service: "session.processor" })
export const RETRY_MAX = 3
export const STREAM_FIRST_BYTE_TIMEOUT = 30_000
export const STREAM_IDLE_TIMEOUT = 120_000

export type Result = "compact" | "stop" | "continue"

Expand All @@ -56,6 +59,12 @@ type Input = {
assistantMessage: MessageV2.Assistant
sessionID: SessionID
model: Provider.Model
abort?: AbortSignal
retryMax?: number
timeout?: {
firstByte?: number
idle?: number
}
}

export interface Interface {
Expand All @@ -82,6 +91,64 @@ interface ProcessorContext extends Input {

type StreamEvent = LLMEvent

async function next<T>(input: {
iterator: AsyncIterator<T>
abort?: AbortSignal
timeout: number
message: string
}) {
input.abort?.throwIfAborted()
let id: ReturnType<typeof setTimeout> | undefined
let listener: (() => void) | undefined
const timeout = new Promise<never>((_, reject) => {
id = setTimeout(() => reject(new DOMException(input.message, "TimeoutError")), input.timeout)
listener = () => reject(input.abort?.reason ?? new DOMException("Aborted", "AbortError"))
input.abort?.addEventListener("abort", listener, { once: true })
})
try {
return await Promise.race([input.iterator.next(), timeout])
} finally {
if (id) clearTimeout(id)
if (listener) input.abort?.removeEventListener("abort", listener)
}
}

function watchStream<T>(
stream: Stream.Stream<T, unknown>,
input: {
abort?: AbortSignal
firstByte: number
idle: number
},
) {
return Stream.fromAsyncIterable(
{
async *[Symbol.asyncIterator]() {
const iterator = Stream.toAsyncIterable(stream)[Symbol.asyncIterator]()
let started = false
try {
while (true) {
const result = await next({
iterator,
abort: input.abort,
timeout: started ? input.idle : input.firstByte,
message: started
? `Stream idle timed out after ${input.idle}ms`
: `Stream first byte timed out after ${input.firstByte}ms`,
})
if (result.done) return
started = true
yield result.value
}
} finally {
await iterator.return?.()
}
},
},
(error) => error,
)
}

export class Service extends Context.Service<Service, Interface>()("@opencode/SessionProcessor") {}

export const layer = Layer.effect(
Expand Down Expand Up @@ -787,7 +854,11 @@ export const layer = Layer.effect(
ctx.currentText = undefined
ctx.reasoningMap = {}
yield* status.set(ctx.sessionID, { type: "busy" })
const stream = llm.stream(streamInput)
const stream = watchStream(llm.stream(streamInput), {
abort: input.abort,
firstByte: input.timeout?.firstByte ?? STREAM_FIRST_BYTE_TIMEOUT,
idle: input.timeout?.idle ?? STREAM_IDLE_TIMEOUT,
})

yield* stream.pipe(
Stream.tap((event) => handleEvent(event)),
Expand Down Expand Up @@ -836,7 +907,7 @@ export const layer = Layer.effect(
),
)
},
}),
}).pipe(Schedule.both(Schedule.recurs(input.retryMax ?? RETRY_MAX))),
),
Effect.catch(halt),
Effect.ensuring(cleanup()),
Expand Down
59 changes: 56 additions & 3 deletions packages/opencode/src/session/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export const RETRY_INITIAL_DELAY = 2000
export const RETRY_BACKOFF_FACTOR = 2
export const RETRY_MAX_DELAY_NO_HEADERS = 30_000 // 30 seconds
export const RETRY_MAX_DELAY = 2_147_483_647 // max 32-bit signed integer for setTimeout
const VALUES_MAX_DEPTH = 32

function cap(ms: number) {
return Math.min(ms, RETRY_MAX_DELAY)
Expand Down Expand Up @@ -69,9 +70,6 @@ export function retryable(error: Err, provider: string) {
if (MessageV2.ContextOverflowError.isInstance(error)) return undefined
if (MessageV2.APIError.isInstance(error)) {
const status = error.data.statusCode
// 5xx errors are transient server failures and should always be retried,
// even when the provider SDK doesn't explicitly mark them as retryable.
if (!error.data.isRetryable && !(status !== undefined && status >= 500)) return undefined
if (error.data.responseBody?.includes("FreeUsageLimitError")) {
return {
message: GO_UPSELL_MESSAGE,
Expand Down Expand Up @@ -118,6 +116,17 @@ export function retryable(error: Err, provider: string) {
},
}
}
const transientMessage = transient({
statusCode: status,
message: error.data.message,
body: parseJSON(error.data.responseBody),
metadata: error.data.metadata,
})
if (transientMessage) return { message: transientMessage }

// 5xx errors are transient server failures and should always be retried,
// even when the provider SDK doesn't explicitly mark them as retryable.
if (!error.data.isRetryable && !(status !== undefined && status >= 500)) return undefined
return { message: error.data.message.includes("Overloaded") ? "Provider is overloaded" : error.data.message }
}

Expand All @@ -136,6 +145,8 @@ export function retryable(error: Err, provider: string) {

const json = parseJSON(msg)
if (!json || typeof json !== "object") return undefined
const transientMessage = transient(json)
if (transientMessage) return { message: transientMessage }
const code = typeof json.code === "string" ? json.code : ""

if (json.type === "error" && json.error?.type === "too_many_requests") {
Expand All @@ -161,6 +172,48 @@ function num(value: unknown) {
return parsed
}

function values(input: unknown, seen = new WeakSet<object>(), depth = 0): string[] {
if (input === undefined || input === null) return []
if (typeof input === "string" || typeof input === "number") return [String(input).toLowerCase()]
if (typeof input !== "object") return []
if (depth >= VALUES_MAX_DEPTH || seen.has(input)) return []
seen.add(input)
if (Array.isArray(input)) return input.flatMap((value) => values(value, seen, depth + 1))
return Object.values(input).flatMap((value) => values(value, seen, depth + 1))
}

function transient(input: unknown) {
const all = values(input)
if (
all.some(
(value) =>
value.includes("context_length") ||
value.includes("context window") ||
value.includes("insufficient_quota") ||
value.includes("usage_not_included") ||
value.includes("invalid_prompt"),
)
)
return undefined
if (all.some((value) => value.includes("too_many_requests"))) return "Too Many Requests"
if (all.some((value) => value === "429" || value.includes("rate_limit"))) return "Rate Limited"
if (
all.some(
(value) =>
value === "502" ||
value === "503" ||
value === "504" ||
value.includes("gateway timeout") ||
value.includes("upstream timeout") ||
value.includes("unavailable") ||
value.includes("overloaded") ||
value.includes("exhausted"),
)
)
return "Provider is overloaded"
return undefined
}

function parseJSON(value: unknown) {
return iife(() => {
try {
Expand Down
Loading
Loading