Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions apps/array/src/main/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -377,3 +377,33 @@ app.on("activate", () => {
createWindow();
}
});

// Handle process signals to ensure clean shutdown
const handleShutdownSignal = async (_signal: string) => {
try {
const agentService = container.get<AgentService>(MAIN_TOKENS.AgentService);
await agentService.cleanupAll();
} catch (_err) {}
process.exit(0);
};

process.on("SIGTERM", () => handleShutdownSignal("SIGTERM"));
process.on("SIGINT", () => handleShutdownSignal("SIGINT"));
process.on("SIGHUP", () => handleShutdownSignal("SIGHUP"));

// Handle uncaught exceptions to attempt cleanup before crash
process.on("uncaughtException", async (_error) => {
try {
const agentService = container.get<AgentService>(MAIN_TOKENS.AgentService);
await agentService.cleanupAll();
} catch (_cleanupErr) {}
process.exit(1);
});

process.on("unhandledRejection", async (_reason) => {
try {
const agentService = container.get<AgentService>(MAIN_TOKENS.AgentService);
await agentService.cleanupAll();
} catch (_cleanupErr) {}
process.exit(1);
});
29 changes: 25 additions & 4 deletions apps/array/src/main/services/agent/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -551,19 +551,40 @@ export class AgentService extends TypedEventEmitter<AgentServiceEvents> {
}

async cleanupAll(): Promise<void> {
log.info("Cleaning up all agent sessions", {
sessionCount: this.sessions.size,
});

for (const [taskRunId, session] of this.sessions) {
// Step 1: Send ACP cancel notification for any ongoing prompt turns
try {
if (!session.connection.signal.aborted) {
await session.connection.cancel({ sessionId: taskRunId });
log.info("Sent ACP cancel for session", { taskRunId });
}
} catch (err) {
log.warn("Failed to send ACP cancel", { taskRunId, error: err });
}

// Step 2: Cancel via agent (triggers AbortController)
try {
session.agent.cancelTask(session.taskId);
} catch (err) {
log.warn("Failed to cancel session during cleanup", {
taskRunId,
error: err,
});
log.warn("Failed to cancel task", { taskRunId, error: err });
}

// Step 3: Cleanup agent connection (closes streams, aborts subprocess)
try {
await session.agent.cleanup();
} catch (err) {
log.warn("Failed to cleanup agent", { taskRunId, error: err });
}

this.cleanupMockNodeEnvironment(session.mockNodeDir);
}

this.sessions.clear();
log.info("All agent sessions cleaned up");
}

private setupEnvironment(
Expand Down
37 changes: 33 additions & 4 deletions packages/agent/src/adapters/claude/claude.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ type Session = {
sdkSessionId?: string;
lastPlanFilePath?: string;
lastPlanContent?: string;
abortController: AbortController;
};

type BackgroundTerminal =
Expand Down Expand Up @@ -300,18 +301,33 @@ export class ClaudeAcpAgent implements Agent {
this.sessionStore = sessionStore;
}

closeAllSessions(): void {
for (const [sessionId, session] of Object.entries(this.sessions)) {
try {
// Abort the session's AbortController - this signals the SDK to terminate the subprocess
session.abortController.abort();
this.logger.info("Aborted session", { sessionId });
} catch (err) {
this.logger.warn("Failed to abort session", { sessionId, error: err });
}
}
this.sessions = {};
}

createSession(
sessionId: string,
q: Query,
input: Pushable<SDKUserMessage>,
permissionMode: PermissionMode,
abortController: AbortController,
): Session {
const session: Session = {
query: q,
input,
cancelled: false,
permissionMode,
notificationHistory: [],
abortController,
};
this.sessions[sessionId] = session;
return session;
Expand Down Expand Up @@ -589,11 +605,13 @@ export class ClaudeAcpAgent implements Agent {
options.disallowedTools = disallowedTools;
}

// Handle abort controller from meta options
const abortController = userProvidedOptions?.abortController;
if (abortController?.signal.aborted) {
// Create or use provided abort controller - we need this for cleanup
const sessionAbortController =
userProvidedOptions?.abortController ?? new AbortController();
if (sessionAbortController.signal.aborted) {
throw new Error("Cancelled");
}
options.abortController = sessionAbortController;

// Clear statsig cache before creating query to avoid input_examples bug
clearStatsigCache();
Expand All @@ -603,7 +621,13 @@ export class ClaudeAcpAgent implements Agent {
options,
});

this.createSession(sessionId, q, input, ourPermissionMode);
this.createSession(
sessionId,
q,
input,
ourPermissionMode,
sessionAbortController,
);

// Register for S3 persistence if config provided
const persistence = params._meta?.persistence as
Expand Down Expand Up @@ -1515,6 +1539,10 @@ export class ClaudeAcpAgent implements Agent {
},
};

// Create abort controller for cleanup
const sessionAbortController = new AbortController();
options.abortController = sessionAbortController;

// Clear statsig cache before creating query to avoid input_examples bug
clearStatsigCache();

Expand All @@ -1530,6 +1558,7 @@ export class ClaudeAcpAgent implements Agent {
q,
input,
permissionMode,
sessionAbortController,
);

// Store SDK session ID if resuming
Expand Down
28 changes: 26 additions & 2 deletions packages/agent/src/adapters/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export type AcpConnectionConfig = {
export type InProcessAcpConnection = {
agentConnection: AgentSideConnection;
clientStreams: StreamPair;
cleanup: () => Promise<void>;
};

/**
Expand Down Expand Up @@ -79,10 +80,12 @@ export function createAcpConnection(

const agentStream = ndJsonStream(agentWritable, streams.agent.readable);

// Create the Claude agent
// Create the Claude agent - capture reference for cleanup
let claudeAgent: ClaudeAcpAgent | null = null;
const agentConnection = new AgentSideConnection((client) => {
logger.info("Creating Claude agent");
return new ClaudeAcpAgent(client, sessionStore);
claudeAgent = new ClaudeAcpAgent(client, sessionStore);
return claudeAgent;
}, agentStream);

return {
Expand All @@ -91,5 +94,26 @@ export function createAcpConnection(
readable: streams.client.readable,
writable: clientWritable,
},
cleanup: async () => {
logger.info("Cleaning up ACP connection");

// First close the agent sessions (aborts any running queries)
if (claudeAgent) {
claudeAgent.closeAllSessions();
}

// Then close the streams to properly terminate the ACP connection
// This signals the connection to close and cleanup
try {
await streams.client.writable.close();
} catch {
// Stream may already be closed
}
try {
await streams.agent.writable.close();
} catch {
// Stream may already be closed
}
},
};
}
4 changes: 4 additions & 0 deletions packages/agent/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,10 @@ This PR implements the changes described in the task.`;
}
}

async cleanup(): Promise<void> {
await this.acpConnection?.cleanup();
}

getTaskExecutionStatus(taskId: string): string | null {
// Find the execution for this task
for (const execution of this.taskManager.executionStates.values()) {
Expand Down