This repository was archived by the owner on Jun 5, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 90
Expand file tree
/
Copy pathadapter.py
More file actions
275 lines (234 loc) · 10.5 KB
/
adapter.py
File metadata and controls
275 lines (234 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
import copy
import json
import uuid
from abc import ABC, abstractmethod
from typing import Callable, Dict, Union
from urllib.parse import urljoin
import structlog
from fastapi.responses import JSONResponse, StreamingResponse
from litellm import ModelResponse
from litellm.types.utils import Delta, StreamingChoices
from ollama import ChatResponse, GenerateResponse
from codegate.db import models as db_models
from codegate.muxing import rulematcher
from codegate.providers.ollama.adapter import OLlamaToModel
logger = structlog.get_logger("codegate")
class MuxingAdapterError(Exception):
pass
class BodyAdapter:
"""
Format the body to the destination provider format.
We expect the body to always be in OpenAI format. We need to configure the client
to send and expect OpenAI format. Here we just need to set the destination provider info.
"""
def _get_provider_formatted_url(self, model_route: rulematcher.ModelRoute) -> str:
"""Get the provider formatted URL to use in base_url. Note this value comes from DB"""
if model_route.endpoint.provider_type in [
db_models.ProviderType.openai,
db_models.ProviderType.vllm,
]:
return urljoin(model_route.endpoint.endpoint, "/v1")
if model_route.endpoint.provider_type == db_models.ProviderType.openrouter:
return urljoin(model_route.endpoint.endpoint, "/api/v1")
return model_route.endpoint.endpoint
def set_destination_info(self, model_route: rulematcher.ModelRoute, data: dict) -> dict:
"""Set the destination provider info."""
new_data = copy.deepcopy(data)
new_data["model"] = model_route.model.name
new_data["base_url"] = self._get_provider_formatted_url(model_route)
return new_data
class OutputFormatter(ABC):
@property
@abstractmethod
def provider_format_funcs(self) -> Dict[str, Callable]:
"""
Return the provider specific format functions. All providers format functions should
return the chunk in OpenAI format.
"""
pass
@abstractmethod
def format(
self, response: Union[StreamingResponse, JSONResponse], dest_prov: db_models.ProviderType
) -> Union[StreamingResponse, JSONResponse]:
"""Format the response to the client."""
pass
class StreamChunkFormatter(OutputFormatter):
"""
Format a single chunk from a stream to OpenAI format.
We need to configure the client to expect the OpenAI format.
In Continue this means setting "provider": "openai" in the config json file.
"""
@property
@abstractmethod
def provider_format_funcs(self) -> Dict[str, Callable]:
"""
Return the provider specific format functions. All providers format functions should
return the chunk in OpenAI format.
"""
pass
def _format_openai(self, chunk: str) -> str:
"""
The chunk is already in OpenAI format. To standarize remove the "data:" prefix.
This function is used by both chat and FIM formatters
"""
cleaned_chunk = chunk.split("data:")[1].strip()
return cleaned_chunk
def _format_antropic(self, chunk: str) -> str:
"""
Format the Anthropic chunk to OpenAI format.
This function is used by both chat and FIM formatters
"""
cleaned_chunk = chunk.split("data:")[1].strip()
try:
# Use `strict=False` to allow the JSON payload to contain
# newlines, tabs and other valid characters that might
# come from Anthropic returning code.
chunk_dict = json.loads(cleaned_chunk, strict=False)
except Exception as e:
logger.warning(f"Error parsing Anthropic chunk: {chunk}. Error: {e}")
return cleaned_chunk.strip()
msg_type = chunk_dict.get("type", "")
finish_reason = None
if msg_type == "message_stop":
finish_reason = "stop"
# In type == "content_block_start" the content comes in "content_block"
# In type == "content_block_delta" the content comes in "delta"
msg_content_dict = chunk_dict.get("delta", {}) or chunk_dict.get("content_block", {})
# We couldn't obtain the content from the chunk. Skip it.
if not msg_content_dict:
return ""
msg_content = msg_content_dict.get("text", "")
open_ai_chunk = ModelResponse(
id=f"anthropic-chat-{str(uuid.uuid4())}",
model="anthropic-muxed-model",
object="chat.completion.chunk",
choices=[
StreamingChoices(
finish_reason=finish_reason,
index=0,
delta=Delta(content=msg_content, role="assistant"),
logprobs=None,
)
],
)
try:
return open_ai_chunk.model_dump_json(exclude_none=True, exclude_unset=True)
except Exception as e:
logger.warning(f"Error serializing Anthropic chunk: {chunk}. Error: {e}")
return cleaned_chunk.strip()
def _format_as_openai_chunk(self, formatted_chunk: str) -> str:
"""Format the chunk as OpenAI chunk. This is the format how the clients expect the data."""
chunk_to_send = f"data: {formatted_chunk}\n\n"
return chunk_to_send
async def _format_streaming_response(
self, response: StreamingResponse, dest_prov: db_models.ProviderType
):
"""Format the streaming response to OpenAI format."""
format_func = self.provider_format_funcs.get(dest_prov)
openai_chunk = None
try:
async for chunk in response.body_iterator:
openai_chunk = format_func(chunk)
# Sometimes for Anthropic we couldn't get content from the chunk. Skip it.
if not openai_chunk:
continue
yield self._format_as_openai_chunk(openai_chunk)
except Exception as e:
logger.error(f"Error sending chunk in muxing: {e}")
yield self._format_as_openai_chunk(str(e))
finally:
# Make sure the last chunk is always [DONE]
if openai_chunk and "[DONE]" not in openai_chunk:
yield self._format_as_openai_chunk("[DONE]")
def format(
self, response: StreamingResponse, dest_prov: db_models.ProviderType
) -> StreamingResponse:
"""Format the response to the client."""
return StreamingResponse(
self._format_streaming_response(response, dest_prov),
status_code=response.status_code,
headers=response.headers,
background=response.background,
media_type=response.media_type,
)
class ChatStreamChunkFormatter(StreamChunkFormatter):
"""
Format a single chunk from a stream to OpenAI format given that the request was a chat.
"""
@property
def provider_format_funcs(self) -> Dict[str, Callable]:
"""
Return the provider specific format functions. All providers format functions should
return the chunk in OpenAI format.
"""
return {
db_models.ProviderType.ollama: self._format_ollama,
db_models.ProviderType.openai: self._format_openai,
db_models.ProviderType.anthropic: self._format_antropic,
# Our Lllamacpp provider emits OpenAI chunks
db_models.ProviderType.llamacpp: self._format_openai,
# OpenRouter is a dialect of OpenAI
db_models.ProviderType.openrouter: self._format_openai,
# VLLM is a dialect of OpenAI
db_models.ProviderType.vllm: self._format_openai,
}
def _format_ollama(self, chunk: str) -> str:
"""Format the Ollama chunk to OpenAI format."""
try:
chunk_dict = json.loads(chunk)
ollama_chunk = ChatResponse(**chunk_dict)
open_ai_chunk = OLlamaToModel.normalize_chat_chunk(ollama_chunk)
return open_ai_chunk.model_dump_json(exclude_none=True, exclude_unset=True)
except Exception as e:
# Sometimes we receive an OpenAI formatted chunk from ollama. Specifically when
# talking to Cline or Kodu. If that's the case we use the format_openai function.
if "data:" in chunk:
return self._format_openai(chunk)
logger.warning(f"Error formatting Ollama chunk: {chunk}. Error: {e}")
return chunk
class FimStreamChunkFormatter(StreamChunkFormatter):
@property
def provider_format_funcs(self) -> Dict[str, Callable]:
"""
Return the provider specific format functions. All providers format functions should
return the chunk in OpenAI format.
"""
return {
db_models.ProviderType.ollama: self._format_ollama,
db_models.ProviderType.openai: self._format_openai,
# Our Lllamacpp provider emits OpenAI chunks
db_models.ProviderType.llamacpp: self._format_openai,
# OpenRouter is a dialect of OpenAI
db_models.ProviderType.openrouter: self._format_openai,
# VLLM is a dialect of OpenAI
db_models.ProviderType.vllm: self._format_openai,
db_models.ProviderType.anthropic: self._format_antropic,
}
def _format_ollama(self, chunk: str) -> str:
"""Format the Ollama chunk to OpenAI format."""
try:
chunk_dict = json.loads(chunk)
ollama_chunk = GenerateResponse(**chunk_dict)
open_ai_chunk = OLlamaToModel.normalize_fim_chunk(ollama_chunk)
return json.dumps(open_ai_chunk, separators=(",", ":"), indent=None)
except Exception:
return chunk
class ResponseAdapter:
def _get_formatter(
self, response: Union[StreamingResponse, JSONResponse], is_fim_request: bool
) -> OutputFormatter:
"""Get the formatter based on the request type."""
if isinstance(response, StreamingResponse):
if is_fim_request:
return FimStreamChunkFormatter()
return ChatStreamChunkFormatter()
raise MuxingAdapterError("Only streaming responses are supported.")
def format_response_to_client(
self,
response: Union[StreamingResponse, JSONResponse],
dest_prov: db_models.ProviderType,
is_fim_request: bool,
) -> Union[StreamingResponse, JSONResponse]:
"""Format the response to the client."""
stream_formatter = self._get_formatter(response, is_fim_request)
return stream_formatter.format(response, dest_prov)