Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions bridge_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,114 @@ func TestOpenAIChatCompletions(t *testing.T) {
})
}
})

t.Run("streaming injected tool call edge cases", func(t *testing.T) {
t.Parallel()

cases := []struct {
name string
fixture []byte
expectedArgs map[string]any
}{
{
name: "tool call no preamble",
fixture: fixtures.OaiChatStreamingInjectedToolNoPreamble,
expectedArgs: map[string]any{"owner": "me"},
},
{
name: "tool call with non-zero index",
fixture: fixtures.OaiChatStreamingInjectedToolNonzeroIndex,
expectedArgs: nil, // No arguments in this fixture
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

arc := txtar.Parse(tc.fixture)
t.Logf("%s: %s", t.Name(), arc.Comment)

files := filesMap(arc)
require.Len(t, files, 3)
require.Contains(t, files, fixtureRequest)
require.Contains(t, files, fixtureStreamingResponse)
require.Contains(t, files, fixtureStreamingToolResponse)

reqBody := files[fixtureRequest]

// Add the stream param to the request.
newBody, err := setJSON(reqBody, "stream", true)
require.NoError(t, err)
reqBody = newBody

ctx, cancel := context.WithTimeout(t.Context(), time.Second*30)
t.Cleanup(cancel)

// Setup mock server with response mutator for multi-turn interaction.
srv := newMockServer(ctx, t, files, func(reqCount uint32, resp []byte) []byte {
if reqCount == 1 {
// First request gets the tool call response
return resp
}
// Second request gets final response
return files[fixtureStreamingToolResponse]
})
t.Cleanup(srv.Close)

recorderClient := &testutil.MockRecorder{}

// Setup MCP proxies with the tool from the fixture
mcpProxiers, mcpCalls := setupMCPServerProxiesForTest(t, testTracer)
mcpMgr := mcp.NewServerProxyManager(mcpProxiers, testTracer)
require.NoError(t, mcpMgr.Init(ctx))

logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: false}).Leveled(slog.LevelDebug)
providers := []aibridge.Provider{provider.NewOpenAI(openaiCfg(srv.URL, apiKey))}
b, err := aibridge.NewRequestBridge(t.Context(), providers, recorderClient, mcpMgr, logger, nil, testTracer)
require.NoError(t, err)

mockSrv := httptest.NewUnstartedServer(b)
t.Cleanup(mockSrv.Close)
mockSrv.Config.BaseContext = func(_ net.Listener) context.Context {
return aibcontext.AsActor(ctx, userID, nil)
}
mockSrv.Start()

req := createOpenAIChatCompletionsReq(t, mockSrv.URL, reqBody)

client := &http.Client{}
resp, err := client.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)

// Consume the full response body to ensure the interception completes
_, err = io.ReadAll(resp.Body)
require.NoError(t, err)
resp.Body.Close()

// Verify the MCP tool was actually invoked
invocations := mcpCalls.getCallsByTool(mockToolName)
require.Len(t, invocations, 1, "expected MCP tool to be invoked")

// Verify tool was invoked with the expected args (if specified)
if tc.expectedArgs != nil {
expected, err := json.Marshal(tc.expectedArgs)
require.NoError(t, err)
actual, err := json.Marshal(invocations[0])
require.NoError(t, err)
require.EqualValues(t, expected, actual)
}

// Verify tool usage was recorded
toolUsages := recorderClient.RecordedToolUsages()
require.Len(t, toolUsages, 1)
assert.Equal(t, mockToolName, toolUsages[0].Tool)

recorderClient.VerifyAllInterceptionsEnded(t)
})
}
})
}

func TestSimple(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions fixtures/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ var (

//go:embed openai/chatcompletions/non_stream_error.txtar
OaiChatNonStreamError []byte

//go:embed openai/chatcompletions/streaming_injected_tool_no_preamble.txtar
OaiChatStreamingInjectedToolNoPreamble []byte

//go:embed openai/chatcompletions/streaming_injected_tool_nonzero_index.txtar
OaiChatStreamingInjectedToolNonzeroIndex []byte
)

var (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
Streaming response where the provider returns an injected tool call as the first chunk with no text preamble.
This test ensures tool invocation continues even when no chunks are relayed to the client.

-- request --
{
"messages": [
{
"content": "<current_datetime>2026-01-22T18:35:17.612Z</current_datetime>\n\nlist all my coder workspaces",
"role": "user"
}
],
"model": "claude-haiku-4.5",
"n": 1,
"temperature": 1,
"parallel_tool_calls": false,
"stream_options": {
"include_usage": true
},
"stream": true
}

-- streaming --
data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"name":"bmcp_coder_coder_list_workspaces"},"id":"toolu_vrtx_01CvBi1d4qpKTG2PCuc9wDbZ","index":0,"type":"function"}]}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"arguments":""},"index":0}]}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"arguments":"{\"own"},"index":0}]}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"arguments":"er\": \"me\"}"},"index":0}]}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","model":"claude-haiku-4.5"}

data: {"choices":[{"finish_reason":"tool_calls","index":0,"delta":{"content":null}}],"created":1769106921,"id":"msg_vrtx_01UoiRJwj3JXcwNYAh3z7ARs","usage":{"completion_tokens":65,"prompt_tokens":25716,"prompt_tokens_details":{"cached_tokens":20470},"total_tokens":25781},"model":"claude-haiku-4.5"}

data: [DONE]

-- streaming/tool-call --
data: {"choices":[{"index":0,"delta":{"content":"You","role":"assistant"}}],"created":1769198061,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" have one","role":"assistant"}}],"created":1769198061,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" Coder workspace:","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"\n\n**test-scf** (","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"ID: a174a2e5","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"-5050-445d-89","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"ff-dd720e5b442","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"e)\n- Template: docker","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"\n- Template Version","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" ID","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":": ad1b5ab1-","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"fc18-4792-84f","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"7-797787607d30","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"\n- Status","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":": Up","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" to date","role":"assistant"}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","model":"claude-haiku-4.5"}

data: {"choices":[{"finish_reason":"stop","index":0,"delta":{"content":null}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","usage":{"completion_tokens":85,"prompt_tokens":25989,"prompt_tokens_details":{"cached_tokens":0},"total_tokens":26074},"model":"claude-haiku-4.5"}

data: [DONE]
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
Streaming response where the provider returns text content followed by an injected tool call at index 1 (instead of index 0).
This can happen when the provider incorrectly continues indexing from a previous response.
This tests that nil entries are removed from the tool calls array caused by non-zero starting indices.

-- request --
{
"messages": [
{
"content": "<current_datetime>2026-01-23T20:22:43.781Z</current_datetime>\n\nI want you to do to this in order:\n1) create a file in my current directory with name \"test.txt\"\n2) list all my coder workspaces",
"role": "user"
}
],
"model": "claude-haiku-4.5",
"n": 1,
"temperature": 1,
"parallel_tool_calls": false,
"stream_options": {
"include_usage": true
},
"stream": true
}

-- streaming --
data: {"choices":[{"index":0,"delta":{"content":"Now","role":"assistant"}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" listing","role":"assistant"}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" your","role":"assistant"}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" C","role":"assistant"}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"oder workspaces:","role":"assistant"}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"name":"bmcp_coder_coder_list_workspaces"},"id":"toolu_vrtx_01DbFqUgk6aAtJ4nDBqzFWDF","index":1,"type":"function"}]}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":null,"tool_calls":[{"function":{"arguments":""},"index":1}]}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","model":"claude-haiku-4.5"}

data: {"choices":[{"finish_reason":"tool_calls","index":0,"delta":{"content":null}}],"created":1769199774,"id":"msg_vrtx_01Fiieb5Z3kqJf9a3FwvLkky","usage":{"completion_tokens":58,"prompt_tokens":25939,"prompt_tokens_details":{"cached_tokens":25429},"total_tokens":25997},"model":"claude-haiku-4.5"}

data: [DONE]

-- streaming/tool-call --
data: {"choices":[{"index":0,"delta":{"content":"Done","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"! I create","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"d `","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"test.txt` in","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" your current directory.","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" You","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" have","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" 1","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" ","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":"Coder workspace:\n\n-","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" **test-scf** (docker","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}

data: {"choices":[{"index":0,"delta":{"content":" template)","role":"assistant"}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","model":"claude-haiku-4.5"}

data: {"choices":[{"finish_reason":"stop","index":0,"delta":{"content":null}}],"created":1769199776,"id":"msg_vrtx_01RVxamMyw1DBtpoENDpmnQK","usage":{"completion_tokens":39,"prompt_tokens":26166,"prompt_tokens_details":{"cached_tokens":25934},"total_tokens":26205},"model":"claude-haiku-4.5"}

data: [DONE]
47 changes: 36 additions & 11 deletions intercept/chatcompletions/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"net/http"
"slices"
"strings"
"time"

Expand Down Expand Up @@ -148,16 +149,24 @@ func (i *StreamingInterception) ProcessRequest(w http.ResponseWriter, r *http.Re
}
}

// Builtin tools are not intercepted.
if toolCall != nil && i.getInjectedToolByName(toolCall.Name) == nil {
_ = i.recorder.RecordToolUsage(streamCtx, &recorder.ToolUsageRecord{
InterceptionID: i.ID().String(),
MsgID: processor.getMsgID(),
Tool: toolCall.Name,
Args: i.unmarshalArgs(toolCall.Arguments),
Injected: false,
})
toolCall = nil
if toolCall != nil {
// Builtin tools are not intercepted.
if i.getInjectedToolByName(toolCall.Name) == nil {
_ = i.recorder.RecordToolUsage(streamCtx, &recorder.ToolUsageRecord{
InterceptionID: i.ID().String(),
MsgID: processor.getMsgID(),
Tool: toolCall.Name,
Args: i.unmarshalArgs(toolCall.Arguments),
Injected: false,
})
toolCall = nil
} else {
// Injected tools mark the stream as initiated so we continue to tool invocation.
// When the provider responds with a tool call as the first chunk (no text
// preamble), no chunks are relayed to the client. Marking as initiated
// ensures we continue to tool invocation instead of returning early.
events.MarkInitiated()
}
}

if prompt != nil {
Expand Down Expand Up @@ -239,7 +248,13 @@ func (i *StreamingInterception) ProcessRequest(w http.ResponseWriter, r *http.Re

// Invoke the injected tool, and use the tool result to make a subsequent request to the upstream.
// Append the completion from this stream as context.
i.req.Messages = append(i.req.Messages, processor.getLastCompletion().ToParam())
// Some providers may return tool calls with non-zero starting indices,
// resulting in nil entries in the array that must be removed.
completion := processor.getLastCompletion()
if completion != nil {
compactToolCalls(completion)
i.req.Messages = append(i.req.Messages, completion.ToParam())
}

id := toolCall.ID
args := i.unmarshalArgs(toolCall.Arguments)
Expand Down Expand Up @@ -486,3 +501,13 @@ func (s *streamProcessor) getLastUsage() openai.CompletionUsage {
func (s *streamProcessor) getCumulativeUsage() openai.CompletionUsage {
return s.cumulativeUsage
}

// compactToolCalls removes nil/empty tool call entries (without an ID).
func compactToolCalls(msg *openai.ChatCompletionMessage) {
if msg == nil || len(msg.ToolCalls) == 0 {
return
}
msg.ToolCalls = slices.DeleteFunc(msg.ToolCalls, func(tc openai.ChatCompletionMessageToolCallUnion) bool {
return tc.ID == ""
})
}
9 changes: 9 additions & 0 deletions intercept/eventstream/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,15 @@ func (s *EventStream) IsStreaming() bool {
return s.initiated.Load() || len(s.eventsCh) > 0
}

// MarkInitiated marks the stream as initiated, even if no events have been
// sent to the client yet. A stream is considered initiated when processing
// injected tool calls that don't relay chunks to the client.
func (s *EventStream) MarkInitiated() {
s.initiateOnce.Do(func() {
s.initiated.Store(true)
})
}

// IsConnError checks if an error is related to client disconnection or context cancellation.
func IsConnError(err error) bool {
if err == nil {
Expand Down