Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ export function CommandCenterSessionView({
promptStartedAt,
isInitializing,
cloudBranch,
readOnlyMessage,
errorTitle,
errorMessage,
} = useSessionViewState(taskId, task);
Expand Down Expand Up @@ -67,7 +66,6 @@ export function CommandCenterSessionView({
onRetry={isCloud ? undefined : handleRetry}
onNewSession={isCloud ? undefined : handleNewSession}
isInitializing={isInitializing}
readOnlyMessage={readOnlyMessage}
compact
/>
</Flex>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ interface SessionViewProps {
onRetry?: () => void;
onNewSession?: () => void;
isInitializing?: boolean;
readOnlyMessage?: string;
slackThreadUrl?: string;
compact?: boolean;
}
Expand Down Expand Up @@ -87,7 +86,6 @@ export function SessionView({
onRetry,
onNewSession,
isInitializing = false,
readOnlyMessage,
slackThreadUrl,
compact = false,
}: SessionViewProps) {
Expand Down Expand Up @@ -506,17 +504,6 @@ export function SessionView({
/>
</Box>
</Box>
) : readOnlyMessage ? (
<Flex
align="center"
justify="center"
py="2"
className="border-gray-4 border-t"
>
<Text size="2" color="gray">
{readOnlyMessage}
</Text>
</Flex>
) : (
<Box className="relative border-gray-4 border-t">
<Box
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { useCwd } from "@features/sidebar/hooks/useCwd";
import { useWorkspace } from "@features/workspace/hooks/useWorkspace";
import type { Task } from "@shared/types";
import { useMemo } from "react";
import { useSessionForTask } from "../stores/sessionStore";

export function useSessionViewState(taskId: string, task: Task) {
Expand All @@ -20,9 +19,7 @@ export function useSessionViewState(taskId: string, task: Task) {
cloudStatus === "in_progress");
const isCloudRunTerminal = isCloud && !isCloudRunNotTerminal;

const isRunning = isCloud
? isCloudRunNotTerminal
: session?.status === "connected";
const isRunning = isCloud ? true : session?.status === "connected";
const hasError = isCloud ? false : session?.status === "error";

const events = session?.events ?? [];
Expand All @@ -46,12 +43,6 @@ export function useSessionViewState(taskId: string, task: Task) {
? (workspace?.baseBranch ?? task.latest_run?.branch ?? null)
: null;

const readOnlyMessage = useMemo(() => {
if (!isCloud) return undefined;
if (isCloudRunTerminal) return "This cloud run has finished";
return undefined;
}, [isCloud, isCloudRunTerminal]);

return {
session,
repoPath,
Expand All @@ -66,7 +57,6 @@ export function useSessionViewState(taskId: string, task: Task) {
promptStartedAt,
isInitializing,
cloudBranch,
readOnlyMessage,
errorTitle: isCloud ? undefined : session?.errorTitle,
errorMessage: isCloud ? undefined : session?.errorMessage,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ export function TaskLogsPanel({ taskId, task }: TaskLogsPanelProps) {
promptStartedAt,
isInitializing,
cloudBranch,
readOnlyMessage,
errorTitle,
errorMessage,
} = useSessionViewState(taskId, task);
Expand Down Expand Up @@ -157,7 +156,6 @@ export function TaskLogsPanel({ taskId, task }: TaskLogsPanelProps) {
onRetry={isCloud ? undefined : handleRetry}
onNewSession={isCloud ? undefined : handleNewSession}
isInitializing={isInitializing}
readOnlyMessage={readOnlyMessage}
slackThreadUrl={slackThreadUrl}
/>
</ErrorBoundary>
Expand Down
123 changes: 119 additions & 4 deletions packages/agent/src/server/agent-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ export class AgentServer {
private questionRelayedToSlack = false;
private detectedPrUrl: string | null = null;
private resumeState: ResumeState | null = null;
// Guards against concurrent session initialization. autoInitializeSession() and
// the GET /events SSE handler can both call initializeSession() — the SSE connection
// often arrives while newSession() is still awaited (this.session is still null),
// causing a second session to be created and duplicate Slack messages to be sent.
private initializationPromise: Promise<void> | null = null;
private pendingEvents: Record<string, unknown>[] = [];

private emitConsoleLog = (
level: LogLevel,
Expand Down Expand Up @@ -264,6 +270,7 @@ export class AgentServer {
await this.initializeSession(payload, sseController);
} else {
this.session.sseController = sseController;
this.replayPendingEvents();
}

this.sendSseEvent(sseController, {
Expand Down Expand Up @@ -483,6 +490,8 @@ export class AgentServer {
`Processing user message (detectedPrUrl=${this.detectedPrUrl ?? "none"}): ${content.substring(0, 100)}...`,
);

this.session.logWriter.resetTurnMessages(this.session.payload.run_id);

const result = await this.session.clientConnection.prompt({
sessionId: this.session.acpSessionId,
prompt: [{ type: "text", text: content }],
Expand All @@ -501,7 +510,31 @@ export class AgentServer {

this.broadcastTurnComplete(result.stopReason);

return { stopReason: result.stopReason };
if (result.stopReason === "end_turn") {
// Relay the response to Slack. For follow-ups this is the primary
// delivery path — the HTTP caller only handles reactions.
this.relayAgentResponse(this.session.payload).catch((err) =>
this.logger.warn("Failed to relay follow-up response", err),
);
}

// Flush logs and include the assistant's response text so callers
// (e.g. Slack follow-up forwarding) can extract it without racing
// against async log persistence to object storage.
let assistantMessage: string | undefined;
try {
await this.session.logWriter.flush(this.session.payload.run_id);
assistantMessage = this.session.logWriter.getFullAgentResponse(
this.session.payload.run_id,
);
} catch {
this.logger.warn("Failed to extract assistant message from logs");
}

return {
stopReason: result.stopReason,
...(assistantMessage && { assistant_message: assistantMessage }),
};
}

case POSTHOG_NOTIFICATIONS.CANCEL:
Expand Down Expand Up @@ -530,6 +563,40 @@ export class AgentServer {
private async initializeSession(
payload: JwtPayload,
sseController: SseController | null,
): Promise<void> {
// Race condition guard: autoInitializeSession() starts first, but while it awaits
// newSession() (which takes ~1-2s for MCP metadata fetch), the Temporal relay connects
// to GET /events. That handler sees this.session === null and calls initializeSession()
// again, creating a duplicate session that sends the same prompt twice — resulting in
// duplicate Slack messages. This lock ensures the second caller waits for the first
// initialization to finish and reuses the session.
if (this.initializationPromise) {
this.logger.info("Waiting for in-progress initialization", {
runId: payload.run_id,
});
await this.initializationPromise;
// After waiting, just attach the SSE controller if needed
if (this.session && sseController) {
this.session.sseController = sseController;
this.replayPendingEvents();
}
return;
}

this.initializationPromise = this._doInitializeSession(
payload,
sseController,
);
try {
await this.initializationPromise;
} finally {
this.initializationPromise = null;
}
}

private async _doInitializeSession(
payload: JwtPayload,
sseController: SseController | null,
): Promise<void> {
if (this.session) {
await this.cleanupSession();
Expand Down Expand Up @@ -770,6 +837,8 @@ export class AgentServer {
usedInitialPromptOverride: !!initialPromptOverride,
});

this.session.logWriter.resetTurnMessages(payload.run_id);

const result = await this.session.clientConnection.prompt({
sessionId: this.session.acpSessionId,
prompt: [{ type: "text", text: initialPrompt }],
Expand Down Expand Up @@ -809,8 +878,8 @@ export class AgentServer {
const pendingUserMessage = this.getPendingUserMessage(taskRun);

const sandboxContext = this.resumeState.snapshotApplied
? `The sandbox environment (all files, packages, and code changes) has been fully restored from a snapshot.`
: `The sandbox could not be restored from a snapshot (it may have expired). You are starting with a fresh environment but have the full conversation history below.`;
? `The workspace environment (all files, packages, and code changes) has been fully restored from where you left off.`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

felt cleaner but can revert it tho

: `The workspace files from the previous session were not restored (the file snapshot may have expired), so you are starting with a fresh environment. Your conversation history is fully preserved below.`;

let resumePrompt: string;
if (pendingUserMessage) {
Expand Down Expand Up @@ -842,6 +911,8 @@ export class AgentServer {
// Clear resume state so it's not reused
this.resumeState = null;

this.session.logWriter.resetTurnMessages(payload.run_id);

const result = await this.session.clientConnection.prompt({
sessionId: this.session.acpSessionId,
prompt: [{ type: "text", text: resumePrompt }],
Expand All @@ -852,6 +923,10 @@ export class AgentServer {
});

this.broadcastTurnComplete(result.stopReason);

if (result.stopReason === "end_turn") {
await this.relayAgentResponse(payload);
}
} catch (error) {
this.logger.error("Failed to send resume message", error);
if (this.session) {
Expand Down Expand Up @@ -992,6 +1067,27 @@ Important:
`;
}

if (!this.config.repositoryPath) {
return `
# Cloud Task Execution — No Repository Mode

You are a helpful assistant with access to PostHog via MCP tools. You can help with both code tasks and data/analytics questions.

When the user asks about analytics, data, metrics, events, funnels, dashboards, feature flags, experiments, or anything PostHog-related:
- Use your PostHog MCP tools to query data, search insights, and provide real answers
- Do NOT tell the user to check an external analytics platform — you ARE the analytics platform
- Use tools like insight-query, query-run, event-definitions-list, and others to answer questions directly

When the user asks for code changes or software engineering tasks:
- Let them know you can help but don't have a repository connected for this session
- Offer to write code snippets, scripts, or provide guidance

Important:
- Do NOT create branches, commits, or pull requests in this mode.
- Prefer using MCP tools to answer questions with real data over giving generic advice.
`;
}

return `
# Cloud Task Execution

Expand Down Expand Up @@ -1124,6 +1220,12 @@ Important:
},
};
},
extNotification: async (
method: string,
params: Record<string, unknown>,
) => {
this.logger.debug("Extension notification", { method, params });
},
sessionUpdate: async (params: {
sessionId: string;
update?: Record<string, unknown>;
Expand Down Expand Up @@ -1176,7 +1278,7 @@ Important:
});
}

const message = this.session.logWriter.getLastAgentMessage(payload.run_id);
const message = this.session.logWriter.getFullAgentResponse(payload.run_id);
if (!message) {
this.logger.warn("No agent message found for Slack relay", {
taskId: payload.task_id,
Expand Down Expand Up @@ -1385,6 +1487,7 @@ Important:
this.session.sseController.close();
}

this.pendingEvents = [];
this.session = null;
}

Expand Down Expand Up @@ -1444,6 +1547,18 @@ Important:
private broadcastEvent(event: Record<string, unknown>): void {
if (this.session?.sseController) {
this.sendSseEvent(this.session.sseController, event);
} else if (this.session) {
// Buffer events during initialization (sseController not yet attached)
this.pendingEvents.push(event);
}
}

private replayPendingEvents(): void {
if (!this.session?.sseController || this.pendingEvents.length === 0) return;
const events = this.pendingEvents;
this.pendingEvents = [];
for (const event of events) {
this.sendSseEvent(this.session.sseController, event);
}
}

Expand Down
20 changes: 17 additions & 3 deletions packages/agent/src/server/question-relay.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ describe("Question relay", () => {
payload: TEST_PAYLOAD,
logWriter: {
flush: vi.fn().mockResolvedValue(undefined),
getLastAgentMessage: vi.fn().mockReturnValue("agent response"),
getFullAgentResponse: vi.fn().mockReturnValue("agent response"),
isRegistered: vi.fn().mockReturnValue(true),
},
};
Expand All @@ -269,7 +269,7 @@ describe("Question relay", () => {
payload: TEST_PAYLOAD,
logWriter: {
flush: vi.fn().mockResolvedValue(undefined),
getLastAgentMessage: vi.fn().mockReturnValue("agent response"),
getFullAgentResponse: vi.fn().mockReturnValue("agent response"),
isRegistered: vi.fn().mockReturnValue(true),
},
};
Expand All @@ -293,7 +293,7 @@ describe("Question relay", () => {
payload: TEST_PAYLOAD,
logWriter: {
flush: vi.fn().mockResolvedValue(undefined),
getLastAgentMessage: vi.fn().mockReturnValue(null),
getFullAgentResponse: vi.fn().mockReturnValue(null),
isRegistered: vi.fn().mockReturnValue(true),
},
};
Expand Down Expand Up @@ -323,6 +323,13 @@ describe("Question relay", () => {
payload: TEST_PAYLOAD,
acpSessionId: "acp-session",
clientConnection: { prompt: promptSpy },
logWriter: {
flushAll: vi.fn().mockResolvedValue(undefined),
getFullAgentResponse: vi.fn().mockReturnValue(null),
resetTurnMessages: vi.fn(),
flush: vi.fn().mockResolvedValue(undefined),
isRegistered: vi.fn().mockReturnValue(true),
},
};

await server.sendInitialTaskMessage(TEST_PAYLOAD);
Expand Down Expand Up @@ -350,6 +357,13 @@ describe("Question relay", () => {
payload: TEST_PAYLOAD,
acpSessionId: "acp-session",
clientConnection: { prompt: promptSpy },
logWriter: {
flushAll: vi.fn().mockResolvedValue(undefined),
getFullAgentResponse: vi.fn().mockReturnValue(null),
resetTurnMessages: vi.fn(),
flush: vi.fn().mockResolvedValue(undefined),
isRegistered: vi.fn().mockReturnValue(true),
},
};

await server.sendInitialTaskMessage(TEST_PAYLOAD);
Expand Down
Loading
Loading