Skip to content
This repository was archived by the owner on Apr 18, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 29 additions & 24 deletions packages/soliplex_client/lib/src/api/soliplex_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -640,40 +640,45 @@ class SoliplexApi {
return allEvents;
}

/// Extracts user messages from run_input and creates synthetic events.
/// Extracts the initiating user message from run_input and creates
/// synthetic events.
///
/// Each run's `run_input.messages` contains the full conversation context,
/// but only the last user message initiated THIS run. Prior user messages
/// were already processed in earlier runs.
List<Map<String, dynamic>> _extractUserMessageEvents(
Map<String, dynamic> rawRun,
) {
final runInput = rawRun['run_input'] as Map<String, dynamic>?;
if (runInput == null) return [];

final messages = runInput['messages'] as List<dynamic>? ?? [];
final syntheticEvents = <Map<String, dynamic>>[];

for (var i = 0; i < messages.length; i++) {
// Find the last user message — the one that initiated this run.
Map<String, dynamic>? lastUserMessage;
for (var i = messages.length - 1; i >= 0; i--) {
final raw = messages[i];
if (raw is! Map<String, dynamic>) continue; // Skip malformed entries
final msgMap = raw;
final role = msgMap['role'] as String? ?? 'user';

// Only process user messages - assistant messages come from events
if (role != 'user') continue;

final id = msgMap['id'] as String? ?? 'user-$i';
final content = msgMap['content'] as String? ?? '';

// Create synthetic TEXT_MESSAGE events (START, CONTENT, END)
syntheticEvents
..add({'type': 'TEXT_MESSAGE_START', 'messageId': id, 'role': role})
..add({
'type': 'TEXT_MESSAGE_CONTENT',
'messageId': id,
'delta': content,
})
..add({'type': 'TEXT_MESSAGE_END', 'messageId': id});
if (raw is! Map<String, dynamic>) continue;
if ((raw['role'] as String? ?? 'user') == 'user') {
lastUserMessage = raw;
break;
}
}

return syntheticEvents;
if (lastUserMessage == null) return [];

final runId = rawRun['run_id'] as String? ?? 'unknown';
final id = lastUserMessage['id'] as String? ?? 'user-$runId';
final content = lastUserMessage['content'] as String? ?? '';

return [
{'type': 'TEXT_MESSAGE_START', 'messageId': id, 'role': 'user'},
{
'type': 'TEXT_MESSAGE_CONTENT',
'messageId': id,
'delta': content,
},
{'type': 'TEXT_MESSAGE_END', 'messageId': id},
];
}

/// Replays events to reconstruct thread history (messages + AG-UI state).
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import 'dart:developer' as developer;

import 'package:ag_ui/ag_ui.dart';
import 'package:meta/meta.dart';
import 'package:soliplex_client/src/application/json_patch.dart';
Expand Down Expand Up @@ -279,6 +281,20 @@ EventProcessingResult _processTextEnd(
String messageId,
) {
if (streaming is TextStreaming && streaming.messageId == messageId) {
// Skip if a message with this ID already exists — idempotency guard
// against duplicate events (e.g. from history replay).
if (conversation.messages.any((m) => m.id == messageId)) {
developer.log(
'Skipped duplicate message ID: $messageId',
name: 'soliplex_client.event_processor',
level: 800,
);
return EventProcessingResult(
conversation: conversation,
streaming: const AwaitingText(),
);
}

final newMessage = TextMessage.create(
id: messageId,
user: streaming.user,
Expand Down
197 changes: 185 additions & 12 deletions packages/soliplex_client/test/api/soliplex_api_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -1588,6 +1588,177 @@ void main() {
expect(assistantMessage.text, equals('Hello from assistant'));
});

// Regression: https://github.com/soliplex/frontend/issues/33
test('does not duplicate user messages across multi-run history',
() async {
// Thread with 3 runs
when(
() => mockTransport.request<Map<String, dynamic>>(
'GET',
Uri.parse(
'https://api.example.com/api/v1/rooms/room-123/agui/thread-456',
),
cancelToken: any(named: 'cancelToken'),
fromJson: any(named: 'fromJson'),
body: any(named: 'body'),
headers: any(named: 'headers'),
timeout: any(named: 'timeout'),
),
).thenAnswer(
(_) async => {
'room_id': 'room-123',
'thread_id': 'thread-456',
'runs': {
'run-1': {
'run_id': 'run-1',
'created': '2026-01-07T01:00:00.000Z',
'finished': '2026-01-07T01:01:00.000Z',
},
'run-2': {
'run_id': 'run-2',
'created': '2026-01-07T02:00:00.000Z',
'finished': '2026-01-07T02:01:00.000Z',
},
'run-3': {
'run_id': 'run-3',
'created': '2026-01-07T03:00:00.000Z',
'finished': '2026-01-07T03:01:00.000Z',
},
},
},
);

// Run 1: run_input has only user-msg-A
when(
() => mockTransport.request<Map<String, dynamic>>(
'GET',
Uri.parse(
'https://api.example.com/api/v1/rooms/room-123/agui/thread-456/run-1',
),
cancelToken: any(named: 'cancelToken'),
fromJson: any(named: 'fromJson'),
body: any(named: 'body'),
headers: any(named: 'headers'),
timeout: any(named: 'timeout'),
),
).thenAnswer(
(_) async => {
'run_id': 'run-1',
'run_input': {
'messages': [
{'id': 'user-msg-A', 'role': 'user', 'content': 'msg A'},
],
},
'events': [
{
'type': 'TEXT_MESSAGE_START',
'messageId': 'asst-1',
'role': 'assistant',
},
{
'type': 'TEXT_MESSAGE_CONTENT',
'messageId': 'asst-1',
'delta': 'response 1',
},
{'type': 'TEXT_MESSAGE_END', 'messageId': 'asst-1'},
],
},
);

// Run 2: run_input has user-msg-A (prior) AND user-msg-B (new)
when(
() => mockTransport.request<Map<String, dynamic>>(
'GET',
Uri.parse(
'https://api.example.com/api/v1/rooms/room-123/agui/thread-456/run-2',
),
cancelToken: any(named: 'cancelToken'),
fromJson: any(named: 'fromJson'),
body: any(named: 'body'),
headers: any(named: 'headers'),
timeout: any(named: 'timeout'),
),
).thenAnswer(
(_) async => {
'run_id': 'run-2',
'run_input': {
'messages': [
{'id': 'user-msg-A', 'role': 'user', 'content': 'msg A'},
{'id': 'user-msg-B', 'role': 'user', 'content': 'msg B'},
],
},
'events': [
{
'type': 'TEXT_MESSAGE_START',
'messageId': 'asst-2',
'role': 'assistant',
},
{
'type': 'TEXT_MESSAGE_CONTENT',
'messageId': 'asst-2',
'delta': 'response 2',
},
{'type': 'TEXT_MESSAGE_END', 'messageId': 'asst-2'},
],
},
);

// Run 3: run_input has all 3 prior user messages
when(
() => mockTransport.request<Map<String, dynamic>>(
'GET',
Uri.parse(
'https://api.example.com/api/v1/rooms/room-123/agui/thread-456/run-3',
),
cancelToken: any(named: 'cancelToken'),
fromJson: any(named: 'fromJson'),
body: any(named: 'body'),
headers: any(named: 'headers'),
timeout: any(named: 'timeout'),
),
).thenAnswer(
(_) async => {
'run_id': 'run-3',
'run_input': {
'messages': [
{'id': 'user-msg-A', 'role': 'user', 'content': 'msg A'},
{'id': 'user-msg-B', 'role': 'user', 'content': 'msg B'},
{'id': 'user-msg-C', 'role': 'user', 'content': 'msg C'},
],
},
'events': [
{
'type': 'TEXT_MESSAGE_START',
'messageId': 'asst-3',
'role': 'assistant',
},
{
'type': 'TEXT_MESSAGE_CONTENT',
'messageId': 'asst-3',
'delta': 'response 3',
},
{'type': 'TEXT_MESSAGE_END', 'messageId': 'asst-3'},
],
},
);

final history = await api.getThreadHistory('room-123', 'thread-456');

// Should be exactly 6 messages: A, resp1, B, resp2, C, resp3
// NOT 10 (with A appearing 3 times and B appearing 2 times)
expect(history.messages.length, equals(6));

expect((history.messages[0] as TextMessage).id, equals('user-msg-A'));
expect((history.messages[0] as TextMessage).text, equals('msg A'));
expect((history.messages[1] as TextMessage).id, equals('asst-1'));
expect((history.messages[2] as TextMessage).id, equals('user-msg-B'));
expect((history.messages[2] as TextMessage).text, equals('msg B'));
expect((history.messages[3] as TextMessage).id, equals('asst-2'));
expect((history.messages[4] as TextMessage).id, equals('user-msg-C'));
expect((history.messages[4] as TextMessage).text, equals('msg C'));
expect((history.messages[5] as TextMessage).id, equals('asst-3'));
});

test('skips non-user messages from run_input.messages', () async {
// Thread endpoint
when(
Expand Down Expand Up @@ -1711,14 +1882,17 @@ void main() {
(_) async => {
'run_id': 'run-1',
'run_input': {
// Message without id or role - should use fallbacks
// Only the last user message is extracted.
// The first is a prior user message (would be from an
// earlier run), the assistant is skipped, and the last
// user message has no id — should use run-based fallback.
'messages': [
{'content': 'Message without id or role'},
{'id': 'has-id', 'content': 'Message with id, no role'},
{'id': 'has-id', 'content': 'Prior user message'},
{
'role': 'assistant',
'content': 'Assistant message (should be skipped)',
},
{'content': 'Message without id or role'},
],
},
'events': [
Expand All @@ -1739,16 +1913,15 @@ void main() {

final messages = await api.getThreadHistory('room-123', 'thread-456');

// Two user messages (with fallback ids) + one assistant from events
expect(messages.messages.length, equals(3));
// First message uses index-based fallback id
expect(messages.messages[0].id, equals('user-0'));
// Only the last user message is extracted (the one that initiated
// this run). The assistant message is skipped, so the last user
// message is the one without id/role. Plus one assistant from events.
expect(messages.messages.length, equals(2));
// Last user message uses run-based fallback id, fallback role
expect(messages.messages[0].id, equals('user-run-1'));
expect(messages.messages[0].user, equals(ChatUser.user));
// Second message uses provided id, fallback role
expect(messages.messages[1].id, equals('has-id'));
expect(messages.messages[1].user, equals(ChatUser.user));
// Third is from events
expect(messages.messages[2].id, equals('m1'));
// Assistant from events
expect(messages.messages[1].id, equals('m1'));
},
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,40 @@ void main() {
},
);

// Regression: https://github.com/soliplex/frontend/issues/33
test('TextMessageEndEvent skips duplicate message ID', () {
// Simulate a conversation that already contains msg-1
final existing = TextMessage.create(
id: 'msg-1',
user: ChatUser.user,
text: 'original',
);
final conversationWithMsg = conversation.withAppendedMessage(existing);

// Stream a duplicate msg-1
const streamingState = app_streaming.TextStreaming(
messageId: 'msg-1',
user: ChatUser.user,
text: 'duplicate',
);
const event = TextMessageEndEvent(messageId: 'msg-1');

final result = processEvent(
conversationWithMsg,
streamingState,
event,
);

// Should skip — conversation still has exactly 1 message
expect(result.conversation.messages, hasLength(1));
expect(
(result.conversation.messages.first as TextMessage).text,
equals('original'),
);
// Streaming should still reset to AwaitingText
expect(result.streaming, isA<app_streaming.AwaitingText>());
});

test('TextMessageStartEvent maps user role to ChatUser.user', () {
const event = TextMessageStartEvent(
messageId: 'msg-1',
Expand Down
Loading