Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions apps/web/src/components/WebSocketConnectionSurface.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import {
type WsConnectionStatus,
type WsConnectionUiState,
useWsConnectionStatus,
WS_RECONNECT_MAX_ATTEMPTS,
} from "../rpc/wsConnectionState";
import { stackedThreadToast, toastManager } from "./ui/toast";
import { getPrimaryEnvironmentConnection } from "../environments/runtime";
Expand Down Expand Up @@ -42,11 +41,10 @@ function describeOfflineToast(): string {
}

function formatReconnectAttemptLabel(status: WsConnectionStatus): string {
const reconnectAttempt = Math.max(
1,
Math.min(status.reconnectAttemptCount, WS_RECONNECT_MAX_ATTEMPTS),
);
return `Attempt ${reconnectAttempt}/${status.reconnectMaxAttempts}`;
const reconnectAttempt = Math.max(1, status.reconnectAttemptCount);
return status.reconnectMaxAttempts === null
? `Attempt ${reconnectAttempt}`
: `Attempt ${Math.min(reconnectAttempt, status.reconnectMaxAttempts)}/${status.reconnectMaxAttempts}`;
}

function describeExhaustedToast(): string {
Expand Down
3 changes: 1 addition & 2 deletions apps/web/src/rpc/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import {
recordWsConnectionErrored,
recordWsConnectionOpened,
type WsConnectionMetadata,
WS_RECONNECT_MAX_RETRIES,
} from "./wsConnectionState";

export interface WsProtocolCloseContext {
Expand Down Expand Up @@ -213,7 +212,7 @@ export function createWsRpcProtocolLayer(
const socketLayer = Socket.layerWebSocket(resolvedUrl).pipe(
Layer.provide(trackingWebSocketConstructorLayer),
);
const retryPolicy = Schedule.addDelay(Schedule.recurs(WS_RECONNECT_MAX_RETRIES), (retryCount) =>
const retryPolicy = Schedule.addDelay(Schedule.forever, (retryCount) =>
Effect.succeed(Duration.millis(getWsReconnectDelayMsForRetry(retryCount) ?? 0)),
);
const protocolLayer = Layer.effect(
Expand Down
12 changes: 6 additions & 6 deletions apps/web/src/rpc/wsConnectionState.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
recordWsConnectionOpened,
resetWsConnectionStateForTests,
setBrowserOnlineStatus,
WS_RECONNECT_MAX_ATTEMPTS,
WS_RECONNECT_MAX_DELAY_MS,
} from "./wsConnectionState";

describe("wsConnectionState", () => {
Expand Down Expand Up @@ -92,16 +92,16 @@ describe("wsConnectionState", () => {
});
});

it("marks the reconnect cycle as exhausted after the final attempt fails", () => {
for (let attempt = 0; attempt < WS_RECONNECT_MAX_ATTEMPTS; attempt += 1) {
it("continues scheduling reconnect attempts with a capped delay", () => {
for (let attempt = 0; attempt < 12; attempt += 1) {
recordWsConnectionAttempt("ws://localhost:3020/ws");
recordWsConnectionErrored("Unable to connect to the T3 server WebSocket.");
}

expect(getWsConnectionStatus()).toMatchObject({
nextRetryAt: null,
reconnectAttemptCount: WS_RECONNECT_MAX_ATTEMPTS,
reconnectPhase: "exhausted",
nextRetryAt: new Date(Date.now() + WS_RECONNECT_MAX_DELAY_MS).toISOString(),
reconnectAttemptCount: 12,
reconnectPhase: "waiting",
});
});
});
12 changes: 5 additions & 7 deletions apps/web/src/rpc/wsConnectionState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ export type WsReconnectPhase = "attempting" | "exhausted" | "idle" | "waiting";
export const WS_RECONNECT_INITIAL_DELAY_MS = 1_000;
export const WS_RECONNECT_BACKOFF_FACTOR = 2;
export const WS_RECONNECT_MAX_DELAY_MS = 64_000;
export const WS_RECONNECT_MAX_RETRIES = 7;
export const WS_RECONNECT_MAX_ATTEMPTS = WS_RECONNECT_MAX_RETRIES + 1;

export interface WsConnectionStatus {
readonly attemptCount: number;
Expand All @@ -26,7 +24,7 @@ export interface WsConnectionStatus {
readonly online: boolean;
readonly phase: "idle" | "connecting" | "connected" | "disconnected";
readonly reconnectAttemptCount: number;
readonly reconnectMaxAttempts: number;
readonly reconnectMaxAttempts: number | null;
readonly reconnectPhase: WsReconnectPhase;
readonly socketUrl: string | null;
}
Expand All @@ -45,7 +43,7 @@ const INITIAL_WS_CONNECTION_STATUS = Object.freeze<WsConnectionStatus>({
online: typeof navigator === "undefined" ? true : navigator.onLine !== false,
phase: "idle",
reconnectAttemptCount: 0,
reconnectMaxAttempts: WS_RECONNECT_MAX_ATTEMPTS,
reconnectMaxAttempts: null,
reconnectPhase: "idle",
socketUrl: null,
});
Expand Down Expand Up @@ -201,7 +199,7 @@ export function useWsConnectionStatus(): WsConnectionStatus {
}

export function getWsReconnectDelayMsForRetry(retryIndex: number): number | null {
if (!Number.isInteger(retryIndex) || retryIndex < 0 || retryIndex >= WS_RECONNECT_MAX_RETRIES) {
if (!Number.isInteger(retryIndex) || retryIndex < 0) {
return null;
}

Expand All @@ -220,7 +218,7 @@ function applyDisconnectState(
): WsConnectionStatus {
const disconnectedAt = current.disconnectedAt ?? isoNow();
const nextRetryDelayMs =
current.nextRetryAt !== null || current.reconnectPhase === "exhausted"
current.nextRetryAt !== null
? null
: getWsReconnectDelayMsForRetry(Math.max(0, current.reconnectAttemptCount - 1));

Expand All @@ -235,7 +233,7 @@ function applyDisconnectState(
: new Date(Date.now() + nextRetryDelayMs).toISOString(),
phase: "disconnected",
reconnectPhase:
current.reconnectPhase === "waiting" || current.reconnectPhase === "exhausted"
current.reconnectPhase === "waiting"
? current.reconnectPhase
: nextRetryDelayMs === null
? "exhausted"
Expand Down
52 changes: 52 additions & 0 deletions apps/web/src/rpc/wsTransport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
} from "../rpc/requestLatencyState";
import {
getWsConnectionStatus,
getWsReconnectDelayMsForRetry,
getWsConnectionUiState,
resetWsConnectionStateForTests,
} from "../rpc/wsConnectionState";
Expand Down Expand Up @@ -108,6 +109,20 @@ async function waitFor(assertion: () => void, timeoutMs = 1_000): Promise<void>
}
}

async function waitForFakeTimerAssertion(assertion: () => void, timeoutMs = 1_000): Promise<void> {
let lastError: unknown;
for (let elapsedMs = 0; elapsedMs <= timeoutMs; elapsedMs += 10) {
try {
assertion();
return;
} catch (error) {
lastError = error;
await vi.advanceTimersByTimeAsync(10);
}
}
throw lastError;
}

function createTransport(...args: ConstructorParameters<typeof WsTransport>): WsTransport {
const transport = new WsTransport(...args);
transports.push(transport);
Expand Down Expand Up @@ -142,6 +157,7 @@ beforeEach(() => {
});

afterEach(async () => {
vi.useRealTimers();
await Promise.allSettled(transports.map((transport) => transport.dispose()));
transports.length = 0;
globalThis.WebSocket = originalWebSocket;
Expand Down Expand Up @@ -258,6 +274,42 @@ describe("WsTransport", () => {
await transport.dispose();
});

it("keeps reconnecting after the previous retry budget would have been exhausted", async () => {
const transport = createTransport("ws://localhost:3020");

await waitFor(() => {
expect(sockets).toHaveLength(1);
});

vi.useFakeTimers();

for (let retryIndex = 0; retryIndex < 8; retryIndex += 1) {
const socket = getSocket();
socket.error();
socket.close(1006, "server unavailable");

const retryDelayMs = getWsReconnectDelayMsForRetry(retryIndex);
if (retryDelayMs === null) {
throw new Error(`Expected reconnect delay for retry ${retryIndex}`);
}

await vi.advanceTimersByTimeAsync(retryDelayMs);
await waitForFakeTimerAssertion(() => {
expect(sockets).toHaveLength(retryIndex + 2);
});
}

expect(getWsConnectionStatus()).toMatchObject({
attemptCount: 9,
phase: "connecting",
reconnectAttemptCount: 9,
reconnectPhase: "attempting",
});

vi.useRealTimers();
await transport.dispose();
});

it("composes custom lifecycle handlers with default websocket state tracking", async () => {
const onOpen = vi.fn();
const onClose = vi.fn();
Expand Down
Loading