Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public sealed partial class ConversationGAgent : GAgentBase<ConversationGAgentSt
{
// Orleans Reminders (the durable scheduler backing ScheduleSelfDurableTimeoutAsync)
// round dueTime up to the local reminder service tick (typically ~1 minute), so
// sub-minute schedules are unreliable. The inbox dispatch happens inline via
// IChannelLlmReplyInbox; the durable timer is reserved for retry/rehydration.
// sub-minute schedules are unreliable. Per-turn LLM dispatch happens inline via
// IConversationLlmReplyExecutor; the durable timer is reserved for retry/rehydration.
private static readonly TimeSpan DeferredLlmDispatchRetryDelay = TimeSpan.FromSeconds(60);
// Pending LLM reply requests older than this are considered stale on rehydration:
// the user gave up, the relay reply_token (~30 min TTL) is likely already expired,
Expand Down Expand Up @@ -311,6 +311,7 @@ public async Task HandleDeferredLlmReplyDroppedAsync(DeferredLlmReplyDroppedEven
"Retired pending LLM reply after inbox drop: correlation={CorrelationId} reason={Reason}",
evt.CorrelationId,
reason);
await DispatchHeadOfLlmReplyQueueIfChangedAsync(evt.CorrelationId);
}

[EventHandler]
Expand Down Expand Up @@ -344,11 +345,35 @@ public async Task HandleDeferredInboundTurnRetryRequestedAsync(DeferredInboundTu

private async Task DispatchPendingLlmReplyAsync(NeedsLlmReplyEvent request, CancellationToken ct)
{
var inbox = Services.GetService<IChannelLlmReplyInbox>();
if (inbox is null)
// Per-conversation FIFO gate: at most one LLM reply per ConversationGAgent is
// dispatched to an executor at a time. The head of State.PendingLlmReplyRequests is
// the in-flight slot; everything behind it waits. When the head retires (success,
// terminal failure, or executor crash drop), the corresponding terminal handler
// calls DispatchHeadOfLlmReplyQueueIfChangedAsync to start the new head. Cross-
// conversation parallelism (the headline win of #599's fix) is unaffected because
// each ConversationGAgent owns its own queue.
var head = State.PendingLlmReplyRequests.FirstOrDefault();
if (head is null)
{
// The pending entry was already retired (e.g. a duplicate terminal event raced
// ahead of this dispatch). Nothing to do.
return;
}
if (!string.Equals(head.CorrelationId, request.CorrelationId, StringComparison.Ordinal))
{
Logger.LogInformation(
"Per-conversation FIFO gate: deferring LLM reply behind in-flight head: head={Head} pending={Pending} conversation={Key}",
head.CorrelationId,
request.CorrelationId,
State.Conversation?.CanonicalKey);
return;
}

var executor = Services.GetService<IConversationLlmReplyExecutor>();
if (executor is null)
{
Logger.LogWarning(
"Channel LLM reply inbox not registered; scheduling durable retry: correlation={CorrelationId}",
"Conversation LLM reply executor not registered; scheduling durable retry: correlation={CorrelationId}",
request.CorrelationId);
await ScheduleDeferredLlmReplyDispatchAsync(request, DeferredLlmDispatchRetryDelay, ct);
return;
Expand All @@ -357,15 +382,15 @@ private async Task DispatchPendingLlmReplyAsync(NeedsLlmReplyEvent request, Canc
// Retry and rehydration paths read `request` from State.PendingLlmReplyRequests,
// which always carries an empty ReplyToken (the inbound handler strips it before
// persist). If the actor is still alive and the in-memory dict still has the
// token for this correlation, re-enrich the inbox copy so the subscriber's relay
// token for this correlation, re-enrich the executor copy so the executor's relay
// credential gate does not mistake a legitimate retry for a dead request.
var enriched = EnrichWithRuntimeReplyTokenIfNeeded(request);

try
{
await inbox.EnqueueAsync(enriched.Clone(), ct);
await executor.StartAsync(enriched.Clone(), ct);
Logger.LogInformation(
"Enqueued LLM reply request to inbox: correlation={CorrelationId} conversation={Key} replyTokenSource={Source}",
"Started LLM reply work: correlation={CorrelationId} conversation={Key} replyTokenSource={Source}",
enriched.CorrelationId,
enriched.Activity?.Conversation?.CanonicalKey,
DescribeEnqueuedReplyTokenSource(request, enriched));
Expand All @@ -374,12 +399,31 @@ private async Task DispatchPendingLlmReplyAsync(NeedsLlmReplyEvent request, Canc
{
Logger.LogError(
ex,
"Failed to enqueue LLM reply request; scheduling durable retry: correlation={CorrelationId}",
"Failed to start LLM reply work; scheduling durable retry: correlation={CorrelationId}",
request.CorrelationId);
await ScheduleDeferredLlmReplyDispatchAsync(request, DeferredLlmDispatchRetryDelay, ct);
}
}

private async Task DispatchHeadOfLlmReplyQueueIfChangedAsync(string completedCorrelationId)
{
var head = State.PendingLlmReplyRequests.FirstOrDefault();
if (head is null)
return;
if (string.Equals(head.CorrelationId, completedCorrelationId, StringComparison.Ordinal))
{
// Head still alive (e.g. transient failure scheduled a durable retry); the
// retry trigger will redispatch when its delay elapses.
return;
}
Logger.LogInformation(
"Per-conversation FIFO gate advancing: completed={Completed} nextHead={NextHead} conversation={Key}",
completedCorrelationId,
head.CorrelationId,
State.Conversation?.CanonicalKey);
await DispatchPendingLlmReplyAsync(head.Clone(), CancellationToken.None);
}

private NeedsLlmReplyEvent EnrichWithRuntimeReplyTokenIfNeeded(NeedsLlmReplyEvent request)
{
if (!string.IsNullOrWhiteSpace(request.ReplyToken))
Expand Down Expand Up @@ -449,6 +493,7 @@ public async Task HandleLlmReplyReadyAsync(LlmReplyReadyEvent evt)
var streamingActivity = referenceActivity ?? evt.Activity;
if (streamingActivity is not null)
_ = ResolveRunner().OnReplyDeliveredAsync(streamingActivity, CancellationToken.None);
await DispatchHeadOfLlmReplyQueueIfChangedAsync(evt.CorrelationId);
return;
}

Expand Down Expand Up @@ -479,6 +524,7 @@ public async Task HandleLlmReplyReadyAsync(LlmReplyReadyEvent evt)
evt.CorrelationId,
result.SentActivityId,
completed.Conversation?.CanonicalKey);
await DispatchHeadOfLlmReplyQueueIfChangedAsync(evt.CorrelationId);
return;
}

Expand Down Expand Up @@ -519,6 +565,9 @@ await ScheduleDeferredLlmReplyDispatchAsync(
evt.CorrelationId,
result.ErrorCode,
result.FailureKind);
// Advance the per-conversation FIFO gate. Retryable failures don't move the head
// (the helper will no-op); terminal failures do.
await DispatchHeadOfLlmReplyQueueIfChangedAsync(evt.CorrelationId);
}

/// <summary>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
namespace Aevatar.GAgents.Channel.Runtime;

/// <summary>
/// Per-conversation LLM reply driver. Replaces the host-level inbox stream subscriber with a seam
/// that <see cref="ConversationGAgent"/> calls directly, so each conversation actor owns its own
/// LLM reply work and different conversations run in parallel rather than serializing on a single
/// silo-wide subscription.
/// </summary>
/// <remarks>
/// <para>
/// The actor must persist the request into <c>State.PendingLlmReplyRequests</c> before invoking
/// this seam: the executor runs the LLM call on a background task and only signals completion via
/// dispatch, so durability is provided by actor state, not by the executor.
/// </para>
/// <para>
/// Implementations MUST eventually deliver one terminal signal to <see cref="NeedsLlmReplyEvent.TargetActorId"/>:
/// either an <see cref="LlmReplyReadyEvent"/> (success or classified failure) or a
/// <see cref="DeferredLlmReplyDroppedEvent"/> (request rejected by a pre-LLM gate). Without a
/// terminal signal, the actor's pending entry would leak.
/// </para>
/// <para>
/// <see cref="StartAsync"/> returns once the work has been kicked off — not when the LLM call
/// completes. The actor turn must not block on the LLM call (60-300s) or it would re-introduce the
/// per-actor serial bottleneck this seam exists to remove.
/// </para>
/// </remarks>
public interface IConversationLlmReplyExecutor
{
Task StartAsync(NeedsLlmReplyEvent request, CancellationToken ct);
}
12 changes: 6 additions & 6 deletions agents/Aevatar.GAgents.Channel.Runtime/IStreamingReplySink.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
namespace Aevatar.GAgents.Channel.Runtime;

/// <summary>
/// Receives per-delta streaming updates from <see cref="IConversationReplyGenerator"/> so the reply
/// inbox can fan the accumulated text to the conversation actor as it is being generated. The
/// actor is the sole holder of the relay reply token, so only it is allowed to drive the relay
/// Receives per-delta streaming updates from <see cref="IConversationReplyGenerator"/> so the LLM
/// reply executor can fan the accumulated text to the conversation actor as it is being generated.
/// The actor is the sole holder of the relay reply token, so only it is allowed to drive the relay
/// placeholder send and subsequent edit calls; this sink therefore fans out signals (chunk events)
/// and never touches the outbound port directly.
/// </summary>
/// <remarks>
/// Implementations are per-turn and owned by the inbox runtime. A null sink signals that streaming
/// is disabled for the turn (for example, the feature flag is off, the activity is not a relay
/// turn, or an earlier failure invalidated the turn); generators must tolerate a null sink by
/// Implementations are per-turn and owned by the LLM reply executor. A null sink signals that
/// streaming is disabled for the turn (for example, the feature flag is off, the activity is not a
/// relay turn, or an earlier failure invalidated the turn); generators must tolerate a null sink by
/// simply accumulating the final text without calling any sink method.
/// </remarks>
public interface IStreamingReplySink
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace Aevatar.GAgents.Channel.Runtime;
/// <item><see cref="FinalizeAsync"/> bypasses the throttle so the actor sees the complete text
/// once the stream ends; if a dispatch is in flight, the final text reflushes after it and
/// <see cref="FinalizeAsync"/> awaits the dispatch loop's drain signal before returning so the
/// caller (the inbox runtime) does not race the ready event past the final chunk.</item>
/// caller (the LLM reply executor) does not race the ready event past the final chunk.</item>
/// </list>
/// </para>
/// <para>
Expand Down Expand Up @@ -68,7 +68,7 @@ public sealed class TurnStreamingReplySink : IStreamingReplySink, IDisposable
private bool _dispatchInProgress;
private bool _disposed;
// Signaled by the dispatch loop when it fully drains. FinalizeAsync awaits this when a
// dispatch is already in flight so the caller does not race the inbox runtime's
// dispatch is already in flight so the caller does not race the LLM reply executor's
// LlmReplyReadyEvent past the final chunk dispatch (the ConversationGAgent
// processed-command guard would otherwise drop the late chunk).
private TaskCompletionSource<bool>? _drainTcs;
Expand Down Expand Up @@ -116,7 +116,7 @@ public Task OnDeltaAsync(string accumulatedText, CancellationToken ct) =>
/// Applies the final accumulated text, bypassing the throttle so the actor can drive the final
/// edit once the stream ends. If a dispatch is already in flight, the final text is stashed and
/// this call awaits the dispatch loop's drain signal so the final chunk is on the wire before
/// the caller proceeds (the inbox runtime sends LlmReplyReadyEvent immediately after).
/// the caller proceeds (the LLM reply executor sends LlmReplyReadyEvent immediately after).
/// </summary>
public Task FinalizeAsync(string finalText, CancellationToken ct) =>
FlushAsync(finalText, isFinal: true, ct);
Expand Down Expand Up @@ -188,7 +188,7 @@ private async Task FlushAsync(string text, bool isFinal, CancellationToken ct)
if (isFinal)
{
// Block FinalizeAsync until the dispatch loop drains the stashed final text.
// Without this wait, ChannelLlmReplyInboxRuntime sends LlmReplyReadyEvent
// Without this wait, ConversationLlmReplyExecutor sends LlmReplyReadyEvent
// first and ConversationGAgent's processed-command guard drops the late
// final chunk.
_drainTcs ??= new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ message LlmReplyReadyEvent {
int64 reply_token_expires_at_unix_ms = 11;
}

// Per-delta streaming signal dispatched from the LLM inbox runtime to the conversation actor while
// Per-delta streaming signal dispatched from the LLM reply executor to the conversation actor while
// the reply is still being generated. The actor owns the outbound reply credential and the
// placeholder message identifier for the turn, so it must be the one issuing the relay placeholder
// send and subsequent edit calls. This message carries only the cumulative accumulated text for
Expand Down Expand Up @@ -154,7 +154,7 @@ message NyxRelayReplyTokenCleanupRequestedEvent {
int64 requested_at_unix_ms = 2;
}

// Sent by ChannelLlmReplyInboxRuntime when its pre-LLM gates (stale age,
// Sent by ConversationLlmReplyExecutor when its pre-LLM gates (stale age,
// missing relay credential, malformed payload) refuse to process a deferred
// LLM reply. The actor consumes this to retire the matching pending entry
// from State.PendingLlmReplyRequests via a NotRetryable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1509,9 +1509,9 @@ private async Task<NeedsLlmReplyEvent> BuildLlmReplyRequestAsync(
RequestedAtUnixMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
};

// Carry the relay reply credential through the inbox as transient inbox-only
// Carry the relay reply credential through to the executor as transient inbox-only
// fields. ConversationGAgent strips these before persisting NeedsLlmReplyEvent;
// ChannelLlmReplyInboxRuntime echoes them into the LlmReplyReadyEvent so the
// ConversationLlmReplyExecutor echoes them into the LlmReplyReadyEvent so the
// outbound reply does not depend on the actor's in-memory token dict surviving
// deactivation.
if (runtimeContext.NyxRelayReplyToken is { } token &&
Expand Down
Loading
Loading