Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions apps/server/src/orchestration/Layers/ProjectionPipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,7 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti

case "thread.message-sent":
case "thread.proposed-plan-upserted":
case "thread.proposed-plan-removed":
case "thread.activity-appended":
case "thread.approval-response-requested":
case "thread.user-input-response-requested": {
Expand Down Expand Up @@ -871,6 +872,12 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti
});
return;

case "thread.proposed-plan-removed":
yield* projectionThreadProposedPlanRepository.deleteByPlanId({
planId: event.payload.planId,
});
return;

case "thread.reverted": {
const existingRows = yield* projectionThreadProposedPlanRepository.listByThreadId({
threadId: event.payload.threadId,
Expand Down
2 changes: 2 additions & 0 deletions apps/server/src/orchestration/Schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
ThreadUnarchivedPayload as ContractsThreadUnarchivedPayloadSchema,
ThreadMessageSentPayload as ContractsThreadMessageSentPayloadSchema,
ThreadProposedPlanUpsertedPayload as ContractsThreadProposedPlanUpsertedPayloadSchema,
ThreadProposedPlanRemovedPayload as ContractsThreadProposedPlanRemovedPayloadSchema,
ThreadSessionSetPayload as ContractsThreadSessionSetPayloadSchema,
ThreadTurnDiffCompletedPayload as ContractsThreadTurnDiffCompletedPayloadSchema,
ThreadRevertedPayload as ContractsThreadRevertedPayloadSchema,
Expand All @@ -37,6 +38,7 @@ export const ThreadUnarchivedPayload = ContractsThreadUnarchivedPayloadSchema;

export const MessageSentPayloadSchema = ContractsThreadMessageSentPayloadSchema;
export const ThreadProposedPlanUpsertedPayload = ContractsThreadProposedPlanUpsertedPayloadSchema;
export const ThreadProposedPlanRemovedPayload = ContractsThreadProposedPlanRemovedPayloadSchema;
export const ThreadSessionSetPayload = ContractsThreadSessionSetPayloadSchema;
export const ThreadTurnDiffCompletedPayload = ContractsThreadTurnDiffCompletedPayloadSchema;
export const ThreadRevertedPayload = ContractsThreadRevertedPayloadSchema;
Expand Down
79 changes: 79 additions & 0 deletions apps/server/src/orchestration/decider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,85 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand"
};
}

case "thread.proposed-plan.promote": {
const thread = yield* requireThread({
readModel,
command,
threadId: command.threadId,
});
const message = thread.messages.find((entry) => entry.id === command.messageId);
if (!message) {
return yield* new OrchestrationCommandInvariantError({
commandType: command.type,
detail: `Message '${command.messageId}' not found in thread '${command.threadId}'.`,
});
}
if (message.role !== "assistant") {
return yield* new OrchestrationCommandInvariantError({
commandType: command.type,
detail: `Message '${command.messageId}' is not an assistant message.`,
});
}
const planMarkdown = message.text.trim();
if (planMarkdown.length === 0) {
return yield* new OrchestrationCommandInvariantError({
commandType: command.type,
detail: `Message '${command.messageId}' has no text to promote.`,
});
}
const planId = `plan:${command.threadId}:promoted:${command.messageId}`;
const existingPlan = thread.proposedPlans.find((entry) => entry.id === planId);
return {
...withEventBase({
aggregateKind: "thread",
aggregateId: command.threadId,
occurredAt: command.createdAt,
commandId: command.commandId,
}),
type: "thread.proposed-plan-upserted",
payload: {
threadId: command.threadId,
proposedPlan: {
id: planId,
turnId: message.turnId ?? null,
planMarkdown,
implementedAt: existingPlan?.implementedAt ?? null,
implementationThreadId: existingPlan?.implementationThreadId ?? null,
createdAt: existingPlan?.createdAt ?? command.createdAt,
updatedAt: command.createdAt,
},
},
};
}

case "thread.proposed-plan.revert": {
const thread = yield* requireThread({
readModel,
command,
threadId: command.threadId,
});
const existingPlan = thread.proposedPlans.find((entry) => entry.id === command.planId);
if (!existingPlan) {
return yield* new OrchestrationCommandInvariantError({
commandType: command.type,
detail: `Proposed plan '${command.planId}' not found in thread '${command.threadId}'.`,
});
}
return {
...withEventBase({
aggregateKind: "thread",
aggregateId: command.threadId,
occurredAt: command.createdAt,
commandId: command.commandId,
}),
type: "thread.proposed-plan-removed",
payload: {
threadId: command.threadId,
planId: command.planId,
},
};
}

case "thread.turn.diff.complete": {
yield* requireThread({
readModel,
Expand Down
26 changes: 26 additions & 0 deletions apps/server/src/orchestration/projector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
ThreadInteractionModeSetPayload,
ThreadMetaUpdatedPayload,
ThreadProposedPlanUpsertedPayload,
ThreadProposedPlanRemovedPayload,
ThreadRuntimeModeSetPayload,
ThreadUnarchivedPayload,
ThreadRevertedPayload,
Expand Down Expand Up @@ -499,6 +500,31 @@ export function projectEvent(
};
});

case "thread.proposed-plan-removed":
return Effect.gen(function* () {
const payload = yield* decodeForEvent(
ThreadProposedPlanRemovedPayload,
event.payload,
event.type,
"payload",
);
const thread = nextBase.threads.find((entry) => entry.id === payload.threadId);
if (!thread) {
return nextBase;
}
const proposedPlans = thread.proposedPlans.filter((entry) => entry.id !== payload.planId);
if (proposedPlans.length === thread.proposedPlans.length) {
return nextBase;
}
return {
...nextBase,
threads: updateThread(nextBase.threads, payload.threadId, {
proposedPlans,
updatedAt: event.occurredAt,
}),
};
});

case "thread.turn-diff-completed":
return Effect.gen(function* () {
const payload = yield* decodeForEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as SqlSchema from "effect/unstable/sql/SqlSchema";

import { toPersistenceSqlError } from "../Errors.ts";
import {
DeleteProjectionThreadProposedPlanByIdInput,
DeleteProjectionThreadProposedPlansInput,
ListProjectionThreadProposedPlansInput,
ProjectionThreadProposedPlan,
Expand Down Expand Up @@ -76,6 +77,14 @@ const makeProjectionThreadProposedPlanRepository = Effect.gen(function* () {
`,
});

const deleteProjectionThreadProposedPlanRowById = SqlSchema.void({
Request: DeleteProjectionThreadProposedPlanByIdInput,
execute: ({ planId }) => sql`
DELETE FROM projection_thread_proposed_plans
WHERE plan_id = ${planId}
`,
});

const upsert: ProjectionThreadProposedPlanRepositoryShape["upsert"] = (row) =>
upsertProjectionThreadProposedPlanRow(row).pipe(
Effect.mapError(toPersistenceSqlError("ProjectionThreadProposedPlanRepository.upsert:query")),
Expand All @@ -97,10 +106,18 @@ const makeProjectionThreadProposedPlanRepository = Effect.gen(function* () {
),
);

const deleteByPlanId: ProjectionThreadProposedPlanRepositoryShape["deleteByPlanId"] = (input) =>
deleteProjectionThreadProposedPlanRowById(input).pipe(
Effect.mapError(
toPersistenceSqlError("ProjectionThreadProposedPlanRepository.deleteByPlanId:query"),
),
);

return {
upsert,
listByThreadId,
deleteByThreadId,
deleteByPlanId,
} satisfies ProjectionThreadProposedPlanRepositoryShape;
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ export const DeleteProjectionThreadProposedPlansInput = Schema.Struct({
export type DeleteProjectionThreadProposedPlansInput =
typeof DeleteProjectionThreadProposedPlansInput.Type;

export const DeleteProjectionThreadProposedPlanByIdInput = Schema.Struct({
planId: OrchestrationProposedPlanId,
});
export type DeleteProjectionThreadProposedPlanByIdInput =
typeof DeleteProjectionThreadProposedPlanByIdInput.Type;

export interface ProjectionThreadProposedPlanRepositoryShape {
readonly upsert: (
proposedPlan: ProjectionThreadProposedPlan,
Expand All @@ -44,6 +50,9 @@ export interface ProjectionThreadProposedPlanRepositoryShape {
readonly deleteByThreadId: (
input: DeleteProjectionThreadProposedPlansInput,
) => Effect.Effect<void, ProjectionRepositoryError>;
readonly deleteByPlanId: (
input: DeleteProjectionThreadProposedPlanByIdInput,
) => Effect.Effect<void, ProjectionRepositoryError>;
}

export class ProjectionThreadProposedPlanRepository extends Context.Service<
Expand Down
13 changes: 12 additions & 1 deletion apps/server/src/provider/Layers/ClaudeAdapter.ts
Comment thread
macroscopeapp[bot] marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ interface ClaudeTurnState {
readonly assistantTextBlocks: Map<number, AssistantTextBlockState>;
readonly assistantTextBlockOrder: Array<AssistantTextBlockState>;
readonly capturedProposedPlanKeys: Set<string>;
readonly interactionMode: "plan" | "default";
nextSyntheticAssistantBlockIndex: number;
}

Expand Down Expand Up @@ -173,6 +174,7 @@ interface ClaudeSessionContext {
lastKnownTokenUsage: ThreadTokenUsageSnapshot | undefined;
lastAssistantUuid: string | undefined;
lastThreadStartedId: string | undefined;
currentInteractionMode: "plan" | "default";
stopped: boolean;
}

Expand Down Expand Up @@ -1359,7 +1361,10 @@ export const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* (
input: {
readonly planMarkdown: string;
readonly toolUseId?: string | undefined;
readonly rawSource: "claude.sdk.message" | "claude.sdk.permission";
readonly rawSource:
| "claude.sdk.message"
| "claude.sdk.permission"
| "client.user-promoted";
readonly rawMethod: string;
readonly rawPayload: unknown;
},
Expand Down Expand Up @@ -1956,6 +1961,7 @@ export const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* (
assistantTextBlocks: new Map(),
assistantTextBlockOrder: [],
capturedProposedPlanKeys: new Set(),
interactionMode: context.currentInteractionMode,
nextSyntheticAssistantBlockIndex: -1,
};
context.session = {
Expand Down Expand Up @@ -2973,6 +2979,7 @@ export const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* (
lastKnownTokenUsage: undefined,
lastAssistantUuid: resumeState?.resumeSessionAt,
lastThreadStartedId: undefined,
currentInteractionMode: permissionMode === "plan" ? "plan" : "default",
stopped: false,
};
yield* Ref.set(contextRef, context);
Expand Down Expand Up @@ -3085,21 +3092,25 @@ export const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* (
try: () => context.query.setPermissionMode("plan"),
catch: (cause) => toRequestError(input.threadId, "turn/setPermissionMode", cause),
});
context.currentInteractionMode = "plan";
} else if (input.interactionMode === "default") {
yield* Effect.tryPromise({
try: () => context.query.setPermissionMode(context.basePermissionMode ?? "default"),
catch: (cause) => toRequestError(input.threadId, "turn/setPermissionMode", cause),
});
context.currentInteractionMode = "default";
}

const turnId = TurnId.make(yield* Random.nextUUIDv4);
const resolvedInteractionMode = context.currentInteractionMode;
const turnState: ClaudeTurnState = {
turnId,
startedAt: yield* nowIso,
items: [],
assistantTextBlocks: new Map(),
assistantTextBlockOrder: [],
capturedProposedPlanKeys: new Set(),
interactionMode: resolvedInteractionMode,
nextSyntheticAssistantBlockIndex: -1,
};

Expand Down
2 changes: 2 additions & 0 deletions apps/server/src/ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ function isThreadDetailEvent(event: OrchestrationEvent): event is Extract<
type:
| "thread.message-sent"
| "thread.proposed-plan-upserted"
| "thread.proposed-plan-removed"
| "thread.activity-appended"
| "thread.turn-diff-completed"
| "thread.reverted"
Expand All @@ -93,6 +94,7 @@ function isThreadDetailEvent(event: OrchestrationEvent): event is Extract<
return (
event.type === "thread.message-sent" ||
event.type === "thread.proposed-plan-upserted" ||
event.type === "thread.proposed-plan-removed" ||
event.type === "thread.activity-appended" ||
event.type === "thread.turn-diff-completed" ||
event.type === "thread.reverted" ||
Expand Down
Loading
Loading