Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/desktop/src/clientPersistence.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ function makeSecretStorage(available: boolean): DesktopSecretStorage {

const clientSettings: ClientSettings = {
autoOpenPlanSidebar: false,
codexUsageIndicatorMode: "five-hour",
confirmThreadArchive: true,
confirmThreadDelete: false,
diffIgnoreWhitespace: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ function createProviderServiceHarness(
continuationKey: `${providerName}:instance:${instanceId}`,
},
}),
getCodexUsage: () => Effect.succeed(null),
rollbackConversation,
get streamEvents() {
return Stream.fromPubSub(runtimeEventPubSub);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ describe("ProviderCommandReactor", () => {
},
});
},
getCodexUsage: () => Effect.succeed(null),
rollbackConversation: () => unsupported(),
get streamEvents() {
return Stream.fromPubSub(runtimeEventPubSub);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ function createProviderServiceHarness() {
},
});
},
getCodexUsage: () => Effect.succeed(null),
rollbackConversation: () => unsupported(),
get streamEvents() {
return Stream.fromPubSub(runtimeEventPubSub);
Expand Down
67 changes: 67 additions & 0 deletions apps/server/src/provider/Layers/CodexAdapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { it, vi } from "@effect/vitest";

import { Context, Effect, Exit, Fiber, Layer, Option, Queue, Schema, Scope, Stream } from "effect";
import * as CodexErrors from "effect-codex-app-server/errors";
import type * as EffectCodexSchema from "effect-codex-app-server/schema";

import { ServerConfig } from "../../config.ts";
import { ServerSettingsService } from "../../serverSettings.ts";
Expand Down Expand Up @@ -92,6 +93,15 @@ class FakeCodexRuntime implements CodexSessionRuntimeShape {
}),
);

public readonly readAccountRateLimitsImpl = vi.fn(
(): Promise<EffectCodexSchema.V2GetAccountRateLimitsResponse> =>
Promise.resolve({
rateLimits: {
primary: { usedPercent: 25, windowDurationMins: 300 },
},
}),
);

public readonly respondToRequestImpl = vi.fn(
(_requestId: ApprovalRequestId, _decision: ProviderApprovalDecision): Promise<void> =>
Promise.resolve(undefined),
Expand Down Expand Up @@ -130,6 +140,8 @@ class FakeCodexRuntime implements CodexSessionRuntimeShape {
return Effect.promise(() => this.rollbackThreadImpl(numTurns));
}

readAccountRateLimits = Effect.promise(() => this.readAccountRateLimitsImpl());

respondToRequest(requestId: ApprovalRequestId, decision: ProviderApprovalDecision) {
return Effect.promise(() => this.respondToRequestImpl(requestId, decision));
}
Expand Down Expand Up @@ -159,6 +171,7 @@ function makeRuntimeFactory() {

return {
factory,
runtimes,
get lastRuntime(): FakeCodexRuntime | undefined {
return runtimes.at(-1);
},
Expand Down Expand Up @@ -348,6 +361,60 @@ sessionErrorLayer("CodexAdapterLive session errors", (it) => {
}),
);

it.effect("reads and normalizes account rate limits through the active runtime", () =>
Effect.gen(function* () {
const adapter = yield* CodexAdapter;
yield* adapter.startSession({
provider: ProviderDriverKind.make("codex"),
threadId: asThreadId("usage-thread"),
runtimeMode: "full-access",
});
const runtime = sessionRuntimeFactory.lastRuntime;
assert.ok(runtime);
runtime.readAccountRateLimitsImpl.mockResolvedValueOnce({
rateLimits: {
primary: { usedPercent: 30, windowDurationMins: 300 },
secondary: { usedPercent: 80, windowDurationMins: 10_080 },
},
});

const snapshot = yield* adapter.readCodexUsage!();

assert.equal(runtime.readAccountRateLimitsImpl.mock.calls.length, 1);
assert.deepStrictEqual(
snapshot?.windows.map((window) => ({
kind: window.kind,
remainingPercent: window.remainingPercent,
})),
[
{ kind: "five-hour", remainingPercent: 70 },
{ kind: "weekly", remainingPercent: 20 },
],
);
}),
);

it.effect("reads account rate limits even before a Codex thread session exists", () =>
Effect.gen(function* () {
const adapter = yield* CodexAdapter;
yield* adapter.stopAll();
const snapshot = yield* adapter.readCodexUsage!();
const runtime = sessionRuntimeFactory.lastRuntime;

assert.ok(runtime);
assert.equal(runtime.options.threadId, asThreadId("codex-usage"));
assert.equal(runtime.readAccountRateLimitsImpl.mock.calls.length, 1);
assert.equal(runtime.closeImpl.mock.calls.length, 1);
assert.deepStrictEqual(snapshot?.windows[0], {
kind: "five-hour",
usedPercent: 25,
remainingPercent: 75,
resetsAt: null,
windowDurationMins: 300,
});
}),
);

it.effect("maps codex model options for the adapter's bound custom instance id", () => {
const customInstanceId = ProviderInstanceId.make("codex_personal");
const customRuntimeFactory = makeRuntimeFactory();
Expand Down
86 changes: 86 additions & 0 deletions apps/server/src/provider/Layers/CodexAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
type ProviderRuntimeEvent,
type ProviderRequestKind,
type ThreadTokenUsageSnapshot,
type CodexUsageSnapshot,
type ProviderUserInputAnswers,
RuntimeItemId,
RuntimeRequestId,
Expand Down Expand Up @@ -54,6 +55,7 @@ import {
type CodexSessionRuntimeShape,
} from "./CodexSessionRuntime.ts";
import { type EventNdjsonLogger, makeEventNdjsonLogger } from "./EventNdjsonLogger.ts";
import { normalizeCodexUsageSnapshot } from "../codexUsage.ts";

const PROVIDER = ProviderDriverKind.make("codex");

Expand Down Expand Up @@ -1350,6 +1352,7 @@ export const makeCodexAdapter = Effect.fn("makeCodexAdapter")(function* (
options?.nativeEventLogger === undefined ? nativeEventLogger : undefined;
const runtimeEventQueue = yield* Queue.unbounded<ProviderRuntimeEvent>();
const sessions = new Map<ThreadId, CodexAdapterSessionContext>();
let cachedCodexUsage: CodexUsageSnapshot | null = null;

const startSession: CodexAdapterShape["startSession"] = (input) =>
Effect.scoped(
Expand Down Expand Up @@ -1409,6 +1412,19 @@ export const makeCodexAdapter = Effect.fn("makeCodexAdapter")(function* (
const eventFiber = yield* Stream.runForEach(runtime.events, (event) =>
Effect.gen(function* () {
yield* writeNativeEvent(event);
if (event.method === "account/rateLimits/updated") {
const payload = readPayload(
EffectCodexSchema.V2AccountRateLimitsUpdatedNotification,
event.payload,
);
if (payload) {
cachedCodexUsage = normalizeCodexUsageSnapshot({
providerInstanceId: boundInstanceId,
payload,
source: "notification",
});
}
}
const runtimeEvents = mapToRuntimeEvents(event, event.threadId);
if (runtimeEvents.length === 0) {
yield* Effect.logDebug("ignoring unhandled Codex provider event", {
Expand Down Expand Up @@ -1644,6 +1660,75 @@ export const makeCodexAdapter = Effect.fn("makeCodexAdapter")(function* (
const hasSession: CodexAdapterShape["hasSession"] = (threadId) =>
Effect.succeed(Boolean(sessions.get(threadId) && !sessions.get(threadId)?.stopped));

const readCodexUsageWithoutSession = Effect.fn("readCodexUsageWithoutSession")(function* () {
const usageThreadId = ThreadId.make("codex-usage");
const createRuntime = options?.makeRuntime ?? makeCodexSessionRuntime;
return yield* Effect.acquireUseRelease(
Scope.make("sequential"),
(usageScope) =>
Effect.gen(function* () {
const runtime = yield* createRuntime({
threadId: usageThreadId,
providerInstanceId: boundInstanceId,
cwd: process.cwd(),
binaryPath: codexConfig.binaryPath,
...(options?.environment ? { environment: options.environment } : {}),
...(codexConfig.homePath ? { homePath: codexConfig.homePath } : {}),
runtimeMode: "full-access",
}).pipe(
Effect.provideService(Scope.Scope, usageScope),
Effect.provideService(ChildProcessSpawner.ChildProcessSpawner, childProcessSpawner),
Effect.mapError(
(cause) =>
new ProviderAdapterProcessError({
provider: PROVIDER,
threadId: usageThreadId,
detail: cause.message,
cause,
}),
),
);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Critical Layers/CodexAdapter.ts:1690

In readCodexUsageWithoutSession, the runtime is used to call readAccountRateLimits without first calling runtime.start(). This skips the JSON-RPC initialize handshake, so the RPC call will fail when sent to an uninitialized server. The startSession function at lines 1442–1459 correctly calls runtime.start() after creating the runtime.

🤖 Copy this AI Prompt to have your agent fix this:
In file apps/server/src/provider/Layers/CodexAdapter.ts around line 1690:

In `readCodexUsageWithoutSession`, the `runtime` is used to call `readAccountRateLimits` without first calling `runtime.start()`. This skips the JSON-RPC `initialize` handshake, so the RPC call will fail when sent to an uninitialized server. The `startSession` function at lines 1442–1459 correctly calls `runtime.start()` after creating the runtime.

Evidence trail:
CodexAdapter.ts lines 1663-1706: `readCodexUsageWithoutSession` creates runtime (line 1670) and calls `runtime.readAccountRateLimits` (line 1691) without `runtime.start()`. CodexSessionRuntime.ts lines 1157-1160: `start()` performs `client.request("initialize", ...)` and `client.notify("initialized", undefined)`. CodexSessionRuntime.ts line 1293: `readAccountRateLimits: client.request("account/rateLimits/read", undefined)`. CodexAdapter.ts line 1442-1459: `startSession` correctly calls `runtime.start()`. CodexProvider.ts line 268-275: `probeCodexAppServerProvider` also calls `client.request("initialize", ...)` before other requests.

const payload = yield* runtime.readAccountRateLimits.pipe(
Effect.mapError((cause) =>
mapCodexRuntimeError(usageThreadId, "account/rateLimits/read", cause),
),
Effect.ensuring(runtime.close),
);
return normalizeCodexUsageSnapshot({
providerInstanceId: boundInstanceId,
payload,
source: "read",
});
}),
(usageScope) => Scope.close(usageScope, Exit.void),
);
});

const readCodexUsage: CodexAdapterShape["readCodexUsage"] = Effect.fn("readCodexUsage")(
function* () {
const session = Array.from(sessions.values()).findLast((candidate) => !candidate.stopped);
if (!session) {
const snapshot = yield* readCodexUsageWithoutSession();
cachedCodexUsage = snapshot ?? cachedCodexUsage;
return (
snapshot ?? (cachedCodexUsage ? { ...cachedCodexUsage, source: "cache" as const } : null)
);
}
const payload = yield* session.runtime.readAccountRateLimits.pipe(
Effect.mapError((cause) =>
mapCodexRuntimeError(session.threadId, "account/rateLimits/read", cause),
),
);
const snapshot = normalizeCodexUsageSnapshot({
providerInstanceId: boundInstanceId,
payload,
source: "read",
});
cachedCodexUsage = snapshot;
return snapshot;
},
);

const stopAll: CodexAdapterShape["stopAll"] = () =>
Effect.forEach(Array.from(sessions.values()), stopSessionInternal, {
concurrency: 1,
Expand Down Expand Up @@ -1673,6 +1758,7 @@ export const makeCodexAdapter = Effect.fn("makeCodexAdapter")(function* (
stopSession,
listSessions,
hasSession,
readCodexUsage,
stopAll,
get streamEvents() {
return Stream.fromQueue(runtimeEventQueue);
Expand Down
5 changes: 5 additions & 0 deletions apps/server/src/provider/Layers/CodexSessionRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ export interface CodexSessionRuntimeShape {
readonly rollbackThread: (
numTurns: number,
) => Effect.Effect<CodexThreadSnapshot, CodexSessionRuntimeError>;
readonly readAccountRateLimits: Effect.Effect<
EffectCodexSchema.V2GetAccountRateLimitsResponse,
CodexSessionRuntimeError
>;
readonly respondToRequest: (
requestId: ApprovalRequestId,
decision: ProviderApprovalDecision,
Expand Down Expand Up @@ -1286,6 +1290,7 @@ export const makeCodexSessionRuntime = (
});
return parseThreadSnapshot(response);
}),
readAccountRateLimits: client.request("account/rateLimits/read", undefined),
respondToRequest: (requestId, decision) =>
Effect.gen(function* () {
const pending = (yield* Ref.get(pendingApprovalsRef)).get(requestId);
Expand Down
35 changes: 35 additions & 0 deletions apps/server/src/provider/Layers/ProviderService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type {
ProviderSendTurnInput,
ProviderSession,
ProviderTurnStartResult,
CodexUsageSnapshot,
} from "@t3tools/contracts";
import {
ApprovalRequestId,
Expand Down Expand Up @@ -191,6 +192,24 @@ function makeFakeCodexAdapter(provider: ProviderDriverKind = CODEX_DRIVER) {
sessions.clear();
}),
);
const readCodexUsage = vi.fn(
(): Effect.Effect<CodexUsageSnapshot | null, ProviderAdapterError> =>
Effect.succeed({
providerInstanceId: codexInstanceId,
checkedAt: "2026-05-04T00:00:00.000Z",
windows: [
{
kind: "five-hour",
usedPercent: 25,
remainingPercent: 75,
resetsAt: null,
windowDurationMins: 300,
},
],
rateLimitReachedType: null,
source: "read",
}),
);

const adapter: ProviderAdapterShape<ProviderAdapterError> = {
provider,
Expand All @@ -207,6 +226,7 @@ function makeFakeCodexAdapter(provider: ProviderDriverKind = CODEX_DRIVER) {
hasSession,
readThread,
rollbackThread,
...(provider === CODEX_DRIVER ? { readCodexUsage } : {}),
stopAll,
get streamEvents() {
return Stream.fromPubSub(runtimeEventPubSub);
Expand Down Expand Up @@ -243,6 +263,7 @@ function makeFakeCodexAdapter(provider: ProviderDriverKind = CODEX_DRIVER) {
readThread,
rollbackThread,
stopAll,
readCodexUsage,
};
}

Expand Down Expand Up @@ -772,6 +793,20 @@ it.effect(
);

routing.layer("ProviderServiceLive routing", (it) => {
it.effect("returns usage for Codex instances and null for non-Codex instances", () =>
Effect.gen(function* () {
const provider = yield* ProviderService;

const codexUsage = yield* provider.getCodexUsage(codexInstanceId);
const claudeUsage = yield* provider.getCodexUsage(claudeAgentInstanceId);

assert.equal(codexUsage?.windows[0]?.remainingPercent, 75);
assert.equal(routing.codex.readCodexUsage.mock.calls.length, 1);
assert.equal(claudeUsage, null);
assert.equal(routing.claude.readCodexUsage.mock.calls.length, 0);
}),
);

it.effect("routes provider operations and rollback conversation", () =>
Effect.gen(function* () {
const provider = yield* ProviderService;
Expand Down
21 changes: 20 additions & 1 deletion apps/server/src/provider/Layers/ProviderService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import {
ProviderSessionStartInput,
ProviderStopSessionInput,
type ProviderInstanceId,
type ProviderDriverKind,
type ProviderRuntimeEvent,
type ProviderSession,
ProviderDriverKind,
} from "@t3tools/contracts";
import { Cause, Effect, Layer, Option, PubSub, Ref, Schema, SchemaIssue, Stream } from "effect";

Expand Down Expand Up @@ -922,6 +922,24 @@ const makeProviderService = Effect.fn("makeProviderService")(function* (
const getInstanceInfo: ProviderServiceShape["getInstanceInfo"] = (instanceId) =>
registry.getInstanceInfo(instanceId);

const getCodexUsage: ProviderServiceShape["getCodexUsage"] = Effect.fn(
"ProviderService.getCodexUsage",
)(function* (instanceId) {
const info = yield* registry
.getInstanceInfo(instanceId)
.pipe(Effect.catch(() => Effect.succeed(null)));
if (!info || info.driverKind !== ProviderDriverKind.make("codex")) {
return null;
}
const adapter = yield* registry
.getByInstance(instanceId)
.pipe(Effect.catch(() => Effect.succeed(null)));
if (!adapter?.readCodexUsage) {
return null;
}
return yield* adapter.readCodexUsage().pipe(Effect.catch(() => Effect.succeed(null)));
});

const rollbackConversation: ProviderServiceShape["rollbackConversation"] = Effect.fn(
"rollbackConversation",
)(function* (rawInput) {
Expand Down Expand Up @@ -1022,6 +1040,7 @@ const makeProviderService = Effect.fn("makeProviderService")(function* (
listSessions,
getCapabilities,
getInstanceInfo,
getCodexUsage,
rollbackConversation,
// Each access creates a fresh PubSub subscription so that multiple
// consumers (ProviderRuntimeIngestion, CheckpointReactor, etc.) each
Expand Down
Loading
Loading