Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 37 additions & 32 deletions web/src/hooks/useSessionStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,13 @@ export function useSessionStream(
const disconnectRef = useRef<() => void>(() => undefined);
const reconnectRef = useRef<() => void>(() => undefined);
const resetStateRef = useRef<(preserveSlashCommands?: boolean) => void>(() => undefined);
// Refs for callbacks used inside connect() — keeps connect() stable across re-renders
const handleMessageRef = useRef<(data: string) => void>(() => undefined);
const onErrorRef = useRef<((error: Error) => void) | undefined>(undefined);
const sendInitializeRef = useRef<(ws: WebSocket) => void>(() => undefined);
const sendPendingMessageRef = useRef<(ws: WebSocket) => void>(() => undefined);
const getWebSocketUrlRef = useRef<(sid: string) => string>(() => "");
const completeStreamingMessagesRef = useRef<() => void>(() => undefined);
const historyCompleteTimeoutRef = useRef<number | null>(null);
const isReplayingRef = useRef(true); // Track if we're still replaying history
const pendingMessageRef = useRef<string | null>(null); // Message to send after connection
Expand Down Expand Up @@ -2221,6 +2228,9 @@ export function useSessionStream(
);

// Connect to WebSocket
// Uses refs for all callback dependencies to keep this callback stable and
// prevent reconnection storms caused by callback identity changes cascading
// through connect → reconnect → sendMessage → queue auto-send effects.
const connect = useCallback(() => {
if (!sessionId) return;

Expand All @@ -2238,12 +2248,12 @@ export function useSessionStream(
}

awaitingIdleRef.current = false;
resetState(true); // preserve slashCommands on reconnect
resetStateRef.current(true); // preserve slashCommands on reconnect
setMessages([]);
setStatus("submitted");
setAwaitingFirstResponse(Boolean(pendingMessageRef.current));

const wsUrl = getWebSocketUrl(sessionId);
const wsUrl = getWebSocketUrlRef.current(sessionId);

try {
const ws = new WebSocket(wsUrl);
Expand Down Expand Up @@ -2291,10 +2301,10 @@ export function useSessionStream(
watchdogIntervalRef.current = watchdogIntervalId;

// Send initialize message to get slash commands
sendInitialize(ws);
sendInitializeRef.current(ws);

// Send pending message immediately after connection
sendPendingMessage(ws);
sendPendingMessageRef.current(ws);
};

ws.onmessage = (event) => {
Expand All @@ -2303,7 +2313,7 @@ export function useSessionStream(
}

lastWsMessageTimeRef.current = Date.now();
handleMessage(event.data);
handleMessageRef.current(event.data);
};

ws.onerror = (event) => {
Expand All @@ -2314,7 +2324,7 @@ export function useSessionStream(
console.error("[SessionStream] WebSocket error:", event);
const err = new Error("WebSocket connection error");
setError(err);
onError?.(err);
onErrorRef.current?.(err);
setAwaitingFirstResponse(false);
awaitingIdleRef.current = false;
pendingMessageRef.current = null; // Clear pending message on error
Expand Down Expand Up @@ -2343,40 +2353,29 @@ export function useSessionStream(
if (event.code === 4004) {
const err = new Error("Session not found");
setError(err);
onError?.(err);
onErrorRef.current?.(err);
} else if (event.code === 4029) {
const err = new Error("Too many concurrent sessions");
setError(err);
onError?.(err);
onErrorRef.current?.(err);
}

// Mark all streaming/subagent messages as complete
completeStreamingMessages();
completeStreamingMessagesRef.current();
setStatus("ready");
};
} catch (err) {
console.error("[SessionStream] Failed to connect:", err);
const connectionError =
err instanceof Error ? err : new Error(String(err));
setError(connectionError);
onError?.(connectionError);
onErrorRef.current?.(connectionError);
awaitingIdleRef.current = false;
setAwaitingFirstResponse(false);
setStatus("error");
pendingMessageRef.current = null; // Clear pending message on error
}
}, [
sessionId,
resetState,
setMessages,
getWebSocketUrl,
handleMessage,
onError,
sendInitialize,
sendPendingMessage,
setAwaitingFirstResponse,
completeStreamingMessages,
]);
}, [sessionId, setMessages, setAwaitingFirstResponse]);

// Send cancel message to server
// Disconnect
Expand Down Expand Up @@ -2414,8 +2413,8 @@ export function useSessionStream(
}

// Mark all streaming/subagent messages as complete
completeStreamingMessages();
}, [completeStreamingMessages, setAwaitingFirstResponse, setMessages]);
completeStreamingMessagesRef.current();
}, [setAwaitingFirstResponse, setMessages]);

// Send cancel request or disconnect if stream not ready
const cancel = useCallback(() => {
Expand Down Expand Up @@ -2476,7 +2475,7 @@ export function useSessionStream(
return msg;
}),
);
disconnect();
disconnectRef.current();
return;
}

Expand Down Expand Up @@ -2551,23 +2550,29 @@ export function useSessionStream(
} catch (err) {
console.error("[SessionStream] Failed to send cancel request:", err);
}
}, [status, disconnect, setAwaitingFirstResponse, setMessages]);
}, [status, setAwaitingFirstResponse, setMessages]);

// Reconnect
const reconnect = useCallback(() => {
disconnect();
disconnectRef.current();
// Small delay before reconnecting
reconnectTimeoutRef.current = window.setTimeout(() => {
connect();
connectRef.current();
}, 100);
}, [disconnect, connect]);
}, []);

// Keep refs in sync so useLayoutEffect can use stable references
// Keep refs in sync so callbacks and useLayoutEffect can use stable references
connectRef.current = connect;
disconnectRef.current = disconnect;
reconnectRef.current = reconnect;
resetStateRef.current = resetState;
statusRef.current = status;
handleMessageRef.current = handleMessage;
onErrorRef.current = onError;
sendInitializeRef.current = sendInitialize;
sendPendingMessageRef.current = sendPendingMessage;
getWebSocketUrlRef.current = getWebSocketUrl;
completeStreamingMessagesRef.current = completeStreamingMessages;

// Send message to session (auto-connects if not connected)
const sendMessage = useCallback(
Expand All @@ -2584,7 +2589,7 @@ export function useSessionStream(
}

pendingMessageRef.current = trimmedText;
connect();
connectRef.current();
return;
}

Expand All @@ -2602,7 +2607,7 @@ export function useSessionStream(
awaitingIdleRef.current = false;
setStatus("streaming");
},
[sessionId, connect, setAwaitingFirstResponse],
[sessionId, setAwaitingFirstResponse],
);

// Clear messages
Expand Down
Loading