Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/llm/src/protocols/openai-responses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,8 @@ const lowerToolResultOutput = Effect.fn("OpenAIResponses.lowerToolResultOutput")
// Text/json/error results are encoded as a plain string for backward
// compatibility with existing cassettes and provider expectations.
if (part.result.type !== "content") return ProviderShared.toolResultText(part)
return yield* Effect.forEach(part.result.value, lowerToolResultContentItem)
const content: ReadonlyArray<ToolResultContentPart> = part.result.value
return yield* Effect.forEach(content, lowerToolResultContentItem)
})

const lowerMessages = Effect.fn("OpenAIResponses.lowerMessages")(function* (request: LLMRequest) {
Expand Down
46 changes: 9 additions & 37 deletions packages/llm/src/route/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ import {
TransportReason,
UnknownProviderReason,
} from "../schema"
import {
REDACTED,
REDACT_JSON_FIELD,
REDACT_QUERY_FIELD,
isSensitiveHeaderName,
isSensitiveQueryName,
redactHeaders,
redactUrl,
} from "./redaction"

export interface Interface {
readonly execute: (
Expand All @@ -35,43 +44,6 @@ const BODY_LIMIT = 16_384
const MAX_RETRIES = 2
const BASE_DELAY_MS = 500
const MAX_DELAY_MS = 10_000
const REDACTED = "<redacted>"

// One source of truth for what counts as a sensitive name across headers,
// URL query keys, and field names embedded inside request/response bodies.
//
// `SENSITIVE_NAME` is used as both a substring matcher (for free-form header
// names like `Authorization` / `X-API-Key`) and as the body-field alternation
// list. `SHORT_QUERY_NAME` covers anchored short keys like `?key=…` / `?sig=…`
// that are too generic to redact substring-style without false positives.
const SENSITIVE_NAME_SOURCE =
"authorization|api[-_]?key|access[-_]?token|refresh[-_]?token|id[-_]?token|token|secret|credential|signature|x-amz-signature"
const SENSITIVE_NAME = new RegExp(SENSITIVE_NAME_SOURCE, "i")
const SHORT_QUERY_NAME = /^(key|sig)$/i
const SENSITIVE_BODY_FIELD = new RegExp(`(?:${SENSITIVE_NAME_SOURCE}|key)`, "i")
const REDACT_JSON_FIELD = new RegExp(`("(?:${SENSITIVE_BODY_FIELD.source})"\\s*:\\s*)"[^"]*"`, "gi")
const REDACT_QUERY_FIELD = new RegExp(`((?:${SENSITIVE_BODY_FIELD.source})=)[^&\\s"]+`, "gi")

const isSensitiveHeaderName = (name: string) => SENSITIVE_NAME.test(name)

const isSensitiveQueryName = (name: string) => isSensitiveHeaderName(name) || SHORT_QUERY_NAME.test(name)

const redactHeaders = (headers: Headers.Headers, redactedNames: ReadonlyArray<string | RegExp>) =>
Object.fromEntries(
Object.entries(Headers.redact(headers, [...redactedNames, SENSITIVE_NAME])).map(([name, value]) => [
name,
String(value),
]),
)

const redactUrl = (value: string) => {
if (!URL.canParse(value)) return REDACTED
const url = new URL(value)
url.searchParams.forEach((_, key) => {
if (isSensitiveQueryName(key)) url.searchParams.set(key, REDACTED)
})
return url.toString()
}

const normalizedHeaders = (headers: Headers.Headers) =>
Object.fromEntries(Object.entries(headers).map(([key, value]) => [key.toLowerCase(), value]))
Expand Down
40 changes: 40 additions & 0 deletions packages/llm/src/route/redaction.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { Headers } from "effect/unstable/http"

export const REDACTED = "<redacted>"

// One source of truth for what counts as a sensitive name across headers,
// URL query keys, and field names embedded inside request/response bodies.
//
// `SENSITIVE_NAME` is used as both a substring matcher (for free-form header
// names like `Authorization` / `X-API-Key`) and as the body-field alternation
// list. `SHORT_QUERY_NAME` covers anchored short keys like `?key=...` / `?sig=...`
// that are too generic to redact substring-style without false positives.
const SENSITIVE_NAME_SOURCE =
"authorization|api[-_]?key|access[-_]?token|refresh[-_]?token|id[-_]?token|token|secret|credential|signature|x-amz-signature"
const SENSITIVE_NAME = new RegExp(SENSITIVE_NAME_SOURCE, "i")
const SHORT_QUERY_NAME = /^(key|sig)$/i
const SENSITIVE_BODY_FIELD = new RegExp(`(?:${SENSITIVE_NAME_SOURCE}|key)`, "i")

export const REDACT_JSON_FIELD = new RegExp(`("(?:${SENSITIVE_BODY_FIELD.source})"\\s*:\\s*)"[^"]*"`, "gi")
export const REDACT_QUERY_FIELD = new RegExp(`((?:${SENSITIVE_BODY_FIELD.source})=)[^&\\s"]+`, "gi")

export const isSensitiveHeaderName = (name: string) => SENSITIVE_NAME.test(name)

export const isSensitiveQueryName = (name: string) => isSensitiveHeaderName(name) || SHORT_QUERY_NAME.test(name)

export const redactHeaders = (headers: Headers.Headers, redactedNames: ReadonlyArray<string | RegExp>) =>
Object.fromEntries(
Object.entries(Headers.redact(headers, [...redactedNames, SENSITIVE_NAME])).map(([name, value]) => [
name,
String(value),
]),
)

export const redactUrl = (value: string) => {
if (!URL.canParse(value)) return REDACTED
const url = new URL(value)
url.searchParams.forEach((_, key) => {
if (isSensitiveQueryName(key)) url.searchParams.set(key, REDACTED)
})
return url.toString()
}
23 changes: 15 additions & 8 deletions packages/llm/src/route/transport/websocket.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Cause, Context, Effect, Layer, Queue, Stream } from "effect"
import { Headers } from "effect/unstable/http"
import { LLMError, TransportReason } from "../../schema"
import { redactUrl } from "../redaction"
import * as HttpTransport from "./http"
import type { Transport } from "./index"

Expand Down Expand Up @@ -123,15 +124,21 @@ const webSocketUrl = (value: string) =>
})

export const open = (input: WebSocketRequest) =>
Effect.try({
try: () =>
new (globalThis.WebSocket as unknown as WebSocketConstructorWithHeaders)(input.url, { headers: input.headers }),
catch: (error) =>
transportError("open", error instanceof Error ? error.message : "Failed to construct WebSocket", {
url: input.url,
kind: "open",
Effect.logInfo("llm websocket open").pipe(
Effect.annotateLogs({ "llm.websocket.url": redactUrl(input.url) }),
Effect.andThen(
Effect.try({
try: () =>
new (globalThis.WebSocket as unknown as WebSocketConstructorWithHeaders)(input.url, { headers: input.headers }),
catch: (error) =>
transportError("open", error instanceof Error ? error.message : "Failed to construct WebSocket", {
url: input.url,
kind: "open",
}),
}),
}).pipe(Effect.flatMap((ws) => fromWebSocket(ws, input)))
),
Effect.flatMap((ws) => fromWebSocket(ws, input)),
)

export const layer: Layer.Layer<Service> = Layer.succeed(Service, Service.of({ open }))

Expand Down
47 changes: 45 additions & 2 deletions packages/llm/test/executor.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { describe, expect } from "bun:test"
import { Effect, Fiber, Layer, Random, Ref } from "effect"
import { Effect, Fiber, Layer, Logger, Random, Ref, References } from "effect"
import * as TestClock from "effect/testing/TestClock"
import { Headers, HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http"
import { LLM, LLMError } from "../src"
import { LLMClient, RequestExecutor } from "../src/route"
import { LLMClient, RequestExecutor, WebSocketExecutor } from "../src/route"
import * as OpenAIChat from "../src/protocols/openai-chat"
import { dynamicResponse } from "./lib/http"
import { deltaChunk } from "./lib/openai-chunks"
Expand Down Expand Up @@ -73,6 +73,49 @@ const expectLLMError = (error: unknown) => {
const errorHttp = (error: LLMError) => ("http" in error.reason ? error.reason.http : undefined)

describe("RequestExecutor", () => {
it.effect("redacts sensitive WebSocket URL query params in open logs", () =>
Effect.gen(function* () {
class OpenWebSocket {
static readonly OPEN = 1
static readonly CLOSING = 2
static readonly CLOSED = 3
readyState = OpenWebSocket.OPEN
addEventListener() {}
removeEventListener() {}
send() {}
close() {
this.readyState = OpenWebSocket.CLOSED
}
}

const annotations: Array<Record<string, unknown>> = []
const logger = Logger.make((options) => {
annotations.push(options.fiber.getRef(References.CurrentLogAnnotations))
})

yield* Effect.acquireUseRelease(
Effect.sync(() => {
const original = globalThis.WebSocket
globalThis.WebSocket = OpenWebSocket as unknown as typeof globalThis.WebSocket
return original
}),
() =>
WebSocketExecutor.open({
url: "wss://provider.test/realtime?api_key=query-secret-123&key=short-secret&debug=1",
headers: Headers.empty,
}).pipe(Effect.flatMap((connection) => connection.close)),
(original) =>
Effect.sync(() => {
globalThis.WebSocket = original
}),
).pipe(Effect.provide(Logger.layer([logger])))

expect(annotations.find((item) => item["llm.websocket.url"])?.["llm.websocket.url"]).toBe(
"wss://provider.test/realtime?api_key=%3Credacted%3E&key=%3Credacted%3E&debug=1",
)
}),
)

it.effect("returns redacted diagnostics for retryable rate limits", () =>
Effect.gen(function* () {
const executor = yield* RequestExecutor.Service
Expand Down
1 change: 1 addition & 0 deletions packages/opencode/src/effect/runtime-flags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export class Service extends ConfigService.Service<Service>()("@opencode/Runtime
outputTokenMax: positiveInteger("OPENCODE_EXPERIMENTAL_OUTPUT_TOKEN_MAX"),
bashDefaultTimeoutMs: positiveInteger("OPENCODE_EXPERIMENTAL_BASH_DEFAULT_TIMEOUT_MS"),
experimentalNativeLlm: bool("OPENCODE_EXPERIMENTAL_NATIVE_LLM"),
experimentalOpenAIWebSocket: bool("OPENCODE_EXPERIMENTAL_OPENAI_WEBSOCKET"),
client: Config.string("OPENCODE_CLIENT").pipe(Config.withDefault("cli")),
}) {}

Expand Down
147 changes: 71 additions & 76 deletions packages/opencode/src/plugin/codex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const log = Log.create({ service: "plugin.codex" })

const CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
const ISSUER = "https://auth.openai.com"
const CODEX_API_ENDPOINT = "https://chatgpt.com/backend-api/codex/responses"
export const CODEX_API_ENDPOINT = "https://chatgpt.com/backend-api/codex/responses"
const OAUTH_PORT = 1455
const OAUTH_POLLING_SAFETY_MARGIN_MS = 3000
const ALLOWED_MODELS = new Set([
Expand Down Expand Up @@ -110,6 +110,28 @@ function buildAuthorizeUrl(redirectUri: string, pkce: PkceCodes, state: string):
return `${ISSUER}/oauth/authorize?${params.toString()}`
}


function codexBaseURL(codexApiEndpoint: string): string {
const url = new URL(codexApiEndpoint)
if (url.pathname.endsWith("/responses")) url.pathname = url.pathname.slice(0, -"/responses".length)
return url.toString().replace(/\/$/, "")
}

function headersInit(init: HeadersInit | undefined): Headers {
const headers = new Headers()
if (!init) return headers
if (init instanceof Headers) {
init.forEach((value, key) => headers.set(key, value))
return headers
}
if (Array.isArray(init)) {
for (const [key, value] of init) if (value !== undefined) headers.set(key, String(value))
return headers
}
for (const [key, value] of Object.entries(init)) if (value !== undefined) headers.set(key, String(value))
return headers
}

interface TokenResponse {
id_token: string
access_token: string
Expand Down Expand Up @@ -419,85 +441,58 @@ export async function CodexAuthPlugin(input: PluginInput, options: CodexAuthPlug
}>
| undefined

return {
apiKey: OAUTH_DUMMY_KEY,
async fetch(requestInput: RequestInfo | URL, init?: RequestInit) {
// Remove dummy API key authorization header
if (init?.headers) {
if (init.headers instanceof Headers) {
init.headers.delete("authorization")
init.headers.delete("Authorization")
} else if (Array.isArray(init.headers)) {
init.headers = init.headers.filter(([key]) => key.toLowerCase() !== "authorization")
} else {
delete init.headers["authorization"]
delete init.headers["Authorization"]
}
}

const currentAuth = await getAuth()
if (currentAuth.type !== "oauth") return fetch(requestInput, init)

// Cast to include accountId field
const authWithAccount = currentAuth as typeof currentAuth & { accountId?: string }

// Check if token needs refresh
if (!currentAuth.access || currentAuth.expires < Date.now()) {
if (!refreshPromise) {
log.info("refreshing codex access token")
refreshPromise = refreshAccessToken(currentAuth.refresh, issuer)
.then(async (tokens) => {
const accountId = extractAccountId(tokens) || authWithAccount.accountId
await input.client.auth.set({
path: { id: "openai" },
body: {
type: "oauth",
refresh: tokens.refresh_token,
access: tokens.access_token,
expires: Date.now() + (tokens.expires_in ?? 3600) * 1000,
...(accountId && { accountId }),
},
})
return {
const codexAuthHeaders = async (init?: HeadersInit) => {
const currentAuth = await getAuth()
if (currentAuth.type !== "oauth") return headersInit(init)

const authWithAccount = currentAuth as typeof currentAuth & { accountId?: string }
if (!currentAuth.access || currentAuth.expires < Date.now()) {
if (!refreshPromise) {
log.info("refreshing codex access token")
refreshPromise = refreshAccessToken(currentAuth.refresh, issuer)
.then(async (tokens) => {
const accountId = extractAccountId(tokens) || authWithAccount.accountId
await input.client.auth.set({
path: { id: "openai" },
body: {
type: "oauth",
refresh: tokens.refresh_token,
access: tokens.access_token,
accountId,
}
})
.finally(() => {
refreshPromise = undefined
expires: Date.now() + (tokens.expires_in ?? 3600) * 1000,
...(accountId && { accountId }),
},
})
}

const refreshed = await refreshPromise
currentAuth.access = refreshed.access
authWithAccount.accountId = refreshed.accountId
}

// Build headers
const headers = new Headers()
if (init?.headers) {
if (init.headers instanceof Headers) {
init.headers.forEach((value, key) => headers.set(key, value))
} else if (Array.isArray(init.headers)) {
for (const [key, value] of init.headers) {
if (value !== undefined) headers.set(key, String(value))
}
} else {
for (const [key, value] of Object.entries(init.headers)) {
if (value !== undefined) headers.set(key, String(value))
}
}
return { access: tokens.access_token, accountId }
})
.finally(() => {
refreshPromise = undefined
})
}

// Set authorization header with access token
headers.set("authorization", `Bearer ${currentAuth.access}`)

// Set ChatGPT-Account-Id header for organization subscriptions
if (authWithAccount.accountId) {
headers.set("ChatGPT-Account-Id", authWithAccount.accountId)
}
const refreshed = await refreshPromise
currentAuth.access = refreshed.access
authWithAccount.accountId = refreshed.accountId
}

const headers = headersInit(init)
headers.delete("authorization")
headers.delete("Authorization")
headers.set("authorization", `Bearer ${currentAuth.access}`)
if (authWithAccount.accountId) headers.set("ChatGPT-Account-Id", authWithAccount.accountId)
return headers
}

// Rewrite URL to Codex endpoint
return {
apiKey: OAUTH_DUMMY_KEY,
codexWebSocket: {
baseURL: codexBaseURL(codexApiEndpoint),
async headers(init?: HeadersInit) {
return Object.fromEntries((await codexAuthHeaders(init)).entries())
},
},
async fetch(requestInput: RequestInfo | URL, init?: RequestInit) {
const currentAuth = await getAuth()
if (currentAuth.type !== "oauth") return fetch(requestInput, init)
const parsed =
requestInput instanceof URL
? requestInput
Expand All @@ -509,7 +504,7 @@ export async function CodexAuthPlugin(input: PluginInput, options: CodexAuthPlug

return fetch(url, {
...init,
headers,
headers: await codexAuthHeaders(init?.headers),
})
},
}
Expand Down
Loading
Loading