Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions src/app/v1/_lib/proxy/billing-header-rectifier.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ export type BillingHeaderRectifierResult = {
};

const BILLING_HEADER_PATTERN = /^\s*x-anthropic-billing-header\s*:/i;
const REDACTED_BILLING_HEADER_VALUE = "x-anthropic-billing-header: [REDACTED]";

function redactBillingHeaderAuditValue(value: string): string {
if (!BILLING_HEADER_PATTERN.test(value)) {
return value.trim();
}

return REDACTED_BILLING_HEADER_VALUE;
}
Comment on lines +22 to +28
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Dead-code branch in redactBillingHeaderAuditValue

Every call site already guards with BILLING_HEADER_PATTERN.test(...) before invoking this function, so the if (!BILLING_HEADER_PATTERN.test(value)) branch can never be reached in practice. The return value.trim() path is unreachable dead code. The function could be simplified to:

Suggested change
function redactBillingHeaderAuditValue(value: string): string {
if (!BILLING_HEADER_PATTERN.test(value)) {
return value.trim();
}
return REDACTED_BILLING_HEADER_VALUE;
}
function redactBillingHeaderAuditValue(): string {
return REDACTED_BILLING_HEADER_VALUE;
}

Or, alternatively, inline REDACTED_BILLING_HEADER_VALUE directly at the two call sites and remove the helper altogether. As-is, the function gives the misleading impression that it might return the original value, which could cause confusion during future refactors.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/billing-header-rectifier.ts
Line: 22-28

Comment:
**Dead-code branch in `redactBillingHeaderAuditValue`**

Every call site already guards with `BILLING_HEADER_PATTERN.test(...)` before invoking this function, so the `if (!BILLING_HEADER_PATTERN.test(value))` branch can never be reached in practice. The `return value.trim()` path is unreachable dead code. The function could be simplified to:

```suggestion
function redactBillingHeaderAuditValue(): string {
  return REDACTED_BILLING_HEADER_VALUE;
}
```

Or, alternatively, inline `REDACTED_BILLING_HEADER_VALUE` directly at the two call sites and remove the helper altogether. As-is, the function gives the misleading impression that it might return the original value, which could cause confusion during future refactors.

How can I resolve this? If you propose a fix, please make it concise.


/**
* Remove x-anthropic-billing-header text blocks from the request system prompt.
Expand All @@ -35,7 +44,7 @@ export function rectifyBillingHeader(
// Case 2: system is a plain string
if (typeof system === "string") {
if (BILLING_HEADER_PATTERN.test(system)) {
const extractedValues = [system.trim()];
const extractedValues = [redactBillingHeaderAuditValue(system)];
delete message.system;
return { applied: true, removedCount: 1, extractedValues };
}
Expand All @@ -55,7 +64,9 @@ export function rectifyBillingHeader(
typeof (block as Record<string, unknown>).text === "string" &&
BILLING_HEADER_PATTERN.test((block as Record<string, unknown>).text as string)
) {
extractedValues.push(((block as Record<string, unknown>).text as string).trim());
extractedValues.push(
redactBillingHeaderAuditValue((block as Record<string, unknown>).text as string)
);
} else {
filtered.push(block);
}
Expand Down
9 changes: 9 additions & 0 deletions src/app/v1/_lib/proxy/circuit-breaker-accounting.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
const PROVIDER_FAILURE_STATUSES = new Set([401, 402, 403, 408, 429, 451]);

export function shouldRecordProviderCircuitFailure(statusCode: number): boolean {
if (statusCode >= 500) {
return true;
}

return PROVIDER_FAILURE_STATUSES.has(statusCode);
}
Comment on lines +1 to +9
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 HTTP 403 classification as a provider-side failure

403 Forbidden is included in PROVIDER_FAILURE_STATUSES, but 403 is ambiguous in a proxy context: it is often client-driven (e.g., a client API key that lacks permission for the requested resource / model), not a signal that the upstream provider itself is unhealthy. Treating every 403 as a provider circuit failure could trigger unnecessary failovers when many clients send permission-constrained requests.

The PR already correctly excludes clear client-error codes (400, 404, 409, 413, 415, 422), and the same reasoning applies to 403. If the intent is specifically to trip the circuit on upstream key-revocation responses, consider adding a comment explaining the reasoning, or narrow the check to only treat 403 as a provider failure when there is additional context (e.g., an upstream error body) indicating a key-level access denial rather than a scope/permission mismatch.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/circuit-breaker-accounting.ts
Line: 1-9

Comment:
**HTTP 403 classification as a provider-side failure**

`403 Forbidden` is included in `PROVIDER_FAILURE_STATUSES`, but 403 is ambiguous in a proxy context: it is often client-driven (e.g., a client API key that lacks permission for the requested resource / model), not a signal that the upstream provider itself is unhealthy. Treating every 403 as a provider circuit failure could trigger unnecessary failovers when many clients send permission-constrained requests.

The PR already correctly excludes clear client-error codes (400, 404, 409, 413, 415, 422), and the same reasoning applies to 403. If the intent is specifically to trip the circuit on upstream key-revocation responses, consider adding a comment explaining the reasoning, or narrow the check to only treat 403 as a provider failure when there is additional context (e.g., an upstream error body) indicating a key-level access denial rather than a scope/permission mismatch.

How can I resolve this? If you propose a fix, please make it concise.

2 changes: 1 addition & 1 deletion src/app/v1/_lib/proxy/endpoint-family-catalog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ const KNOWN_ENDPOINT_FAMILIES: readonly EndpointFamily[] = Object.freeze([
{
id: "response-compact",
surface: "response",
accountingTier: "none",
accountingTier: "required_usage",
modelRequired: false,
rawPassthrough: true,
match: (pathname) => pathname === "/v1/responses/compact",
Expand Down
26 changes: 21 additions & 5 deletions src/app/v1/_lib/proxy/endpoint-policy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export type EndpointGuardPreset = "chat" | "raw_passthrough";
export type EndpointPoolStrictness = "inherit" | "strict";

export interface EndpointPolicy {
readonly kind: "default" | "raw_passthrough";
readonly kind: "default" | "raw_passthrough" | "guarded_passthrough";
readonly guardPreset: EndpointGuardPreset;
readonly allowRetry: boolean;
readonly allowProviderSwitch: boolean;
Expand Down Expand Up @@ -46,10 +46,22 @@ const RAW_PASSTHROUGH_ENDPOINT_POLICY: EndpointPolicy = Object.freeze({
endpointPoolStrictness: "strict",
});

const rawPassthroughEndpointPathSet = new Set<string>([
V1_ENDPOINT_PATHS.MESSAGES_COUNT_TOKENS,
V1_ENDPOINT_PATHS.RESPONSES_COMPACT,
]);
const GUARDED_PASSTHROUGH_ENDPOINT_POLICY: EndpointPolicy = Object.freeze({
kind: "guarded_passthrough",
guardPreset: "chat",
allowRetry: false,
allowProviderSwitch: false,
allowCircuitBreakerAccounting: true,
Comment on lines +52 to +54
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Count guarded passthrough failures before aborting in forwarder

For /v1/responses/compact, this policy now says allowCircuitBreakerAccounting=true while keeping allowRetry=false. In ProxyForwarder.send(), every !allowRetry endpoint takes the raw-passthrough fast path and throws before any recordFailure() call, and doForward() turns non-2xx upstream responses into ProxyErrors before ProxyResponseHandler can make up the difference. In practice, repeated 401/429/500s on responses/compact will never open the provider circuit, so this endpoint keeps hammering a bad provider even though the new policy claims to restore breaker accounting.

Useful? React with 👍 / 👎.

trackConcurrentRequests: true,
bypassRequestFilters: false,
bypassForwarderPreprocessing: true,
bypassSpecialSettings: true,
bypassResponseRectifier: true,
endpointPoolStrictness: "strict",
});

const rawPassthroughEndpointPathSet = new Set<string>([V1_ENDPOINT_PATHS.MESSAGES_COUNT_TOKENS]);
const guardedPassthroughEndpointPathSet = new Set<string>([V1_ENDPOINT_PATHS.RESPONSES_COMPACT]);

export function isRawPassthroughEndpointPath(pathname: string): boolean {
return rawPassthroughEndpointPathSet.has(normalizeEndpointPath(pathname));
Expand All @@ -64,5 +76,9 @@ export function resolveEndpointPolicy(pathname: string): EndpointPolicy {
return RAW_PASSTHROUGH_ENDPOINT_POLICY;
}

if (guardedPassthroughEndpointPathSet.has(normalizeEndpointPath(pathname))) {
return GUARDED_PASSTHROUGH_ENDPOINT_POLICY;
}

return DEFAULT_ENDPOINT_POLICY;
}
34 changes: 23 additions & 11 deletions src/app/v1/_lib/proxy/response-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import type { SessionUsageUpdate } from "@/types/session";
import type { LongContextPricingSpecialSetting } from "@/types/special-settings";
import { GeminiAdapter } from "../gemini/adapter";
import type { GeminiResponse } from "../gemini/types";
import { shouldRecordProviderCircuitFailure } from "./circuit-breaker-accounting";
import { isClientAbortError, isTransportError } from "./errors";
import type { ProxySession } from "./session";
import { consumeDeferredStreamingFinalization } from "./stream-finalization";
Expand Down Expand Up @@ -507,10 +508,12 @@ async function finalizeDeferredStreamingFinalizationIfNeeded(

const chainReason = effectiveStatusCode === 404 ? "resource_not_found" : "retry_failed";

// 计入熔断器:让后续请求能正确触发故障转移/熔断。
//
// 注意:404 语义在 forwarder 中属于 RESOURCE_NOT_FOUND,不计入熔断器(避免把“资源/模型不存在”当作供应商故障)。
if (effectiveStatusCode !== 404 && session.getEndpointPolicy().allowCircuitBreakerAccounting) {
// 计入熔断器:仅对 provider/key 侧故障计数;
// 客户端可诱发的 4xx(400/404/409/413/415/422)不应把全局 provider 熔断打爆。
if (
shouldRecordProviderCircuitFailure(effectiveStatusCode) &&
session.getEndpointPolicy().allowCircuitBreakerAccounting
) {
try {
// 动态导入:避免 proxy 模块与熔断器模块之间潜在的循环依赖。
const { recordFailure } = await import("@/lib/circuit-breaker");
Expand Down Expand Up @@ -559,9 +562,12 @@ async function finalizeDeferredStreamingFinalizationIfNeeded(

const chainReason = effectiveStatusCode === 404 ? "resource_not_found" : "retry_failed";

// 计入熔断器:让后续请求能正确触发故障转移/熔断。
// 注意:与 forwarder 口径保持一致:404 不计入熔断器(资源不存在不是供应商故障)。
if (effectiveStatusCode !== 404 && session.getEndpointPolicy().allowCircuitBreakerAccounting) {
// 计入熔断器:仅对 provider/key 侧故障计数;
// 客户端可诱发的 4xx(400/404/409/413/415/422)不应把全局 provider 熔断打爆。
if (
shouldRecordProviderCircuitFailure(effectiveStatusCode) &&
session.getEndpointPolicy().allowCircuitBreakerAccounting
) {
try {
const { recordFailure } = await import("@/lib/circuit-breaker");
await recordFailure(meta.providerId, new Error(errorMessage));
Expand Down Expand Up @@ -780,8 +786,11 @@ export class ProxyResponseHandler {
const detected = detectUpstreamErrorFromSseOrJsonText(responseText);
errorMessageForFinalize = detected.isError ? detected.code : `HTTP ${statusCode}`;

// 计入熔断器
if (session.getEndpointPolicy().allowCircuitBreakerAccounting) {
// 计入熔断器:仅统计 provider/key 侧故障。
if (
shouldRecordProviderCircuitFailure(statusCode) &&
session.getEndpointPolicy().allowCircuitBreakerAccounting
) {
try {
const { recordFailure } = await import("@/lib/circuit-breaker");
await recordFailure(provider.id, new Error(errorMessageForFinalize));
Expand Down Expand Up @@ -1106,8 +1115,11 @@ export class ProxyResponseHandler {
const detected = detectUpstreamErrorFromSseOrJsonText(responseText);
const errorMessageForDb = detected.isError ? detected.code : `HTTP ${statusCode}`;

// 计入熔断器
if (session.getEndpointPolicy().allowCircuitBreakerAccounting) {
// 计入熔断器:仅统计 provider/key 侧故障。
if (
shouldRecordProviderCircuitFailure(statusCode) &&
session.getEndpointPolicy().allowCircuitBreakerAccounting
) {
try {
const { recordFailure } = await import("@/lib/circuit-breaker");
await recordFailure(provider.id, new Error(errorMessageForDb));
Expand Down
93 changes: 6 additions & 87 deletions src/lib/session-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -391,96 +391,15 @@ export class SessionManager {
return clientSessionId;
}

// 2. 降级方案:计算 messages 内容哈希(TC-047 警告:不可靠)
logger.warn(
"SessionManager: No client session ID, falling back to content hash (unreliable for compressed dialogs)",
{
keyId,
messagesLength: Array.isArray(messages) ? messages.length : 0,
}
);
const contentHash = SessionManager.calculateMessagesHash(messages);
if (!contentHash) {
// 降级:无法计算哈希,生成新 session
const newId = SessionManager.generateSessionId();
logger.warn("SessionManager: Cannot calculate hash, generating new session", {
sessionId: newId,
});
return newId;
}

// 3. 尝试从 Redis 查找已有 session
if (redis && redis.status === "ready") {
try {
const hashKey = `hash:${contentHash}:session`;
const existingSessionId = await redis.get(hashKey);

if (existingSessionId) {
// 找到已有 session,刷新 TTL
await SessionManager.refreshSessionTTL(existingSessionId);
logger.trace("SessionManager: Reusing session via hash", {
sessionId: existingSessionId,
hash: contentHash,
});
return existingSessionId;
}

// 未找到:创建新 session
const newSessionId = SessionManager.generateSessionId();

// 存储映射关系(异步,不阻塞)
void SessionManager.storeSessionMapping(contentHash, newSessionId, keyId);

logger.trace("SessionManager: Created new session with hash", {
sessionId: newSessionId,
hash: contentHash,
});
return newSessionId;
} catch (error) {
logger.error("SessionManager: Redis error", { error });
// 降级:Redis 错误,生成新 session
return SessionManager.generateSessionId();
}
}

// 4. Redis 不可用,降级生成新 session
// 2. 安全降级:客户端未提供 session_id 时,始终生成新的 opaque session。
// 旧的 content-hash 复用会把不同 key/user 的同构请求错误合并进同一 session。
logger.warn("SessionManager: No client session ID, generating fresh opaque session", {
keyId,
messagesLength,
});
return SessionManager.generateSessionId();
Comment on lines +394 to 400
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve a reusable fallback session for clients without session_id

When clientSessionId is missing, this now always generates a brand-new opaque session. Successful responses only attach x-cch-session-id on >=400 responses (error-session-id.ts), and only Claude requests get any best-effort metadata completion, so OpenAI/Codex/Gemini clients that do not already send a session token can never learn or reuse the generated ID. For those callers, every 2xx turn becomes a fresh session, which breaks provider stickiness, request sequencing, and session grouping that previously worked via the fallback path.

Useful? React with 👍 / 👎.

}

/**
* 存储 hash → session 映射关系
*/
private static async storeSessionMapping(
contentHash: string,
sessionId: string,
keyId: number
): Promise<void> {
const redis = getRedisClient();
if (!redis || redis.status !== "ready") return;

try {
const pipeline = redis.pipeline();
const hashKey = `hash:${contentHash}:session`;

// 存储映射关系
pipeline.setex(hashKey, SessionManager.SESSION_TTL, sessionId);

// 初始化 session 元数据
pipeline.setex(`session:${sessionId}:key`, SessionManager.SESSION_TTL, keyId.toString());
pipeline.setex(
`session:${sessionId}:last_seen`,
SessionManager.SESSION_TTL,
Date.now().toString()
);

await pipeline.exec();
} catch (error) {
logger.error("SessionManager: Failed to store session mapping", {
error,
});
}
}

/**
* 刷新 session TTL(滑动窗口)
*/
Expand Down
3 changes: 2 additions & 1 deletion src/lib/utils/sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ export function parseSSEData(sseText: string): ParsedSSEEvent[] {
* 只认行首的 `event:` / `data:`(或前置注释行 `:`),避免 JSON 里包含 "data:" 误判。
*/
export function isSSEText(text: string): boolean {
const sseFieldPrefixes = ["event:", "data:", "id:", "retry:"];
let start = 0;

for (let i = 0; i <= text.length; i += 1) {
Expand All @@ -81,7 +82,7 @@ export function isSSEText(text: string): boolean {
if (!line) continue;
if (line.startsWith(":")) continue;

return line.startsWith("event:") || line.startsWith("data:");
return sseFieldPrefixes.some((prefix) => line.startsWith(prefix));
Comment on lines 73 to +85
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 parseSSEData silently drops id: and retry: fields

isSSEText now correctly recognises responses whose first meaningful line starts with id: or retry:, which is good. However, parseSSEData does not handle these two fields — they fall through the if-else chain and are silently dropped. This matches the HTML SSE spec (these fields affect the connection-level reconnection state, not the event payload), but it is worth a comment to make the intent clear and to prevent a future contributor from treating the omission as a bug:

Suggested change
const sseFieldPrefixes = ["event:", "data:", "id:", "retry:"];
// Note: `id:` and `retry:` are valid SSE field prefixes and must be
// recognised here so isSSEText returns true for responses that begin
// with them. parseSSEData intentionally does NOT process these fields
// because they control SSE reconnection state, not event data.
let start = 0;
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/lib/utils/sse.ts
Line: 73-85

Comment:
**`parseSSEData` silently drops `id:` and `retry:` fields**

`isSSEText` now correctly recognises responses whose first meaningful line starts with `id:` or `retry:`, which is good. However, `parseSSEData` does not handle these two fields — they fall through the if-else chain and are silently dropped. This matches the HTML SSE spec (these fields affect the connection-level reconnection state, not the event payload), but it is worth a comment to make the intent clear and to prevent a future contributor from treating the omission as a bug:

```suggestion
  const sseFieldPrefixes = ["event:", "data:", "id:", "retry:"];
  // Note: `id:` and `retry:` are valid SSE field prefixes and must be
  // recognised here so isSSEText returns true for responses that begin
  // with them. parseSSEData intentionally does NOT process these fields
  // because they control SSE reconnection state, not event data.
  let start = 0;
```

How can I resolve this? If you propose a fix, please make it concise.

}

return false;
Expand Down
63 changes: 63 additions & 0 deletions tests/unit/lib/session-manager-session-id-fallback.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { beforeEach, describe, expect, test, vi } from "vitest";

vi.mock("server-only", () => ({}));

vi.mock("@/lib/logger", () => ({
logger: {
warn: vi.fn(),
trace: vi.fn(),
info: vi.fn(),
error: vi.fn(),
},
}));

const getRedisClientMock = vi.fn();

vi.mock("@/lib/redis", () => ({
getRedisClient: getRedisClientMock,
}));

vi.mock("@/app/v1/_lib/proxy/errors", () => ({
sanitizeHeaders: vi.fn(() => "(empty)"),
sanitizeUrl: vi.fn((value: string) => value),
}));

vi.mock("@/lib/session-tracker", () => ({
SessionTracker: {
getActiveSessions: vi.fn(async () => []),
},
}));

describe("SessionManager.getOrCreateSessionId fallback", () => {
beforeEach(() => {
vi.clearAllMocks();
});

test("without client session id, identical messages should not reuse the same derived session id even when Redis is ready", async () => {
const { SessionManager } = await import("@/lib/session-manager");
const messages = [{ role: "user", content: "hello" }];
const hashMapping = new Map<string, string>();

const pipeline = {
setex: vi.fn((key: string, _ttl: number, value: string) => {
hashMapping.set(key, value);
return pipeline;
}),
exec: vi.fn(async () => []),
};

getRedisClientMock.mockReturnValue({
status: "ready",
get: vi.fn(async (key: string) => hashMapping.get(key) ?? null),
pipeline: vi.fn(() => pipeline),
});

const first = await SessionManager.getOrCreateSessionId(11, messages, null);
await new Promise((resolve) => setTimeout(resolve, 0));
const second = await SessionManager.getOrCreateSessionId(22, messages, null);

expect(first).toMatch(/^sess_/);
expect(second).toMatch(/^sess_/);
expect(first).not.toBe(second);
});
});
Loading
Loading