Skip to content

Fix #599: per-conversation LLM reply executor (drop silo-wide inbox)#600

Closed
eanzhao wants to merge 3 commits intofeature/lark-botfrom
fix/2026-05-08_inbox-into-conversation-actor
Closed

Fix #599: per-conversation LLM reply executor (drop silo-wide inbox)#600
eanzhao wants to merge 3 commits intofeature/lark-botfrom
fix/2026-05-08_inbox-into-conversation-actor

Conversation

@eanzhao
Copy link
Copy Markdown
Contributor

@eanzhao eanzhao commented May 8, 2026

Summary

Moves LLM reply work from a silo-wide hosted-service stream subscriber into a per-conversation executor seam owned by ConversationGAgent. Different conversations (different Lark users) now run their LLM replies in parallel instead of strict FIFO serialization across the whole silo.

Closes #599.

Root cause

ChannelLlmReplyInboxRuntime registered a single Orleans persistent-stream SubscribeAsync per silo (1 pod = 1 silo = 1 callback). Orleans serializes SubscribeAsync callbacks FIFO, and ProcessAsync synchronously awaited 60-300s LLM calls — so every Lark user's reply queued behind every other user's reply.

Production logs (60 min window 2026-05-08 08:28-09:28, pod aevatar-console-backend-7b69b89cd6-x9d9t): 6 LLM reply requests, 4 timeouts, 67% failure rate. Adjacent users' Processing timestamps showed strict back-to-back FIFO across distinct user IDs.

Approach (Solution C)

  • New IConversationLlmReplyExecutor seam in Aevatar.GAgents.Channel.Runtime. Single method: Task StartAsync(NeedsLlmReplyEvent, CancellationToken).
  • New ConversationLlmReplyExecutor implementation in Aevatar.GAgents.NyxidChat:
    • Pre-LLM gates (stale age, missing relay credential, malformed payload), bot-owner config enrichment, the LLM call itself, streaming-sink wiring, and dispatch of the terminal signal back to the originating actor — same logic the old ProcessAsync had.
    • StartAsync is non-blocking: it spawns a Task.Run for the LLM work and returns immediately. The actor turn does not wait on the 60-300s call.
    • The background task only does external I/O and finishes by dispatching LlmReplyReadyEvent (or DeferredLlmReplyDroppedEvent) to request.TargetActorId via IActorDispatchPort. It never reads or writes actor state — only signals — preserving the actor's single-threaded execution invariant per CLAUDE.md.
  • ConversationGAgent.DispatchPendingLlmReplyAsync now calls executor.StartAsync instead of inbox.EnqueueAsync.
  • Deleted ChannelLlmReplyInboxRuntime, ChannelLlmReplyInboxHostedService, and IChannelLlmReplyInbox.
  • DI registration drops the silo-wide hosted service; replaced with a singleton ConversationLlmReplyExecutor.

Durability: the request is persisted in State.PendingLlmReplyRequests before the actor calls StartAsync, exactly as before. Pod restart mid-flight rehydrates the actor and re-kicks the executor on activation via the existing SchedulePendingLlmReplyDispatchesAsync path.

CLAUDE.md fit: "Actor 即业务实体 — 一个 actor = 一个业务实体(数据与方法同住);禁止按技术功能(读/写/投影)拆分同一业务实体为多个 actor". The LLM reply is part of the conversation entity's behavior; the original host-level worker was the violation.

Files

  • agents/Aevatar.GAgents.Channel.Runtime/Conversation/IConversationLlmReplyExecutor.cs (new)
  • agents/Aevatar.GAgents.NyxidChat/ConversationLlmReplyExecutor.cs (renamed from ChannelLlmReplyInboxRuntime.cs)
  • agents/Aevatar.GAgents.Channel.Runtime/Conversation/IChannelLlmReplyInbox.cs (deleted)
  • agents/Aevatar.GAgents.Channel.Runtime/Conversation/ConversationGAgent.csDispatchPendingLlmReplyAsync rewired
  • agents/Aevatar.GAgents.NyxidChat/ServiceCollectionExtensions.cs — drop hosted service, register executor singleton
  • test/Aevatar.GAgents.ChannelRuntime.Tests/ConversationLlmReplyExecutorTests.cs (renamed from ChannelLlmReplyInboxRuntimeTests.cs, ported to executor-shape)
  • test/Aevatar.GAgents.Channel.Protocol.Tests/ConversationGAgentDedupTests.csRecordingInboxRecordingExecutor
  • Misc comment / proto-doc / allowlist renames

Test plan

  • dotnet build aevatar.slnx — 0 errors
  • dotnet test test/Aevatar.GAgents.ChannelRuntime.Tests/Aevatar.GAgents.ChannelRuntime.Tests.csproj — 895/895 pass
  • dotnet test test/Aevatar.GAgents.Channel.Protocol.Tests/Aevatar.GAgents.Channel.Protocol.Tests.csproj — 133/133 pass
  • dotnet test test/Aevatar.GAgents.Platform.Lark.Tests/Aevatar.GAgents.Platform.Lark.Tests.csproj — 17/17 pass
  • bash tools/ci/architecture_guards.sh — pass
  • Production verification (post-deploy): script 10 simultaneous Lark DMs to different users; expect their Processing timestamps to interleave (no longer strict FIFO) and failure rate < 5% in the same 60-min window.

Out of scope

…utor

Replace the silo-wide ChannelLlmReplyInboxRuntime stream subscriber with an
IConversationLlmReplyExecutor seam owned by each ConversationGAgent. The actor
calls executor.StartAsync from inside its turn; the executor schedules the LLM
work on a background task so the 60-300s call never pins the actor turn, then
dispatches LlmReplyReadyEvent / DeferredLlmReplyDroppedEvent back to the actor
via IActorDispatchPort.

Why: the old hosted service registered a single Orleans persistent stream
subscription per silo, and Orleans serialized SubscribeAsync callbacks FIFO.
Every Lark user's LLM reply queued behind every other user's LLM reply,
producing a 67% timeout rate in production (issue #599). The bottleneck was
shape, not capacity — sharding by conversation alone is enough to recover
parallelism.

Compliant with CLAUDE.md "Actor 即业务实体": LLM reply is part of the
conversation entity's behavior; refactoring it into a host-level worker was
the original violation.

Closes #599
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 189a912363

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +106 to +110
_ = Task.Run(async () =>
{
try
{
await ProcessAsync(snapshot).ConfigureAwait(false);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Serialize in-flight replies per conversation

When the same conversation receives another activity while the previous LLM call is still running (for example, a user sends a follow-up within the 60–300s reply window), this Task.Run starts another background ProcessAsync after the actor turn returns. The actor only serializes the later ready/stream events, so the LLM/tool side effects and streaming chunks for the same conversation can race and replies can be delivered out of order; keep at most one in-flight LLM reply per ConversationGAgent while still allowing different conversation actors to run in parallel.

Useful? React with 👍 / 👎.

Comment on lines +112 to +123
catch (Exception ex)
{
// ProcessAsync handles its own errors and dispatches a terminal signal;
// an exception bubbling out here means dispatch itself or some outer
// step failed unexpectedly. Log and swallow so the unobserved exception
// does not crash the host.
_logger.LogError(
ex,
"Conversation LLM reply executor crashed before dispatching terminal signal: correlation={CorrelationId} target={TargetActorId}",
snapshot.CorrelationId,
snapshot.TargetActorId);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Retry when the background task cannot signal the actor

If the background work reaches the terminal dispatch and IActorDispatchPort.DispatchAsync throws (for example, a transient Orleans/grain handoff failure), the exception lands here and is only logged. Because StartAsync already returned successfully, ConversationGAgent does not schedule the durable retry path, leaving the pending request stuck in State.PendingLlmReplyRequests until some later activation happens; the failure needs to be signaled back to the actor or schedule the same retry used when startup fails.

Useful? React with 👍 / 👎.

// ownership to a background task.
var snapshot = request.Clone();

// Use Task.Run so the LLM work runs on the thread pool, not on the caller's
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Consensus: 3 models] severity=minor, category=design

The new Task.Run launch ignores the caller's CancellationToken (passes CancellationToken.None) and has no host-shutdown coordination — neither caller-token cancellation nor application-stopping events can stop an in-flight LLM call. Three reviewers flagged variants of this with different fix proposals; pick whichever fits (or none, if accepted as durability-by-rehydration).

Per-model verbatim
  • deepseek-v4-pro: StartAsync spawns fire-and-forget Task.Run work with CancellationToken.None and no tracking (no ConcurrentBag<Task> / CountdownEvent / IHostedService.StopAsync integration). In-flight LLM calls (up to 300s) that outlive the host's graceful-shutdown window will run into a tearing-down IActorDispatchPort, producing ObjectDisposedException-style noise in shutdown logs. Durability is covered by actor state rehydration, so this won't cause data loss, but consider a minimal shutdown gate: track in-flight tasks, expose IAsyncDisposable that signals a shared CancellationTokenSource and awaits the pending set within a bounded drain window (e.g. 5s), then let any stragglers die naturally.

  • glm-5.1: StartAsync accepts a CancellationToken ct parameter but passes CancellationToken.None to Task.Run, completely ignoring the caller's token. At minimum, add ct.ThrowIfCancellationRequested() before the Task.Run call so that a caller whose turn is already cancelled does not wastefully spawn an LLM round that may race the rehydration path. More robustly, pass ct to Task.Run as the second argument so the task is never scheduled if cancellation is already signaled:

    ct.ThrowIfCancellationRequested();
    var snapshot = request.Clone();
    _ = Task.Run(async () => { ... }, ct);
  • mimo-v2.5-pro: _ = Task.Run(async () => { ... }, CancellationToken.None) — the fire-and-forget pattern is correct for the non-blocking requirement, but there is no coordinated shutdown path. If the host shuts down while LLM calls are in-flight, ProcessAsync runs to completion (or its internal timeout) with no host-level cancellation. The old IHostedService.StopAsyncDisposeAsync at least tore down the stream subscription. Consider accepting a CancellationToken from a shared host-lifetime source (e.g. IHostApplicationLifetime.ApplicationStopping) to allow the internal CancellationTokenSource in ProcessAsync to be cancelled early on shutdown.

// ownership to a background task.
var snapshot = request.Clone();

// Use Task.Run so the LLM work runs on the thread pool, not on the caller's
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Consensus: 3 models] severity=minor, category=design

The new Task.Run launch ignores the caller's CancellationToken (passes CancellationToken.None) and has no host-shutdown coordination — neither caller-token cancellation nor application-stopping events can stop an in-flight LLM call. Three reviewers flagged variants of this with different fix proposals; pick whichever fits (or none, if accepted as durability-by-rehydration).

Per-model verbatim
  • deepseek-v4-pro: StartAsync spawns fire-and-forget Task.Run work with CancellationToken.None and no tracking (no ConcurrentBag<Task> / CountdownEvent / IHostedService.StopAsync integration). In-flight LLM calls (up to 300s) that outlive the host's graceful-shutdown window will run into a tearing-down IActorDispatchPort, producing ObjectDisposedException-style noise in shutdown logs. Durability is covered by actor state rehydration, so this won't cause data loss, but consider a minimal shutdown gate: track in-flight tasks, expose IAsyncDisposable that signals a shared CancellationTokenSource and awaits the pending set within a bounded drain window (e.g. 5s), then let any stragglers die naturally.

  • glm-5.1: StartAsync accepts a CancellationToken ct parameter but passes CancellationToken.None to Task.Run, completely ignoring the caller's token. At minimum, add ct.ThrowIfCancellationRequested() before the Task.Run call so that a caller whose turn is already cancelled does not wastefully spawn an LLM round that may race the rehydration path. More robustly, pass ct to Task.Run as the second argument so the task is never scheduled if cancellation is already signaled:

    ct.ThrowIfCancellationRequested();
    var snapshot = request.Clone();
    _ = Task.Run(async () => { ... }, ct);
  • mimo-v2.5-pro: _ = Task.Run(async () => { ... }, CancellationToken.None) — the fire-and-forget pattern is correct for the non-blocking requirement, but there is no coordinated shutdown path. If the host shuts down while LLM calls are in-flight, ProcessAsync runs to completion (or its internal timeout) with no host-level cancellation. The old IHostedService.StopAsyncDisposeAsync at least tore down the stream subscription. Consider accepting a CancellationToken from a shared host-lifetime source (e.g. IHostApplicationLifetime.ApplicationStopping) to allow the internal CancellationTokenSource in ProcessAsync to be cancelled early on shutdown.

// very bottleneck this seam removes is reintroduced inside one actor). The
// background task only does external I/O and finishes by dispatching back to
// the actor — it never reads or writes actor state.
_ = Task.Run(async () =>
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[codex] severity=major, category=concurrency

Task.Run returns immediately for every pending request, so one ConversationGAgent can now start multiple LLM replies for the same conversation before the first one finishes. That removes the only ordering boundary the old silo-FIFO provided and can produce out-of-order replies or interleaved streaming/ready signals for a single user (e.g. user types two messages quickly; both DispatchPendingLlmReplyAsync calls fire in turn order, but their Task.Run LLM calls finish in arbitrary order).

Keep the parallelism across different conversation actors (the headline win of this PR), but add an actor-owned in-flight gate in ConversationGAgent so only one correlation per conversation is started at a time — block subsequent StartAsync calls until the current correlation's LlmReplyReadyEvent / DeferredLlmReplyDroppedEvent retires it. The gate lives in actor state, which already tracks PendingLlmReplyRequests.

If you've decided per-conversation interleaving is acceptable (or already prevented elsewhere I'm not seeing), please add a comment in DispatchPendingLlmReplyAsync explaining why the FIFO loss is OK for one user's stream.

Timestamp = Timestamp.FromDateTimeOffset(_timeProvider.GetUtcNow()),
Payload = Any.Pack(ready),
Route = EnvelopeRouteSemantics.CreateDirect(InboxStreamId, request.TargetActorId),
Route = EnvelopeRouteSemantics.CreateDirect(PublisherActorId, request.TargetActorId),
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Consensus: 2 models] severity=major, category=bug

If DispatchAsync (this line) or any code in ProcessAsync throws after the pre-LLM gates pass — e.g. transient IActorDispatchPort failure, target actor not yet initialized, network blip — the outer catch in StartAsync only logs and swallows the exception, so the IConversationLlmReplyExecutor contract's required terminal signal (LlmReplyReadyEvent or DeferredLlmReplyDroppedEvent) is never delivered. State.PendingLlmReplyRequests then leaks until the 5-minute stale-age retry fires.

Fix options:

  1. Dispatch a terminal DeferredLlmReplyDroppedEvent (reason like executor_crash) from the outer catch in StartAsync, so the actor can retire the pending entry immediately. Cheapest fix.
  2. Convert dispatch failures into an actor-owned durable retry path (more work, but matches the durability story for transient failures).

Either way, add a test with a throwing IActorDispatchPort to lock in the chosen behavior.

Per-model verbatim
  • kimi (at executor.cs:112, the outer catch): The outer catch block in StartAsync logs the crash but does not dispatch a terminal signal, violating the IConversationLlmReplyExecutor contract ("Implementations MUST eventually deliver one terminal signal... Without a terminal signal, the actor's pending entry would leak"). If ProcessAsync crashes after its internal gates but before dispatching (e.g. _actorDispatchPort throws on send), the actor's State.PendingLlmReplyRequests entry will linger until the 5-minute stale-age retry fires. Consider calling NotifyActorOfDropAsync in the catch block with reason executor_crash to honor the contract and allow the actor to retire the pending entry immediately.
  • codex (at executor.cs:303, the dispatch site): If DispatchAsync throws here (for example Orleans reports the target actor is not initialized, or dispatch has a transient failure), the outer fire-and-forget catch only logs and swallows it, so the IConversationLlmReplyExecutor contract's required terminal signal is never delivered and State.PendingLlmReplyRequests can leak indefinitely. Convert dispatch failure into an actor-owned durable retry path (or otherwise guarantee a terminal signal) and add a test with a throwing IActorDispatchPort.

/// </summary>
internal static readonly TimeSpan MetadataBuildBudget = TimeSpan.FromSeconds(15);

public Task StartAsync(NeedsLlmReplyEvent request, CancellationToken ct)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[kimi] severity=major, category=test

StartAsync — the primary public entry point of this seam — is not exercised by any unit test. The renamed ConversationLlmReplyExecutorTests still calls ProcessAsync directly, bypassing the Task.Run wrapping, exception-swallowing semantics, and non-blocking guarantee that are the whole point of this refactor.

Add at least these tests:

  1. StartAsync returns a completed task immediately without awaiting the LLM call (e.g. fake LLM that blocks on a TaskCompletionSource — verify StartAsync returns before the TCS is signaled).
  2. The background task eventually invokes ProcessAsync and dispatches the expected terminal signal.
  3. An unexpected exception inside ProcessAsync is logged and does not surface as an unobserved task exception (i.e. the outer catch does its job — pairs with Issue 3 above).
  4. The request is snapshotted (Clone()) before being handed to the background task, so post-StartAsync mutation by the caller can't affect the in-flight LLM call.

Without these, the non-blocking semantics that justify this PR are unverified — a future refactor could silently re-await the LLM call inside StartAsync and every existing test would still pass.

@codecov
Copy link
Copy Markdown

codecov Bot commented May 8, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 71.92%. Comparing base (59fd77a) to head (9c35ede).
⚠️ Report is 5 commits behind head on feature/lark-bot.

@@                 Coverage Diff                  @@
##           feature/lark-bot     #600      +/-   ##
====================================================
- Coverage             71.95%   71.92%   -0.04%     
====================================================
  Files                  1258     1258              
  Lines                 91512    91512              
  Branches              11998    11998              
====================================================
- Hits                  65851    65822      -29     
- Misses                20926    20958      +32     
+ Partials               4735     4732       -3     
Flag Coverage Δ
ci 71.92% <ø> (-0.04%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...r.AI.ToolProviders.NyxId/Tools/NyxIdSshExecTool.cs 99.41% <ø> (ø)

... and 7 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

eanzhao and others added 2 commits May 8, 2026 18:23
Fix three concrete issues raised by the multi-model PR review:

1. (major) Outer catch in StartAsync now dispatches DeferredLlmReplyDroppedEvent
   with reason "executor_crash" so the actor's pending entry is retired
   immediately when the background task fails after the pre-LLM gates. Without
   this, the IConversationLlmReplyExecutor contract's "must deliver one terminal
   signal" guarantee was only honored on the success path; failures past the
   gates leaked State.PendingLlmReplyRequests until the 5-min stale-age fallback.

2. (minor) StartAsync now honors the caller's CancellationToken: throws
   immediately on a pre-cancelled token (avoids burning an LLM round for a turn
   that's already moot) and passes ct to Task.Run so a cancellation between
   StartAsync and scheduler pickup short-circuits the work.

3. (major) Add four direct tests for StartAsync covering the contracts the
   refactor introduced: non-blocking-while-LLM-runs, request snapshot semantics
   (post-StartAsync mutation doesn't leak into the in-flight task), executor-
   crash drop dispatch, and pre-cancelled-token short-circuit.

Document the per-conversation parallelism choice in
DispatchPendingLlmReplyAsync rather than adding an actor-owned in-flight gate.
The previous host-level FIFO serialized per-conversation only as a side effect
of the silo-wide bottleneck this PR removes; per-user reordering is uncommon
under normal usage (typing indicator, async ack), and adding a gate would
introduce head-of-line blocking on silent crashes. The comment makes the
trade-off explicit and notes the gate is a separate, deferred refinement.

Tests: 899 ChannelRuntime + 133 Protocol pass; test_stability_guards pass.
Addresses the unresolved item from PR #600 review: prior commit allowed
multiple concurrent executor.StartAsync calls per ConversationGAgent, so
two messages arriving in the same conversation within the previous reply
window could produce out-of-order or interleaved LLM replies.

Use the head of State.PendingLlmReplyRequests as the in-flight slot.
DispatchPendingLlmReplyAsync defers anything that isn't the current head;
when the head retires (success / terminal failure / executor crash drop),
the corresponding terminal handler calls DispatchHeadOfLlmReplyQueueIfChanged
to start the new head. Cross-conversation parallelism is preserved because
each actor owns its own queue.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@eanzhao
Copy link
Copy Markdown
Contributor Author

eanzhao commented May 8, 2026

Superseded by #597 (already merged into feature/lark-bot as f078068e).

PR #597's Phase A explicitly retires ChannelLlmReplyInboxRuntime as a host-level singleton and replaces it with run-scoped AgentRunGAgent actors keyed by correlationId. Each correlationId getting its own actor + stream eliminates the silo-wide FIFO this PR was targeting, so the production failure mode reported in #599 (sequential 60-300s waits across distinct Lark users) no longer exists once feature/lark-bot deploys.

The two PRs touch the same files (proto/streaming sink/ConversationGAgent.DispatchPendingLlmReplyAsync/DI/dedup tests) but pick different seam shapes (IConversationLlmReplyExecutor here vs IChannelLlmReplyRunDispatcher there). #597 is already in, and its actor-per-run model fits CLAUDE.md's "actor 即业务实体 / run/session/task-scoped actor" guidance more strictly than this PR's Task.Run singleton, so closing this rather than re-litigating.

No additional test coverage worth porting — AgentRunGAgentTests already covers the same scenarios (stale gate, missing relay token, malformed payload, streaming chunk + ready ordering, bot-owner LLM config, timeout fallback).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant