Skip to content

Conversation

@pawbana
Copy link
Contributor

@pawbana pawbana commented Jan 22, 2026

Adds suppport to MCP tool injection in responses streaming interceptor.

Fixes: #89

Copy link
Contributor Author

pawbana commented Jan 22, 2026

This stack of pull requests is managed by Graphite. Learn more about stacking.

@pawbana pawbana marked this pull request as ready for review January 22, 2026 17:47
@pawbana pawbana force-pushed the pb/responses-mcp-streaming branch from db0be87 to 2ca985c Compare January 22, 2026 17:49
@pawbana pawbana force-pushed the pb/responses-mcp-streaming branch from 2ca985c to ba4e96d Compare January 22, 2026 17:54
Copy link
Member

@mtojek mtojek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First review round

// Set responseID to the first response.id that is set.
if responseID == "" && ev.Response.ID != "" {
responseID = ev.Response.ID
for shouldLoop {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need any security breaker for the maximum number of tools?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume something like maximum number of inner loop iterations? I don't think it is necessary.
If I understand MPC correctly there is no such limit when using it in responses API, by default provider will ask for approval but it can be disabled per tool: https://platform.openai.com/docs/guides/tools-connectors-mcp#approvals

So infinite MCP tool call loop in theory could happen when using provider directly with approval disabled.

I think adding configurable max number of inner loop iterations breaker or maybe some more sophisticated solution as a follow up would be nice.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding configurable max number of inner loop iterations breaker or maybe some more sophisticated solution as a follow up would be nice.

SGTM 👍

respCopy = responseCopier{}
opts := i.requestOptions(&respCopy)
stream := i.newStream(ctx, srv, opts)
defer stream.Close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this defer will be called after leaving this function, but most likely you want to call it after iteration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Added stream.Close() after the iteration (stream can be safely closed multiple times). Though about moving stream processing into separate function but code seemed less clear to me.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, although I don't think this loop should iterate much.

This is annoying 😅
I really want to have a defer just in case (very easy to forget in future code changes) but there are too many references to local variables to cleanly move it to separate function.
Would scoping stream processing it into a local func be ok?

pending := i.getPendingInjectedToolCalls(response)
shouldLoop, err = i.handleInnerAgenticLoop(ctx, pending, response)
if err != nil {
i.sendCustomErr(ctx, w, http.StatusInternalServerError, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean that if one tool returns an error, we failed the entire prompt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depends what "returns an error" means.

  • if the tool was called and MCP tool returned an error it will be forwarded as a tool call result (see *SingleInjectedToolError fixtures / test cases in ‎responses_integration_test.go)
  • if there was an error in code executing the MCP tool call eg: empty response or re-marshalling error we fail the prompt.

Thanks to this comment I've found re-marshaling error was ignored. Will add prepareRequestForAgenticLoop error being returned in handleInnerAgenticLoop.

}
t.Cleanup(func() { _ = ln.Close() })

go func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shouldn't we wait for graceful shutdown of this goroutine rather than leaving it on its own?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added wait to cleanup.

Copy link
Collaborator

@dannykopping dannykopping left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work @pawbana!

respCopy = responseCopier{}
opts := i.requestOptions(&respCopy)
stream := i.newStream(ctx, srv, opts)
defer stream.Close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// so events can be forwarded as soon as received.
// Otherwise loop could iterate so only last response will be forwarded.
// This is needed to keep consistency between response.id and response.previous_response_id fields.
if i.mcpProxy == nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this doesn't really match the comment, or at least it could be more explicit.
Currently it requires knowledge about how this relates to tool injection.

i.e.

"If no MCP proxy is provided then no tools are injected, ..."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed comment.

err = fmt.Errorf("failed to relay chunk: %w", err)
return err
streamErr = stream.Err()
stream.Close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error is not handled.
I've been meaning to add a linter for a while, gonna do it now.

// Append newly added items to reqPayload field
// New items are appended to limit Input re-marshaling.
// See responsesInterceptionBase.requestOptions for more details about marshaling issues.
for j := originalInputSize; j < len(i.req.Input.OfInputItemList); j++ {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌 nice

Copy link
Member

@mtojek mtojek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well done 👍

@dannykopping dannykopping self-requested a review January 23, 2026 14:29
Copy link
Contributor Author

pawbana commented Jan 23, 2026

Merge activity

  • Jan 23, 2:37 PM UTC: A user started a stack merge that includes this pull request via Graphite.
  • Jan 23, 2:37 PM UTC: @pawbana merged this pull request with Graphite.

@pawbana pawbana merged commit b9555f3 into main Jan 23, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add MCP injection support to /v1/responses streaming interceptor.

4 participants