Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dotnet/src/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ public async Task<CopilotSession> CreateSessionAsync(SessionConfig config, Cance

// Create and register the session before issuing the RPC so that
// events emitted by the CLI (e.g. session.start) are not dropped.
var session = new CopilotSession(sessionId, connection.Rpc);
var session = new CopilotSession(sessionId, connection.Rpc, _logger);
session.RegisterTools(config.Tools ?? []);
session.RegisterPermissionHandler(config.OnPermissionRequest);
if (config.OnUserInputRequest != null)
Expand Down Expand Up @@ -511,7 +511,7 @@ public async Task<CopilotSession> ResumeSessionAsync(string sessionId, ResumeSes

// Create and register the session before issuing the RPC so that
// events emitted by the CLI (e.g. session.start) are not dropped.
var session = new CopilotSession(sessionId, connection.Rpc);
var session = new CopilotSession(sessionId, connection.Rpc, _logger);
session.RegisterTools(config.Tools ?? []);
session.RegisterPermissionHandler(config.OnPermissionRequest);
if (config.OnUserInputRequest != null)
Expand Down
165 changes: 113 additions & 52 deletions dotnet/src/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
* Copyright (c) Microsoft Corporation. All rights reserved.
*--------------------------------------------------------------------------------------------*/

using GitHub.Copilot.SDK.Rpc;
using Microsoft.Extensions.AI;
using Microsoft.Extensions.Logging;
using StreamJsonRpc;
using System.Collections.Immutable;
using System.Text.Json;
using System.Text.Json.Nodes;
using System.Text.Json.Serialization;
using GitHub.Copilot.SDK.Rpc;
using System.Threading.Channels;

namespace GitHub.Copilot.SDK;

Expand Down Expand Up @@ -52,22 +55,27 @@ namespace GitHub.Copilot.SDK;
/// </example>
public sealed partial class CopilotSession : IAsyncDisposable
{
/// <summary>
/// Multicast delegate used as a thread-safe, insertion-ordered handler list.
/// The compiler-generated add/remove accessors use a lock-free CAS loop over the backing field.
/// Dispatch reads the field once (inherent snapshot, no allocation).
/// Expected handler count is small (typically 1–3), so Delegate.Combine/Remove cost is negligible.
/// </summary>
private event SessionEventHandler? EventHandlers;
private readonly Dictionary<string, AIFunction> _toolHandlers = [];
private readonly JsonRpc _rpc;
private readonly ILogger _logger;

private volatile PermissionRequestHandler? _permissionHandler;
private volatile UserInputHandler? _userInputHandler;
private ImmutableArray<SessionEventHandler> _eventHandlers = ImmutableArray<SessionEventHandler>.Empty;

private SessionHooks? _hooks;
private readonly SemaphoreSlim _hooksLock = new(1, 1);
private SessionRpc? _sessionRpc;
private int _isDisposed;

/// <summary>
/// Channel that serializes event dispatch. <see cref="DispatchEvent"/> enqueues;
/// a single background consumer (<see cref="ProcessEventsAsync"/>) dequeues and
/// invokes handlers one at a time, preserving arrival order.
/// </summary>
private readonly Channel<SessionEvent> _eventChannel = Channel.CreateUnbounded<SessionEvent>(
new() { SingleReader = true });

/// <summary>
/// Gets the unique identifier for this session.
/// </summary>
Expand All @@ -93,15 +101,20 @@ public sealed partial class CopilotSession : IAsyncDisposable
/// </summary>
/// <param name="sessionId">The unique identifier for this session.</param>
/// <param name="rpc">The JSON-RPC connection to the Copilot CLI.</param>
/// <param name="logger">Logger for diagnostics.</param>
/// <param name="workspacePath">The workspace path if infinite sessions are enabled.</param>
/// <remarks>
/// This constructor is internal. Use <see cref="CopilotClient.CreateSessionAsync"/> to create sessions.
/// </remarks>
internal CopilotSession(string sessionId, JsonRpc rpc, string? workspacePath = null)
internal CopilotSession(string sessionId, JsonRpc rpc, ILogger logger, string? workspacePath = null)
{
SessionId = sessionId;
_rpc = rpc;
_logger = logger;
WorkspacePath = workspacePath;

// Start the asynchronous processing loop.
_ = ProcessEventsAsync();
}

private Task<T> InvokeRpcAsync<T>(string method, object?[]? args, CancellationToken cancellationToken)
Expand Down Expand Up @@ -186,7 +199,7 @@ public async Task<string> SendAsync(MessageOptions options, CancellationToken ca
CancellationToken cancellationToken = default)
{
var effectiveTimeout = timeout ?? TimeSpan.FromSeconds(60);
var tcs = new TaskCompletionSource<AssistantMessageEvent?>();
var tcs = new TaskCompletionSource<AssistantMessageEvent?>(TaskCreationOptions.RunContinuationsAsynchronously);
AssistantMessageEvent? lastAssistantMessage = null;

void Handler(SessionEvent evt)
Expand Down Expand Up @@ -236,7 +249,9 @@ void Handler(SessionEvent evt)
/// Multiple handlers can be registered and will all receive events.
/// </para>
/// <para>
/// Handler exceptions are allowed to propagate so they are not lost.
/// Handlers are invoked serially in event-arrival order on a background thread.
/// A handler will never be called concurrently with itself or with other handlers
/// on the same session.
/// </para>
/// </remarks>
/// <example>
Expand All @@ -259,27 +274,53 @@ void Handler(SessionEvent evt)
/// </example>
public IDisposable On(SessionEventHandler handler)
{
EventHandlers += handler;
return new ActionDisposable(() => EventHandlers -= handler);
ImmutableInterlocked.Update(ref _eventHandlers, array => array.Add(handler));
return new ActionDisposable(() => ImmutableInterlocked.Update(ref _eventHandlers, array => array.Remove(handler)));
}

/// <summary>
/// Dispatches an event to all registered handlers.
/// Enqueues an event for serial dispatch to all registered handlers.
/// </summary>
/// <param name="sessionEvent">The session event to dispatch.</param>
/// <remarks>
/// This method is internal. Handler exceptions are allowed to propagate so they are not lost.
/// Broadcast request events (external_tool.requested, permission.requested) are handled
/// internally before being forwarded to user handlers.
/// This method is non-blocking. Broadcast request events (external_tool.requested,
/// permission.requested) are fired concurrently so that a stalled handler does not
/// block event delivery. The event is then placed into an in-memory channel and
/// processed by a single background consumer (<see cref="ProcessEventsAsync"/>),
/// which guarantees user handlers see events one at a time, in order.
/// </remarks>
internal void DispatchEvent(SessionEvent sessionEvent)
{
// Handle broadcast request events (protocol v3) before dispatching to user handlers.
// Fire-and-forget: the response is sent asynchronously via RPC.
HandleBroadcastEventAsync(sessionEvent);
// Fire broadcast work concurrently (fire-and-forget with error logging).
// This is done outside the channel so broadcast handlers don't block the
// consumer loop — important when a secondary client's handler intentionally
// never completes (multi-client permission scenario).
_ = HandleBroadcastEventAsync(sessionEvent);

// Queue the event for serial processing by user handlers.
_eventChannel.Writer.TryWrite(sessionEvent);
}

// Reading the field once gives us a snapshot; delegates are immutable.
EventHandlers?.Invoke(sessionEvent);
/// <summary>
/// Single-reader consumer loop that processes events from the channel.
/// Ensures user event handlers are invoked serially and in FIFO order.
/// </summary>
private async Task ProcessEventsAsync()
{
await foreach (var sessionEvent in _eventChannel.Reader.ReadAllAsync())
{
foreach (var handler in _eventHandlers)
{
try
{
handler(sessionEvent);
}
catch (Exception ex)
{
LogEventHandlerError(ex);
}
}
}
}

/// <summary>
Expand Down Expand Up @@ -355,37 +396,44 @@ internal async Task<PermissionRequestResult> HandlePermissionRequestAsync(JsonEl
/// Implements the protocol v3 broadcast model where tool calls and permission requests
/// are broadcast as session events to all clients.
/// </summary>
private async void HandleBroadcastEventAsync(SessionEvent sessionEvent)
private async Task HandleBroadcastEventAsync(SessionEvent sessionEvent)
{
switch (sessionEvent)
try
{
case ExternalToolRequestedEvent toolEvent:
{
var data = toolEvent.Data;
if (string.IsNullOrEmpty(data.RequestId) || string.IsNullOrEmpty(data.ToolName))
return;

var tool = GetTool(data.ToolName);
if (tool is null)
return; // This client doesn't handle this tool; another client will.

await ExecuteToolAndRespondAsync(data.RequestId, data.ToolName, data.ToolCallId, data.Arguments, tool);
break;
}

case PermissionRequestedEvent permEvent:
{
var data = permEvent.Data;
if (string.IsNullOrEmpty(data.RequestId) || data.PermissionRequest is null)
return;

var handler = _permissionHandler;
if (handler is null)
return; // This client doesn't handle permissions; another client will.

await ExecutePermissionAndRespondAsync(data.RequestId, data.PermissionRequest, handler);
break;
}
switch (sessionEvent)
{
case ExternalToolRequestedEvent toolEvent:
{
var data = toolEvent.Data;
if (string.IsNullOrEmpty(data.RequestId) || string.IsNullOrEmpty(data.ToolName))
return;

var tool = GetTool(data.ToolName);
if (tool is null)
return; // This client doesn't handle this tool; another client will.

await ExecuteToolAndRespondAsync(data.RequestId, data.ToolName, data.ToolCallId, data.Arguments, tool);
break;
}

case PermissionRequestedEvent permEvent:
{
var data = permEvent.Data;
if (string.IsNullOrEmpty(data.RequestId) || data.PermissionRequest is null)
return;

var handler = _permissionHandler;
if (handler is null)
return; // This client doesn't handle permissions; another client will.

await ExecutePermissionAndRespondAsync(data.RequestId, data.PermissionRequest, handler);
break;
}
}
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
LogBroadcastHandlerError(ex);
}
}

Expand Down Expand Up @@ -703,6 +751,11 @@ public async Task LogAsync(string message, SessionLogRequestLevel? level = null,
/// <returns>A task representing the dispose operation.</returns>
/// <remarks>
/// <para>
/// The caller should ensure the session is idle (e.g., <see cref="SendAndWaitAsync"/>
/// has returned) before disposing. If the session is not idle, in-flight event handlers
/// or tool handlers may observe failures.
/// </para>
/// <para>
/// Session state on disk (conversation history, planning state, artifacts) is
/// preserved, so the conversation can be resumed later by calling
/// <see cref="CopilotClient.ResumeSessionAsync"/> with the session ID. To
Expand Down Expand Up @@ -731,6 +784,8 @@ public async ValueTask DisposeAsync()
return;
}

_eventChannel.Writer.TryComplete();

try
{
await InvokeRpcAsync<object>(
Expand All @@ -745,12 +800,18 @@ await InvokeRpcAsync<object>(
// Connection is broken or closed
}

EventHandlers = null;
_eventHandlers = ImmutableInterlocked.InterlockedExchange(ref _eventHandlers, ImmutableArray<SessionEventHandler>.Empty);
_toolHandlers.Clear();

_permissionHandler = null;
}

[LoggerMessage(Level = LogLevel.Error, Message = "Unhandled exception in broadcast event handler")]
private partial void LogBroadcastHandlerError(Exception exception);

[LoggerMessage(Level = LogLevel.Error, Message = "Unhandled exception in session event handler")]
private partial void LogEventHandlerError(Exception exception);

internal record SendMessageRequest
{
public string SessionId { get; init; } = string.Empty;
Expand Down
4 changes: 2 additions & 2 deletions dotnet/test/Harness/TestHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public static class TestHelper
CopilotSession session,
TimeSpan? timeout = null)
{
var tcs = new TaskCompletionSource<AssistantMessageEvent>();
var tcs = new TaskCompletionSource<AssistantMessageEvent>(TaskCreationOptions.RunContinuationsAsynchronously);
using var cts = new CancellationTokenSource(timeout ?? TimeSpan.FromSeconds(60));

AssistantMessageEvent? finalAssistantMessage = null;
Expand Down Expand Up @@ -78,7 +78,7 @@ public static async Task<T> GetNextEventOfTypeAsync<T>(
CopilotSession session,
TimeSpan? timeout = null) where T : SessionEvent
{
var tcs = new TaskCompletionSource<T>();
var tcs = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
using var cts = new CancellationTokenSource(timeout ?? TimeSpan.FromSeconds(60));

using var subscription = session.On(evt =>
Expand Down
8 changes: 4 additions & 4 deletions dotnet/test/MultiClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ public async Task Both_Clients_See_Tool_Request_And_Completion_Events()
});

// Set up event waiters BEFORE sending the prompt to avoid race conditions
var client1Requested = new TaskCompletionSource<bool>();
var client2Requested = new TaskCompletionSource<bool>();
var client1Completed = new TaskCompletionSource<bool>();
var client2Completed = new TaskCompletionSource<bool>();
var client1Requested = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var client2Requested = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var client1Completed = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var client2Completed = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

using var sub1 = session1.On(evt =>
{
Expand Down
Loading
Loading