Skip to content
This repository was archived by the owner on Apr 18, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/soliplex_agent/lib/soliplex_agent.dart
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ export 'package:soliplex_client/soliplex_client.dart'
convertToAgui,
defaultHttpTimeout,
fetchAuthProviders;
// Re-export logging types so consumers don't need a direct soliplex_logging
// dependency just to construct an AgentRuntime.
export 'package:soliplex_logging/soliplex_logging.dart' show LogManager, Logger;

// ── Host API ──
export 'src/host/agent_api.dart';
Expand Down
4 changes: 1 addition & 3 deletions packages/soliplex_agent/lib/src/host/agent_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ abstract interface class AgentApi {
Future<AgentResult> watchAgent(int handle, {Duration? timeout});

/// Cancels the agent identified by [handle].
///
/// Returns `true` if the agent was successfully cancelled.
Future<bool> cancelAgent(int handle);
Future<void> cancelAgent(int handle);

/// Returns the current lifecycle status of the agent as a string.
///
Expand Down
7 changes: 1 addition & 6 deletions packages/soliplex_agent/lib/src/host/fake_agent_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ class FakeAgentApi implements AgentApi {
this.spawnResult = 1,
this.waitAllResult = const [],
this.getResultResult = '',
this.cancelResult = true,
AgentResult? watchResult,
}) : watchResult = watchResult ??
const AgentSuccess(
Expand All @@ -29,9 +28,6 @@ class FakeAgentApi implements AgentApi {
/// Value returned by [getResult].
String getResultResult;

/// Value returned by [cancelAgent].
bool cancelResult;

/// Value returned by [watchAgent]. Defaults to a success result.
AgentResult watchResult;

Expand Down Expand Up @@ -74,9 +70,8 @@ class FakeAgentApi implements AgentApi {
}

@override
Future<bool> cancelAgent(int handle) async {
Future<void> cancelAgent(int handle) async {
calls['cancelAgent'] = [handle];
return cancelResult;
}

/// Value returned by [agentStatus].
Expand Down
4 changes: 1 addition & 3 deletions packages/soliplex_agent/lib/src/host/runtime_agent_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,8 @@ class RuntimeAgentApi implements AgentApi {
}

@override
Future<bool> cancelAgent(int handle) async {
Future<void> cancelAgent(int handle) async {
_lookupSession(handle).cancel();
_handles.remove(handle);
return true;
}

@override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ class ChatFnLlmProvider implements AgentLlmProvider {
),
);
case final SystemMessage m:
result.add((role: 'system', content: m.content));
result.add(
(role: 'system', content: m.content),
);
default:
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class RunOrchestrator {

final StreamController<RunState> _controller =
StreamController<RunState>.broadcast();
final StreamController<BaseEvent> _baseEventController =
StreamController<BaseEvent>.broadcast();

RunState _currentState = const IdleState();
bool _disposed = false;
Expand All @@ -104,6 +106,12 @@ class RunOrchestrator {
/// Broadcast stream of state transitions.
Stream<RunState> get stateChanges => _controller.stream;

/// Broadcast stream of raw AG-UI events received from the SSE connection.
///
/// Used by `AgentSession` to bridge server-side events into the
/// `ExecutionEvent` signal without duplicating event processing logic.
Stream<BaseEvent> get baseEvents => _baseEventController.stream;

/// The current cancellation token for the active run.
///
/// Returns a fresh (uncancelled) token if no run is active.
Expand Down Expand Up @@ -282,6 +290,9 @@ class RunOrchestrator {
if (!_controller.isClosed) {
unawaited(_controller.close());
}
if (!_baseEventController.isClosed) {
unawaited(_baseEventController.close());
}
_disposing = false;
}

Expand Down Expand Up @@ -567,6 +578,9 @@ class RunOrchestrator {
}

void _onEvent(BaseEvent event) {
if (!_baseEventController.isClosed) {
_baseEventController.add(event);
}
final running = _currentState;
if (running is! RunningState) return;
final result = processEvent(running.conversation, running.streaming, event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ typedef StreamingChatFn = Stream<LlmEvent> Function({
/// and native tool calling (via open_responses).
class StreamingLlmProvider implements AgentLlmProvider {
/// Creates a [StreamingLlmProvider].
StreamingLlmProvider({
required StreamingChatFn chatFn,
this.systemPrompt,
}) : _chatFn = chatFn;
StreamingLlmProvider({required StreamingChatFn chatFn, this.systemPrompt})
: _chatFn = chatFn;

final StreamingChatFn _chatFn;

Expand Down Expand Up @@ -88,10 +86,7 @@ class StreamingLlmProvider implements AgentLlmProvider {
currentMsgId = msgId;
yield TextMessageStartEvent(messageId: msgId);
}
yield TextMessageContentEvent(
messageId: currentMsgId,
delta: text,
);
yield TextMessageContentEvent(messageId: currentMsgId, delta: text);

case LlmTextDone():
if (currentMsgId case final msgId?) {
Expand All @@ -105,10 +100,7 @@ class StreamingLlmProvider implements AgentLlmProvider {
yield TextMessageEndEvent(messageId: msgId);
currentMsgId = null;
}
yield ToolCallStartEvent(
toolCallId: callId,
toolCallName: name,
);
yield ToolCallStartEvent(toolCallId: callId, toolCallName: name);

case LlmToolCallArgsDelta(:final callId, :final delta):
yield ToolCallArgsEvent(toolCallId: callId, delta: delta);
Expand Down
9 changes: 7 additions & 2 deletions packages/soliplex_agent/lib/src/runtime/agent_runtime.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import 'package:signals_core/signals_core.dart';
import 'package:soliplex_agent/src/host/platform_constraints.dart';
import 'package:soliplex_agent/src/models/agent_result.dart';
import 'package:soliplex_agent/src/models/thread_key.dart';
import 'package:soliplex_agent/src/orchestration/ag_ui_llm_provider.dart';
import 'package:soliplex_agent/src/orchestration/agent_llm_provider.dart';
import 'package:soliplex_agent/src/orchestration/run_orchestrator.dart';
import 'package:soliplex_agent/src/runtime/agent_session.dart';
Expand Down Expand Up @@ -46,17 +47,21 @@ class AgentRuntime {
/// all its children are cancelled.
AgentRuntime({
required ServerConnection connection,
required AgentLlmProvider llmProvider,
required ToolRegistryResolver toolRegistryResolver,
required PlatformConstraints platform,
required Logger logger,
AgentLlmProvider? llmProvider,
SessionExtensionFactory? extensionFactory,
AgentUiDelegate? uiDelegate,
this.maxSpawnDepth = 10,
this.rootTimeout,
}) : serverId = connection.serverId,
_connection = connection,
_llmProvider = llmProvider,
_llmProvider = llmProvider ??
AgUiLlmProvider(
api: connection.api,
agUiStreamClient: connection.agUiStreamClient,
),
_toolRegistryResolver = toolRegistryResolver,
_extensionFactory = extensionFactory,
_uiDelegate = uiDelegate,
Expand Down
32 changes: 32 additions & 0 deletions packages/soliplex_agent/lib/src/runtime/agent_session.dart
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class AgentSession implements ToolExecutionContext {
final List<AgentSession> _children = [];
final Completer<AgentResult> _resultCompleter = Completer<AgentResult>();
StreamSubscription<RunState>? _subscription;
StreamSubscription<BaseEvent>? _baseEventSubscription;
AgentSessionState _state = AgentSessionState.spawning;
bool _disposed = false;
final Signal<RunState> _runStateSignal = signal(const IdleState());
Expand Down Expand Up @@ -255,6 +256,7 @@ class AgentSession implements ToolExecutionContext {
}) async {
await _attachExtensions();
_subscription = _orchestrator.stateChanges.listen(_onStateChange);
_baseEventSubscription = _orchestrator.baseEvents.listen(_bridgeBaseEvent);
unawaited(
_orchestrator.runToCompletion(
key: threadKey,
Expand All @@ -280,6 +282,8 @@ class AgentSession implements ToolExecutionContext {
_disposeExtensions();
unawaited(_subscription?.cancel());
_subscription = null;
unawaited(_baseEventSubscription?.cancel());
_baseEventSubscription = null;
_orchestrator.dispose();
_completeIfPending();
_runStateSignal.dispose();
Expand Down Expand Up @@ -327,6 +331,34 @@ class AgentSession implements ToolExecutionContext {
}
}

/// Maps raw AG-UI [BaseEvent]s to [ExecutionEvent] emissions so that
/// consumers observing [lastExecutionEvent] see streaming text, thinking,
/// server tool calls, and terminal events without polling [runState].
void _bridgeBaseEvent(BaseEvent event) {
switch (event) {
case TextMessageContentEvent(:final delta):
emitEvent(TextDelta(delta: delta));
case ThinkingTextMessageStartEvent():
emitEvent(const ThinkingStarted());
case ThinkingTextMessageContentEvent(:final delta):
emitEvent(ThinkingContent(delta: delta));
case ToolCallStartEvent(:final toolCallId, :final toolCallName):
emitEvent(
ServerToolCallStarted(toolCallId: toolCallId, toolName: toolCallName),
);
case ToolCallResultEvent(:final toolCallId, :final content):
emitEvent(
ServerToolCallCompleted(toolCallId: toolCallId, result: content),
);
case RunFinishedEvent():
emitEvent(const RunCompleted());
case RunErrorEvent(:final message):
emitEvent(RunFailed(error: message));
default:
break;
}
}

// ---------------------------------------------------------------------------
// Tool execution (callback for runToCompletion)
// ---------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,7 @@ void main() {
expect(status, 'poll_timeout');

// Supervisor cancels the stuck worker.
final cancelled = await agent.cancelAgent(h);
expect(cancelled, isTrue);
await agent.cancelAgent(h);
});

test('cancels one stuck worker while collecting another', () async {
Expand Down
11 changes: 5 additions & 6 deletions packages/soliplex_agent/test/host/runtime_agent_api_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import 'package:mocktail/mocktail.dart';
import 'package:soliplex_agent/soliplex_agent.dart';
import 'package:soliplex_client/soliplex_client.dart'
show AgUiStreamClient, SoliplexApi;
import 'package:soliplex_logging/soliplex_logging.dart';
import 'package:test/test.dart';

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -149,8 +148,7 @@ void main() {
controller.add(const RunStartedEvent(threadId: _threadId, runId: _runId));
await Future<void>.delayed(Duration.zero);

final cancelled = await agentApi.cancelAgent(handle);
expect(cancelled, isTrue);
await agentApi.cancelAgent(handle);

await controller.close();
});
Expand All @@ -177,7 +175,7 @@ void main() {
expect(() => agentApi.getResult(h2), throwsA(isA<ArgumentError>()));
});

test('cancelAgent evicts handle', () async {
test('cancelAgent keeps handle for getResult', () async {
final controller = StreamController<BaseEvent>.broadcast();
when(
() => api.createThread(any()),
Expand All @@ -201,8 +199,9 @@ void main() {

await agentApi.cancelAgent(handle);

// Handle is evicted.
expect(() => agentApi.getResult(handle), throwsA(isA<ArgumentError>()));
// Handle survives cancel — getResult throws StateError for the
// cancelled session, then evicts the handle.
expect(() => agentApi.getResult(handle), throwsA(isA<StateError>()));

await controller.close();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,7 @@ void main() {

test('tokenRefresher without getToken throws assertion', () {
expect(
() => createAgentHttpClient(
tokenRefresher: _MockTokenRefresher(),
),
() => createAgentHttpClient(tokenRefresher: _MockTokenRefresher()),
throwsA(isA<AssertionError>()),
);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,6 @@ class IntegrationHarness {
);
return AgentRuntime(
connection: connection,
llmProvider: AgUiLlmProvider(
api: api,
agUiStreamClient: agUiStreamClient,
),
toolRegistryResolver:
toolRegistryResolver ?? (_) async => const ToolRegistry(),
platform: platform,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,23 @@ void main() {

test('cancels a running agent', () async {
final handle = await agentApi.spawnAgent(
'echo',
'Write a very long essay about the history of computing.',
'plain',
'List every prime number between 1 and 100000. For each prime, '
'show a proof that it is not divisible by any smaller prime. '
'Format each entry on its own line.',
);
print('Spawned handle: $handle');

final cancelled = await agentApi.cancelAgent(handle);
expect(cancelled, isTrue);
print('Cancelled: $cancelled');
// Wait until the agent is running so the SSE stream has started.
var status = agentApi.agentStatus(handle);
while (status == 'spawning') {
await Future<void>.delayed(const Duration(milliseconds: 10));
status = agentApi.agentStatus(handle);
}
print('Status before cancel: $status');

await agentApi.cancelAgent(handle);
print('Cancelled');

// getResult should throw because the agent was cancelled.
expect(
Expand Down
Loading
Loading