Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-scene-drops-white.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@viamrobotics/motion-tools': patch
---

Fix 3D scene going white by handling WebGL context loss and adding auto-reconnect to draw service streams
154 changes: 154 additions & 0 deletions src/lib/__tests__/retry-stream.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import { describe, expect, it, vi } from 'vitest'

import { retryStream } from '../retry-stream'

describe('retryStream', () => {
it('calls run and resolves when run succeeds', async () => {
const run = vi.fn().mockResolvedValue(undefined)
const controller = new AbortController()

// run resolves once, retryStream will call it again — abort after first call
run.mockImplementation(async () => {
controller.abort()
})

await retryStream(run, controller.signal)

expect(run).toHaveBeenCalledTimes(1)
})

it('retries when run throws', async () => {
vi.useFakeTimers()

const controller = new AbortController()
let callCount = 0

const run = vi.fn().mockImplementation(async () => {
callCount++
if (callCount < 3) {
throw new Error('stream error')
}
controller.abort()
})

const promise = retryStream(run, controller.signal)
// Advance through the backoff delays
await vi.advanceTimersByTimeAsync(1_000)
await vi.advanceTimersByTimeAsync(2_000)

await promise

expect(run).toHaveBeenCalledTimes(3)

vi.useRealTimers()
})

it('stops retrying when signal is aborted', async () => {
vi.useFakeTimers()

const controller = new AbortController()
const run = vi.fn().mockRejectedValue(new Error('stream error'))
const onRetry = vi.fn()

const promise = retryStream(run, controller.signal, onRetry)

// First call fails immediately, then waits for backoff
await vi.advanceTimersByTimeAsync(0)
expect(run).toHaveBeenCalledTimes(1)

// Abort during backoff wait
controller.abort()
await vi.advanceTimersByTimeAsync(1_000)

await promise

// Should have called onRetry once, but not retried run
expect(onRetry).toHaveBeenCalledTimes(1)
expect(run).toHaveBeenCalledTimes(1)

vi.useRealTimers()
})

it('calls onRetry with the current delay', async () => {
vi.useFakeTimers()

const controller = new AbortController()
let callCount = 0

const run = vi.fn().mockImplementation(async () => {
callCount++
if (callCount < 3) {
throw new Error('stream error')
}
controller.abort()
})

const onRetry = vi.fn()
const promise = retryStream(run, controller.signal, onRetry)

await vi.advanceTimersByTimeAsync(1_000)
await vi.advanceTimersByTimeAsync(2_000)

await promise

expect(onRetry).toHaveBeenCalledTimes(2)
expect(onRetry).toHaveBeenNthCalledWith(1, 1_000)
expect(onRetry).toHaveBeenNthCalledWith(2, 2_000)

vi.useRealTimers()
})

it('does not call onRetry and restarts immediately on clean stream end', async () => {
const controller = new AbortController()
let callCount = 0

const run = vi.fn().mockImplementation(async () => {
callCount++
if (callCount === 1) return // clean end — server closed the stream
controller.abort()
})

const onRetry = vi.fn()
await retryStream(run, controller.signal, onRetry)

expect(run).toHaveBeenCalledTimes(2)
expect(onRetry).not.toHaveBeenCalled()
})

it('resets delay after a successful run', async () => {
vi.useFakeTimers()

const controller = new AbortController()
let callCount = 0

const run = vi.fn().mockImplementation(async () => {
callCount++
// First call: fail
if (callCount === 1) throw new Error('fail')
// Second call: succeed (stream ended cleanly)
if (callCount === 2) return
// Third call: fail
if (callCount === 3) throw new Error('fail')
// Fourth call: abort
controller.abort()
})

const onRetry = vi.fn()
const promise = retryStream(run, controller.signal, onRetry)

// First failure + 1s backoff
await vi.advanceTimersByTimeAsync(1_000)
// Second call succeeds, delay resets. Third call fails, should use 1s again
await vi.advanceTimersByTimeAsync(1_000)
// Fourth call - abort
await vi.advanceTimersByTimeAsync(2_000)

await promise

// Both retries should have used 1000ms (reset after success)
expect(onRetry).toHaveBeenNthCalledWith(1, 1_000)
expect(onRetry).toHaveBeenNthCalledWith(2, 1_000)

vi.useRealTimers()
})
})
52 changes: 25 additions & 27 deletions src/lib/hooks/useDrawService.svelte.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
uuidStringToBytes,
} from '$lib/draw'
import { traits, useWorld } from '$lib/ecs'
import { retryStream } from '$lib/retry-stream'

import { useCameraControls } from './useControls.svelte'
import { useDrawConnectionConfig } from './useDrawConnectionConfig.svelte'
Expand Down Expand Up @@ -320,33 +321,34 @@ export function provideDrawService() {
}

const streamEntityChanges = async (client: Client<typeof DrawService>, signal: AbortSignal) => {
try {
for await (const response of client.streamEntityChanges({}, { signal })) {
connectionStatus = ConnectionStatus.CONNECTED

const { entity } = response
if (!entity.case) continue

const uuid = UuidTool.toString([...(entity.value.uuid ?? [])])
pendingEvents.push({
uuid,
changeType: response.changeType,
entity,
updatedFields: response.updatedFields,
})
scheduleFlush()
}
} catch (error) {
if (!signal.aborted) {
console.error('Draw service entity stream error:', error)
await retryStream(
async (sig) => {
for await (const response of client.streamEntityChanges({}, { signal: sig })) {
connectionStatus = ConnectionStatus.CONNECTED

const { entity } = response
if (!entity.case) continue

const uuid = UuidTool.toString([...(entity.value.uuid ?? [])])
pendingEvents.push({
uuid,
changeType: response.changeType,
entity,
updatedFields: response.updatedFields,
})
scheduleFlush()
}
},
signal,
() => {
connectionStatus = ConnectionStatus.DISCONNECTED
}
}
)
}

const streamSceneChanges = async (client: Client<typeof DrawService>, signal: AbortSignal) => {
try {
for await (const response of client.streamSceneChanges({}, { signal })) {
await retryStream(async (sig) => {
for await (const response of client.streamSceneChanges({}, { signal: sig })) {
const { sceneMetadata } = response
if (!sceneMetadata) continue

Expand All @@ -361,11 +363,7 @@ export function provideDrawService() {
)
}
}
} catch (error) {
if (!signal.aborted) {
console.error('Draw service scene stream error:', error)
}
}
}, signal)
}

const createRelationship = async (
Expand Down
24 changes: 16 additions & 8 deletions src/lib/hooks/useGeometries.svelte.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,19 +196,27 @@ export const provideGeometries = (partID: () => string) => {
})
}

// Clean up owners whose queries disappeared entirely
// Clean up owners whose queries disappeared entirely.
// Guard: if ALL queries are gone (activeQueryKeys empty), the machine is likely
// temporarily disconnected — preserve entities so they reappear on reconnect.
// Only destroy when the partID changed (old-partID entities) or other queries
// are still active (connected machine, resource legitimately removed).
const anyQueriesActive = activeQueryKeys.size > 0
for (const [queryKey, keys] of queryEntityKeys) {
if (!activeQueryKeys.has(queryKey)) {
for (const key of keys) {
const entity = entities.get(key)
if (entity && world.has(entity)) {
entity.destroy()
const queryPartID = queryKey.split(':')[0]!
if (queryPartID !== currentPartID || anyQueriesActive) {
for (const key of keys) {
const entity = entities.get(key)
if (entity && world.has(entity)) {
entity.destroy()
}

entities.delete(key)
}

entities.delete(key)
queryEntityKeys.delete(queryKey)
}

queryEntityKeys.delete(queryKey)
}
}
})
Expand Down
16 changes: 12 additions & 4 deletions src/lib/hooks/usePointclouds.svelte.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,21 @@ export const providePointclouds = (partID: () => string) => {
})
}

// clean up queries that disappeared entirely
// clean up queries that disappeared entirely.
// Guard: if ALL queries are gone (activeQueryKeys empty), the machine is likely
// temporarily disconnected — preserve entities so they reappear on reconnect.
// Only destroy when the partID changed (old-partID entities) or other queries
// are still active (connected machine, camera legitimately removed).
const anyQueriesActive = activeQueryKeys.size > 0
for (const [queryKey, entity] of entities) {
if (!activeQueryKeys.has(queryKey)) {
if (world.has(entity)) {
entity.destroy()
const queryPartID = queryKey.split(':')[0]!
if (queryPartID !== currentPartID || anyQueriesActive) {
if (world.has(entity)) {
entity.destroy()
}
entities.delete(queryKey)
}
entities.delete(queryKey)
}
}
})
Expand Down
51 changes: 51 additions & 0 deletions src/lib/retry-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
const INITIAL_DELAY_MS = 1_000
const MAX_DELAY_MS = 30_000

/**
* Calls `run` in a loop, retrying with exponential backoff when it throws.
* - Clean stream end (server closed it): restarts immediately, delay resets.
* - Error: calls `onRetry`, waits with exponential backoff, then retries.
* Stops when the signal is aborted.
*/
export const retryStream = async (
run: (signal: AbortSignal) => Promise<void>,
signal: AbortSignal,
onRetry?: (delay: number) => void
): Promise<void> => {
let delay = INITIAL_DELAY_MS

while (!signal.aborted) {
let errored = false
try {
await run(signal)
// Stream ended cleanly (server closed it) — restart immediately.
delay = INITIAL_DELAY_MS
} catch (error) {
if (signal.aborted) return
errored = true
console.warn('Stream error, retrying in', delay, 'ms:', error)
}

if (signal.aborted) return

if (errored) {
onRetry?.(delay)
await sleep(delay, signal)
delay = Math.min(delay * 2, MAX_DELAY_MS)
}
}
}

const sleep = (ms: number, signal: AbortSignal): Promise<void> => {
return new Promise((resolve) => {
const timer = setTimeout(resolve, ms)
signal.addEventListener(
'abort',
() => {
clearTimeout(timer)
resolve()
},
{ once: true }
)
})
}
Loading