Skip to content

Commit b0ec624

Browse files
committed
Add Fireworks fallback for CanopyWave
1 parent b5d6411 commit b0ec624

2 files changed

Lines changed: 227 additions & 20 deletions

File tree

web/src/app/api/v1/chat/completions/__tests__/completions.test.ts

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
FREEBUFF_GLM_MODEL_ID,
77
isFreebuffDeploymentHours,
88
} from '@codebuff/common/constants/freebuff-models'
9+
import { env } from '@codebuff/internal/env'
910
import { formatQuotaResetCountdown, postChatCompletions } from '../_post'
1011
import {
1112
checkFreeModeRateLimit,
@@ -1075,6 +1076,116 @@ describe('/api/v1/chat/completions POST endpoint', () => {
10751076
})
10761077

10771078
describe('Successful responses', () => {
1079+
const withCanopyWaveApiKey = async (testFn: () => Promise<void>) => {
1080+
const previousCanopyWaveApiKey = env.CANOPYWAVE_API_KEY
1081+
env.CANOPYWAVE_API_KEY = 'test'
1082+
try {
1083+
await testFn()
1084+
} finally {
1085+
env.CANOPYWAVE_API_KEY = previousCanopyWaveApiKey
1086+
}
1087+
}
1088+
1089+
const createCanopyWaveFallbackRequest = (stream: boolean) =>
1090+
new NextRequest('http://localhost:3000/api/v1/chat/completions', {
1091+
method: 'POST',
1092+
headers: { Authorization: 'Bearer test-api-key-123' },
1093+
body: JSON.stringify({
1094+
model: 'minimax/minimax-m2.5',
1095+
stream,
1096+
codebuff_metadata: {
1097+
run_id: 'run-123',
1098+
client_id: 'test-client-id-123',
1099+
client_request_id: 'test-client-session-id-123',
1100+
},
1101+
}),
1102+
})
1103+
1104+
const createCanopyWaveNoWorkersThenFireworksFetch = (stream: boolean) => {
1105+
const fetchedBodies: Record<string, unknown>[] = []
1106+
const fetch = mock(
1107+
async (_url: string | URL | Request, init?: RequestInit) => {
1108+
fetchedBodies.push(JSON.parse(init?.body as string))
1109+
1110+
if (fetchedBodies.length === 1) {
1111+
return Response.json(
1112+
{
1113+
error: {
1114+
message: 'No available workers',
1115+
code: 'no_available_workers',
1116+
},
1117+
},
1118+
{ status: 503 },
1119+
)
1120+
}
1121+
1122+
if (!stream) {
1123+
return Response.json({
1124+
id: 'test-id',
1125+
model: 'accounts/fireworks/models/minimax-m2p5',
1126+
choices: [{ message: { content: 'fireworks response' } }],
1127+
usage: {
1128+
prompt_tokens: 10,
1129+
completion_tokens: 20,
1130+
total_tokens: 30,
1131+
},
1132+
})
1133+
}
1134+
1135+
const encoder = new TextEncoder()
1136+
const fireworksStream = new ReadableStream({
1137+
start(controller) {
1138+
controller.enqueue(
1139+
encoder.encode(
1140+
'data: {"id":"test-id","model":"accounts/fireworks/models/minimax-m2p5","choices":[{"delta":{"content":"test"}}]}\n\n',
1141+
),
1142+
)
1143+
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
1144+
controller.close()
1145+
},
1146+
})
1147+
1148+
return new Response(fireworksStream, {
1149+
status: 200,
1150+
headers: { 'Content-Type': 'text/event-stream' },
1151+
})
1152+
},
1153+
) as unknown as typeof globalThis.fetch
1154+
1155+
return { fetch, fetchedBodies }
1156+
}
1157+
1158+
const postCanopyWaveFallbackRequest = async ({
1159+
fetch,
1160+
stream,
1161+
}: {
1162+
fetch: typeof globalThis.fetch
1163+
stream: boolean
1164+
}) =>
1165+
postChatCompletions({
1166+
req: createCanopyWaveFallbackRequest(stream),
1167+
getUserInfoFromApiKey: mockGetUserInfoFromApiKey,
1168+
logger: mockLogger,
1169+
trackEvent: mockTrackEvent,
1170+
getUserUsageData: mockGetUserUsageData,
1171+
getAgentRunFromId: mockGetAgentRunFromId,
1172+
fetch,
1173+
insertMessageBigquery: mockInsertMessageBigquery,
1174+
loggerWithContext: mockLoggerWithContext,
1175+
checkSessionAdmissible: mockCheckSessionAdmissibleAllow,
1176+
})
1177+
1178+
const expectCanopyWaveThenFireworks = (
1179+
fetchedBodies: Record<string, unknown>[],
1180+
) => {
1181+
expect(fetchedBodies).toHaveLength(2)
1182+
expect(fetchedBodies[0].model).toBe('minimax/minimax-m2.5')
1183+
expect(fetchedBodies[1].model).toBe(
1184+
'accounts/fireworks/models/minimax-m2p5',
1185+
)
1186+
expect(mockLogger.warn).toHaveBeenCalled()
1187+
}
1188+
10781189
it('returns stream with correct headers', async () => {
10791190
const req = new NextRequest(
10801191
'http://localhost:3000/api/v1/chat/completions',
@@ -1158,6 +1269,48 @@ describe('/api/v1/chat/completions POST endpoint', () => {
11581269
},
11591270
FETCH_PATH_TEST_TIMEOUT_MS,
11601271
)
1272+
1273+
it(
1274+
'falls back to Fireworks when CanopyWave has no available workers for non-streaming requests',
1275+
async () => {
1276+
await withCanopyWaveApiKey(async () => {
1277+
const { fetch, fetchedBodies } =
1278+
createCanopyWaveNoWorkersThenFireworksFetch(false)
1279+
const response = await postCanopyWaveFallbackRequest({
1280+
fetch,
1281+
stream: false,
1282+
})
1283+
1284+
expect(response.status).toBe(200)
1285+
expectCanopyWaveThenFireworks(fetchedBodies)
1286+
1287+
const body = await response.json()
1288+
expect(body.model).toBe('minimax/minimax-m2.5')
1289+
expect(body.provider).toBe('Fireworks')
1290+
expect(body.choices[0].message.content).toBe('fireworks response')
1291+
})
1292+
},
1293+
FETCH_PATH_TEST_TIMEOUT_MS,
1294+
)
1295+
1296+
it(
1297+
'falls back to Fireworks when CanopyWave has no available workers for streaming requests',
1298+
async () => {
1299+
await withCanopyWaveApiKey(async () => {
1300+
const { fetch, fetchedBodies } =
1301+
createCanopyWaveNoWorkersThenFireworksFetch(true)
1302+
const response = await postCanopyWaveFallbackRequest({
1303+
fetch,
1304+
stream: true,
1305+
})
1306+
1307+
expect(response.status).toBe(200)
1308+
expect(response.headers.get('Content-Type')).toBe('text/event-stream')
1309+
expectCanopyWaveThenFireworks(fetchedBodies)
1310+
})
1311+
},
1312+
FETCH_PATH_TEST_TIMEOUT_MS,
1313+
)
11611314
})
11621315

11631316
describe('Subscription limit enforcement', () => {

web/src/app/api/v1/chat/completions/_post.ts

Lines changed: 74 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,50 @@ export const formatQuotaResetCountdown = (
109109
return `in ${pluralize(minutes, 'minute')}`
110110
}
111111

112+
type ProviderHandlerArgs = Parameters<typeof handleCanopyWaveStream>[0]
113+
type ProviderHandler<T> = (args: ProviderHandlerArgs) => Promise<T>
114+
115+
function shouldFallbackCanopyWaveToFireworks(
116+
error: unknown,
117+
model: string,
118+
): error is CanopyWaveError {
119+
if (!(error instanceof CanopyWaveError) || !isFireworksModel(model)) {
120+
return false
121+
}
122+
const message = error.errorBody.error.message.toLowerCase()
123+
return (
124+
error.statusCode === 429 ||
125+
error.statusCode >= 500 ||
126+
message.includes('no available workers')
127+
)
128+
}
129+
130+
async function handleCanopyWaveWithFireworksFallback<T>(
131+
args: ProviderHandlerArgs,
132+
handleCanopyWave: ProviderHandler<T>,
133+
handleFireworks: ProviderHandler<T>,
134+
): Promise<T> {
135+
try {
136+
return await handleCanopyWave(args)
137+
} catch (error) {
138+
if (!shouldFallbackCanopyWaveToFireworks(error, args.body.model)) {
139+
throw error
140+
}
141+
142+
args.logger.warn(
143+
{
144+
error: getErrorObject(error),
145+
model: args.body.model,
146+
providerStatusCode: error.statusCode,
147+
providerStatusText: error.statusText,
148+
},
149+
'CanopyWave request failed, falling back to Fireworks',
150+
)
151+
152+
return handleFireworks(args)
153+
}
154+
}
155+
112156
export type CheckSessionAdmissibleFn = typeof checkSessionAdmissible
113157

114158
type GateRejectCode = Extract<SessionGateResult, { ok: false }>['code']
@@ -599,7 +643,8 @@ export async function postChatCompletions(params: {
599643
if (bodyStream) {
600644
// Streaming request — route to SiliconFlow/CanopyWave/Fireworks for supported models
601645
const useSiliconFlow = false // isSiliconFlowModel(typedBody.model)
602-
const useCanopyWave = isCanopyWaveModel(typedBody.model)
646+
const useCanopyWave =
647+
!!env.CANOPYWAVE_API_KEY && isCanopyWaveModel(typedBody.model)
603648
const useFireworks = !useCanopyWave && isFireworksModel(typedBody.model)
604649
const useOpenAIDirect =
605650
!useCanopyWave &&
@@ -616,15 +661,19 @@ export async function postChatCompletions(params: {
616661
insertMessageBigquery,
617662
})
618663
: useCanopyWave
619-
? await handleCanopyWaveStream({
620-
body: typedBody,
621-
userId,
622-
stripeCustomerId,
623-
agentId,
624-
fetch,
625-
logger,
626-
insertMessageBigquery,
627-
})
664+
? await handleCanopyWaveWithFireworksFallback(
665+
{
666+
body: typedBody,
667+
userId,
668+
stripeCustomerId,
669+
agentId,
670+
fetch,
671+
logger,
672+
insertMessageBigquery,
673+
},
674+
handleCanopyWaveStream,
675+
handleFireworksStream,
676+
)
628677
: useFireworks
629678
? await handleFireworksStream({
630679
body: typedBody,
@@ -678,7 +727,8 @@ export async function postChatCompletions(params: {
678727
// Non-streaming request — route to SiliconFlow/CanopyWave/Fireworks for supported models
679728
const model = typedBody.model
680729
const useSiliconFlow = false // isSiliconFlowModel(model)
681-
const useCanopyWave = isCanopyWaveModel(model)
730+
const useCanopyWave =
731+
!!env.CANOPYWAVE_API_KEY && isCanopyWaveModel(model)
682732
const useFireworks = !useCanopyWave && isFireworksModel(model)
683733
const shouldUseOpenAIEndpoint =
684734
!useCanopyWave && !useFireworks && isOpenAIDirectModel(model)
@@ -694,15 +744,19 @@ export async function postChatCompletions(params: {
694744
insertMessageBigquery,
695745
})
696746
: useCanopyWave
697-
? handleCanopyWaveNonStream({
698-
body: typedBody,
699-
userId,
700-
stripeCustomerId,
701-
agentId,
702-
fetch,
703-
logger,
704-
insertMessageBigquery,
705-
})
747+
? handleCanopyWaveWithFireworksFallback(
748+
{
749+
body: typedBody,
750+
userId,
751+
stripeCustomerId,
752+
agentId,
753+
fetch,
754+
logger,
755+
insertMessageBigquery,
756+
},
757+
handleCanopyWaveNonStream,
758+
handleFireworksNonStream,
759+
)
706760
: useFireworks
707761
? handleFireworksNonStream({
708762
body: typedBody,

0 commit comments

Comments
 (0)