Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions src/bot/commands/abort.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import { logger } from "../../utils/logger.js";
import { t } from "../../i18n/index.js";
import { foregroundSessionState } from "../../scheduled-task/foreground-state.js";
import { assistantRunState } from "../assistant-run-state.js";
import { markAttachedSessionIdle } from "../../attach/service.js";
import { clearPromptResponseMode } from "../handlers/prompt.js";
import { markUserAbortRequested } from "../utils/abort-error-suppression.js";

type SessionState = "idle" | "busy" | "not-found";

Expand All @@ -19,6 +22,13 @@ function abortLocalStreaming(): void {
clearAllInteractionState("abort_command");
}

async function releaseAbortBusyState(sessionId: string, reason: string): Promise<void> {
foregroundSessionState.markIdle(sessionId);
assistantRunState.clearRun(sessionId, reason);
await markAttachedSessionIdle(sessionId);
clearPromptResponseMode(sessionId);
}

async function pollSessionStatus(
sessionId: string,
directory: string,
Expand Down Expand Up @@ -92,6 +102,7 @@ export async function abortCurrentOperation(

const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), 5000);
markUserAbortRequested(currentSession.id);

try {
const { data: abortResult, error: abortError } = await opencodeClient.session.abort(
Expand All @@ -106,13 +117,15 @@ export async function abortCurrentOperation(

if (abortError) {
logger.warn("[Abort] Abort request failed:", abortError);
await releaseAbortBusyState(currentSession.id, "abort_unconfirmed");
if (notifyUser && chatId !== null && waitingMessageId !== null) {
await ctx.api.editMessageText(chatId, waitingMessageId, t("stop.warn_unconfirmed"));
}
return;
}

if (abortResult !== true) {
await releaseAbortBusyState(currentSession.id, "abort_maybe_finished");
if (notifyUser && chatId !== null && waitingMessageId !== null) {
await ctx.api.editMessageText(chatId, waitingMessageId, t("stop.warn_maybe_finished"));
}
Expand All @@ -126,8 +139,7 @@ export async function abortCurrentOperation(
);

if (finalStatus === "idle" || finalStatus === "not-found") {
foregroundSessionState.markIdle(currentSession.id);
assistantRunState.clearRun(currentSession.id, "abort_confirmed");
await releaseAbortBusyState(currentSession.id, "abort_confirmed");
if (notifyUser && chatId !== null && waitingMessageId !== null) {
await ctx.api.editMessageText(chatId, waitingMessageId, t("stop.success"));
}
Expand All @@ -138,6 +150,7 @@ export async function abortCurrentOperation(
}
} catch (error) {
clearTimeout(timeoutId);
await releaseAbortBusyState(currentSession.id, "abort_error");

if (error instanceof Error && error.name === "AbortError") {
if (notifyUser && chatId !== null && waitingMessageId !== null) {
Expand Down
67 changes: 53 additions & 14 deletions src/bot/commands/opencode-start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,65 @@ import { logger } from "../../utils/logger.js";
import { t } from "../../i18n/index.js";
import { editBotText } from "../utils/telegram-text.js";

const SERVER_READY_TIMEOUT_MS = 10_000;
const SERVER_READY_POLL_INTERVAL_MS = 500;
const HEALTH_CHECK_TIMEOUT_MS = 3_000;
const HEALTH_CHECK_TIMED_OUT = Symbol("health-check-timed-out");

type HealthCheckResult = Awaited<ReturnType<typeof opencodeClient.global.health>>;

async function healthWithTimeout(
timeoutMs: number = HEALTH_CHECK_TIMEOUT_MS,
): Promise<HealthCheckResult | typeof HEALTH_CHECK_TIMED_OUT> {
const controller = new AbortController();
let timeout: ReturnType<typeof setTimeout> | undefined;

try {
return await Promise.race([
opencodeClient.global.health({ signal: controller.signal }),
new Promise<typeof HEALTH_CHECK_TIMED_OUT>((resolve) => {
timeout = setTimeout(() => {
controller.abort();
resolve(HEALTH_CHECK_TIMED_OUT);
}, timeoutMs);
}),
]);
} finally {
if (timeout) {
clearTimeout(timeout);
}
}
}

async function getHealthIfAvailable(): Promise<HealthCheckResult | null> {
try {
const result = await healthWithTimeout();
if (result === HEALTH_CHECK_TIMED_OUT) {
logger.warn(`[Bot] OpenCode health check timed out after ${HEALTH_CHECK_TIMEOUT_MS}ms`);
return null;
}

return result;
} catch {
return null;
}
}

/**
* Wait for OpenCode server to become ready by polling health endpoint
* @param maxWaitMs Maximum time to wait in milliseconds
* @returns true if server became ready, false if timeout
*/
async function waitForServerReady(maxWaitMs: number = 10000): Promise<boolean> {
const startTime = Date.now();
const pollInterval = 500;

while (Date.now() - startTime < maxWaitMs) {
try {
const { data, error } = await opencodeClient.global.health();

if (!error && data?.healthy) {
return true;
}
} catch {
// Server not ready yet
const health = await getHealthIfAvailable();
if (health?.data?.healthy) {
return true;
}

await new Promise((resolve) => setTimeout(resolve, pollInterval));
await new Promise((resolve) => setTimeout(resolve, SERVER_READY_POLL_INTERVAL_MS));
}

return false;
Expand All @@ -47,9 +85,10 @@ export async function opencodeStartCommand(ctx: CommandContext<Context>) {

// Check if server is already accessible.
try {
const { data, error } = await opencodeClient.global.health();
const health = await getHealthIfAvailable();
const data = health?.data;

if (!error && data?.healthy) {
if (data?.healthy) {
await ctx.reply(
t("opencode_start.already_running", { version: data.version || t("common.unknown") }),
);
Expand Down Expand Up @@ -82,7 +121,7 @@ export async function opencodeStartCommand(ctx: CommandContext<Context>) {
childProcess.unref();

logger.info("[Bot] Waiting for OpenCode server to become ready...");
const ready = await waitForServerReady(10000);
const ready = await waitForServerReady(SERVER_READY_TIMEOUT_MS);

if (!ready) {
await editBotText({
Expand All @@ -96,7 +135,7 @@ export async function opencodeStartCommand(ctx: CommandContext<Context>) {
return;
}

const { data: health } = await opencodeClient.global.health();
const health = (await getHealthIfAvailable())?.data;
await editBotText({
api: ctx.api,
chatId: ctx.chat.id,
Expand Down
8 changes: 8 additions & 0 deletions src/bot/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ import { reconcileBusyState } from "./utils/busy-reconciliation.js";
import { finalizeAssistantResponse } from "./utils/finalize-assistant-response.js";
import { sendTtsResponseForSession } from "./utils/send-tts-response.js";
import { deliverThinkingMessage } from "./utils/thinking-message.js";
import { shouldSuppressUserAbortSessionError } from "./utils/abort-error-suppression.js";
import {
editRenderedBotPart,
getTelegramRenderedPartSignature,
Expand Down Expand Up @@ -940,6 +941,13 @@ async function ensureEventSubscription(directory: string): Promise<void> {
]);

const normalizedMessage = message.trim() || t("common.unknown_error");
if (shouldSuppressUserAbortSessionError(sessionId, normalizedMessage)) {
logger.debug(`[Bot] Suppressed user-initiated abort error: session=${sessionId}`);
foregroundSessionState.markIdle(sessionId);
await scheduledTaskRuntime.flushDeferredDeliveries();
return;
}

const truncatedMessage =
normalizedMessage.length > 3500
? `${normalizedMessage.slice(0, 3497)}...`
Expand Down
8 changes: 7 additions & 1 deletion src/bot/middleware/interaction-guard.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { Context, NextFunction } from "grammy";
import { resolveInteractionGuardDecision } from "../../interaction/guard.js";
import type { BlockReason, InteractionKind } from "../../interaction/types.js";
import { reconcileForegroundBusyState } from "../utils/busy-guard.js";
import { logger } from "../../utils/logger.js";
import { t } from "../../i18n/index.js";

Expand Down Expand Up @@ -84,7 +85,12 @@ function getInteractionBlockedMessage(
}

export async function interactionGuardMiddleware(ctx: Context, next: NextFunction): Promise<void> {
const decision = resolveInteractionGuardDecision(ctx);
let decision = resolveInteractionGuardDecision(ctx);

if (!decision.allow && decision.busy) {
await reconcileForegroundBusyState();
decision = resolveInteractionGuardDecision(ctx);
}

if (decision.allow) {
await next();
Expand Down
40 changes: 40 additions & 0 deletions src/bot/utils/abort-error-suppression.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Covers abort request timeout, post-abort status polling, and delayed SSE reconnect delivery.
const USER_ABORT_SUPPRESSION_WINDOW_MS = 90_000;

const userAbortRequestedAtBySession = new Map<string, number>();

function deleteExpiredAbortRequests(now: number = Date.now()): void {
for (const [sessionId, requestedAt] of userAbortRequestedAtBySession) {
if (now - requestedAt > USER_ABORT_SUPPRESSION_WINDOW_MS) {
userAbortRequestedAtBySession.delete(sessionId);
}
}
}

export function markUserAbortRequested(sessionId: string): void {
const now = Date.now();
deleteExpiredAbortRequests(now);
userAbortRequestedAtBySession.set(sessionId, now);
}

export function shouldSuppressUserAbortSessionError(sessionId: string, message: string): boolean {
if (message.trim().toLowerCase() !== "aborted") {
return false;
}

const requestedAt = userAbortRequestedAtBySession.get(sessionId);
if (requestedAt === undefined) {
return false;
}

userAbortRequestedAtBySession.delete(sessionId);
return Date.now() - requestedAt <= USER_ABORT_SUPPRESSION_WINDOW_MS;
}

export function __resetUserAbortErrorSuppressionForTests(): void {
userAbortRequestedAtBySession.clear();
}

export function __getUserAbortErrorSuppressionSizeForTests(): number {
return userAbortRequestedAtBySession.size;
}
31 changes: 31 additions & 0 deletions src/bot/utils/busy-guard.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,43 @@
import type { Context } from "grammy";
import { foregroundSessionState } from "../../scheduled-task/foreground-state.js";
import { attachManager } from "../../attach/manager.js";
import { reconcileBusyStateNow } from "./busy-reconciliation.js";
import { t } from "../../i18n/index.js";
import { logger } from "../../utils/logger.js";

export function isForegroundBusy(): boolean {
return foregroundSessionState.isBusy() || attachManager.isBusy();
}

function getBusyDirectories(): string[] {
const directories = new Set<string>();

for (const session of foregroundSessionState.getBusySessions()) {
directories.add(session.directory);
}

const attached = attachManager.getSnapshot();
if (attached?.busy) {
directories.add(attached.directory);
}

return [...directories];
}

export async function reconcileForegroundBusyState(): Promise<void> {
if (!isForegroundBusy()) {
return;
}

for (const directory of getBusyDirectories()) {
try {
await reconcileBusyStateNow(directory);
} catch (error) {
logger.warn("[BusyGuard] Failed to reconcile foreground busy state", error);
}
}
}

export async function replyBusyBlocked(ctx: Context): Promise<void> {
const message = t("bot.session_busy");

Expand Down
Loading
Loading