Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ SLACK_SOCKET_OPEN_URL=apps.connections.open
SLACK_INITIAL_THREAD_HISTORY_COUNT=8
SLACK_HISTORY_API_MAX_LIMIT=50
SLACK_ACTIVE_TURN_RECONCILE_INTERVAL_MS=15000
SLACK_STALE_ACTIVE_TURN_AFTER_MS=1800000
SLACK_PROGRESS_REMINDER_AFTER_MS=120000
SLACK_PROGRESS_REMINDER_REPEAT_MS=120000

Expand Down
44 changes: 44 additions & 0 deletions docs/proposals/active-turn-watchdog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Active Turn Watchdog Plan

## Goal

Recover Slack sessions whose broker state still points at an active agent turn even though the agent runtime has stopped producing trace activity. The broker should not leave new Slack messages permanently inflight behind a stale `activeTurnId` after an upstream stream disconnect or hung runtime turn.

## Current state

- Slack sessions persist `agentSessionId`, `activeTurnId`, and `activeTurnStartedAt`.
- The active-turn reconciler periodically reads the runtime snapshot for persisted active turns.
- If the runtime snapshot is `completed`, `interrupted`, or `failed`, the reconciler clears the active turn and marks or resets the inbound batch.
- If the runtime snapshot is `inProgress` or `unknown`, the reconciler always retains the turn.
- If the runtime stream disconnects without a terminal result and the runtime snapshot remains `inProgress`, the broker can keep the turn active indefinitely. Later Slack messages are joined to or queued behind that stale turn and never get a fresh dispatch.
- Manual session reset endpoints already exist, but this failure mode should recover automatically.

## Proposed changes

- Add a configurable stale-active-turn threshold:
- `SLACK_STALE_ACTIVE_TURN_AFTER_MS`
- default: `1800000` (30 minutes)
- values `<= 0` disable the watchdog
- Extend active-turn reconciliation for `inProgress`/`unknown` snapshots:
- compute the latest broker-observed activity for the active turn from `activeTurnStartedAt` and persisted agent trace events for the same `turnId`;
- ignore later Slack inbound-message timestamps as activity, because those can be user follow-ups that were blocked behind the stale turn;
- retain the turn if recent trace activity exists;
- when the latest activity exceeds the threshold, interrupt the runtime turn when possible, reset that turn's inflight inbound batch to pending, clear the broker `activeTurnId`, and let the existing pending-dispatch recovery path resume the work.
- Keep startup missing-turn recovery behavior unchanged: a missing snapshot is only treated as stale when the startup path explicitly asks for that.
- Document the new environment variable in `.env.example`.

## If we do not change it

A single lost runtime stream can leave a Slack thread wedged until a human notices and manually resets the session. Follow-up messages can make the thread look active while the broker still has no terminal signal to resume dispatch.

## After the change

A long-silent active turn is converted back into pending Slack work automatically. The existing queue drain then starts a new agent turn for the same Slack message batch, so the user sees progress without a manual broker reset.

## Acceptance criteria

- Recent `inProgress` turns are retained.
- Long-silent `inProgress` turns with an inflight batch are interrupted/reset/cleared.
- Runtime interrupt failure does not block broker recovery.
- The threshold is configurable and documented.
- Targeted automated tests and a manual validation command pass.
2 changes: 2 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export interface AppConfig {
readonly slackHistoryApiMaxLimit: number;
readonly slackActiveTurnReconcileIntervalMs: number;
readonly slackMissedThreadRecoveryIntervalMs: number;
readonly slackStaleActiveTurnAfterMs: number;
readonly stateDir: string;
readonly jobsRoot: string;
readonly sessionsRoot: string;
Expand Down Expand Up @@ -193,6 +194,7 @@ export function loadConfig(env = process.env): AppConfig {
"SLACK_MISSED_THREAD_RECOVERY_INTERVAL_MS",
5 * 60_000
),
slackStaleActiveTurnAfterMs: getNumber(env, "SLACK_STALE_ACTIVE_TURN_AFTER_MS", 30 * 60_000),
stateDir,
jobsRoot,
sessionsRoot,
Expand Down
3 changes: 2 additions & 1 deletion src/services/slack/slack-conversation-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ export class SlackConversationService {
this.#turnReconciler = new SlackTurnReconciler({
sessions: this.#sessions,
turnRunner: this.#turnRunner,
inboundStore: this.#inboundStore
inboundStore: this.#inboundStore,
staleActiveTurnAfterMs: this.#config.slackStaleActiveTurnAfterMs
});
this.#agentRuntimeEventHandler = (event) => {
this.#handleAgentRuntimeEvent(event);
Expand Down
113 changes: 112 additions & 1 deletion src/services/slack/slack-turn-reconciler.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,30 @@
import { logger } from "../../logger.js";
import type { SlackSessionRecord } from "../../types.js";
import type { PersistedAgentTraceEvent, SlackSessionRecord } from "../../types.js";
import { SessionManager } from "../session-manager.js";
import { SlackInboundStore } from "./slack-inbound-store.js";
import { SlackTurnRunner } from "./slack-turn-runner.js";

const DEFAULT_TRACE_ACTIVITY_LOOKBACK = 500;

export class SlackTurnReconciler {
readonly #sessions: SessionManager;
readonly #turnRunner: SlackTurnRunner;
readonly #inboundStore: SlackInboundStore;
readonly #staleActiveTurnAfterMs: number;
readonly #now: () => Date;

constructor(options: {
readonly sessions: SessionManager;
readonly turnRunner: SlackTurnRunner;
readonly inboundStore: SlackInboundStore;
readonly staleActiveTurnAfterMs?: number | undefined;
readonly now?: (() => Date) | undefined;
}) {
this.#sessions = options.sessions;
this.#turnRunner = options.turnRunner;
this.#inboundStore = options.inboundStore;
this.#staleActiveTurnAfterMs = options.staleActiveTurnAfterMs ?? 0;
this.#now = options.now ?? (() => new Date());
}

async reconcileSingleActiveTurn(
Expand Down Expand Up @@ -62,6 +70,34 @@ export class SlackTurnReconciler {
}

if (snapshot.status === "inProgress" || snapshot.status === "unknown") {
const stale = this.#getStaleActiveTurn(session, activeTurnId);
if (stale) {
logger.warn("Resetting stale active agent turn after trace inactivity", {
sessionKey: session.key,
turnId: activeTurnId,
snapshotStatus: snapshot.status,
staleForMs: stale.staleForMs,
staleAfterMs: this.#staleActiveTurnAfterMs,
lastActivityAt: stale.lastActivityAt
});
try {
await this.#turnRunner.interrupt(hydratedSession);
} catch (error) {
logger.warn("Failed to interrupt stale active agent turn before broker reset", {
sessionKey: session.key,
turnId: activeTurnId,
error: error instanceof Error ? error.message : String(error)
});
}
await this.#inboundStore.resetTurnBatchToPending(hydratedSession, activeTurnId);
await this.#sessions.setActiveTurnId(
hydratedSession.channelId,
hydratedSession.rootThreadTs,
undefined
);
return "cleared";
}

return "retained";
}

Expand All @@ -84,4 +120,79 @@ export class SlackTurnReconciler {
);
return "cleared";
}

#getStaleActiveTurn(session: SlackSessionRecord, turnId: string): {
readonly lastActivityAt: string;
readonly staleForMs: number;
} | null {
if (this.#staleActiveTurnAfterMs <= 0) {
return null;
}

const lastActivityMs = this.#getLatestActiveTurnActivityMs(session, turnId);
if (lastActivityMs === undefined) {
return null;
}

const nowMs = this.#now().getTime();
if (!Number.isFinite(nowMs)) {
return null;
}

const staleForMs = nowMs - lastActivityMs;
if (staleForMs < this.#staleActiveTurnAfterMs) {
return null;
}

return {
lastActivityAt: new Date(lastActivityMs).toISOString(),
staleForMs
};
}

#getLatestActiveTurnActivityMs(session: SlackSessionRecord, turnId: string): number | undefined {
let latest = parseIsoTimestampMs(session.activeTurnStartedAt);
const traceEvents = this.#sessions.listAgentTraceEventsPage(session.key, {
limit: DEFAULT_TRACE_ACTIVITY_LOOKBACK
}).events;

for (const event of traceEvents) {
if (event.turnId !== turnId) {
continue;
}

latest = maxTimestampMs(latest, latestTraceEventTimestampMs(event));
}

return latest;
}
}

function latestTraceEventTimestampMs(event: PersistedAgentTraceEvent): number | undefined {
return maxTimestampMs(
maxTimestampMs(parseIsoTimestampMs(event.at), parseIsoTimestampMs(event.updatedAt)),
parseIsoTimestampMs(event.createdAt)
);
}

function maxTimestampMs(
left: number | undefined,
right: number | undefined
): number | undefined {
if (left === undefined) {
return right;
}
if (right === undefined) {
return left;
}
return Math.max(left, right);
}

function parseIsoTimestampMs(value: string | undefined): number | undefined {
if (!value) {
return undefined;
}

const parsed = Date.parse(value);
return Number.isFinite(parsed) ? parsed : undefined;
}
11 changes: 11 additions & 0 deletions test/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ describe("loadConfig", () => {
expect(config.slackHistoryApiMaxLimit).toBe(50);
expect(config.slackActiveTurnReconcileIntervalMs).toBe(15_000);
expect(config.slackMissedThreadRecoveryIntervalMs).toBe(5 * 60_000);
expect(config.slackStaleActiveTurnAfterMs).toBe(30 * 60_000);
expect(config.logLevel).toBe("info");
expect(config.logRawSlackEvents).toBe(true);
expect(config.logRawCodexRpc).toBe(true);
Expand Down Expand Up @@ -169,6 +170,16 @@ describe("loadConfig", () => {
expect(config.defaultGitHubToken).toBe("default-token");
});

it("loads an explicit stale active turn watchdog threshold", () => {
const config = loadConfig({
SLACK_APP_TOKEN: "xapp-test",
SLACK_BOT_TOKEN: "xoxb-test",
SLACK_STALE_ACTIVE_TURN_AFTER_MS: "600000"
} as NodeJS.ProcessEnv);

expect(config.slackStaleActiveTurnAfterMs).toBe(600_000);
});

it("parses disk cleanup configuration", () => {
const config = loadConfig({
SLACK_APP_TOKEN: "xapp-test",
Expand Down
Loading
Loading