Skip to content
1 change: 1 addition & 0 deletions .changes/typed-disconnect-error
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
patch type="fixed" "Report transport-level disconnects as LiveKitError(.network) instead of LiveKitError(.cancelled) so consumers can distinguish network failures from user-initiated cancellation"
4 changes: 2 additions & 2 deletions Sources/LiveKit/Core/DataChannelPair.swift
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
}
}

func reset() {
func reset(throwing error: Error? = nil) {
let (lossy, reliable) = _state.mutate {
let result = ($0.lossy, $0.reliable)
$0.reliable = nil
Expand All @@ -312,7 +312,7 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
lossy?.close()
reliable?.close()

openCompleter.reset()
openCompleter.reset(throwing: error)
}

// MARK: - Send
Expand Down
6 changes: 3 additions & 3 deletions Sources/LiveKit/Core/Room+Engine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ extension Room {
}

// Resets state of transports
func cleanUpRTC() async {
func cleanUpRTC(withError disconnectError: Error? = nil) async {
// Close data channels
publisherDataChannel.reset()
subscriberDataChannel.reset()
publisherDataChannel.reset(throwing: disconnectError)
subscriberDataChannel.reset(throwing: disconnectError)

await _state.transport?.close()

Expand Down
8 changes: 6 additions & 2 deletions Sources/LiveKit/Core/Room+TransportDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@ extension Room: TransportDelegate {
func transport(_ transport: Transport, didUpdateState pcState: LKRTCPeerConnectionState) {
log("target: \(transport.target), connectionState: \(pcState.description)")

let pcError: LiveKitError? = _state.connectionState.isTearingDown ? nil : LiveKitError(
.network, message: "Transport \(transport.target) state changed to \(pcState.description)"
)

// primary connected
if transport.isPrimary {
if pcState.isConnected {
primaryTransportConnectedCompleter.resume(returning: ())
} else if pcState.isDisconnected {
primaryTransportConnectedCompleter.reset()
primaryTransportConnectedCompleter.reset(throwing: pcError)
}
}

Expand All @@ -46,7 +50,7 @@ extension Room: TransportDelegate {
if pcState.isConnected {
publisherTransportConnectedCompleter.resume(returning: ())
} else if pcState.isDisconnected {
publisherTransportConnectedCompleter.reset()
publisherTransportConnectedCompleter.reset(throwing: pcError)
}
}

Expand Down
9 changes: 5 additions & 4 deletions Sources/LiveKit/Core/Room.swift
Original file line number Diff line number Diff line change
Expand Up @@ -545,15 +545,16 @@ extension Room {
log("withError: \(String(describing: disconnectError)), isFullReconnect: \(isFullReconnect)")

// Reset completers
_sidCompleter.reset()
primaryTransportConnectedCompleter.reset()
publisherTransportConnectedCompleter.reset()
_sidCompleter.reset(throwing: disconnectError)
primaryTransportConnectedCompleter.reset(throwing: disconnectError)
publisherTransportConnectedCompleter.reset(throwing: disconnectError)
await activeParticipantCompleters.reset(throwing: disconnectError)

await signalClient.cleanUp(withError: disconnectError)
// Cancel all track stats timers before closing transports to prevent
// stats collection from accessing destroyed WebRTC channels.
cancelTimers()
await cleanUpRTC()
await cleanUpRTC(withError: disconnectError)
await cleanUpParticipants(isFullReconnect: isFullReconnect)

// Cleanup for E2EE
Expand Down
4 changes: 2 additions & 2 deletions Sources/LiveKit/Core/SignalClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,9 @@ actor SignalClient: Loggable {
$0.lastJoinResponse = nil
}

_connectResponseCompleter.reset()
_connectResponseCompleter.reset(throwing: disconnectError)

await _addTrackCompleters.reset()
await _addTrackCompleters.reset(throwing: disconnectError)
await _requestQueue.clear()
await _responseQueue.clear()

Expand Down
18 changes: 11 additions & 7 deletions Sources/LiveKit/Support/Async/AsyncCompleter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ actor CompleterMapActor<T: Sendable> {
}

func resume(throwing error: any Error, for key: String) {
let completer = completer(for: key)
guard let completer = _completerMap[key] else { return }
completer.resume(throwing: error)
}

func reset() {
func reset(throwing error: Error? = nil) {
// Reset call completers...
for (_, value) in _completerMap {
value.reset()
value.reset(throwing: error)
}
// Clear all completers...
_completerMap.removeAll()
Expand All @@ -69,8 +69,8 @@ final class AsyncCompleter<T: Sendable>: @unchecked Sendable, Loggable {
let continuation: CheckedContinuation<T, Error>
let timeoutBlock: DispatchWorkItem

func cancel() {
continuation.resume(throwing: LiveKitError(.cancelled))
func cancel(throwing error: LiveKitError? = nil) {
continuation.resume(throwing: error ?? LiveKitError(.cancelled))
timeoutBlock.cancel()
}

Expand All @@ -96,6 +96,10 @@ final class AsyncCompleter<T: Sendable>: @unchecked Sendable, Loggable {

private let _lock: some Lock = createLock()

var waiterCount: Int {
_lock.sync { _entries.count }
}

init(label: String, defaultTimeout: TimeInterval) {
self.label = label
_defaultTimeout = defaultTimeout.toDispatchTimeInterval
Expand All @@ -111,10 +115,10 @@ final class AsyncCompleter<T: Sendable>: @unchecked Sendable, Loggable {
}
}

func reset() {
func reset(throwing error: Error? = nil) {
_lock.sync {
for entry in _entries.values {
entry.cancel()
entry.cancel(throwing: LiveKitError.from(error: error))
}
_entries.removeAll()
_result = nil
Expand Down
6 changes: 6 additions & 0 deletions Sources/LiveKit/Types/ConnectionState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,9 @@ extension ConnectionState: Identifiable {
rawValue
}
}

extension ConnectionState {
var isTearingDown: Bool {
self == .disconnecting || self == .disconnected
}
}
36 changes: 18 additions & 18 deletions Tests/LiveKitAudioTests/PublishDeviceOptimization.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,60 +27,60 @@ import LiveKitTestSupport

// Default publish flow
@Test func defaultMicPublish() async throws {
var sw = Stopwatch(label: "Test: Normal publish sequence")
let span = Span(label: "Test: Normal publish sequence")

let room1Opts = RoomTestingOptions(url: url, token: token, canPublish: true)
try await TestEnvironment.withRooms([room1Opts]) { rooms in
sw.split(label: "Connected to room")
span.record("Connected to room")
// Alias to Rooms
let room1 = rooms[0]
try await room1.localParticipant.setMicrophone(enabled: true)
sw.split(label: "Did publish mic")
span.record("Did publish mic")
}
sw.split(label: "Sequence complete")
print(sw)
span.record("Sequence complete")
print(span)

print("Total time: \(sw.total())")
print("Total time: \(span.total())")
}

// No-VP publish flow
@Test func noVpMicPublish() async throws {
// Turn off Apple's VP
try AudioManager.shared.setVoiceProcessingEnabled(false)

var sw = Stopwatch(label: "Test: No-VP publish sequence")
let span = Span(label: "Test: No-VP publish sequence")

let room1Opts = RoomTestingOptions(url: url, token: token, canPublish: true)
try await TestEnvironment.withRooms([room1Opts]) { rooms in
sw.split(label: "Connected to room")
span.record("Connected to room")
// Alias to Rooms
let room1 = rooms[0]
try await room1.localParticipant.setMicrophone(enabled: true)
sw.split(label: "Did publish mic")
span.record("Did publish mic")
}
sw.split(label: "Sequence complete")
print(sw)
span.record("Sequence complete")
print(span)

print("Total time: \(sw.total())")
print("Total time: \(span.total())")
}

// Concurrent device acquisition publish flow
@Test func concurrentMicPublish() async throws {
var sw = Stopwatch(label: "Test: Normal publish sequence")
let span = Span(label: "Test: Normal publish sequence")

let room1Opts = RoomTestingOptions(url: url, token: token, enableMicrophone: true, canPublish: true)
try await TestEnvironment.withRooms([room1Opts]) { rooms in
sw.split(label: "Connected to room")
span.record("Connected to room")
// Alias to Rooms
let room1 = rooms[0]
// Mic should be already enabled at this point
let isMicEnabled = room1.localParticipant.isMicrophoneEnabled()
#expect(isMicEnabled, "Mic should be enabled at this point")
sw.split(label: "Did publish mic")
span.record("Did publish mic")
}
sw.split(label: "Sequence complete")
print(sw)
span.record("Sequence complete")
print(span)

print("Total time: \(sw.total())")
print("Total time: \(span.total())")
}
}
121 changes: 121 additions & 0 deletions Tests/LiveKitCoreTests/CompleterTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,125 @@ struct CompleterTests {
print("Unknown error: \(error)")
}
}

@Test func resetThrowingPropagatesTypedError() async {
let completer = AsyncCompleter<Void>(label: "reset-throwing", defaultTimeout: 30)
let task = Task { try await completer.wait() }
await waitForRegistration(of: completer)

completer.reset(throwing: LiveKitError(.network, message: "transport failed"))

await expectLiveKitError(.network, from: task)
}

@Test func taskCancellationStillProducesCancelled() async {
let completer = AsyncCompleter<Void>(label: "task-cancel", defaultTimeout: 30)
let task = Task { try await completer.wait() }
await waitForRegistration(of: completer)

task.cancel()

await expectLiveKitError(.cancelled, from: task)
}

@Test func resetClearsResultForReuse() async throws {
let completer = AsyncCompleter<Void>(label: "reuse-after-throw", defaultTimeout: 30)

let firstTask = Task { try await completer.wait() }
await waitForRegistration(of: completer)
completer.reset(throwing: LiveKitError(.network))
_ = await firstTask.result

let secondTask = Task { try await completer.wait() }
await waitForRegistration(of: completer)
completer.resume(returning: ())
try await secondTask.value
}
}

private func waitForRegistration(of completer: AsyncCompleter<some Any>) async {
while completer.waiterCount == 0 {
await Task.yield()
}
}

private func expectLiveKitError(_ expected: LiveKitErrorType, from task: Task<some Sendable, Error>) async {
do {
_ = try await task.value
Issue.record("Expected LiveKitError(.\(expected)) to be thrown")
} catch let error as LiveKitError {
#expect(error.type == expected)
} catch {
Issue.record("Expected LiveKitError, got \(error)")
}
}

@Suite(.tags(.concurrency))
struct CompleterMapActorTests {
@Test func resetThrowingFanOutsTypedErrorToAllCompleters() async {
let map = CompleterMapActor<Void>(label: "map-test", defaultTimeout: 30)

let completerA = await map.completer(for: "a")
let completerB = await map.completer(for: "b")

let taskA = Task { try await completerA.wait() }
let taskB = Task { try await completerB.wait() }

await waitForRegistration(of: completerA)
await waitForRegistration(of: completerB)

await map.reset(throwing: LiveKitError(.network, message: "fan-out"))

await expectLiveKitError(.network, from: taskA)
await expectLiveKitError(.network, from: taskB)
}

@Test func resetWithoutErrorDefaultsToCancelled() async {
let map = CompleterMapActor<Void>(label: "map-test", defaultTimeout: 30)
let completer = await map.completer(for: "a")
let task = Task { try await completer.wait() }

await waitForRegistration(of: completer)

await map.reset()

await expectLiveKitError(.cancelled, from: task)
}

@Test func resumeThrowingForMissingKeyIsNoOp() async throws {
let map = CompleterMapActor<Void>(label: "no-op-test", defaultTimeout: 30)

// No completer for the key yet — resume(throwing:) must not auto-create.
await map.resume(throwing: LiveKitError(.participantRemoved), for: "absent")

// Subsequent wait on the same key must NOT see a stale "remembered" failure.
let completer = await map.completer(for: "absent")
let task = Task { try await completer.wait() }
await waitForRegistration(of: completer)
completer.resume(returning: ())
try await task.value
}

@Test func resumeReturningForMissingKeyRemembersSuccess() async throws {
let map = CompleterMapActor<Void>(label: "remember-success", defaultTimeout: 30)

// resume(returning:) on a missing key creates and remembers the value.
await map.resume(returning: (), for: "key")

// A later wait must see the success immediately.
let completer = await map.completer(for: "key")
try await completer.wait()
}

@Test func resumeThrowingReachesExistingWaiter() async {
let map = CompleterMapActor<Void>(label: "existing-waiter", defaultTimeout: 30)

let completer = await map.completer(for: "key")
let task = Task { try await completer.wait() }
await waitForRegistration(of: completer)

await map.resume(throwing: LiveKitError(.network), for: "key")

await expectLiveKitError(.network, from: task)
}
}
3 changes: 1 addition & 2 deletions Tests/LiveKitCoreTests/Room/RoomTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ private struct WeakRoomRefs: @unchecked Sendable {
localParticipant = room.localParticipant

for remoteParticipant in room.remoteParticipants.values {
weak var weakRP: RemoteParticipant? = remoteParticipant
remoteParticipantChecks.append { weakRP == nil }
remoteParticipantChecks.append { [weak remoteParticipant] in remoteParticipant == nil }
}

state = room._state
Expand Down
Loading