-
Notifications
You must be signed in to change notification settings - Fork 3
feat: add MCP injection support to responses streaming interceptor #143
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
db0be87 to
2ca985c
Compare
2ca985c to
ba4e96d
Compare
mtojek
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First review round
| // Set responseID to the first response.id that is set. | ||
| if responseID == "" && ev.Response.ID != "" { | ||
| responseID = ev.Response.ID | ||
| for shouldLoop { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need any security breaker for the maximum number of tools?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume something like maximum number of inner loop iterations? I don't think it is necessary.
If I understand MPC correctly there is no such limit when using it in responses API, by default provider will ask for approval but it can be disabled per tool: https://platform.openai.com/docs/guides/tools-connectors-mcp#approvals
So infinite MCP tool call loop in theory could happen when using provider directly with approval disabled.
I think adding configurable max number of inner loop iterations breaker or maybe some more sophisticated solution as a follow up would be nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think adding configurable max number of inner loop iterations breaker or maybe some more sophisticated solution as a follow up would be nice.
SGTM 👍
intercept/responses/streaming.go
Outdated
| respCopy = responseCopier{} | ||
| opts := i.requestOptions(&respCopy) | ||
| stream := i.newStream(ctx, srv, opts) | ||
| defer stream.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this defer will be called after leaving this function, but most likely you want to call it after iteration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Added stream.Close() after the iteration (stream can be safely closed multiple times). Though about moving stream processing into separate function but code seemed less clear to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Watch out for defer in loops: https://www.jetbrains.com/help/inspectopedia/GoDeferInLoop.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, although I don't think this loop should iterate much.
This is annoying 😅
I really want to have a defer just in case (very easy to forget in future code changes) but there are too many references to local variables to cleanly move it to separate function.
Would scoping stream processing it into a local func be ok?
| pending := i.getPendingInjectedToolCalls(response) | ||
| shouldLoop, err = i.handleInnerAgenticLoop(ctx, pending, response) | ||
| if err != nil { | ||
| i.sendCustomErr(ctx, w, http.StatusInternalServerError, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it mean that if one tool returns an error, we failed the entire prompt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Depends what "returns an error" means.
- if the tool was called and MCP tool returned an error it will be forwarded as a tool call result (see *SingleInjectedToolError fixtures / test cases in responses_integration_test.go)
- if there was an error in code executing the MCP tool call eg: empty response or re-marshalling error we fail the prompt.
Thanks to this comment I've found re-marshaling error was ignored. Will add prepareRequestForAgenticLoop error being returned in handleInnerAgenticLoop.
| } | ||
| t.Cleanup(func() { _ = ln.Close() }) | ||
|
|
||
| go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: shouldn't we wait for graceful shutdown of this goroutine rather than leaving it on its own?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added wait to cleanup.
dannykopping
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work @pawbana!
intercept/responses/streaming.go
Outdated
| respCopy = responseCopier{} | ||
| opts := i.requestOptions(&respCopy) | ||
| stream := i.newStream(ctx, srv, opts) | ||
| defer stream.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Watch out for defer in loops: https://www.jetbrains.com/help/inspectopedia/GoDeferInLoop.html
intercept/responses/streaming.go
Outdated
| // so events can be forwarded as soon as received. | ||
| // Otherwise loop could iterate so only last response will be forwarded. | ||
| // This is needed to keep consistency between response.id and response.previous_response_id fields. | ||
| if i.mcpProxy == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this doesn't really match the comment, or at least it could be more explicit.
Currently it requires knowledge about how this relates to tool injection.
i.e.
"If no MCP proxy is provided then no tools are injected, ..."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed comment.
intercept/responses/streaming.go
Outdated
| err = fmt.Errorf("failed to relay chunk: %w", err) | ||
| return err | ||
| streamErr = stream.Err() | ||
| stream.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error is not handled.
I've been meaning to add a linter for a while, gonna do it now.
| // Append newly added items to reqPayload field | ||
| // New items are appended to limit Input re-marshaling. | ||
| // See responsesInterceptionBase.requestOptions for more details about marshaling issues. | ||
| for j := originalInputSize; j < len(i.req.Input.OfInputItemList); j++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👌 nice
mtojek
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well done 👍

Adds suppport to MCP tool injection in responses streaming interceptor.
Fixes: #89