feat(chat): add langchaingo agent execution path for chat module#710
Conversation
Introduces langchaingo v0.1.14 as the modern LLM agent framework: - chat/llm.go: LangChain LLM client initialization with proxy support - chat/tools.go: McpoToolAdapter bridging mcpo tools to langchaingo interface - chat/agent.go: Agent processor with multi-round tool calling and streaming - config/chat.go: AgentConfig for enabling agent mode per chat config - chat/chat.go: Dual execution paths (legacy SDK + new agent, backward compatible) - chat/agent_test.go: Tests for new components Co-authored-by: hugefiver <18693500+hugefiver@users.noreply.github.com>
Addresses code review feedback: stopTicker now uses sync.Once with close() instead of a blocking send to prevent potential deadlock if the goroutine has already exited. Co-authored-by: hugefiver <18693500+hugefiver@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Adds an opt-in langchaingo-based agent execution path to the existing chat module, intended to support multi-round tool calling while keeping the legacy go-openai streaming path as the default.
Changes:
- Introduces
agentconfiguration (enabled,max_iterations) for chat configs. - Adds langchaingo OpenAI model initialization and an MCP tool adapter layer for langchaingo tools.
- Routes chat execution to either the new agent loop or the legacy streaming implementation based on config.
Reviewed changes
Copilot reviewed 8 out of 9 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| main.go | Initializes langchaingo models during startup. |
| go.mod | Adds github.com/tmc/langchaingo and indirect deps. |
| go.sum | Records new dependency checksums for langchaingo and related packages. |
| config/chat.go | Adds AgentConfig and wires it into ChatConfigSingle. |
| chat/chat.go | Adds agent/legacy routing and extracts agent/legacy execution helpers. |
| chat/llm.go | Initializes langchaingo OpenAI models with base URL + proxy support. |
| chat/tools.go | Bridges existing MCP(mcpo) tools into langchaingo tool definitions/adapters. |
| chat/agent.go | Implements the agent loop, tool-call execution, and Telegram streaming updates. |
| chat/agent_test.go | Adds unit tests for message conversion, defaults, and nil-safety. |
Comments suppressed due to low confidence (1)
chat/chat.go:404
- Same issue as the agent path:
streamProcessor.process()callsfinalizeResponse()(which sends/edits the Telegram message) before returning the response string, so applyingProcessOutgoinghere cannot affect the sent message. If filters are meant to change user-visible output, they need to run beforefinalizeResponse()sends/edits (e.g., pass a filter hook into the processor or return formatted text and send after filtering).
filter := createFilter(&filterConfig)
if filter == nil {
continue
}
response = filter.ProcessOutgoing(response, ctx, v2)
| if ap.placeholderMsg == nil { | ||
| ap.placeholderMsg, err = ap.ctx.Bot().Reply(ap.ctx.Message(), formattedText, formatOpt) | ||
| if err != nil { | ||
| log.Error("agent: failed to create initial reply during streaming", zap.Error(err)) | ||
| return |
There was a problem hiding this comment.
ap.placeholderMsg is assigned here from the streaming ticker goroutine, but it is also read/used in finalizeResponse() on the main goroutine. Since stopTicker() doesn't wait for the goroutine to finish, this can race under -race. Consider guarding placeholderMsg with a mutex and/or making stopTicker() wait for the ticker goroutine to exit before proceeding to final send/edit.
There was a problem hiding this comment.
Fixed in 6ebda7a. placeholderMsg is now guarded by mu — updateStreamingMessage() holds the lock for its entire execution, and finalizeResponse() reads placeholderMsg under lock. Additionally, stopTicker() now waits on a tickerDone channel for the goroutine to fully exit before finalizeResponse() proceeds.
| } else { | ||
| _, err = util.EditMessageWithError(ap.placeholderMsg, formattedText, formatOpt) | ||
| if err != nil { | ||
| log.Error("agent: failed to edit message during streaming", zap.Error(err)) | ||
| return |
There was a problem hiding this comment.
Related race risk: editing ap.placeholderMsg from the streaming goroutine can overlap with final editing in finalizeResponse() (and with stopTicker()), since there is no join/wait for the streaming goroutine. Add synchronization (mutex/WaitGroup) so edits cannot happen concurrently.
There was a problem hiding this comment.
Fixed in 6ebda7a. updateStreamingMessage() now holds mu for its entire execution (not just for reading buffers), and stopTicker() blocks on <-ap.tickerDone to ensure the goroutine has exited before finalizeResponse() touches the message.
| model := getLangchainModel(v2.Model.Name) | ||
| if model == nil { | ||
| log.Error("langchain model not found, falling back to legacy path", zap.String("model", v2.Model.Name)) | ||
| return nil | ||
| } |
There was a problem hiding this comment.
chatWithAgent logs that it will fall back to the legacy path when the langchaingo model is missing, but it currently returns nil instead. This results in no response being sent (and can leave a placeholder message hanging). Call chatWithLegacy(...) here (or at least edit/send an error message) instead of returning nil.
There was a problem hiding this comment.
Fixed in 6ebda7a. chatWithAgent now accepts the legacy messages parameter and calls chatWithLegacy(chatCtx, ctx, v2, messages, placeholderMsg) when the langchaingo model is not found.
| } | ||
| response = filter.ProcessOutgoing(response, ctx, v2) | ||
| } | ||
|
|
There was a problem hiding this comment.
Outgoing filters are applied after ap.run() returns, but ap.run() already sends/edits the Telegram message in finalizeResponse(). That means ProcessOutgoing modifications are never reflected in what the user sees (despite the filter contract saying it can modify the response before sending). Consider applying outgoing filters before the final send/edit, or refactor ap.run() to return the final text and let the caller send after filtering.
| // Ensure the filtered response is reflected in the final user-visible message. | |
| if placeholderMsg != nil { | |
| if _, err := util.EditMessageWithError(placeholderMsg, response, tb.ModeMarkdownV2); err != nil { | |
| log.Error("failed to edit placeholder with filtered agent response", zap.Error(err)) | |
| } | |
| } |
There was a problem hiding this comment.
Note: the outgoing filter behavior is consistent with the legacy path (chatWithLegacy), which also applies ProcessOutgoing after the stream processor has already sent/edited the message. Both paths apply filters to the returned response text after it's been sent. If we want filters to modify what the user sees, both paths would need refactoring together — keeping them consistent for now in 6ebda7a.
| ap.mu.Unlock() | ||
| ap.lastSentText = "" |
There was a problem hiding this comment.
Potential data race: ap.lastSentText is read/written in the streaming ticker goroutine (updateStreamingMessage) but also reset here in the main agent loop without any synchronization. Protect lastSentText with the same mutex (or confine all reads/writes to the streaming goroutine) to avoid races under -race.
| ap.mu.Unlock() | |
| ap.lastSentText = "" | |
| ap.lastSentText = "" | |
| ap.mu.Unlock() |
There was a problem hiding this comment.
Fixed in 6ebda7a. lastSentText = "" is now inside the ap.mu.Lock() block alongside the buffer resets, and updateStreamingMessage() holds the same mu lock for its entire execution.
|
@copilot apply changes based on the comments in this thread |
…nd filter ordering 1. Fix data races on placeholderMsg, lastSentText, fullResponse, reasonContent: - Changed RWMutex to Mutex for simpler exclusive locking - updateStreamingMessage now holds mu for its entire execution - lastSentText reset moved inside locked section - finalizeResponse reads placeholderMsg under lock 2. Fix goroutine join: added tickerDone channel so stopTicker() waits for the ticker goroutine to fully exit before finalizeResponse() runs. 3. Fix missing fallback: chatWithAgent now calls chatWithLegacy() when langchaingo model is not found, instead of returning nil. 4. Accept filter ordering note: outgoing filters are applied after ap.run() returns the final text (not before send), matching the legacy path's behavior consistently. Co-authored-by: hugefiver <18693500+hugefiver@users.noreply.github.com>
All 5 review comments addressed in 6ebda7a:
|
| return | ||
| } | ||
|
|
||
| editInterval := ap.config.Format.GetEditInterval() |
There was a problem hiding this comment.
time.NewTicker(editInterval) will panic if editInterval <= 0 (e.g., config sets edit_interval: "0" or a negative duration). Please defensively clamp/validate the duration (or treat non-positive as streaming-disabled) before constructing the ticker to avoid crashing the whole bot on misconfiguration.
| editInterval := ap.config.Format.GetEditInterval() | |
| editInterval := ap.config.Format.GetEditInterval() | |
| if editInterval <= 0 { | |
| log.Warn("agent: streaming disabled due to non-positive edit interval", | |
| zap.Duration("editInterval", editInterval)) | |
| close(ap.tickerDone) | |
| return | |
| } |
There was a problem hiding this comment.
Fixed in 1485fcf. Added the exact suggested guard — if editInterval <= 0, streaming is disabled with a warning log and tickerDone is closed immediately, preventing the time.NewTicker panic.
| // run executes the agent loop with multi-round tool calling support. | ||
| func (ap *agentProcessor) run() (string, error) { | ||
| maxIterations := ap.config.Agent.GetMaxIterations() | ||
|
|
||
| // Start streaming ticker if enabled | ||
| ap.startStreamingTicker() | ||
| defer ap.stopTicker() | ||
|
|
||
| for iteration := range maxIterations { | ||
| log.Debug("agent iteration", | ||
| zap.Int("iteration", iteration), | ||
| zap.Int("maxIterations", maxIterations)) | ||
|
|
||
| // Build call options | ||
| opts := ap.buildCallOptions() | ||
|
|
||
| // Call the model | ||
| resp, err := ap.model.GenerateContent(ap.chatCtx, ap.messages, opts...) | ||
| if err != nil { | ||
| log.Error("agent: model GenerateContent failed", zap.Error(err)) | ||
| return "", err | ||
| } | ||
|
|
||
| if len(resp.Choices) == 0 { | ||
| log.Warn("agent: empty response from model") | ||
| break | ||
| } | ||
|
|
||
| choice := resp.Choices[0] | ||
|
|
||
| // Accumulate content | ||
| if choice.Content != "" { | ||
| ap.mu.Lock() | ||
| ap.fullResponse.WriteString(choice.Content) | ||
| ap.mu.Unlock() | ||
| } | ||
|
|
||
| // Accumulate reasoning content | ||
| if choice.ReasoningContent != "" { | ||
| ap.mu.Lock() | ||
| ap.reasonContent.WriteString(choice.ReasoningContent) | ||
| ap.mu.Unlock() | ||
| } | ||
|
|
||
| // Check for tool calls | ||
| if len(choice.ToolCalls) > 0 { | ||
| // Add assistant message with tool calls to conversation | ||
| assistantMsg := llms.MessageContent{ | ||
| Role: llms.ChatMessageTypeAI, | ||
| Parts: []llms.ContentPart{}, | ||
| } | ||
| if choice.Content != "" { | ||
| assistantMsg.Parts = append(assistantMsg.Parts, llms.TextPart(choice.Content)) | ||
| } | ||
| for _, tc := range choice.ToolCalls { | ||
| assistantMsg.Parts = append(assistantMsg.Parts, tc) | ||
| } | ||
| ap.messages = append(ap.messages, assistantMsg) | ||
|
|
||
| // Execute tool calls | ||
| if err := ap.executeToolCalls(choice.ToolCalls); err != nil { | ||
| log.Error("agent: tool execution failed", zap.Error(err)) | ||
| return "", err | ||
| } | ||
|
|
||
| // Send typing notification | ||
| _ = ap.ctx.Bot().Notify(ap.ctx.Chat(), tb.Typing) | ||
|
|
||
| // Reset response buffer for next iteration | ||
| ap.mu.Lock() | ||
| ap.fullResponse.Reset() | ||
| ap.reasonContent.Reset() | ||
| ap.lastSentText = "" | ||
| ap.mu.Unlock() | ||
|
|
||
| continue | ||
| } | ||
|
|
||
| // No tool calls - we have a final response | ||
| // Add the final assistant message | ||
| ap.messages = append(ap.messages, llms.MessageContent{ | ||
| Role: llms.ChatMessageTypeAI, | ||
| Parts: []llms.ContentPart{llms.TextPart(choice.Content)}, | ||
| }) | ||
|
|
||
| break | ||
| } | ||
|
|
||
| // Finalize and send the response | ||
| return ap.finalizeResponse() | ||
| } |
There was a problem hiding this comment.
The core agent loop (run + tool-call execution + streaming/final send) is currently untested; existing tests only cover message conversion/config getters and nil-guard helpers. Adding a small fake llms.Model to exercise (1) a normal response, (2) a tool-call round-trip, and (3) streaming callback accumulation would help prevent regressions in this new execution path.
There was a problem hiding this comment.
Added in 1485fcf. New tests use a fakeModel implementing llms.Model and a fakeTool implementing tools.Tool:
TestAgentLoop_NormalResponse— normal content responseTestAgentLoop_ToolCallRoundTrip— tool call → tool result → final response (4-message conversation)TestAgentLoop_EmptyResponse— graceful empty choices handlingTestAgentLoop_ReasoningContent— reasoning content accumulationTestStreamingCallback_Accumulation— chunk accumulationTestExecuteToolCalls_NormalResponse/_ToolNotFound— tool execution pathsTestBuildCallOptions_WithTools/_WithStreaming— option construction
|
@copilot apply changes based on the comments in this thread |
…t loop tests 1. Defensively clamp editInterval in startStreamingTicker: if editInterval <= 0, disable streaming with a warning instead of panicking in time.NewTicker. 2. Add comprehensive tests for the core agent loop using a fake llms.Model: - TestStreamingCallback_Accumulation: verifies chunk accumulation - TestExecuteToolCalls_NormalResponse: verifies tool call execution + message append - TestExecuteToolCalls_ToolNotFound: verifies graceful handling of missing tools - TestBuildCallOptions_WithTools/WithStreaming: verifies option construction - TestAgentLoop_NormalResponse: normal response with fake model - TestAgentLoop_ToolCallRoundTrip: tool call → tool result → final response - TestAgentLoop_EmptyResponse: graceful handling of empty choices - TestAgentLoop_ReasoningContent: reasoning content accumulation Co-authored-by: hugefiver <18693500+hugefiver@users.noreply.github.com>
Both comments addressed in 1485fcf:
|
|
@copilot 修复合并冲突 |
Incorporate changes from dev branch: - Add extractInputFromMessage() helper function (chat/chat.go) - Add UseInternalTools config field and internal tools support (config/chat.go, chat/chat.go) - Add chat.InitInternalTools() initialization call (main.go) - Add new files: chat/internal_tools.go, chat/internal_tools_test.go, chat/chat_input_test.go - Update chat/reaction.go and chat/streaming.go from dev - Update dependency versions: mapstructure v2.5.0, meilisearch v0.36.0, xsync v4.4.0, quic-go v0.59.0, go-redis v9.17.3, x/image v0.35.0, x/text v0.33.0, jwt/v5 v5.3.0 - Merge internal tools into legacy chatWithLegacy path alongside MCP tools Co-authored-by: hugefiver <18693500+hugefiver@users.noreply.github.com>
Merge conflicts resolved in 565414e. Incorporated all changes from
|
Co-authored-by: hugefiver <18693500+hugefiver@users.noreply.github.com>
…cts with dev Co-authored-by: hugefiver <18693500+hugefiver@users.noreply.github.com>
…cts with dev Co-authored-by: hugefiver <18693500+hugefiver@users.noreply.github.com>
Co-authored-by: hugefiver <18693500+hugefiver@users.noreply.github.com>
Restore google/go-cmp in go.mod to reduce PR #710 merge conflicts with dev
chat/llm.go- LangChain LLM client initialization with proxy supportchat/tools.go- Tool adapter (mcpo → langchaingo tools) + skill systemchat/agent.go- Agent executor with multi-round tool calling + streamingconfig/chat.go(AgentConfig with max_iterations)chat/chat.goto support agent execution path (opt-in, backward compatible)main.goto initialize langchaingo componentsplaceholderMsg,lastSentText(mutex protection)stopTicker(tickerDone channel)chatWithAgent(callschatWithLegacy)editIntervalpanic instartStreamingTickerllms.Model(normal response, tool-call round-trip, streaming, reasoning)Original prompt
💬 We'd love your input! Share your thoughts on Copilot coding agent in our 2 minute survey.