Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ SLACK_SOCKET_OPEN_URL=apps.connections.open
SLACK_INITIAL_THREAD_HISTORY_COUNT=8
SLACK_HISTORY_API_MAX_LIMIT=50
SLACK_ACTIVE_TURN_RECONCILE_INTERVAL_MS=15000
SLACK_ACTIVE_TURN_STALL_TIMEOUT_MS=600000
SLACK_PROGRESS_REMINDER_AFTER_MS=120000
SLACK_PROGRESS_REMINDER_REPEAT_MS=120000

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ Copy `.env.example` to `.env` and fill in:
- `SLACK_BOT_TOKEN`
- optional `SLACK_INITIAL_THREAD_HISTORY_COUNT`
- optional `SLACK_HISTORY_API_MAX_LIMIT`
- optional `SLACK_ACTIVE_TURN_STALL_TIMEOUT_MS` (defaults to 10 minutes)
- optional `SESSIONS_ROOT`
- optional `REPOS_ROOT`
- optional `LOG_DIR`
Expand Down
25 changes: 25 additions & 0 deletions docs/proposals/active-turn-watchdog-recovery.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Active Turn Watchdog Recovery

## Goal
Keep Slack threads from getting permanently stuck when the agent runtime loses a terminal turn event or leaves a turn in `inProgress` after the agent has stopped producing activity.

## Current state
The broker clears `activeTurnId` when the runtime returns a terminal completion or when `thread/read` reports a terminal turn state. If the runtime keeps reporting an active turn as `inProgress` while no agent-runtime events arrive, newer Slack messages are delivered into that stale turn and the session remains blocked.

## Proposed changes
- Add a configurable active-turn stall timeout based on the latest agent-runtime trace event for the active turn, falling back to `activeTurnStartedAt`.
- During active-turn reconciliation, if `thread/read` still reports `inProgress`/`unknown` but agent activity is older than the timeout, best-effort interrupt the stale turn, reset its inflight messages to `pending`, clear `activeTurnId`, and resume dispatch.
- Add an internal manual repair endpoint for a single session that performs the same reset/resume flow without discarding Slack history or agent session state.
- Cover the flow with unit and e2e tests so follow-up Slack messages are eventually retried in a fresh turn instead of being lost in the stale active turn.

## If we do not change it
A lost terminal event or silent runtime stall can leave the thread stuck indefinitely; follow-up messages will keep joining the stale turn and users will not get the requested PR/final update.

## After the change
A silent active turn is recovered after the configured timeout. The user’s latest Slack messages remain pending and are redelivered to a fresh turn.

## Acceptance criteria
- A test can reproduce an active turn that remains `inProgress` with no agent-runtime activity and verify the broker clears it after the timeout.
- Inflight messages for the stale turn are reset to `pending` and processed by a replacement turn.
- A manual repair API can reset one stale active turn and resume pending work without a full session reset.
- Existing active-turn reconciliation behavior for terminal, missing, and temporarily omitted turns remains intact.
2 changes: 2 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export interface AppConfig {
readonly slackInitialThreadHistoryCount: number;
readonly slackHistoryApiMaxLimit: number;
readonly slackActiveTurnReconcileIntervalMs: number;
readonly slackActiveTurnStallTimeoutMs: number;
readonly slackMissedThreadRecoveryIntervalMs: number;
readonly stateDir: string;
readonly jobsRoot: string;
Expand Down Expand Up @@ -188,6 +189,7 @@ export function loadConfig(env = process.env): AppConfig {
slackInitialThreadHistoryCount: getNumber(env, "SLACK_INITIAL_THREAD_HISTORY_COUNT", 8),
slackHistoryApiMaxLimit: getNumber(env, "SLACK_HISTORY_API_MAX_LIMIT", 50),
slackActiveTurnReconcileIntervalMs: getNumber(env, "SLACK_ACTIVE_TURN_RECONCILE_INTERVAL_MS", 15_000),
slackActiveTurnStallTimeoutMs: getNumber(env, "SLACK_ACTIVE_TURN_STALL_TIMEOUT_MS", 10 * 60_000),
slackMissedThreadRecoveryIntervalMs: getNumber(
env,
"SLACK_MISSED_THREAD_RECOVERY_INTERVAL_MS",
Expand Down
45 changes: 45 additions & 0 deletions src/http/slack-routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ export async function handleSlackRequest(
return true;
}

const matchedRepairActiveTurn = matchRepairActiveTurnPath(url.pathname);
if (method === "POST" && matchedRepairActiveTurn) {
await handleSlackRepairActiveTurnRequest(response, options, matchedRepairActiveTurn.sessionKey);
return true;
}

const matchedResetSession = matchResetSessionPath(url.pathname);
if (method === "POST" && matchedResetSession) {
await handleSlackResetSessionRequest(response, options, matchedResetSession.sessionKey);
Expand Down Expand Up @@ -112,6 +118,28 @@ async function handleSlackResumePendingSessionRequest(
}
}

async function handleSlackRepairActiveTurnRequest(
response: http.ServerResponse,
options: {
readonly bridge: SlackAgentBridge;
},
sessionKey: string
): Promise<void> {
try {
const repair = await options.bridge.repairActiveTurn(sessionKey);
respondJson(response, 200, {
ok: true,
sessionKey,
repair
});
} catch (error) {
respondJson(response, 500, {
ok: false,
error: error instanceof Error ? error.message : String(error)
});
}
}

async function handleSlackResetSessionRequest(
response: http.ServerResponse,
options: {
Expand Down Expand Up @@ -694,6 +722,23 @@ function matchResumeSessionPath(pathname: string): { readonly sessionKey: string
};
}

function matchRepairActiveTurnPath(pathname: string): { readonly sessionKey: string } | null {
const prefix = "/slack/sessions/";
const suffix = "/repair-active-turn";
if (!pathname.startsWith(prefix) || !pathname.endsWith(suffix)) {
return null;
}

const encodedKey = pathname.slice(prefix.length, -suffix.length);
if (!encodedKey) {
return null;
}

return {
sessionKey: decodeURIComponent(encodedKey)
};
}

function matchResetSessionPath(pathname: string): { readonly sessionKey: string } | null {
const prefix = "/slack/sessions/";
const suffix = "/reset";
Expand Down
4 changes: 4 additions & 0 deletions src/services/slack/slack-agent-bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ export class SlackAgentBridge {
return await this.#conversations.resumePendingSession(sessionKey);
}

async repairActiveTurn(sessionKey: string) {
return await this.#conversations.repairActiveTurn(sessionKey);
}

async resetSession(sessionKey: string) {
return await this.#conversations.resetSession(sessionKey);
}
Expand Down
72 changes: 71 additions & 1 deletion src/services/slack/slack-conversation-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ export class SlackConversationService {
this.#turnReconciler = new SlackTurnReconciler({
sessions: this.#sessions,
turnRunner: this.#turnRunner,
inboundStore: this.#inboundStore
inboundStore: this.#inboundStore,
activeTurnStallTimeoutMs: this.#config.slackActiveTurnStallTimeoutMs
});
this.#agentRuntimeEventHandler = (event) => {
this.#handleAgentRuntimeEvent(event);
Expand Down Expand Up @@ -312,6 +313,75 @@ export class SlackConversationService {
});
}

async repairActiveTurn(sessionKey: string): Promise<{
readonly repaired: boolean;
readonly previousActiveTurnId: string | null;
readonly resetInflightCount: number;
readonly resumedCount: number;
readonly interruptedActiveTurn: boolean;
readonly interruptError?: string | undefined;
readonly authBlocked: boolean;
}> {
const session = this.#findSessionByKey(sessionKey);
const previousActiveTurnId = session.activeTurnId ?? null;
const authBlocked = Boolean(session.authBlockedAt);

if (!session.activeTurnId) {
const resumedCount = authBlocked
? 0
: await this.#resumePendingDispatch(session.key, {
forceReset: true
});
return {
repaired: false,
previousActiveTurnId,
resetInflightCount: 0,
resumedCount,
interruptedActiveTurn: false,
authBlocked
};
}

const activeTurnId = session.activeTurnId;
const resetInflightCount = await this.#inboundStore.resetTurnBatchToPending(session, activeTurnId);
await this.#sessions.setActiveTurnId(session.channelId, session.rootThreadTs, undefined);
this.#resetRuntimeProcessing(session.key);
this.#clearAssistantStatus(session.channelId, session.rootThreadTs);

let interruptedActiveTurn = false;
let interruptError: string | undefined;
if (session.agentSessionId) {
try {
await this.#turnRunner.interrupt(session);
interruptedActiveTurn = true;
} catch (error) {
interruptError = error instanceof Error ? error.message : String(error);
logger.warn("Failed to interrupt active turn during manual active-turn repair", {
sessionKey: session.key,
agentSessionId: session.agentSessionId,
turnId: activeTurnId,
error: interruptError
});
}
}

const resumedCount = authBlocked
? 0
: await this.#resumePendingDispatch(session.key, {
forceReset: true
});

return {
repaired: true,
previousActiveTurnId,
resetInflightCount,
resumedCount,
interruptedActiveTurn,
authBlocked,
...(interruptError ? { interruptError } : {})
};
}

async resetSession(sessionKey: string): Promise<{
readonly clearedInboundCount: number;
readonly resetMessageTs: string;
Expand Down
5 changes: 3 additions & 2 deletions src/services/slack/slack-inbound-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,9 @@ export class SlackInboundStore {
return session;
}

async resetTurnBatchToPending(session: SlackSessionRecord, turnId: string): Promise<void> {
await this.#sessions.resetInflightMessages(session.channelId, session.rootThreadTs, turnId);
async resetTurnBatchToPending(session: SlackSessionRecord, turnId: string): Promise<number> {
const resetMessages = await this.#sessions.resetInflightMessages(session.channelId, session.rootThreadTs, turnId);
return resetMessages.length;
}

async reconcileOrphanedInflightMessages(session: SlackSessionRecord): Promise<{
Expand Down
91 changes: 90 additions & 1 deletion src/services/slack/slack-turn-reconciler.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,30 @@
import { logger } from "../../logger.js";
import type { SlackSessionRecord } from "../../types.js";
import type { PersistedAgentTraceEvent, SlackSessionRecord } from "../../types.js";
import { SessionManager } from "../session-manager.js";
import { SlackInboundStore } from "./slack-inbound-store.js";
import { SlackTurnRunner } from "./slack-turn-runner.js";

const ACTIVE_TURN_TRACE_LOOKBACK_LIMIT = 500;

export class SlackTurnReconciler {
readonly #sessions: SessionManager;
readonly #turnRunner: SlackTurnRunner;
readonly #inboundStore: SlackInboundStore;
readonly #activeTurnStallTimeoutMs: number;
readonly #now: () => number;

constructor(options: {
readonly sessions: SessionManager;
readonly turnRunner: SlackTurnRunner;
readonly inboundStore: SlackInboundStore;
readonly activeTurnStallTimeoutMs?: number | undefined;
readonly now?: (() => number) | undefined;
}) {
this.#sessions = options.sessions;
this.#turnRunner = options.turnRunner;
this.#inboundStore = options.inboundStore;
this.#activeTurnStallTimeoutMs = options.activeTurnStallTimeoutMs ?? Number.POSITIVE_INFINITY;
this.#now = options.now ?? Date.now;
}

async reconcileSingleActiveTurn(
Expand Down Expand Up @@ -62,6 +70,33 @@ export class SlackTurnReconciler {
}

if (snapshot.status === "inProgress" || snapshot.status === "unknown") {
const stale = this.#getStalledActiveTurn(session, activeTurnId);
if (stale) {
logger.warn("Detected stalled active agent turn; resetting broker runtime state", {
sessionKey: session.key,
turnId: activeTurnId,
status: snapshot.status,
lastAgentRuntimeActivityAt: stale.lastActivityAt,
inactiveMs: stale.inactiveMs,
staleAfterMs: this.#activeTurnStallTimeoutMs
});
await this.#inboundStore.resetTurnBatchToPending(hydratedSession, activeTurnId);
await this.#sessions.setActiveTurnId(
hydratedSession.channelId,
hydratedSession.rootThreadTs,
undefined
);
try {
await this.#turnRunner.interrupt(hydratedSession);
} catch (error) {
logger.warn("Failed to interrupt stalled active agent turn during reconciliation", {
sessionKey: session.key,
turnId: activeTurnId,
error: error instanceof Error ? error.message : String(error)
});
}
return "cleared";
}
return "retained";
}

Expand All @@ -84,4 +119,58 @@ export class SlackTurnReconciler {
);
return "cleared";
}

#getStalledActiveTurn(
session: SlackSessionRecord,
turnId: string
): {
readonly lastActivityAt: string;
readonly inactiveMs: number;
} | null {
if (!Number.isFinite(this.#activeTurnStallTimeoutMs) || this.#activeTurnStallTimeoutMs < 0) {
return null;
}

const lastActivityAt = this.#latestAgentRuntimeActivityAt(session, turnId);
if (!lastActivityAt) {
return null;
}

const lastActivityMs = Date.parse(lastActivityAt);
if (!Number.isFinite(lastActivityMs)) {
return null;
}

const inactiveMs = this.#now() - lastActivityMs;
return inactiveMs >= this.#activeTurnStallTimeoutMs
? {
lastActivityAt,
inactiveMs
}
: null;
}

#latestAgentRuntimeActivityAt(session: SlackSessionRecord, turnId: string): string | undefined {
const traceEvents = this.#sessions.listAgentTraceEventsPage(session.key, {
limit: ACTIVE_TURN_TRACE_LOOKBACK_LIMIT
}).events;
const latestTrace = traceEvents
.filter((event) => event.turnId === turnId && event.source === "agent_runtime")
.sort(compareTraceActivity)
.at(-1);

return latestTrace?.at ?? session.activeTurnStartedAt ?? session.createdAt;
}
}

function compareTraceActivity(left: PersistedAgentTraceEvent, right: PersistedAgentTraceEvent): number {
const leftAt = Date.parse(left.at);
const rightAt = Date.parse(right.at);
if (Number.isFinite(leftAt) && Number.isFinite(rightAt) && leftAt !== rightAt) {
return leftAt - rightAt;
}
if (left.sequence !== right.sequence) {
return left.sequence - right.sequence;
}
return (left.id ?? "").localeCompare(right.id ?? "");
}
Loading
Loading