Skip to content
1 change: 1 addition & 0 deletions .changes/single-pc-connect-perf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
patch type="fixed" "Faster initial connect in single peer connection mode by skipping an unnecessary 20ms negotiate debounce"
41 changes: 20 additions & 21 deletions Benchmarks/LiveKitBenchmark/ConnectionBenchmark.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,19 @@ private let dWs: BenchmarkMetric = .custom("D_WS_MS", polarity: .prefersSmaller,
private let dSignal: BenchmarkMetric = .custom("D_SIGNAL_MS", polarity: .prefersSmaller, useScalingFactor: false)
private let dTransport: BenchmarkMetric = .custom("D_TRANSPORT_MS", polarity: .prefersSmaller, useScalingFactor: false)
private let dIceDtls: BenchmarkMetric = .custom("D_ICE_DTLS_MS", polarity: .prefersSmaller, useScalingFactor: false)
private let dDc: BenchmarkMetric = .custom("D_DC_MS", polarity: .prefersSmaller, useScalingFactor: false)
// `D_DC_MS` is not collected: the SDK does not block `connect()` on data channels
// opening, so a `dc_open` split would race with `splitMilliseconds` being read
// here. See spec/01-connection-time.md for the spec-defined `T_DC_OPEN`.
// private let dDc: BenchmarkMetric = .custom("D_DC_MS", polarity: .prefersSmaller, useScalingFactor: false)

let connectionBenchmarks: @Sendable () -> Void = {
// BM-CONN-001: Dual PeerConnection, subscriber-primary (default)
Benchmark(
"BM-CONN-001-DualPC-SubscriberPrimary",
configuration: .init(
metrics: .default + [dWs, dSignal, dTransport, dIceDtls, dDc],
metrics: .default + [dWs, dSignal, dTransport, dIceDtls],
timeUnits: .milliseconds,
units: [dWs: .count, dSignal: .count, dTransport: .count, dIceDtls: .count, dDc: .count],
units: [dWs: .count, dSignal: .count, dTransport: .count, dIceDtls: .count],
warmupIterations: 5,
scalingFactor: .one,
maxDuration: .seconds(300),
Expand All @@ -67,21 +70,20 @@ let connectionBenchmarks: @Sendable () -> Void = {
let s = span.splitMilliseconds
let wsOpen = s["ws_open"] ?? 0
let joinRecv = s["signal"] ?? s["join_recv"] ?? 0
let answerSent = s["answer_sent"]
// Either side may initiate SDP — answer_sent in dual PC subscriber-primary
// (server-initiated offer), offer_sent in single PC / publisher-primary.
let sdpDispatched = s["answer_sent"] ?? s["offer_sent"]
let pcConnected = s["pc_connected"] ?? 0
let dcOpen = s["dc_open"]
// let dcOpen = s["dc_open"] // see note on `dDc` above

benchmark.measurement(dWs, Int(wsOpen))
benchmark.measurement(dSignal, Int(joinRecv - wsOpen))
benchmark.measurement(dTransport, Int(pcConnected - joinRecv))

if let answerSent {
benchmark.measurement(dIceDtls, Int(pcConnected - answerSent))
}

if let dcOpen {
benchmark.measurement(dDc, Int(dcOpen - pcConnected))
if let sdpDispatched {
benchmark.measurement(dIceDtls, Int(pcConnected - sdpDispatched))
}
// if let dcOpen { benchmark.measurement(dDc, Int(dcOpen - pcConnected)) }
}

await room.disconnect()
Expand All @@ -93,9 +95,9 @@ let connectionBenchmarks: @Sendable () -> Void = {
Benchmark(
"BM-CONN-003-SinglePC",
configuration: .init(
metrics: .default + [dWs, dSignal, dTransport, dIceDtls, dDc],
metrics: .default + [dWs, dSignal, dTransport, dIceDtls],
timeUnits: .milliseconds,
units: [dWs: .count, dSignal: .count, dTransport: .count, dIceDtls: .count, dDc: .count],
units: [dWs: .count, dSignal: .count, dTransport: .count, dIceDtls: .count],
warmupIterations: 5,
scalingFactor: .one,
maxDuration: .seconds(300),
Expand All @@ -120,21 +122,18 @@ let connectionBenchmarks: @Sendable () -> Void = {
let s = span.splitMilliseconds
let wsOpen = s["ws_open"] ?? 0
let joinRecv = s["signal"] ?? s["join_recv"] ?? 0
let answerSent = s["answer_sent"]
let sdpDispatched = s["answer_sent"] ?? s["offer_sent"]
let pcConnected = s["pc_connected"] ?? 0
let dcOpen = s["dc_open"]
// let dcOpen = s["dc_open"] // see note on `dDc` above

benchmark.measurement(dWs, Int(wsOpen))
benchmark.measurement(dSignal, Int(joinRecv - wsOpen))
benchmark.measurement(dTransport, Int(pcConnected - joinRecv))

if let answerSent {
benchmark.measurement(dIceDtls, Int(pcConnected - answerSent))
}

if let dcOpen {
benchmark.measurement(dDc, Int(dcOpen - pcConnected))
if let sdpDispatched {
benchmark.measurement(dIceDtls, Int(pcConnected - sdpDispatched))
}
// if let dcOpen { benchmark.measurement(dDc, Int(dcOpen - pcConnected)) }
}

await room.disconnect()
Expand Down
19 changes: 13 additions & 6 deletions Sources/LiveKit/Core/Room+Engine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ extension Room {
}
}

func publisherShouldNegotiate() async throws {
func publisherShouldNegotiate(force: Bool = false) async throws {
log()

let publisher = try requirePublisher()
await publisher.negotiate()
try await publisher.negotiate(force: force)
_state.mutate { $0.hasPublished = true }
}

Expand Down Expand Up @@ -165,6 +165,7 @@ extension Room {
guard let self else { return }
log("Publisher onOffer with offerId: \(offerId), sdp: \(offer.sdp)")
try await signalClient.send(offer: offer, offerId: offerId)
connectSpan?.record("offer_sent")
}

// data over pub channel for backwards compatibility
Expand Down Expand Up @@ -196,10 +197,6 @@ extension Room {
_state.mutate { $0.transport = transport }

log("[Connect] Fast publish enabled: \(joinResponse.fastPublish ? "true" : "false")")
if isSinglePC || !isSubscriberPrimary || joinResponse.fastPublish {
// In single PC mode or when publisher is primary, negotiate immediately
try await publisherShouldNegotiate()
}

} else if case let .reconnect(reconnectResponse) = connectResponse {
log("[Connect] Configuring transports with RECONNECT response...")
Expand Down Expand Up @@ -286,6 +283,16 @@ extension Room {

// Resume after configuring transports...
await signalClient.resumeQueues()
try Task.checkCancellation()

// Eager publisher negotiation must run after `resumeQueues()` —
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subtle but important: moving the offer send out of Debounce.schedule { try? await action() } also surfaces errors that used to be silently swallowed (createOffer / setLocalDescription / WS send failures). Pairs with #987 (typed errors through AsyncCompleter) in covering connect-time failures.

// offers are not queueable, so sending while suspended drops them.
if case let .join(joinResponse) = connectResponse {
let isSubscriberPrimary = singlePC ? false : joinResponse.subscriberPrimary
if singlePC || !isSubscriberPrimary || joinResponse.fastPublish {
try await publisherShouldNegotiate(force: true)
}
}

// Wait for transport...
try await primaryTransportConnectedCompleter.wait(timeout: _state.connectOptions.primaryTransportConnectTimeout)
Expand Down
4 changes: 0 additions & 4 deletions Sources/LiveKit/Core/Room+TransportDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,6 @@ extension Room: TransportDelegate {
case LKRTCDataChannel.Labels.lossy: subscriberDataChannel.set(lossy: dataChannel)
default: log("Unknown data channel label \(dataChannel.label)", .warning)
}

if subscriberDataChannel.isOpen {
connectSpan?.record("dc_open")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is moved to 769386a to make it consistent with the spec, I'm not sure if we should drop this data point if that's not really awaitable from Swift.

I may re-evaluate the spec (and other SDKs).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now I'm more for dropping it...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropped in b37099b

}
}

func transportShouldNegotiate(_: Transport) {}
Expand Down
12 changes: 9 additions & 3 deletions Sources/LiveKit/Core/Transport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,15 @@ actor Transport: NSObject, Loggable {
_delegate.add(delegate: delegate)
}

func negotiate() async {
await _debounce.schedule {
try await self.createAndSendOffer()
func negotiate(force: Bool = false) async throws {
if force {
// Cancel any pending debounced negotiation; this call supersedes it.
await _debounce.cancel()
try await createAndSendOffer()
} else {
await _debounce.schedule {
try await self.createAndSendOffer()
}
}
}

Expand Down
Loading