Fix message-subscription races in callCommand and expect/intercept#73
Merged
Conversation
Two hangs shared one root cause: code acted first and only then subscribed to hear the result, and because messages flow through a replay-0 shared flow, a message arriving in that window was dropped. callCommand: it sent the request and only afterwards subscribed (responses.first) to await the reply. A reply that arrived before the collector subscribed was lost and callCommand hung forever. It now registers an id -> CompletableDeferred in a pendingRequests map *before* sending; the receive loop completes the matching deferred, so the reply is captured regardless of await timing. The map is also a keyed registry (O(1) routing, and a foundation for failing in-flight requests on disconnect and per-request timeouts later) rather than N broadcast collectors. expect/intercept: BaseRequestExpectation/BaseFetchInterception enabled the domain and only then launched the event collectors (lazily), so an event fired in that window was missed and getRequestEvent() hung. They now launch the collectors with CoroutineStart.UNDISPATCHED and *before* enable(), so the subscription is established before any event can fire. To make this testable without a real browser, the WebSocket is extracted behind a WebSocketTransport interface (default KtorWebSocketTransport) injected via a protected createTransport() factory on DefaultConnection. Verified red->green deterministically (UnconfinedTestDispatcher + a fake transport that delivers the reply/event in the exact race window): - CallCommandResponseRaceTest: RED (hang/timeout) on the old responses.first path, GREEN with the waiter map. - ExpectationSubscribeRaceTest: RED (timeout) on the old enable-then-subscribe ordering, GREEN with subscribe-before-enable. Full :core:jvmTest (real Chrome) + :opentelemetry:jvmTest pass; macOS native tests pass and Linux/MinGW targets compile.
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Two hangs that share one root cause: code acts first and only then subscribes to hear the result. Because messages flow through a
replay = 0shared flow, a message arriving in that window is dropped.1.
callCommandresponse raceIt sent the request and only afterwards subscribed (
responses.first { it.id == … }) to await the reply. A reply that arrived before the collector subscribed was lost →callCommandhangs forever (no timeout).Fix: register an
id -> CompletableDeferred<Message.Response>in apendingRequestsmap (mutex-guarded) before sending; the receive loop completes the matching deferred, so the reply is captured regardless of await timing. The map is also a keyed registry — O(1) routing instead of N broadcast collectors, and a foundation for failing in-flight requests on disconnect / per-request timeouts (follow-ups).2.
expect/interceptsubscribe-after-enable raceBaseRequestExpectation/BaseFetchInterceptionenabled the domain and only then launched the event collectors (lazily), so an event fired in that window was missed →getRequestEvent()hangs.Fix: launch the collectors with
CoroutineStart.UNDISPATCHEDand beforeenable(), so the subscription is established before any event can fire.Testability seam
The WebSocket is extracted behind a
WebSocketTransportinterface (defaultKtorWebSocketTransport), injected via aprotected open fun createTransport()onDefaultConnection, so the message plumbing can be exercised without a real browser.Verification (deterministic red → green)
Both tests use
UnconfinedTestDispatcher+ a fake transport that delivers the reply/event in the exact race window, so they fail against the old code and pass against the fix (not just guard tests):CallCommandResponseRaceTest— delivers the response insidesend(): RED (timeout) onresponses.first, GREEN with the waiter map.ExpectationSubscribeRaceTest— firesrequestWillBeSentwhile handlingNetwork.enable: RED (timeout) on enable-then-subscribe, GREEN with subscribe-before-enable.Full
:core:jvmTest(real headless Chrome — incl.RequestExpectationTest,FetchInterceptionTest,BrowserTest) +:opentelemetry:jvmTestpass. macOS native tests pass; Linux/MinGW targets compile.Out of scope (tracked separately)
callCommanddeferreds on disconnect, and closing the WSHttpClient, are pre-existing limitations (the old code hung/leaked the same way). The new registry makes the disconnect fix clean — slated for the connection-lifecycle PR.Test plan
./gradlew :core:jvmTest --tests "*.CallCommandResponseRaceTest" --tests "*.ExpectationSubscribeRaceTest"— red on old code, green after./gradlew :core:jvmTest(real Chrome) — all pass./gradlew :opentelemetry:jvmTest— all pass./gradlew :core:macosArm64Test+ Linux/MinGW compile — pass