Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/neat-rabbits-draw.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"client-sdk-android": patch
---

fix: resume joinContinuation when LEAVE received during reconnect handshake to avoid reconnection loop hanging issue
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import io.livekit.android.util.Either
import io.livekit.android.util.LKLog
import io.livekit.android.util.toHttpUrl
import io.livekit.android.util.toWebsocketUrl
import io.livekit.android.util.withDeadline
import io.livekit.android.webrtc.toProtoSessionDescription
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CoroutineDispatcher
Expand Down Expand Up @@ -63,6 +64,7 @@ import javax.inject.Inject
import javax.inject.Named
import javax.inject.Singleton
import kotlin.coroutines.resumeWithException
import kotlin.time.Duration.Companion.milliseconds

/**
* SignalClient to LiveKit WS servers
Expand Down Expand Up @@ -186,17 +188,19 @@ constructor(
.addHeader("Authorization", "Bearer $token")
.build()

return suspendCancellableCoroutine { cont ->
// Wait for join response through WebSocketListener
joinContinuation = cont
cont.invokeOnCancellation {
// If the coroutine is cancelled, websocket needs to be cancelled.
// onFailure will handle cleanup.
LKLog.v { "connect cancelled, abort websocket" }
joinContinuation = null
currentWs?.cancel()
return withDeadline(SIGNAL_CONNECT_TIMEOUT.milliseconds) {
suspendCancellableCoroutine { cont ->
// Wait for join response through WebSocketListener
joinContinuation = cont
cont.invokeOnCancellation {
// If the coroutine is cancelled, websocket needs to be cancelled.
// onFailure will handle cleanup.
LKLog.v { "connect cancelled, abort websocket" }
joinContinuation = null
currentWs?.cancel()
}
currentWs = websocketFactory.newWebSocket(request, this@SignalClient)
}
currentWs = websocketFactory.newWebSocket(request, this@SignalClient)
}
}

Expand Down Expand Up @@ -691,6 +695,11 @@ constructor(
} else if (response.hasLeave()) {
// Some reconnects may immediately send leave back without a join response first.
handleSignalResponseImpl(ws, response)
val cont = joinContinuation
joinContinuation = null
cont?.resumeWithException(
RoomException.ConnectException("Received leave during reconnect: ${response.leave.reason}"),
)
} else if (isReconnecting) {
// When reconnecting, any message received means signal reconnected.
// Newer servers will send a reconnect response first
Expand Down Expand Up @@ -977,6 +986,7 @@ constructor(
// iceServer("stun:stun3.l.google.com:19302"),
// iceServer("stun:stun4.l.google.com:19302"),
)
private const val SIGNAL_CONNECT_TIMEOUT = 10000
const val CLOSE_REASON_NORMAL_CLOSURE = 1000
const val CLOSE_REASON_PING_TIMEOUT = 3000
const val CLOSE_REASON_WEBSOCKET_FAILURE = 3500
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,10 @@ class RTCEngineMockE2ETest : MockE2ETest() {
connect()
wsFactory.listener.onFailure(wsFactory.ws, Exception(), null)

testScheduler.advanceUntilIdle()
testScheduler.advanceTimeBy(1000)
wsFactory.listener.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request))
simulateMessageFromServer(TestData.RECONNECT)

val sid = wsFactory.request.url.queryParameter(SignalClient.CONNECT_QUERY_PARTICIPANT_SID)
assertEquals(TestData.JOIN.join.participant.sid, sid)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023-2025 LiveKit, Inc.
* Copyright 2023-2026 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -345,13 +345,17 @@ class RoomMockE2ETest : MockE2ETest() {
callback.onAvailable(network)
}

coroutineRule.dispatcher.scheduler.advanceUntilIdle()
coroutineRule.dispatcher.scheduler.advanceTimeBy(1000)
wsFactory.listener.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request))
simulateMessageFromServer(TestData.RECONNECT)

val events = eventCollector.stopCollecting()

assertEquals(
listOf(
ConnectionState.CONNECTED,
ConnectionState.RESUMING,
ConnectionState.CONNECTED,
),
events,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023-2025 LiveKit, Inc.
* Copyright 2023-2026 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@ import io.livekit.android.test.mock.MockDataChannel
import io.livekit.android.test.mock.MockMediaStream
import io.livekit.android.test.mock.MockRtpReceiver
import io.livekit.android.test.mock.MockVideoStreamTrack
import io.livekit.android.test.mock.SignalRequestHandler
import io.livekit.android.test.mock.TestData
import io.livekit.android.test.mock.createMediaStreamId
import io.livekit.android.test.mock.room.track.createMockLocalAudioTrack
Expand Down Expand Up @@ -197,8 +198,28 @@ class RoomReconnectionMockE2ETest : MockE2ETest() {
fun softReconnectResendsPackets() = runTest {
room.setReconnectionType(ReconnectType.FORCE_SOFT_RECONNECT)

val publisherOfferHandler: SignalRequestHandler = { request ->
if (request.hasOffer()) {
val answer = with(LivekitRtc.SignalResponse.newBuilder()) {
answer = with(LivekitRtc.SessionDescription.newBuilder()) {
sdp = "remote_answer"
type = "answer"
id = request.offer.id
build()
}
build()
}
wsFactory.receiveMessage(answer)
true
} else {
false
}
}
wsFactory.registerSignalRequestHandler(publisherOfferHandler)
connect()

val lastMessageSeq = TestData.RECONNECT.reconnect.lastMessageSeq

for (i in 1..5) {
assertTrue(room.localParticipant.publishData(ByteArray(i), reliability = DataPublishReliability.RELIABLE).isSuccess)
}
Expand All @@ -212,7 +233,8 @@ class RoomReconnectionMockE2ETest : MockE2ETest() {
val pubPeerConnection = getPublisherPeerConnection()
val pubDataChannel = pubPeerConnection.dataChannels[RTCEngine.RELIABLE_DATA_CHANNEL_LABEL] as MockDataChannel

assertEquals(5, pubDataChannel.sentBuffers.size)
val expectedResentCount = (1..5).count { it > lastMessageSeq }
assertEquals(5 + expectedResentCount, pubDataChannel.sentBuffers.size)
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import kotlinx.coroutines.async
import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.yield
import kotlinx.serialization.json.Json
import livekit.LivekitModels
import livekit.LivekitRtc
import livekit.org.webrtc.SessionDescription
import okhttp3.OkHttpClient
Expand Down Expand Up @@ -429,6 +430,77 @@ class SignalClientTest : BaseTest() {
assertTrue(originalWs.isClosed)
}

/**
* Reconnect fails cleanly when server sends LEAVE as the initial response and closes WS cleanly.
*/
@Test
fun reconnectDoesNotHangOnLeaveStateMismatchWithCleanClose() = runTest {
val joinJob = async { client.join(EXAMPLE_URL, "") }
connectWebsocketAndJoin()
joinJob.await()

wsFactory.ws.cancel()

supervisorScope {
val reconnectJob = async {
client.reconnect(EXAMPLE_URL, "", "participant_sid")
}
yield()

client.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request))
val leaveResponse = with(LivekitRtc.SignalResponse.newBuilder()) {
leave = with(LivekitRtc.LeaveRequest.newBuilder()) {
reason = LivekitModels.DisconnectReason.STATE_MISMATCH
action = LivekitRtc.LeaveRequest.Action.RECONNECT
build()
}
build()
}
wsFactory.receiveMessage(leaveResponse)

wsFactory.ws.close(1000, "normal")

val result = runCatching { reconnectJob.await() }
assertTrue("reconnect should have failed", result.isFailure)
}
}

/**
* Reconnect fails cleanly when server sends LEAVE as the initial response and drops the connection.
*/
@Test
fun reconnectDoesNotHangOnLeaveStateMismatchWithFailure() = runTest {
val joinJob = async { client.join(EXAMPLE_URL, "") }
connectWebsocketAndJoin()
joinJob.await()

wsFactory.ws.cancel()

supervisorScope {
val reconnectJob = async {
client.reconnect(EXAMPLE_URL, "", "participant_sid")
}
yield()

client.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request))
val leaveResponse = with(LivekitRtc.SignalResponse.newBuilder()) {
leave = with(LivekitRtc.LeaveRequest.newBuilder()) {
reason = LivekitModels.DisconnectReason.STATE_MISMATCH
action = LivekitRtc.LeaveRequest.Action.RECONNECT
build()
}
build()
}
wsFactory.receiveMessage(leaveResponse)

// 5. Server drops connection (unclean close)
wsFactory.ws.cancel()

val result = runCatching { reconnectJob.await() }
assertTrue("reconnect should have failed", result.isFailure)
}
}

// mock data
companion object
}