Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions src/App/src/pages/PlanPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -182,19 +182,37 @@ const PlanPage: React.FC = () => {
if (!planApprovalRequest) return;
dispatch(setProcessingApproval(true));
const id = showToast('Submitting Approval', 'progress');
try {
await apiService.approvePlan({

const submitApproval = () =>
apiService.approvePlan({
m_plan_id: planApprovalRequest.id,
plan_id: planData?.plan?.id ?? '',
approved: true,
feedback: 'Plan approved by user',
});

try {
await submitApproval();
dismissToast(id);
/* P0: single compound action replaces 3 separate dispatches */
dispatch(planApprovalAccepted());
} catch {
dismissToast(id);
showToast('Failed to submit approval', 'error');
} catch (firstError) {
// Approval failed — the backend may have timed out or the WS dropped.
// Reconnect the WebSocket and retry once before giving up.
try {
if (!webSocketService.isConnected() && planData?.plan?.id) {
await webSocketService.connect(planData.plan.id);
}
await submitApproval();
dismissToast(id);
dispatch(planApprovalAccepted());
} catch {
dismissToast(id);
showToast(
'Failed to submit approval. The plan may have timed out — please start a new task and try again.',
'error',
);
}
} finally {
dispatch(setProcessingApproval(false));
}
Expand Down
31 changes: 31 additions & 0 deletions src/App/src/store/WebSocketService.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class WebSocketService {
private listeners: Map<string, Set<(message: StreamMessage) => void>> = new Map();
private planSubscriptions: Set<string> = new Set();
private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
private keepaliveTimer: ReturnType<typeof setInterval> | null = null;
private isConnecting = false;
private intentionalDisconnect = false;
private lastPlanId: string | undefined;
Expand Down Expand Up @@ -59,6 +60,7 @@ class WebSocketService {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
this.startKeepalive();
this.emit('connection_status', { connected: true });
resolve();
};
Expand All @@ -75,6 +77,7 @@ class WebSocketService {
this.ws.onclose = (event) => {
this.isConnecting = false;
this.ws = null;
this.stopKeepalive();
this.emit('connection_status', { connected: false });
/* P1: Only auto-reconnect if not intentional and not a clean close */
if (!this.intentionalDisconnect && event.code !== 1000 &&
Expand All @@ -99,6 +102,7 @@ class WebSocketService {

disconnect(): void {
this.intentionalDisconnect = true;
this.stopKeepalive();
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
Expand Down Expand Up @@ -197,6 +201,11 @@ class WebSocketService {
}

private handleMessage(message: StreamMessage): void {
// Ignore keepalive ping/pong messages from the server
const msgType = message.type as string;
if (msgType === 'ping' || msgType === 'pong') {
return;
}

switch (message.type) {
case WebsocketMessageType.PLAN_APPROVAL_REQUEST: {
Expand Down Expand Up @@ -277,6 +286,28 @@ class WebSocketService {
}
}

private startKeepalive(): void {
this.stopKeepalive();
// Send a ping every 30 seconds to prevent idle timeout from
// Azure Container Apps / reverse proxies closing the WebSocket.
this.keepaliveTimer = setInterval(() => {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
try {
this.ws.send(JSON.stringify({ type: 'pong' }));
} catch {
// If send fails, the onclose handler will trigger reconnect
}
}
}, 30_000);
}

private stopKeepalive(): void {
if (this.keepaliveTimer) {
clearInterval(this.keepaliveTimer);
this.keepaliveTimer = null;
}
}

private attemptReconnect(): void {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
this.emit('error', { error: 'Max reconnection attempts reached' });
Expand Down
20 changes: 13 additions & 7 deletions src/backend/v4/api/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,23 @@ async def start_comms(
# Keep the connection open - FastAPI will close the connection if this returns
try:
# Keep the connection open - FastAPI will close the connection if this returns
# Send periodic pings to prevent idle timeout from Azure infra / reverse proxies.
ping_interval = 30 # seconds
while True:
# no expectation that we will receive anything from the client but this keeps
# the connection open and does not take cpu cycle
try:
message = await websocket.receive_text()
message = await asyncio.wait_for(
websocket.receive_text(), timeout=ping_interval
)
logging.debug(f"Received WebSocket message from {user_id}: {message}")
except asyncio.TimeoutError:
# Ignore timeouts to keep the WebSocket connection open, but avoid a tight loop.
logging.debug(
f"WebSocket receive timeout for user {user_id}, process {process_id}"
)
await asyncio.sleep(0.1)
# No message received within ping_interval — send a ping to keep alive.
try:
await websocket.send_text('{"type":"ping"}')
except Exception:
logging.info(f"Ping failed for {user_id}/{process_id}, connection likely closed")
break
except WebSocketDisconnect:
dc_props = {"process_id": process_id, "user_id": user_id}
if session_id:
Expand Down Expand Up @@ -537,7 +542,8 @@ async def plan_approval(
human_feedback.m_plan_id
)
raise HTTPException(
status_code=404, detail="No active plan found for approval"
status_code=404,
detail="No active plan found for approval. The plan may have timed out due to inactivity. Please start a new task.",
)
except Exception as e:
logging.error(f"Error processing plan approval: {e}")
Expand Down
4 changes: 2 additions & 2 deletions src/backend/v4/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ def __init__(self):
self._approval_events: Dict[str, asyncio.Event] = {}
self._clarification_events: Dict[str, asyncio.Event] = {}

# Default timeout (seconds) for waiting operations
self.default_timeout: float = 300.0
# Default timeout (seconds) for waiting operations.
self.default_timeout: float = 1800.0

def get_current_orchestration(self, user_id: str) -> Any:
"""Get existing orchestration workflow instance for user_id."""
Expand Down
2 changes: 1 addition & 1 deletion src/tests/backend/v4/config/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def test_orchestration_config_creation(self):
self.assertEqual(config.max_rounds, 20)
self.assertIsInstance(config._approval_events, dict)
self.assertIsInstance(config._clarification_events, dict)
self.assertEqual(config.default_timeout, 300.0)
self.assertEqual(config.default_timeout, 1800.0)

def test_get_current_orchestration(self):
"""Test getting current orchestration."""
Expand Down