Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 276 additions & 0 deletions apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2165,6 +2165,282 @@ it.effect("restores pending turn-start metadata across projection pipeline resta
),
);

it.layer(Layer.fresh(makeProjectionPipelinePrefixedTestLayer("t3-active-turn-segment-")))(
"OrchestrationProjectionPipeline",
(it) => {
it.effect("keeps the active running turn open across non-streaming assistant segments", () =>
Effect.gen(function* () {
const projectionPipeline = yield* OrchestrationProjectionPipeline;
const eventStore = yield* OrchestrationEventStore;
const sql = yield* SqlClient.SqlClient;
const appendAndProject = (event: Parameters<typeof eventStore.append>[0]) =>
eventStore
.append(event)
.pipe(Effect.flatMap((savedEvent) => projectionPipeline.projectEvent(savedEvent)));

const projectId = ProjectId.make("project-active-turn-segment");
const threadId = ThreadId.make("thread-active-turn-segment");
const turnId = TurnId.make("turn-active-segment");
const userMessageId = MessageId.make("user-active-segment");
const interimMessageId = MessageId.make("assistant-active-segment-interim");
const finalMessageId = MessageId.make("assistant-active-segment-final");

yield* appendAndProject({
type: "project.created",
eventId: EventId.make("evt-active-segment-1"),
aggregateKind: "project",
aggregateId: projectId,
occurredAt: "2026-05-05T03:20:00.000Z",
commandId: CommandId.make("cmd-active-segment-1"),
causationEventId: null,
correlationId: CorrelationId.make("cmd-active-segment-1"),
metadata: {},
payload: {
projectId,
title: "Active Turn Segment Project",
workspaceRoot: "/tmp/active-turn-segment",
defaultModelSelection: null,
scripts: [],
createdAt: "2026-05-05T03:20:00.000Z",
updatedAt: "2026-05-05T03:20:00.000Z",
},
});

yield* appendAndProject({
type: "thread.created",
eventId: EventId.make("evt-active-segment-2"),
aggregateKind: "thread",
aggregateId: threadId,
occurredAt: "2026-05-05T03:20:01.000Z",
commandId: CommandId.make("cmd-active-segment-2"),
causationEventId: null,
correlationId: CorrelationId.make("cmd-active-segment-2"),
metadata: {},
payload: {
threadId,
projectId,
title: "Active Turn Segment Thread",
modelSelection: {
instanceId: ProviderInstanceId.make("codex"),
model: "gpt-5-codex",
},
runtimeMode: "full-access",
branch: null,
worktreePath: null,
createdAt: "2026-05-05T03:20:01.000Z",
updatedAt: "2026-05-05T03:20:01.000Z",
},
});

yield* appendAndProject({
type: "thread.turn-start-requested",
eventId: EventId.make("evt-active-segment-3"),
aggregateKind: "thread",
aggregateId: threadId,
occurredAt: "2026-05-05T03:20:04.714Z",
commandId: CommandId.make("cmd-active-segment-3"),
causationEventId: null,
correlationId: CorrelationId.make("cmd-active-segment-3"),
metadata: {},
payload: {
threadId,
messageId: userMessageId,
runtimeMode: "full-access",
createdAt: "2026-05-05T03:20:04.714Z",
},
});

yield* appendAndProject({
type: "thread.session-set",
eventId: EventId.make("evt-active-segment-4"),
aggregateKind: "thread",
aggregateId: threadId,
occurredAt: "2026-05-05T03:20:20.549Z",
commandId: CommandId.make("cmd-active-segment-4"),
causationEventId: null,
correlationId: CorrelationId.make("cmd-active-segment-4"),
metadata: {},
payload: {
threadId,
session: {
threadId,
status: "running",
providerName: "codex",
runtimeMode: "full-access",
activeTurnId: turnId,
lastError: null,
updatedAt: "2026-05-05T03:20:20.549Z",
},
},
});

yield* appendAndProject({
type: "thread.message-sent",
eventId: EventId.make("evt-active-segment-5"),
aggregateKind: "thread",
aggregateId: threadId,
occurredAt: "2026-05-05T03:20:31.834Z",
commandId: CommandId.make("cmd-active-segment-5"),
causationEventId: null,
correlationId: CorrelationId.make("cmd-active-segment-5"),
metadata: {},
payload: {
threadId,
messageId: interimMessageId,
role: "assistant",
text: "",
turnId,
streaming: false,
createdAt: "2026-05-05T03:20:31.834Z",
updatedAt: "2026-05-05T03:20:31.834Z",
},
});

const runningRows = yield* sql<{
readonly state: string;
readonly completedAt: string | null;
readonly assistantMessageId: string | null;
readonly sessionStatus: string;
readonly activeTurnId: string | null;
}>`
SELECT
turns.state,
turns.completed_at AS "completedAt",
turns.assistant_message_id AS "assistantMessageId",
sessions.status AS "sessionStatus",
sessions.active_turn_id AS "activeTurnId"
FROM projection_turns AS turns
JOIN projection_thread_sessions AS sessions
ON sessions.thread_id = turns.thread_id
WHERE turns.thread_id = ${threadId}
AND turns.turn_id = ${turnId}
`;

assert.deepEqual(runningRows, [
{
state: "running",
completedAt: null,
assistantMessageId: "assistant-active-segment-interim",
sessionStatus: "running",
activeTurnId: "turn-active-segment",
},
]);

yield* appendAndProject({
type: "thread.session-set",
eventId: EventId.make("evt-active-segment-6"),
aggregateKind: "thread",
aggregateId: threadId,
occurredAt: "2026-05-05T03:20:45.000Z",
commandId: CommandId.make("cmd-active-segment-6"),
causationEventId: null,
correlationId: CorrelationId.make("cmd-active-segment-6"),
metadata: {},
payload: {
threadId,
session: {
threadId,
status: "ready",
providerName: "codex",
runtimeMode: "full-access",
activeTurnId: null,
lastError: null,
updatedAt: "2026-05-05T03:20:45.000Z",
},
},
});

yield* appendAndProject({
type: "thread.message-sent",
eventId: EventId.make("evt-active-segment-7"),
aggregateKind: "thread",
aggregateId: threadId,
occurredAt: "2026-05-05T03:20:46.000Z",
commandId: CommandId.make("cmd-active-segment-7"),
causationEventId: null,
correlationId: CorrelationId.make("cmd-active-segment-7"),
metadata: {},
payload: {
threadId,
messageId: finalMessageId,
role: "assistant",
text: "",
turnId,
streaming: false,
createdAt: "2026-05-05T03:20:46.000Z",
updatedAt: "2026-05-05T03:20:46.000Z",
},
});

const completedRows = yield* sql<{
readonly state: string;
readonly completedAt: string | null;
readonly assistantMessageId: string | null;
readonly sessionStatus: string;
readonly activeTurnId: string | null;
}>`
SELECT
turns.state,
turns.completed_at AS "completedAt",
turns.assistant_message_id AS "assistantMessageId",
sessions.status AS "sessionStatus",
sessions.active_turn_id AS "activeTurnId"
FROM projection_turns AS turns
JOIN projection_thread_sessions AS sessions
ON sessions.thread_id = turns.thread_id
WHERE turns.thread_id = ${threadId}
AND turns.turn_id = ${turnId}
`;

assert.deepEqual(completedRows, [
{
state: "completed",
completedAt: "2026-05-05T03:20:45.000Z",
assistantMessageId: "assistant-active-segment-final",
sessionStatus: "ready",
activeTurnId: null,
},
]);

yield* sql`DELETE FROM projection_turns`;
yield* sql`DELETE FROM projection_thread_sessions`;
yield* sql`DELETE FROM projection_state`;
yield* projectionPipeline.bootstrap;

const rebuiltRows = yield* sql<{
readonly state: string;
readonly completedAt: string | null;
readonly assistantMessageId: string | null;
readonly sessionStatus: string;
readonly activeTurnId: string | null;
}>`
SELECT
turns.state,
turns.completed_at AS "completedAt",
turns.assistant_message_id AS "assistantMessageId",
sessions.status AS "sessionStatus",
sessions.active_turn_id AS "activeTurnId"
FROM projection_turns AS turns
JOIN projection_thread_sessions AS sessions
ON sessions.thread_id = turns.thread_id
WHERE turns.thread_id = ${threadId}
AND turns.turn_id = ${turnId}
`;

assert.deepEqual(rebuiltRows, [
{
state: "completed",
completedAt: "2026-05-05T03:20:45.000Z",
assistantMessageId: "assistant-active-segment-final",
sessionStatus: "ready",
activeTurnId: null,
},
]);
}),
);
},
);

const engineLayer = it.layer(
OrchestrationEngineLive.pipe(
Layer.provide(OrchestrationProjectionSnapshotQueryLive),
Expand Down
52 changes: 38 additions & 14 deletions apps/server/src/orchestration/Layers/ProjectionPipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,25 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti
case "thread.session-set": {
const turnId = event.payload.session.activeTurnId;
if (turnId === null || event.payload.session.status !== "running") {
if (event.payload.session.status === "running") {
return;
}
const existingTurns = yield* projectionTurnRepository.listByThreadId({
threadId: event.payload.threadId,
});
const runningTurn = existingTurns
.toReversed()
.find((entry) => entry.turnId !== null && entry.state === "running");
if (!runningTurn || runningTurn.turnId === null) {
return;
}

yield* projectionTurnRepository.upsertByTurnId({
...runningTurn,
turnId: runningTurn.turnId,
state: event.payload.session.status === "error" ? "error" : "completed",
completedAt: runningTurn.completedAt ?? event.payload.session.updatedAt,
});
return;
}

Expand All @@ -1004,13 +1023,11 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti
threadId: event.payload.threadId,
});
if (Option.isSome(existingTurn)) {
const nextState =
existingTurn.value.state === "completed" || existingTurn.value.state === "error"
? existingTurn.value.state
: "running";
const nextState = existingTurn.value.state === "error" ? "error" : "running";
yield* projectionTurnRepository.upsertByTurnId({
...existingTurn.value,
state: nextState,
completedAt: nextState === "running" ? null : existingTurn.value.completedAt,
pendingMessageId:
existingTurn.value.pendingMessageId ??
(Option.isSome(pendingTurnStart) ? pendingTurnStart.value.messageId : null),
Expand Down Expand Up @@ -1079,19 +1096,26 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti
turnId: event.payload.turnId,
});
if (Option.isSome(existingTurn)) {
const preserveSettledFailure =
existingTurn.value.state === "interrupted" || existingTurn.value.state === "error";
const nextState = event.payload.streaming
? existingTurn.value.state
: existingTurn.value.state === "running"
? "running"
: preserveSettledFailure
? existingTurn.value.state
: "completed";
const completedAt =
event.payload.streaming || existingTurn.value.state === "running"
? existingTurn.value.completedAt
: preserveSettledFailure
? existingTurn.value.completedAt
: (existingTurn.value.completedAt ?? event.payload.updatedAt);
yield* projectionTurnRepository.upsertByTurnId({
...existingTurn.value,
assistantMessageId: event.payload.messageId,
state: event.payload.streaming
? existingTurn.value.state
: existingTurn.value.state === "interrupted"
? "interrupted"
: existingTurn.value.state === "error"
? "error"
: "completed",
completedAt: event.payload.streaming
? existingTurn.value.completedAt
: (existingTurn.value.completedAt ?? event.payload.updatedAt),
state: nextState,
completedAt,
startedAt: existingTurn.value.startedAt ?? event.payload.createdAt,
requestedAt: existingTurn.value.requestedAt ?? event.payload.createdAt,
});
Expand Down
Loading