Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1735,6 +1735,7 @@ it.effect("restores pending turn-start metadata across projection pipeline resta
yield* Effect.gen(function* () {
const eventStore = yield* OrchestrationEventStore;
const projectionPipeline = yield* OrchestrationProjectionPipeline;
const sql = yield* SqlClient.SqlClient;

yield* eventStore.append({
type: "thread.turn-start-requested",
Expand All @@ -1755,6 +1756,26 @@ it.effect("restores pending turn-start metadata across projection pipeline resta
});

yield* projectionPipeline.bootstrap;

const sessionRows = yield* sql<{
readonly status: string;
readonly runtimeMode: string;
readonly providerName: string | null;
}>`
SELECT
status,
runtime_mode AS "runtimeMode",
provider_name AS "providerName"
FROM projection_thread_sessions
WHERE thread_id = ${threadId}
`;
assert.deepEqual(sessionRows, [
{
status: "starting",
runtimeMode: "approval-required",
providerName: null,
},
]);
}).pipe(Effect.provide(firstProjectionLayer));

const turnRows = yield* Effect.gen(function* () {
Expand Down
44 changes: 33 additions & 11 deletions apps/server/src/orchestration/Layers/ProjectionPipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -751,18 +751,40 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () {
_attachmentSideEffects,
) =>
Effect.gen(function* () {
if (event.type !== "thread.session-set") {
return;
switch (event.type) {
case "thread.turn-start-requested": {
const existingRow = yield* projectionThreadSessionRepository.getByThreadId({
threadId: event.payload.threadId,
});
yield* projectionThreadSessionRepository.upsert({
threadId: event.payload.threadId,
status: "starting",
providerName:
event.payload.provider ??
(Option.isSome(existingRow) ? existingRow.value.providerName : null),
runtimeMode: event.payload.runtimeMode,
activeTurnId: null,
lastError: null,
updatedAt: event.payload.createdAt,
});
return;
}

case "thread.session-set":
yield* projectionThreadSessionRepository.upsert({
threadId: event.payload.threadId,
status: event.payload.session.status,
providerName: event.payload.session.providerName,
runtimeMode: event.payload.session.runtimeMode,
activeTurnId: event.payload.session.activeTurnId,
lastError: event.payload.session.lastError,
updatedAt: event.payload.session.updatedAt,
});
return;

default:
return;
}
yield* projectionThreadSessionRepository.upsert({
threadId: event.payload.threadId,
status: event.payload.session.status,
providerName: event.payload.session.providerName,
runtimeMode: event.payload.session.runtimeMode,
activeTurnId: event.payload.session.activeTurnId,
lastError: event.payload.session.lastError,
updatedAt: event.payload.session.updatedAt,
});
});

const applyThreadTurnsProjection: ProjectorDefinition["apply"] = (
Expand Down
66 changes: 66 additions & 0 deletions apps/server/src/orchestration/projector.test.ts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Low

it("keeps projector forward-compatible for unhandled event types", async () => {

The test "keeps projector forward-compatible for unhandled event types" at line 125 uses thread.turn-start-requested, which now has a handler (added in this diff). The test passes only because the thread doesn't exist, causing the handler to return nextBase unchanged—not because it falls through to the default case. The test description is now misleading and no longer validates forward-compatibility behavior. Consider using an actually unhandled event type or updating the test description to reflect what it actually verifies.

🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file apps/server/src/orchestration/projector.test.ts around line 125:

The test "keeps projector forward-compatible for unhandled event types" at line 125 uses `thread.turn-start-requested`, which now has a handler (added in this diff). The test passes only because the thread doesn't exist, causing the handler to return `nextBase` unchanged—not because it falls through to the `default` case. The test description is now misleading and no longer validates forward-compatibility behavior. Consider using an actually unhandled event type or updating the test description to reflect what it actually verifies.

Evidence trail:
- Test at apps/server/src/orchestration/projector.test.ts:125-151 - uses `thread.turn-start-requested` with an empty model, expects threads to remain empty
- New handler added at apps/server/src/orchestration/projector.ts:394-424 (see git_diff MERGE_BASE..REVIEWED_COMMIT) - contains `if (!thread) { return nextBase; }` early return
- Default case at apps/server/src/orchestration/projector.ts:657 - `default: return Effect.succeed(nextBase);` is the actual forward-compatibility mechanism

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix it for me

Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,72 @@ describe("orchestration projector", () => {
expect(thread?.session?.status).toBe("running");
});

it("marks a thread session as starting when a turn is requested", async () => {
const createdAt = "2026-02-23T08:00:00.000Z";
const requestedAt = "2026-02-23T08:00:02.000Z";
const model = createEmptyReadModel(createdAt);

const afterCreate = await Effect.runPromise(
projectEvent(
model,
makeEvent({
sequence: 1,
type: "thread.created",
aggregateKind: "thread",
aggregateId: "thread-1",
occurredAt: createdAt,
commandId: "cmd-create",
payload: {
threadId: "thread-1",
projectId: "project-1",
title: "demo",
model: "gpt-5.3-codex",
runtimeMode: "full-access",
branch: null,
worktreePath: null,
createdAt,
updatedAt: createdAt,
},
}),
),
);

const afterTurnRequested = await Effect.runPromise(
projectEvent(
afterCreate,
makeEvent({
sequence: 2,
type: "thread.turn-start-requested",
aggregateKind: "thread",
aggregateId: "thread-1",
occurredAt: requestedAt,
commandId: "cmd-turn-requested",
payload: {
threadId: "thread-1",
messageId: "message-1",
provider: "codex",
runtimeMode: "approval-required",
interactionMode: "plan",
assistantDeliveryMode: "streaming",
createdAt: requestedAt,
},
}),
),
);

const thread = afterTurnRequested.threads[0];
expect(thread?.session).toEqual({
threadId: "thread-1",
status: "starting",
providerName: "codex",
runtimeMode: "approval-required",
activeTurnId: null,
lastError: null,
updatedAt: requestedAt,
});
expect(thread?.latestTurn).toBeNull();
});

it("updates canonical thread runtime mode from thread.runtime-mode-set", async () => {
const createdAt = "2026-02-23T08:00:00.000Z";
const updatedAt = "2026-02-23T08:00:05.000Z";
Expand Down
33 changes: 33 additions & 0 deletions apps/server/src/orchestration/projector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
ThreadRevertedPayload,
ThreadSessionSetPayload,
ThreadTurnDiffCompletedPayload,
ThreadTurnStartRequestedPayload,
} from "./Schemas.ts";

type ThreadPatch = Partial<Omit<OrchestrationThread, "id" | "projectId">>;
Expand Down Expand Up @@ -391,6 +392,38 @@ export function projectEvent(
};
});

case "thread.turn-start-requested":
return Effect.gen(function* () {
const payload = yield* decodeForEvent(
ThreadTurnStartRequestedPayload,
event.payload,
event.type,
"payload",
);
const thread = nextBase.threads.find((entry) => entry.id === payload.threadId);
if (!thread) {
return nextBase;
}

const session: OrchestrationSession = {
threadId: payload.threadId,
status: "starting",
providerName: payload.provider ?? thread.session?.providerName ?? null,
runtimeMode: payload.runtimeMode,
activeTurnId: null,
lastError: null,
updatedAt: payload.createdAt,
};

return {
...nextBase,
threads: updateThread(nextBase.threads, payload.threadId, {
session,
updatedAt: event.occurredAt,
}),
};
});

case "thread.session-set":
return Effect.gen(function* () {
const payload = yield* decodeForEvent(
Expand Down
77 changes: 77 additions & 0 deletions apps/web/src/components/ChatView.browser.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,20 @@ function addThreadToSnapshot(
};
}

function mapThreadInSnapshot(
snapshot: OrchestrationReadModel,
threadId: ThreadId,
mapper: (
thread: OrchestrationReadModel["threads"][number],
) => OrchestrationReadModel["threads"][number],
): OrchestrationReadModel {
return {
...snapshot,
snapshotSequence: snapshot.snapshotSequence + 1,
threads: snapshot.threads.map((thread) => (thread.id === threadId ? mapper(thread) : thread)),
};
}

function createDraftOnlySnapshot(): OrchestrationReadModel {
const snapshot = createSnapshotForTargetUser({
targetMessageId: "msg-user-draft-target" as MessageId,
Expand Down Expand Up @@ -1201,6 +1215,69 @@ describe("ChatView timeline estimator parity (full app)", () => {
}
});

it("re-enables sending after a turn completes without an observed running phase", async () => {
useComposerDraftStore.getState().setPrompt(THREAD_ID, "first prompt");

const mounted = await mountChatView({
viewport: DEFAULT_VIEWPORT,
snapshot: createSnapshotForTargetUser({
targetMessageId: "msg-user-fast-complete" as MessageId,
targetText: "fast complete target",
}),
});

try {
const sendButton = await waitForSendButton();
expect(sendButton.disabled).toBe(false);
sendButton.click();

const completedSnapshot = mapThreadInSnapshot(fixture.snapshot, THREAD_ID, (thread) => ({
...thread,
latestTurn: {
turnId: "turn-fast-complete" as never,
state: "completed",
requestedAt: isoAt(2_000),
startedAt: isoAt(2_001),
completedAt: isoAt(2_003),
assistantMessageId: null,
},
session: {
...(thread.session ?? {
threadId: THREAD_ID,
status: "ready",
providerName: "codex",
runtimeMode: "full-access",
activeTurnId: null,
lastError: null,
updatedAt: NOW_ISO,
}),
status: "ready",
activeTurnId: null,
lastError: null,
updatedAt: isoAt(2_003),
},
updatedAt: isoAt(2_003),
}));
fixture.snapshot = completedSnapshot;
useStore.getState().syncServerReadModel(completedSnapshot);

useComposerDraftStore.getState().setPrompt(THREAD_ID, "second prompt");

await vi.waitFor(
() => {
const nextSendButton = document.querySelector<HTMLButtonElement>(
'button[aria-label="Send message"]',
);
expect(nextSendButton).toBeTruthy();
expect(nextSendButton?.disabled).toBe(false);
},
{ timeout: 8_000, interval: 16 },
);
} finally {
await mounted.cleanup();
}
});

it("shows a pointer cursor for the running stop button", async () => {
const mounted = await mountChatView({
viewport: DEFAULT_VIEWPORT,
Expand Down
4 changes: 4 additions & 0 deletions apps/web/src/components/ChatView.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import {
deriveActivePlanState,
findLatestProposedPlan,
deriveWorkLogEntries,
hasLatestTurnObservationSince,
hasToolActivityForTurn,
isLatestTurnSettled,
formatElapsed,
Expand Down Expand Up @@ -2051,6 +2052,7 @@ export default function ChatView({ threadId }: ChatViewProps) {
}
if (
phase === "running" ||
hasLatestTurnObservationSince(activeLatestTurn, sendStartedAt) ||
activePendingApproval !== null ||
activePendingUserInput !== null ||
activeThread?.error
Expand All @@ -2060,9 +2062,11 @@ export default function ChatView({ threadId }: ChatViewProps) {
}, [
activePendingApproval,
activePendingUserInput,
activeLatestTurn,
activeThread?.error,
phase,
resetSendPhase,
sendStartedAt,
sendPhase,
]);

Expand Down
Loading
Loading