Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ open class DefaultBrowser(
logger.debug("Closing connection...")
connection?.close()
connection = null
logger.debug("Closing HTTP API client...")
http?.close()
http = null
logger.debug("Canceling coroutine scope...")
coroutineScope.cancel()
logger.info("Browser process stopped")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import dev.kdriver.cdp.*
import dev.kdriver.cdp.domain.*
import dev.kdriver.core.browser.Browser
import dev.kdriver.core.browser.Config.Defaults
import dev.kdriver.core.exceptions.ConnectionClosedException
import io.ktor.util.logging.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
Expand Down Expand Up @@ -34,6 +35,9 @@ open class DefaultConnection(

private var socketSubscription: Job? = null

private val connectMutex = Mutex()
private val prepareMutex = Mutex()

private val currentIdMutex = Mutex()
private var currentId = 0L

Expand Down Expand Up @@ -63,8 +67,13 @@ open class DefaultConnection(

private suspend fun connect() {
if (transport.isActive) return
transport.connect()
startListening()
// Guard so concurrent first commands don't each open a session (which would leak the
// duplicate sockets/listeners). Double-checked: skip the lock once connected (ISSUE-4).
connectMutex.withLock {
if (transport.isActive) return@withLock
transport.connect()
startListening()
}
}

private fun startListening() {
Expand All @@ -86,22 +95,42 @@ open class DefaultConnection(
logger.debug("WebSocket exception while receiving message: {}", e)
}
}
// incoming() completed without error => the socket was closed. Fail any in-flight
// commands so their callers observe the disconnect instead of hanging (ISSUE-3).
failPendingRequests(ConnectionClosedException())
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
e.printStackTrace()
// Handle disconnect, maybe trigger reconnect logic here
logger.error("WebSocket receive loop terminated: {}", e)
failPendingRequests(ConnectionClosedException(cause = e))
}
}
}

/**
* Completes every in-flight request waiter exceptionally and clears the registry, so callers
* parked in [callCommand] observe a failure rather than hanging when the connection goes away.
*/
private suspend fun failPendingRequests(cause: Throwable) {
val pending = pendingRequestsMutex.withLock {
val snapshot = pendingRequests.values.toList()
pendingRequests.clear()
snapshot
}
pending.forEach { it.completeExceptionally(cause) }
}

@InternalCdpApi
override suspend fun callCommand(method: String, parameter: JsonElement?, mode: CommandMode): JsonElement? {
connect()

if (mode == CommandMode.DEFAULT) owner?.let { browser ->
if (browser.config.expert) prepareExpert()
if (browser.config.headless) prepareHeadless()
// Serialize preparation so concurrent first commands run it once, not N times (ISSUE-4).
// prepare* issue ONE_SHOT commands, which skip this block, so prepareMutex isn't re-entered.
prepareMutex.withLock {
if (browser.config.expert) prepareExpert()
if (browser.config.headless) prepareHeadless()
}
}

val requestId = currentIdMutex.withLock { currentId++ }
Expand All @@ -128,6 +157,8 @@ open class DefaultConnection(
transport.close()
socketSubscription?.cancel()
socketSubscription = null
// Fail any commands still awaiting a reply that will now never come (ISSUE-3).
failPendingRequests(ConnectionClosedException("Connection closed"))
}

override suspend fun updateTarget() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class KtorWebSocketTransport(
override suspend fun close() {
session?.close()
session = null
// Close the engine-backed client too, otherwise its threads/connection pool leak across
// browser create/stop cycles (ISSUE-9).
client.close()
}

private fun parseWebSocketUrl(url: String): WebSocketInfo {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package dev.kdriver.core.exceptions

/**
* Thrown to a pending CDP command when the underlying WebSocket connection is closed (gracefully or
* by a disconnect) before its response is received.
*
* This lets callers observe a failure instead of hanging forever on a reply that will never arrive.
*/
class ConnectionClosedException(
message: String = "The connection to the browser was closed before a response was received",
cause: Throwable? = null,
) : RuntimeException(message, cause)
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package dev.kdriver.core.connection

import dev.kdriver.cdp.CommandMode
import dev.kdriver.core.exceptions.ConnectionClosedException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withTimeout
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertIs

/**
* Lifecycle behavior of [DefaultConnection]: failing in-flight commands on disconnect/close
* (ISSUE-3) and not opening duplicate sessions under concurrent first commands (ISSUE-4).
*/
class ConnectionLifecycleTest {

private class FakeTransport(
/** When set, [connect] awaits this before marking the connection active. */
private val connectGate: CompletableDeferred<Unit>? = null,
) : WebSocketTransport {
private val channel = Channel<String>(Channel.UNLIMITED)
override var isActive: Boolean = false
private set
var connectCount: Int = 0
private set

override suspend fun connect() {
connectCount++
connectGate?.await()
isActive = true
}

override suspend fun send(message: String) {
// No-op: the test controls when/whether a response or disconnect happens.
}

override fun incoming(): Flow<String> = channel.receiveAsFlow()

/** Simulate the socket dropping (no graceful close()). */
fun simulateDisconnect() = channel.close()

override suspend fun close() {
isActive = false
channel.close()
}
}

private class TestConnection(
scope: CoroutineScope,
private val transport: FakeTransport,
) : DefaultConnection("ws://stub/devtools/page/stub", scope) {
override fun createTransport(): WebSocketTransport = transport
}

@Test
fun callCommand_failsWithConnectionClosed_onDisconnect() = runTest(UnconfinedTestDispatcher()) {
val transport = FakeTransport()
val connection = TestConnection(this, transport)

// Sends, then parks awaiting a reply that never comes. Capture the outcome here rather than
// via async{}.await(), whose exception would also propagate to (and fail) the test scope.
val outcome = CompletableDeferred<Throwable>()
launch {
try {
connection.callCommand("Some.method", null, CommandMode.ONE_SHOT)
} catch (e: Throwable) {
outcome.complete(e)
}
}

// The socket drops with the command still in flight.
transport.simulateDisconnect()

assertIs<ConnectionClosedException>(withTimeout(2_000) { outcome.await() })
}

@Test
fun callCommand_failsWithConnectionClosed_onClose() = runTest(UnconfinedTestDispatcher()) {
val transport = FakeTransport()
val connection = TestConnection(this, transport)

val outcome = CompletableDeferred<Throwable>()
launch {
try {
connection.callCommand("Some.method", null, CommandMode.ONE_SHOT)
} catch (e: Throwable) {
outcome.complete(e)
}
}

connection.close()

assertIs<ConnectionClosedException>(withTimeout(2_000) { outcome.await() })
}

@Test
fun connect_opensTransportOnce_underConcurrentFirstCommands() = runTest(UnconfinedTestDispatcher()) {
val gate = CompletableDeferred<Unit>()
val transport = FakeTransport(connectGate = gate)
val connection = TestConnection(this, transport)

// Two first commands race into connect() while it's still in progress (isActive == false).
val c1 = launch { runCatching { connection.callCommand("A.x", null, CommandMode.ONE_SHOT) } }
val c2 = launch { runCatching { connection.callCommand("B.y", null, CommandMode.ONE_SHOT) } }

// Let the first connect complete; the second must reuse it, not open a new session.
gate.complete(Unit)

assertEquals(1, transport.connectCount, "connect() must open the transport exactly once")

c1.cancel()
c2.cancel()
connection.close()
}
}
Loading