Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ export function CommandCenterSessionView({
promptStartedAt,
isInitializing,
cloudBranch,
readOnlyMessage,
errorTitle,
errorMessage,
} = useSessionViewState(taskId, task);
Expand Down Expand Up @@ -67,7 +66,6 @@ export function CommandCenterSessionView({
onRetry={isCloud ? undefined : handleRetry}
onNewSession={isCloud ? undefined : handleNewSession}
isInitializing={isInitializing}
readOnlyMessage={readOnlyMessage}
compact
/>
</Flex>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ interface SessionViewProps {
onRetry?: () => void;
onNewSession?: () => void;
isInitializing?: boolean;
readOnlyMessage?: string;
slackThreadUrl?: string;
compact?: boolean;
}
Expand Down Expand Up @@ -87,7 +86,6 @@ export function SessionView({
onRetry,
onNewSession,
isInitializing = false,
readOnlyMessage,
slackThreadUrl,
compact = false,
}: SessionViewProps) {
Expand Down Expand Up @@ -506,17 +504,6 @@ export function SessionView({
/>
</Box>
</Box>
) : readOnlyMessage ? (
<Flex
align="center"
justify="center"
py="2"
className="border-gray-4 border-t"
>
<Text size="2" color="gray">
{readOnlyMessage}
</Text>
</Flex>
) : (
<Box className="relative border-gray-4 border-t">
<Box
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { useCwd } from "@features/sidebar/hooks/useCwd";
import { useWorkspace } from "@features/workspace/hooks/useWorkspace";
import type { Task } from "@shared/types";
import { useMemo } from "react";
import { useSessionForTask } from "../stores/sessionStore";

export function useSessionViewState(taskId: string, task: Task) {
Expand All @@ -20,9 +19,7 @@ export function useSessionViewState(taskId: string, task: Task) {
cloudStatus === "in_progress");
const isCloudRunTerminal = isCloud && !isCloudRunNotTerminal;

const isRunning = isCloud
? isCloudRunNotTerminal
: session?.status === "connected";
const isRunning = isCloud ? true : session?.status === "connected";
const hasError = isCloud ? false : session?.status === "error";

const events = session?.events ?? [];
Expand All @@ -46,12 +43,6 @@ export function useSessionViewState(taskId: string, task: Task) {
? (workspace?.baseBranch ?? task.latest_run?.branch ?? null)
: null;

const readOnlyMessage = useMemo(() => {
if (!isCloud) return undefined;
if (isCloudRunTerminal) return "This cloud run has finished";
return undefined;
}, [isCloud, isCloudRunTerminal]);

return {
session,
repoPath,
Expand All @@ -66,7 +57,6 @@ export function useSessionViewState(taskId: string, task: Task) {
promptStartedAt,
isInitializing,
cloudBranch,
readOnlyMessage,
errorTitle: isCloud ? undefined : session?.errorTitle,
errorMessage: isCloud ? undefined : session?.errorMessage,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ export function TaskLogsPanel({ taskId, task }: TaskLogsPanelProps) {
promptStartedAt,
isInitializing,
cloudBranch,
readOnlyMessage,
errorTitle,
errorMessage,
} = useSessionViewState(taskId, task);
Expand Down Expand Up @@ -157,7 +156,6 @@ export function TaskLogsPanel({ taskId, task }: TaskLogsPanelProps) {
onRetry={isCloud ? undefined : handleRetry}
onNewSession={isCloud ? undefined : handleNewSession}
isInitializing={isInitializing}
readOnlyMessage={readOnlyMessage}
slackThreadUrl={slackThreadUrl}
/>
</ErrorBoundary>
Expand Down
123 changes: 119 additions & 4 deletions packages/agent/src/server/agent-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@
private questionRelayedToSlack = false;
private detectedPrUrl: string | null = null;
private resumeState: ResumeState | null = null;
// Guards against concurrent session initialization. autoInitializeSession() and
// the GET /events SSE handler can both call initializeSession() — the SSE connection
// often arrives while newSession() is still awaited (this.session is still null),
// causing a second session to be created and duplicate Slack messages to be sent.
private initializationPromise: Promise<void> | null = null;
private pendingEvents: Record<string, unknown>[] = [];

private emitConsoleLog = (
level: LogLevel,
Expand Down Expand Up @@ -264,6 +270,7 @@
await this.initializeSession(payload, sseController);
} else {
this.session.sseController = sseController;
this.replayPendingEvents();
}

this.sendSseEvent(sseController, {
Expand Down Expand Up @@ -483,6 +490,8 @@
`Processing user message (detectedPrUrl=${this.detectedPrUrl ?? "none"}): ${content.substring(0, 100)}...`,
);

this.session.logWriter.resetTurnMessages(this.session.payload.run_id);

const result = await this.session.clientConnection.prompt({
sessionId: this.session.acpSessionId,
prompt: [{ type: "text", text: content }],
Expand All @@ -501,7 +510,31 @@

this.broadcastTurnComplete(result.stopReason);

return { stopReason: result.stopReason };
if (result.stopReason === "end_turn") {
// Relay the response to Slack. For follow-ups this is the primary
// delivery path — the HTTP caller only handles reactions.
this.relayAgentResponse(this.session.payload).catch((err) =>
this.logger.warn("Failed to relay follow-up response", err),
);
}

// Flush logs and include the assistant's response text so callers
// (e.g. Slack follow-up forwarding) can extract it without racing
// against async log persistence to object storage.
let assistantMessage: string | undefined;
try {
await this.session.logWriter.flush(this.session.payload.run_id);
assistantMessage = this.session.logWriter.getFullAgentResponse(
this.session.payload.run_id,
);
} catch {
this.logger.warn("Failed to extract assistant message from logs");
}

return {
stopReason: result.stopReason,
...(assistantMessage && { assistant_message: assistantMessage }),
};
}

case POSTHOG_NOTIFICATIONS.CANCEL:
Expand Down Expand Up @@ -530,6 +563,40 @@
private async initializeSession(
payload: JwtPayload,
sseController: SseController | null,
): Promise<void> {
// Race condition guard: autoInitializeSession() starts first, but while it awaits
// newSession() (which takes ~1-2s for MCP metadata fetch), the Temporal relay connects
// to GET /events. That handler sees this.session === null and calls initializeSession()
// again, creating a duplicate session that sends the same prompt twice — resulting in
// duplicate Slack messages. This lock ensures the second caller waits for the first
// initialization to finish and reuses the session.
if (this.initializationPromise) {
this.logger.info("Waiting for in-progress initialization", {
runId: payload.run_id,
});
await this.initializationPromise;
// After waiting, just attach the SSE controller if needed
if (this.session && sseController) {
this.session.sseController = sseController;
this.replayPendingEvents();
}
return;
}

this.initializationPromise = this._doInitializeSession(
payload,
sseController,
);
try {
await this.initializationPromise;
} finally {
this.initializationPromise = null;
}
}

private async _doInitializeSession(
payload: JwtPayload,
sseController: SseController | null,
): Promise<void> {
if (this.session) {
await this.cleanupSession();
Expand Down Expand Up @@ -770,6 +837,8 @@
usedInitialPromptOverride: !!initialPromptOverride,
});

this.session.logWriter.resetTurnMessages(payload.run_id);

const result = await this.session.clientConnection.prompt({
sessionId: this.session.acpSessionId,
prompt: [{ type: "text", text: initialPrompt }],
Expand All @@ -787,7 +856,7 @@
} catch (error) {
this.logger.error("Failed to send initial task message", error);
if (this.session) {
await this.session.logWriter.flushAll();

Check failure on line 859 in packages/agent/src/server/agent-server.ts

View workflow job for this annotation

GitHub Actions / unit-test

src/server/question-relay.test.ts > Question relay > sendInitialTaskMessage prompt source > falls back to task description when override is missing

TypeError: Cannot read properties of undefined (reading 'flushAll') ❯ AgentServer.sendInitialTaskMessage src/server/agent-server.ts:859:38 ❯ src/server/question-relay.test.ts:355:7

Check failure on line 859 in packages/agent/src/server/agent-server.ts

View workflow job for this annotation

GitHub Actions / unit-test

src/server/question-relay.test.ts > Question relay > sendInitialTaskMessage prompt source > uses run state initial_prompt_override when present

TypeError: Cannot read properties of undefined (reading 'flushAll') ❯ AgentServer.sendInitialTaskMessage src/server/agent-server.ts:859:38 ❯ src/server/question-relay.test.ts:328:7

Check failure on line 859 in packages/agent/src/server/agent-server.ts

View workflow job for this annotation

GitHub Actions / unit-test

src/server/question-relay.test.ts > Question relay > sendInitialTaskMessage prompt source > falls back to task description when override is missing

TypeError: Cannot read properties of undefined (reading 'flushAll') ❯ AgentServer.sendInitialTaskMessage src/server/agent-server.ts:859:38 ❯ src/server/question-relay.test.ts:355:7

Check failure on line 859 in packages/agent/src/server/agent-server.ts

View workflow job for this annotation

GitHub Actions / unit-test

src/server/question-relay.test.ts > Question relay > sendInitialTaskMessage prompt source > uses run state initial_prompt_override when present

TypeError: Cannot read properties of undefined (reading 'flushAll') ❯ AgentServer.sendInitialTaskMessage src/server/agent-server.ts:859:38 ❯ src/server/question-relay.test.ts:328:7
}
await this.signalTaskComplete(payload, "error");
}
Expand All @@ -809,8 +878,8 @@
const pendingUserMessage = this.getPendingUserMessage(taskRun);

const sandboxContext = this.resumeState.snapshotApplied
? `The sandbox environment (all files, packages, and code changes) has been fully restored from a snapshot.`
: `The sandbox could not be restored from a snapshot (it may have expired). You are starting with a fresh environment but have the full conversation history below.`;
? `The workspace environment (all files, packages, and code changes) has been fully restored from where you left off.`
: `The workspace files from the previous session were not restored (the file snapshot may have expired), so you are starting with a fresh environment. Your conversation history is fully preserved below.`;

let resumePrompt: string;
if (pendingUserMessage) {
Expand Down Expand Up @@ -842,6 +911,8 @@
// Clear resume state so it's not reused
this.resumeState = null;

this.session.logWriter.resetTurnMessages(payload.run_id);

const result = await this.session.clientConnection.prompt({
sessionId: this.session.acpSessionId,
prompt: [{ type: "text", text: resumePrompt }],
Expand All @@ -852,6 +923,10 @@
});

this.broadcastTurnComplete(result.stopReason);

if (result.stopReason === "end_turn") {
await this.relayAgentResponse(payload);
}
} catch (error) {
this.logger.error("Failed to send resume message", error);
if (this.session) {
Expand Down Expand Up @@ -992,6 +1067,27 @@
`;
}

if (!this.config.repositoryPath) {
return `
# Cloud Task Execution — No Repository Mode

You are a helpful assistant with access to PostHog via MCP tools. You can help with both code tasks and data/analytics questions.

When the user asks about analytics, data, metrics, events, funnels, dashboards, feature flags, experiments, or anything PostHog-related:
- Use your PostHog MCP tools to query data, search insights, and provide real answers
- Do NOT tell the user to check an external analytics platform — you ARE the analytics platform
- Use tools like insight-query, query-run, event-definitions-list, and others to answer questions directly

When the user asks for code changes or software engineering tasks:
- Let them know you can help but don't have a repository connected for this session
- Offer to write code snippets, scripts, or provide guidance

Important:
- Do NOT create branches, commits, or pull requests in this mode.
- Prefer using MCP tools to answer questions with real data over giving generic advice.
`;
}

return `
# Cloud Task Execution

Expand Down Expand Up @@ -1124,6 +1220,12 @@
},
};
},
extNotification: async (
method: string,
params: Record<string, unknown>,
) => {
this.logger.debug("Extension notification", { method, params });
},
sessionUpdate: async (params: {
sessionId: string;
update?: Record<string, unknown>;
Expand Down Expand Up @@ -1176,7 +1278,7 @@
});
}

const message = this.session.logWriter.getLastAgentMessage(payload.run_id);
const message = this.session.logWriter.getFullAgentResponse(payload.run_id);

Check failure on line 1281 in packages/agent/src/server/agent-server.ts

View workflow job for this annotation

GitHub Actions / unit-test

src/server/question-relay.test.ts > Question relay > relayAgentResponse duplicate suppression > does not relay when no agent message is available

TypeError: this.session.logWriter.getFullAgentResponse is not a function ❯ AgentServer.relayAgentResponse src/server/agent-server.ts:1281:44 ❯ src/server/question-relay.test.ts:302:7

Check failure on line 1281 in packages/agent/src/server/agent-server.ts

View workflow job for this annotation

GitHub Actions / unit-test

src/server/question-relay.test.ts > Question relay > relayAgentResponse duplicate suppression > relays normally when questionRelayedToSlack is not set

TypeError: this.session.logWriter.getFullAgentResponse is not a function ❯ AgentServer.relayAgentResponse src/server/agent-server.ts:1281:44 ❯ src/server/question-relay.test.ts:278:7

Check failure on line 1281 in packages/agent/src/server/agent-server.ts

View workflow job for this annotation

GitHub Actions / unit-test

src/server/question-relay.test.ts > Question relay > relayAgentResponse duplicate suppression > does not relay when no agent message is available

TypeError: this.session.logWriter.getFullAgentResponse is not a function ❯ AgentServer.relayAgentResponse src/server/agent-server.ts:1281:44 ❯ src/server/question-relay.test.ts:302:7

Check failure on line 1281 in packages/agent/src/server/agent-server.ts

View workflow job for this annotation

GitHub Actions / unit-test

src/server/question-relay.test.ts > Question relay > relayAgentResponse duplicate suppression > relays normally when questionRelayedToSlack is not set

TypeError: this.session.logWriter.getFullAgentResponse is not a function ❯ AgentServer.relayAgentResponse src/server/agent-server.ts:1281:44 ❯ src/server/question-relay.test.ts:278:7
if (!message) {
this.logger.warn("No agent message found for Slack relay", {
taskId: payload.task_id,
Expand Down Expand Up @@ -1385,6 +1487,7 @@
this.session.sseController.close();
}

this.pendingEvents = [];
this.session = null;
}

Expand Down Expand Up @@ -1444,6 +1547,18 @@
private broadcastEvent(event: Record<string, unknown>): void {
if (this.session?.sseController) {
this.sendSseEvent(this.session.sseController, event);
} else if (this.session) {
// Buffer events during initialization (sseController not yet attached)
this.pendingEvents.push(event);
}
}

private replayPendingEvents(): void {
if (!this.session?.sseController || this.pendingEvents.length === 0) return;
const events = this.pendingEvents;
this.pendingEvents = [];
for (const event of events) {
this.sendSseEvent(this.session.sseController, event);
}
}

Expand Down
18 changes: 17 additions & 1 deletion packages/agent/src/session-log-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ interface SessionState {
context: SessionContext;
chunkBuffer?: ChunkBuffer;
lastAgentMessage?: string;
currentTurnMessages: string[];
}

export class SessionLogWriter {
Expand Down Expand Up @@ -69,7 +70,7 @@ export class SessionLogWriter {
taskId: context.taskId,
runId: context.runId,
});
this.sessions.set(sessionId, { context });
this.sessions.set(sessionId, { context, currentTurnMessages: [] });

this.lastFlushAttemptTime.set(sessionId, Date.now());

Expand Down Expand Up @@ -127,6 +128,7 @@ export class SessionLogWriter {
const nonChunkAgentText = this.extractAgentMessageText(message);
if (nonChunkAgentText) {
session.lastAgentMessage = nonChunkAgentText;
session.currentTurnMessages.push(nonChunkAgentText);
}

const entry: StoredNotification = {
Expand Down Expand Up @@ -240,6 +242,7 @@ export class SessionLogWriter {
const { text, firstTimestamp } = session.chunkBuffer;
session.chunkBuffer = undefined;
session.lastAgentMessage = text;
session.currentTurnMessages.push(text);

const entry: StoredNotification = {
type: "notification",
Expand Down Expand Up @@ -270,6 +273,19 @@ export class SessionLogWriter {
return this.sessions.get(sessionId)?.lastAgentMessage;
}

getFullAgentResponse(sessionId: string): string | undefined {
const session = this.sessions.get(sessionId);
if (!session || session.currentTurnMessages.length === 0) return undefined;
return session.currentTurnMessages.join("\n\n");
}

resetTurnMessages(sessionId: string): void {
const session = this.sessions.get(sessionId);
if (session) {
session.currentTurnMessages = [];
}
}

private extractAgentMessageText(
message: Record<string, unknown>,
): string | null {
Expand Down
Loading