Fix #599: per-conversation LLM reply executor (drop silo-wide inbox)#600
Fix #599: per-conversation LLM reply executor (drop silo-wide inbox)#600eanzhao wants to merge 3 commits intofeature/lark-botfrom
Conversation
…utor Replace the silo-wide ChannelLlmReplyInboxRuntime stream subscriber with an IConversationLlmReplyExecutor seam owned by each ConversationGAgent. The actor calls executor.StartAsync from inside its turn; the executor schedules the LLM work on a background task so the 60-300s call never pins the actor turn, then dispatches LlmReplyReadyEvent / DeferredLlmReplyDroppedEvent back to the actor via IActorDispatchPort. Why: the old hosted service registered a single Orleans persistent stream subscription per silo, and Orleans serialized SubscribeAsync callbacks FIFO. Every Lark user's LLM reply queued behind every other user's LLM reply, producing a 67% timeout rate in production (issue #599). The bottleneck was shape, not capacity — sharding by conversation alone is enough to recover parallelism. Compliant with CLAUDE.md "Actor 即业务实体": LLM reply is part of the conversation entity's behavior; refactoring it into a host-level worker was the original violation. Closes #599
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 189a912363
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| _ = Task.Run(async () => | ||
| { | ||
| try | ||
| { | ||
| await ProcessAsync(snapshot).ConfigureAwait(false); |
There was a problem hiding this comment.
Serialize in-flight replies per conversation
When the same conversation receives another activity while the previous LLM call is still running (for example, a user sends a follow-up within the 60–300s reply window), this Task.Run starts another background ProcessAsync after the actor turn returns. The actor only serializes the later ready/stream events, so the LLM/tool side effects and streaming chunks for the same conversation can race and replies can be delivered out of order; keep at most one in-flight LLM reply per ConversationGAgent while still allowing different conversation actors to run in parallel.
Useful? React with 👍 / 👎.
| catch (Exception ex) | ||
| { | ||
| // ProcessAsync handles its own errors and dispatches a terminal signal; | ||
| // an exception bubbling out here means dispatch itself or some outer | ||
| // step failed unexpectedly. Log and swallow so the unobserved exception | ||
| // does not crash the host. | ||
| _logger.LogError( | ||
| ex, | ||
| "Conversation LLM reply executor crashed before dispatching terminal signal: correlation={CorrelationId} target={TargetActorId}", | ||
| snapshot.CorrelationId, | ||
| snapshot.TargetActorId); | ||
| } |
There was a problem hiding this comment.
Retry when the background task cannot signal the actor
If the background work reaches the terminal dispatch and IActorDispatchPort.DispatchAsync throws (for example, a transient Orleans/grain handoff failure), the exception lands here and is only logged. Because StartAsync already returned successfully, ConversationGAgent does not schedule the durable retry path, leaving the pending request stuck in State.PendingLlmReplyRequests until some later activation happens; the failure needs to be signaled back to the actor or schedule the same retry used when startup fails.
Useful? React with 👍 / 👎.
| // ownership to a background task. | ||
| var snapshot = request.Clone(); | ||
|
|
||
| // Use Task.Run so the LLM work runs on the thread pool, not on the caller's |
There was a problem hiding this comment.
[Consensus: 3 models] severity=minor, category=design
The new Task.Run launch ignores the caller's CancellationToken (passes CancellationToken.None) and has no host-shutdown coordination — neither caller-token cancellation nor application-stopping events can stop an in-flight LLM call. Three reviewers flagged variants of this with different fix proposals; pick whichever fits (or none, if accepted as durability-by-rehydration).
Per-model verbatim
-
deepseek-v4-pro:
StartAsyncspawns fire-and-forgetTask.Runwork withCancellationToken.Noneand no tracking (noConcurrentBag<Task>/CountdownEvent/IHostedService.StopAsyncintegration). In-flight LLM calls (up to 300s) that outlive the host's graceful-shutdown window will run into a tearing-downIActorDispatchPort, producingObjectDisposedException-style noise in shutdown logs. Durability is covered by actor state rehydration, so this won't cause data loss, but consider a minimal shutdown gate: track in-flight tasks, exposeIAsyncDisposablethat signals a sharedCancellationTokenSourceandawaits the pending set within a bounded drain window (e.g. 5s), then let any stragglers die naturally. -
glm-5.1:
StartAsyncaccepts aCancellationToken ctparameter but passesCancellationToken.NonetoTask.Run, completely ignoring the caller's token. At minimum, addct.ThrowIfCancellationRequested()before theTask.Runcall so that a caller whose turn is already cancelled does not wastefully spawn an LLM round that may race the rehydration path. More robustly, passcttoTask.Runas the second argument so the task is never scheduled if cancellation is already signaled:ct.ThrowIfCancellationRequested(); var snapshot = request.Clone(); _ = Task.Run(async () => { ... }, ct);
-
mimo-v2.5-pro:
_ = Task.Run(async () => { ... }, CancellationToken.None)— the fire-and-forget pattern is correct for the non-blocking requirement, but there is no coordinated shutdown path. If the host shuts down while LLM calls are in-flight,ProcessAsyncruns to completion (or its internal timeout) with no host-level cancellation. The oldIHostedService.StopAsync→DisposeAsyncat least tore down the stream subscription. Consider accepting aCancellationTokenfrom a shared host-lifetime source (e.g.IHostApplicationLifetime.ApplicationStopping) to allow the internalCancellationTokenSourceinProcessAsyncto be cancelled early on shutdown.
| // ownership to a background task. | ||
| var snapshot = request.Clone(); | ||
|
|
||
| // Use Task.Run so the LLM work runs on the thread pool, not on the caller's |
There was a problem hiding this comment.
[Consensus: 3 models] severity=minor, category=design
The new Task.Run launch ignores the caller's CancellationToken (passes CancellationToken.None) and has no host-shutdown coordination — neither caller-token cancellation nor application-stopping events can stop an in-flight LLM call. Three reviewers flagged variants of this with different fix proposals; pick whichever fits (or none, if accepted as durability-by-rehydration).
Per-model verbatim
-
deepseek-v4-pro:
StartAsyncspawns fire-and-forgetTask.Runwork withCancellationToken.Noneand no tracking (noConcurrentBag<Task>/CountdownEvent/IHostedService.StopAsyncintegration). In-flight LLM calls (up to 300s) that outlive the host's graceful-shutdown window will run into a tearing-downIActorDispatchPort, producingObjectDisposedException-style noise in shutdown logs. Durability is covered by actor state rehydration, so this won't cause data loss, but consider a minimal shutdown gate: track in-flight tasks, exposeIAsyncDisposablethat signals a sharedCancellationTokenSourceandawaits the pending set within a bounded drain window (e.g. 5s), then let any stragglers die naturally. -
glm-5.1:
StartAsyncaccepts aCancellationToken ctparameter but passesCancellationToken.NonetoTask.Run, completely ignoring the caller's token. At minimum, addct.ThrowIfCancellationRequested()before theTask.Runcall so that a caller whose turn is already cancelled does not wastefully spawn an LLM round that may race the rehydration path. More robustly, passcttoTask.Runas the second argument so the task is never scheduled if cancellation is already signaled:ct.ThrowIfCancellationRequested(); var snapshot = request.Clone(); _ = Task.Run(async () => { ... }, ct);
-
mimo-v2.5-pro:
_ = Task.Run(async () => { ... }, CancellationToken.None)— the fire-and-forget pattern is correct for the non-blocking requirement, but there is no coordinated shutdown path. If the host shuts down while LLM calls are in-flight,ProcessAsyncruns to completion (or its internal timeout) with no host-level cancellation. The oldIHostedService.StopAsync→DisposeAsyncat least tore down the stream subscription. Consider accepting aCancellationTokenfrom a shared host-lifetime source (e.g.IHostApplicationLifetime.ApplicationStopping) to allow the internalCancellationTokenSourceinProcessAsyncto be cancelled early on shutdown.
| // very bottleneck this seam removes is reintroduced inside one actor). The | ||
| // background task only does external I/O and finishes by dispatching back to | ||
| // the actor — it never reads or writes actor state. | ||
| _ = Task.Run(async () => |
There was a problem hiding this comment.
[codex] severity=major, category=concurrency
Task.Run returns immediately for every pending request, so one ConversationGAgent can now start multiple LLM replies for the same conversation before the first one finishes. That removes the only ordering boundary the old silo-FIFO provided and can produce out-of-order replies or interleaved streaming/ready signals for a single user (e.g. user types two messages quickly; both DispatchPendingLlmReplyAsync calls fire in turn order, but their Task.Run LLM calls finish in arbitrary order).
Keep the parallelism across different conversation actors (the headline win of this PR), but add an actor-owned in-flight gate in ConversationGAgent so only one correlation per conversation is started at a time — block subsequent StartAsync calls until the current correlation's LlmReplyReadyEvent / DeferredLlmReplyDroppedEvent retires it. The gate lives in actor state, which already tracks PendingLlmReplyRequests.
If you've decided per-conversation interleaving is acceptable (or already prevented elsewhere I'm not seeing), please add a comment in DispatchPendingLlmReplyAsync explaining why the FIFO loss is OK for one user's stream.
| Timestamp = Timestamp.FromDateTimeOffset(_timeProvider.GetUtcNow()), | ||
| Payload = Any.Pack(ready), | ||
| Route = EnvelopeRouteSemantics.CreateDirect(InboxStreamId, request.TargetActorId), | ||
| Route = EnvelopeRouteSemantics.CreateDirect(PublisherActorId, request.TargetActorId), |
There was a problem hiding this comment.
[Consensus: 2 models] severity=major, category=bug
If DispatchAsync (this line) or any code in ProcessAsync throws after the pre-LLM gates pass — e.g. transient IActorDispatchPort failure, target actor not yet initialized, network blip — the outer catch in StartAsync only logs and swallows the exception, so the IConversationLlmReplyExecutor contract's required terminal signal (LlmReplyReadyEvent or DeferredLlmReplyDroppedEvent) is never delivered. State.PendingLlmReplyRequests then leaks until the 5-minute stale-age retry fires.
Fix options:
- Dispatch a terminal
DeferredLlmReplyDroppedEvent(reason likeexecutor_crash) from the outercatchinStartAsync, so the actor can retire the pending entry immediately. Cheapest fix. - Convert dispatch failures into an actor-owned durable retry path (more work, but matches the durability story for transient failures).
Either way, add a test with a throwing IActorDispatchPort to lock in the chosen behavior.
Per-model verbatim
- kimi (at executor.cs:112, the outer
catch): The outercatchblock inStartAsynclogs the crash but does not dispatch a terminal signal, violating theIConversationLlmReplyExecutorcontract ("Implementations MUST eventually deliver one terminal signal... Without a terminal signal, the actor's pending entry would leak"). IfProcessAsynccrashes after its internal gates but before dispatching (e.g._actorDispatchPortthrows on send), the actor'sState.PendingLlmReplyRequestsentry will linger until the 5-minute stale-age retry fires. Consider callingNotifyActorOfDropAsyncin the catch block with reasonexecutor_crashto honor the contract and allow the actor to retire the pending entry immediately. - codex (at executor.cs:303, the dispatch site): If
DispatchAsyncthrows here (for example Orleans reports the target actor is not initialized, or dispatch has a transient failure), the outer fire-and-forget catch only logs and swallows it, so theIConversationLlmReplyExecutorcontract's required terminal signal is never delivered andState.PendingLlmReplyRequestscan leak indefinitely. Convert dispatch failure into an actor-owned durable retry path (or otherwise guarantee a terminal signal) and add a test with a throwingIActorDispatchPort.
| /// </summary> | ||
| internal static readonly TimeSpan MetadataBuildBudget = TimeSpan.FromSeconds(15); | ||
|
|
||
| public Task StartAsync(NeedsLlmReplyEvent request, CancellationToken ct) |
There was a problem hiding this comment.
[kimi] severity=major, category=test
StartAsync — the primary public entry point of this seam — is not exercised by any unit test. The renamed ConversationLlmReplyExecutorTests still calls ProcessAsync directly, bypassing the Task.Run wrapping, exception-swallowing semantics, and non-blocking guarantee that are the whole point of this refactor.
Add at least these tests:
StartAsyncreturns a completed task immediately without awaiting the LLM call (e.g. fake LLM that blocks on aTaskCompletionSource— verifyStartAsyncreturns before the TCS is signaled).- The background task eventually invokes
ProcessAsyncand dispatches the expected terminal signal. - An unexpected exception inside
ProcessAsyncis logged and does not surface as an unobserved task exception (i.e. the outer catch does its job — pairs with Issue 3 above). - The
requestis snapshotted (Clone()) before being handed to the background task, so post-StartAsyncmutation by the caller can't affect the in-flight LLM call.
Without these, the non-blocking semantics that justify this PR are unverified — a future refactor could silently re-await the LLM call inside StartAsync and every existing test would still pass.
Codecov Report✅ All modified and coverable lines are covered by tests. @@ Coverage Diff @@
## feature/lark-bot #600 +/- ##
====================================================
- Coverage 71.95% 71.92% -0.04%
====================================================
Files 1258 1258
Lines 91512 91512
Branches 11998 11998
====================================================
- Hits 65851 65822 -29
- Misses 20926 20958 +32
+ Partials 4735 4732 -3
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 7 files with indirect coverage changes 🚀 New features to boost your workflow:
|
Fix three concrete issues raised by the multi-model PR review: 1. (major) Outer catch in StartAsync now dispatches DeferredLlmReplyDroppedEvent with reason "executor_crash" so the actor's pending entry is retired immediately when the background task fails after the pre-LLM gates. Without this, the IConversationLlmReplyExecutor contract's "must deliver one terminal signal" guarantee was only honored on the success path; failures past the gates leaked State.PendingLlmReplyRequests until the 5-min stale-age fallback. 2. (minor) StartAsync now honors the caller's CancellationToken: throws immediately on a pre-cancelled token (avoids burning an LLM round for a turn that's already moot) and passes ct to Task.Run so a cancellation between StartAsync and scheduler pickup short-circuits the work. 3. (major) Add four direct tests for StartAsync covering the contracts the refactor introduced: non-blocking-while-LLM-runs, request snapshot semantics (post-StartAsync mutation doesn't leak into the in-flight task), executor- crash drop dispatch, and pre-cancelled-token short-circuit. Document the per-conversation parallelism choice in DispatchPendingLlmReplyAsync rather than adding an actor-owned in-flight gate. The previous host-level FIFO serialized per-conversation only as a side effect of the silo-wide bottleneck this PR removes; per-user reordering is uncommon under normal usage (typing indicator, async ack), and adding a gate would introduce head-of-line blocking on silent crashes. The comment makes the trade-off explicit and notes the gate is a separate, deferred refinement. Tests: 899 ChannelRuntime + 133 Protocol pass; test_stability_guards pass.
Addresses the unresolved item from PR #600 review: prior commit allowed multiple concurrent executor.StartAsync calls per ConversationGAgent, so two messages arriving in the same conversation within the previous reply window could produce out-of-order or interleaved LLM replies. Use the head of State.PendingLlmReplyRequests as the in-flight slot. DispatchPendingLlmReplyAsync defers anything that isn't the current head; when the head retires (success / terminal failure / executor crash drop), the corresponding terminal handler calls DispatchHeadOfLlmReplyQueueIfChanged to start the new head. Cross-conversation parallelism is preserved because each actor owns its own queue. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Superseded by #597 (already merged into PR #597's Phase A explicitly retires The two PRs touch the same files (proto/streaming sink/ No additional test coverage worth porting — |
Summary
Moves LLM reply work from a silo-wide hosted-service stream subscriber into a per-conversation executor seam owned by
ConversationGAgent. Different conversations (different Lark users) now run their LLM replies in parallel instead of strict FIFO serialization across the whole silo.Closes #599.
Root cause
ChannelLlmReplyInboxRuntimeregistered a single Orleans persistent-streamSubscribeAsyncper silo (1 pod = 1 silo = 1 callback). Orleans serializesSubscribeAsynccallbacks FIFO, andProcessAsyncsynchronously awaited 60-300s LLM calls — so every Lark user's reply queued behind every other user's reply.Production logs (60 min window 2026-05-08 08:28-09:28, pod
aevatar-console-backend-7b69b89cd6-x9d9t): 6 LLM reply requests, 4 timeouts, 67% failure rate. Adjacent users'Processingtimestamps showed strict back-to-back FIFO across distinct user IDs.Approach (Solution C)
IConversationLlmReplyExecutorseam inAevatar.GAgents.Channel.Runtime. Single method:Task StartAsync(NeedsLlmReplyEvent, CancellationToken).ConversationLlmReplyExecutorimplementation inAevatar.GAgents.NyxidChat:ProcessAsynchad.StartAsyncis non-blocking: it spawns aTask.Runfor the LLM work and returns immediately. The actor turn does not wait on the 60-300s call.LlmReplyReadyEvent(orDeferredLlmReplyDroppedEvent) torequest.TargetActorIdviaIActorDispatchPort. It never reads or writes actor state — only signals — preserving the actor's single-threaded execution invariant per CLAUDE.md.ConversationGAgent.DispatchPendingLlmReplyAsyncnow callsexecutor.StartAsyncinstead ofinbox.EnqueueAsync.ChannelLlmReplyInboxRuntime,ChannelLlmReplyInboxHostedService, andIChannelLlmReplyInbox.ConversationLlmReplyExecutor.Durability: the request is persisted in
State.PendingLlmReplyRequestsbefore the actor callsStartAsync, exactly as before. Pod restart mid-flight rehydrates the actor and re-kicks the executor on activation via the existingSchedulePendingLlmReplyDispatchesAsyncpath.CLAUDE.md fit: "Actor 即业务实体 — 一个 actor = 一个业务实体(数据与方法同住);禁止按技术功能(读/写/投影)拆分同一业务实体为多个 actor". The LLM reply is part of the conversation entity's behavior; the original host-level worker was the violation.
Files
agents/Aevatar.GAgents.Channel.Runtime/Conversation/IConversationLlmReplyExecutor.cs(new)agents/Aevatar.GAgents.NyxidChat/ConversationLlmReplyExecutor.cs(renamed fromChannelLlmReplyInboxRuntime.cs)agents/Aevatar.GAgents.Channel.Runtime/Conversation/IChannelLlmReplyInbox.cs(deleted)agents/Aevatar.GAgents.Channel.Runtime/Conversation/ConversationGAgent.cs—DispatchPendingLlmReplyAsyncrewiredagents/Aevatar.GAgents.NyxidChat/ServiceCollectionExtensions.cs— drop hosted service, register executor singletontest/Aevatar.GAgents.ChannelRuntime.Tests/ConversationLlmReplyExecutorTests.cs(renamed fromChannelLlmReplyInboxRuntimeTests.cs, ported to executor-shape)test/Aevatar.GAgents.Channel.Protocol.Tests/ConversationGAgentDedupTests.cs—RecordingInbox→RecordingExecutorTest plan
dotnet build aevatar.slnx— 0 errorsdotnet test test/Aevatar.GAgents.ChannelRuntime.Tests/Aevatar.GAgents.ChannelRuntime.Tests.csproj— 895/895 passdotnet test test/Aevatar.GAgents.Channel.Protocol.Tests/Aevatar.GAgents.Channel.Protocol.Tests.csproj— 133/133 passdotnet test test/Aevatar.GAgents.Platform.Lark.Tests/Aevatar.GAgents.Platform.Lark.Tests.csproj— 17/17 passbash tools/ci/architecture_guards.sh— passProcessingtimestamps to interleave (no longer strict FIFO) and failure rate < 5% in the same 60-min window.Out of scope
SkillRunnerGAgentnon-reentrant grain (mentioned in Lark bot LLM reply 全局串行:单 hosted-service 订阅一条共享 stream #599 as a secondary bottleneck) — separate fix.EventStoreOptimisticConcurrencyExceptionagainstuser-agent-catalog-read-model— independent issue, low frequency (1/hr).