Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 80 additions & 46 deletions Sources/LiveKit/Core/DataChannelPair.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ protocol DataChannelDelegate: AnyObject, Sendable {
/// and serializes all outgoing sends through a private FIFO event loop.
///
/// ## Send flow
/// `send(dataPacket:)` builds a `PublishDataRequest` and yields a `.publishData`
/// `send(dataPacket:)` builds a `PublishDataRequest` and yields a `.sendRequested`
/// event onto the internal `AsyncStream`. The event loop's single observer is the
/// only mutator of `Buffers` (lossy / reliable send buffers + reliable retry
/// buffer), so all bookkeeping is race-free without locks.
Expand Down Expand Up @@ -66,7 +66,7 @@ protocol DataChannelDelegate: AnyObject, Sendable {
/// ## Permanent teardown
/// `reset(throwing:)` clears channel references, yields a `.drain(error)`
/// event, then resets the completer. The drain runs through the same FIFO
/// stream, so it's ordered after any `.publishData` enqueues already in
/// stream, so it's ordered after any `.sendRequested` enqueues already in
/// flight from concurrent callers. Every parked request's continuation is
/// resumed with `error` (or `LiveKitError(.cancelled)` if `nil`), so no
/// continuation leaks across a disconnect / full reconnect.
Expand All @@ -93,15 +93,14 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {

var isOpen: Bool { _state.isOpen }

var e2eeManager: E2EEManager?

// MARK: - Private

private struct State {
var lossy: LKRTCDataChannel?
var reliable: LKRTCDataChannel?
var reliableDataSequence: UInt32 = 1
var reliableReceivedState: TTLDictionary<String, UInt32> = TTLDictionary(ttl: reliableReceivedStateTTL)
var e2eeManager: E2EEManager?

var isOpen: Bool {
guard let lossy, let reliable else { return false }
Expand Down Expand Up @@ -195,16 +194,41 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
let detail: Detail

enum Detail {
case publishData(PublishDataRequest)
case publishedData(PublishDataRequest)
/// Yielded by `send(dataPacket:)`. Consumer enqueues the request
/// into the matching `SendBuffer` and tries to flush.
case sendRequested(PublishDataRequest)

/// Yielded by `processSendQueue` after a successful
/// `channel.sendData`. Consumer copies reliable requests into the
/// retry buffer for potential SCTP-level replay on resume;
/// lossy requests are a no-op (they aren't replayed).
case sendDispatched(PublishDataRequest)

/// Yielded by `LKRTCDataChannelDelegate.didChangeBufferedAmount`
/// when WebRTC reports its outbound buffer has drained. Consumer
/// updates `SendBuffer.rtcAmount` (the backpressure target) and,
/// on the reliable channel, trims the retry buffer down to the
/// new bound so only un-acked bytes are retained.
case bufferedAmountChanged(UInt64)

/// Yielded by `retryReliable(lastSequence:)` when the server
/// asks the client to replay packets after a reconnect resume.
/// Consumer re-enqueues every retry-buffer entry whose sequence
/// is greater than `lastSeq` as a fresh `.sendRequested`.
case retryRequested(UInt32)
/// A data channel transitioned into `.open`; flush both send buffers
/// so requests parked while `channel(for:)` returned `nil` can ship.

/// Yielded when channel readiness *may* have improved
/// (`set(reliable:)`, `set(lossy:)`, `dataChannelDidChangeState`).
/// The case itself is a no-op; the common flush at the end of
/// `processEvent` re-runs `processSendQueue` to ship anything
/// that was parked while `channel(for:)` returned `nil`.
case wakeup
/// Fail every parked send-buffer request with `error`; yielded by
/// `reset(throwing:)` so callers don't hang on continuations queued
/// for now-discarded channels.

/// Yielded by `reset(throwing:)` to permanently fail every
/// parked send-buffer request with `error` (or
/// `LiveKitError(.cancelled)` when `nil`). Routed through the
/// stream so it's ordered after any in-flight `.sendRequested`
/// enqueues from concurrent callers.
case drain(Error?)
}
}
Expand All @@ -214,12 +238,12 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
// swiftlint:disable:next cyclomatic_complexity
private func processEvent(_ event: ChannelEvent, buffers: inout Buffers) {
switch event.detail {
case let .publishData(request):
case let .sendRequested(request):
switch event.channelKind {
case .lossy: buffers.lossyBuffer.enqueue(request)
case .reliable: buffers.reliableBuffer.enqueue(request)
}
case let .publishedData(request):
case let .sendDispatched(request):
switch event.channelKind {
case .lossy: ()
case .reliable: buffers.reliableRetryBuffer.enqueue(request)
Expand Down Expand Up @@ -280,22 +304,24 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
buffer.enqueueFront(request)
return
}
buffer.rtcAmount += UInt64(request.data.data.count)

guard channel.sendData(request.data) else {
request.continuation?.resume(
throwing: LiveKitError(.invalidState, message: "sendData failed")
)
return
}
// Bytes are now in WebRTC's SCTP queue; account for them so the
// backpressure check below kicks in for subsequent iterations.
buffer.rtcAmount += UInt64(request.data.data.count)
request.continuation?.resume()

let event = ChannelEvent(channelKind: kind, detail: .publishedData(request))
let event = ChannelEvent(channelKind: kind, detail: .sendDispatched(request))
eventContinuation.yield(event)
}
}

// MARK: - Cache
// MARK: - Buffer helpers

private func updateTarget(
buffer: inout SendBuffer,
Expand All @@ -319,7 +345,7 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
while let request = buffer.dequeue() {
assert(request.continuation == nil, "Continuation may fire multiple times while retrying causing crash")
if request.sequence > lastSeq {
let event = ChannelEvent(channelKind: .reliable, detail: .publishData(request))
let event = ChannelEvent(channelKind: .reliable, detail: .sendRequested(request))
eventContinuation.yield(event)
}
}
Expand Down Expand Up @@ -349,23 +375,19 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
}

func set(reliable channel: LKRTCDataChannel?) {
let isOpen = _state.mutate {
$0.reliable = channel
return $0.isOpen
}

channel?.delegate = self

if isOpen {
// Wake parked sends — pairs with `dataChannelDidChangeState`.
openCompleter.resume(returning: ())
eventContinuation.yield(ChannelEvent(channelKind: .reliable, detail: .wakeup))
}
setChannel(channel, kind: .reliable)
}

func set(lossy channel: LKRTCDataChannel?) {
setChannel(channel, kind: .lossy)
}

private func setChannel(_ channel: LKRTCDataChannel?, kind: ChannelKind) {
let isOpen = _state.mutate {
$0.lossy = channel
switch kind {
case .reliable: $0.reliable = channel
case .lossy: $0.lossy = channel
}
return $0.isOpen
}

Expand All @@ -374,10 +396,14 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
if isOpen {
// Wake parked sends — pairs with `dataChannelDidChangeState`.
openCompleter.resume(returning: ())
eventContinuation.yield(ChannelEvent(channelKind: .lossy, detail: .wakeup))
eventContinuation.yield(ChannelEvent(channelKind: kind, detail: .wakeup))
}
}

func set(e2eeManager: E2EEManager?) {
_state.mutate { $0.e2eeManager = e2eeManager }
}

func reset(throwing error: Error? = nil) {
let (lossy, reliable) = _state.mutate {
let result = ($0.lossy, $0.reliable)
Expand All @@ -392,7 +418,7 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
reliable?.close()

// Drain parked sends through the same event stream so they're ordered
// after any in-flight `publishData` enqueues from concurrent callers.
// after any in-flight `.sendRequested` enqueues from concurrent callers.
eventContinuation.yield(ChannelEvent(channelKind: .reliable, detail: .drain(error)))

openCompleter.reset(throwing: error)
Expand Down Expand Up @@ -420,14 +446,14 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
)
let event = ChannelEvent(
channelKind: ChannelKind(packet.kind), // TODO: field is deprecated
detail: .publishData(request)
detail: .sendRequested(request)
)
eventContinuation.yield(event)
}
}

private func withEncryption(_ packet: Livekit_DataPacket) throws -> Livekit_DataPacket {
guard let e2eeManager, e2eeManager.isDataChannelEncryptionEnabled,
guard let e2eeManager = _state.e2eeManager, e2eeManager.isDataChannelEncryptionEnabled,
let payload = Livekit_EncryptedPacketPayload(dataPacket: packet) else { return packet }
var packet = packet
do {
Expand Down Expand Up @@ -458,16 +484,18 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
// MARK: - Sync state

func infos() -> [Livekit_DataChannelInfo] {
_state.read { [$0.lossy, $0.reliable] }
.compactMap(\.self)
.map { $0.toLKInfoType() }
_state.read { state in
[state.lossy, state.reliable].compactMap { $0?.toLKInfoType() }
}
}

func receiveStates() -> [Livekit_DataChannelReceiveState] {
_state.reliableReceivedState.map { sid, seq in
Livekit_DataChannelReceiveState.with {
$0.publisherSid = sid
$0.lastSeq = seq
_state.read { state in
state.reliableReceivedState.map { sid, seq in
Livekit_DataChannelReceiveState.with {
$0.publisherSid = sid
$0.lastSeq = seq
}
}
}
}
Expand Down Expand Up @@ -516,17 +544,23 @@ extension DataChannelPair: LKRTCDataChannelDelegate {
}

if dataChannel.kind == .reliable, dataPacket.sequence > 0, !dataPacket.participantSid.isEmpty {
if let lastSeq = _state.reliableReceivedState[dataPacket.participantSid], dataPacket.sequence <= lastSeq {
// Check and update in one locked step so two concurrent receives
// for the same sender can't both pass the dedup gate.
let isDuplicate = _state.mutate { state -> Bool in
if let lastSeq = state.reliableReceivedState[dataPacket.participantSid], dataPacket.sequence <= lastSeq {
return true
}
state.reliableReceivedState[dataPacket.participantSid] = dataPacket.sequence
return false
}
if isDuplicate {
log("Ignoring duplicate/out-of-order reliable data message", .warning)
return
}
_state.mutate {
$0.reliableReceivedState[dataPacket.participantSid] = dataPacket.sequence
}
}

if let encryptedPacket = dataPacket.encryptedPacketOrNil,
let e2eeManager
let e2eeManager = _state.e2eeManager
{
do {
let decryptedData = try e2eeManager.handle(encryptedData: encryptedPacket.toRTCEncryptedPacket(), participantIdentity: dataPacket.participantIdentity)
Expand Down
9 changes: 7 additions & 2 deletions Sources/LiveKit/Core/Room+Engine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,13 @@ extension Room {
try await publisherShouldNegotiate()
}

try await publisherTransportConnectedCompleter.wait(timeout: _state.connectOptions.publisherTransportConnectTimeout)
try await publisherDataChannel.openCompleter.wait()
// Single combined gate: wait for both the publisher PC to be ICE-
// connected *and* the data channels to reach `.open` concurrently.
// Mirrors the prevailing pattern in client-sdk-js / -rust, where a
// single poll loop checks both conditions before any send proceeds.
async let transportReady: Void = publisherTransportConnectedCompleter.wait(timeout: _state.connectOptions.publisherTransportConnectTimeout)
async let dataChannelReady: Void = publisherDataChannel.openCompleter.wait()
_ = try await (transportReady, dataChannelReady)
}

try await ensurePublisherConnected()
Expand Down
8 changes: 4 additions & 4 deletions Sources/LiveKit/Core/Room.swift
Original file line number Diff line number Diff line change
Expand Up @@ -399,13 +399,13 @@ public class Room: NSObject, @unchecked Sendable, ObservableObject, Loggable {
e2eeManager = E2EEManager(options: encryptionOptions)
e2eeManager!.setup(room: self)

subscriberDataChannel.e2eeManager = e2eeManager
publisherDataChannel.e2eeManager = e2eeManager
subscriberDataChannel.set(e2eeManager: e2eeManager)
publisherDataChannel.set(e2eeManager: e2eeManager)
} else {
e2eeManager = nil

subscriberDataChannel.e2eeManager = nil
publisherDataChannel.e2eeManager = nil
subscriberDataChannel.set(e2eeManager: nil)
publisherDataChannel.set(e2eeManager: nil)
}

_state.mutate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ actor IncomingStreamManager: Loggable {
let info: StreamInfo
let openTime: TimeInterval
let continuation: StreamReaderSource.Continuation
var task: AnyTaskCancellable?
var readLength = 0
}

Expand Down Expand Up @@ -143,16 +142,17 @@ actor IncomingStreamManager: Loggable {
continuation = $0
}

let descriptor = Descriptor(
openStreams[info.id] = Descriptor(
info: info,
openTime: Date.timeIntervalSinceReferenceDate,
continuation: continuation,
task: Task {
try await handler(source, identity)
}.cancellable()
continuation: continuation
)

openStreams[info.id] = descriptor
// Detached: handler lifetime is not tied to the descriptor — abnormal stream
// conditions are signalled through `source` throwing instead.
Task.detachedDiscarding {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixes a pre-existing abnormal-cancellation bug exposed by RPC v2 benchmarks (failing run) — the old Task was wrapped in AnyTaskCancellable whose deinit cancelled mid-handler awaits when the trailer arrived.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try await handler(source, identity)
}
}

/// Close the stream with the given id.
Expand Down
4 changes: 3 additions & 1 deletion Sources/LiveKit/Extensions/TimeInterval.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ public extension TimeInterval {

static let defaultJoinResponse: Self = 7
static let defaultTransportState: Self = 10
static let defaultPublisherDataChannelOpen: Self = 7
// Matches client-sdk-js (`peerConnectionTimeout`) and client-sdk-rust
// (`ICE_CONNECT_TIMEOUT`); client-sdk-android uses 20s.
static let defaultPublisherDataChannelOpen: Self = 15
static let resolveSid: Self = 7 + 5 // Join response + 5
static let defaultPublish: Self = 10
static let defaultCaptureStart: Self = 10
Expand Down
Loading
Loading