Skip to content
1 change: 1 addition & 0 deletions .changes/fix-buffer-status-busy-wait
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
patch type="fixed" "Fix waitForBufferStatusLow busy-wait after engine close"
1 change: 1 addition & 0 deletions .changes/fix-connected-server-address
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
patch type="fixed" "Fix connected server address using wrong peer connection"
1 change: 1 addition & 0 deletions .changes/fix-deferred-track-listener-leak
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
patch type="fixed" "Fix deferred track listener leak across reconnects"
1 change: 1 addition & 0 deletions .changes/fix-log-interpolation
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
patch type="fixed" "Fix string interpolation in forceRelay log messages"
1 change: 1 addition & 0 deletions .changes/fix-premature-publication-dispose
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
patch type="fixed" "Fix premature publication dispose during unpublish"
1 change: 1 addition & 0 deletions .changes/fix-reconnect-counter
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
patch type="fixed" "Fix reconnect counter null assertion on first reconnect attempt"
1 change: 1 addition & 0 deletions .changes/fix-region-failover-condition
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
patch type="fixed" "Fix region failover condition allowing null provider dereference"
1 change: 1 addition & 0 deletions .changes/fix-send-sync-state-return-type
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
patch type="fixed" "Fix sendSyncState returning void instead of Future"
77 changes: 38 additions & 39 deletions lib/src/core/engine.dart
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,19 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {

late EventsListener<SignalEvent> _signalListener = signalClient.createListener(synchronized: true);

int? reconnectAttempts;

Timer? reconnectTimeout;
DateTime? reconnectStart;
int _reconnectAttempts = 0;
Timer? _reconnectTimeout;
DateTime? _reconnectStart;

bool _isClosed = false;

bool get isClosed => _isClosed;

bool get isPendingReconnect => reconnectStart != null && reconnectTimeout != null;
bool get isPendingReconnect => _reconnectStart != null && _reconnectTimeout != null;

final int _reconnectCount = defaultRetryDelaysInMs.length;

bool attemptingReconnect = false;
bool _attemptingReconnect = false;

RegionUrlProvider? _regionUrlProvider;

Expand Down Expand Up @@ -179,17 +178,17 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
return null;
}

void clearReconnectTimeout() {
if (reconnectTimeout != null) {
reconnectTimeout?.cancel();
reconnectTimeout = null;
void _clearReconnectTimeout() {
if (_reconnectTimeout != null) {
_reconnectTimeout?.cancel();
_reconnectTimeout = null;
}
}

void clearPendingReconnect() {
clearReconnectTimeout();
reconnectAttempts = 0;
reconnectStart = null;
void _clearPendingReconnect() {
_clearReconnectTimeout();
_reconnectAttempts = 0;
_reconnectStart = null;
}

Engine({
Expand Down Expand Up @@ -290,15 +289,15 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
await signalClient.cleanUp();

fullReconnectOnNext = false;
attemptingReconnect = false;
_attemptingReconnect = false;

// Reset reliability state
_reliableDataSequence = 1;
_reliableMessageBuffer.clear();
_reliableReceivedState.clear();
_isReconnecting = false;

clearPendingReconnect();
_clearPendingReconnect();
}

@internal
Expand Down Expand Up @@ -357,7 +356,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {

events.once<EngineClosingEvent>((e) => onClosing());

while (!_dcBufferStatus[kind]!) {
while (!completer.isCompleted && !_dcBufferStatus[kind]!) {
await Future.delayed(const Duration(milliseconds: 10));
}
if (completer.isCompleted) {
Expand Down Expand Up @@ -725,7 +724,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
signalClient.connectionState == ConnectionState.connecting) {
final track = event.track;
final receiver = event.receiver;
events.on<EngineConnectedEvent>((event) async {
events.once<EngineConnectedEvent>((event) async {
Timer(const Duration(milliseconds: 10), () {
events.emit(EngineTrackAddedEvent(
track: track,
Expand Down Expand Up @@ -829,7 +828,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {

Future<void> _handleGettingConnectedServerAddress(rtc.RTCPeerConnection pc) async {
try {
final remoteAddress = await getConnectedAddress(publisher!.pc);
final remoteAddress = await getConnectedAddress(pc);
logger.fine('Connected address: $remoteAddress');
if (_connectedServerAddress == null || _connectedServerAddress != remoteAddress) {
_connectedServerAddress = remoteAddress;
Expand Down Expand Up @@ -999,11 +998,11 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {

_isReconnecting = true;

if (reconnectAttempts == 0) {
reconnectStart = DateTime.timestamp();
if (_reconnectAttempts == 0) {
_reconnectStart = DateTime.timestamp();
}

if (reconnectAttempts! >= _reconnectCount) {
if (_reconnectAttempts >= _reconnectCount) {
logger.fine('reconnectAttempts exceeded, disconnecting...');
_isClosed = true;
await cleanUp();
Expand All @@ -1014,26 +1013,26 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
return;
}

var delay = defaultRetryDelaysInMs[reconnectAttempts!];
var delay = defaultRetryDelaysInMs[_reconnectAttempts];
// Add random jitter to prevent thundering herd on reconnect
if (reconnectAttempts! > 1) {
if (_reconnectAttempts > 1) {
delay += math.Random().nextInt(1000);
}

events.emit(EngineAttemptReconnectEvent(
attempt: reconnectAttempts! + 1,
attempt: _reconnectAttempts + 1,
maxAttempts: _reconnectCount,
nextRetryDelaysInMs: delay,
));

clearReconnectTimeout();
_clearReconnectTimeout();
if (token != null && _regionUrlProvider != null) {
// token may have been refreshed, we do not want to recreate the regionUrlProvider
// since the current engine may have inherited a regional url
_regionUrlProvider!.updateToken(token!);
}
logger.fine('WebSocket reconnecting in $delay ms, retry times $reconnectAttempts');
reconnectTimeout = Timer(Duration(milliseconds: delay), () async {
logger.fine('WebSocket reconnecting in $delay ms, retry times $_reconnectAttempts');
_reconnectTimeout = Timer(Duration(milliseconds: delay), () async {
await attemptReconnect(
reason,
reconnectReason: reconnectReason,
Expand All @@ -1051,7 +1050,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
}

// guard for attempting reconnection multiple times while one attempt is still not finished
if (attemptingReconnect) {
if (_attemptingReconnect) {
return;
}

Expand All @@ -1065,7 +1064,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
}

try {
attemptingReconnect = true;
_attemptingReconnect = true;

if (await signalClient.networkIsAvailable() == false) {
logger.fine('no internet connection, waiting...');
Expand All @@ -1086,11 +1085,11 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
reconnectReason: reconnectReason,
);
}
clearPendingReconnect();
attemptingReconnect = false;
_clearPendingReconnect();
_attemptingReconnect = false;
_isReconnecting = false;
} catch (e) {
reconnectAttempts = reconnectAttempts! + 1;
_reconnectAttempts = _reconnectAttempts + 1;
bool recoverable = true;
if (e is WebSocketException || e is MediaConnectException) {
// cannot resume connection, need to do full reconnect
Expand All @@ -1111,7 +1110,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
await cleanUp();
}
} finally {
attemptingReconnect = false;
_attemptingReconnect = false;
}
}

Expand Down Expand Up @@ -1230,7 +1229,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
}

@internal
void sendSyncState({
Future<void> sendSyncState({
required lk_rtc.UpdateSubscription subscription,
required Iterable<lk_rtc.TrackPublishedResponse>? publishTracks,
required List<String> trackSidsDisabled,
Expand Down Expand Up @@ -1281,7 +1280,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
logger.fine('onConnected subscriberPrimary: ${_subscriberPrimary}, '
'serverVersion: ${event.response.serverVersion}, '
'iceServers: ${event.response.iceServers}, '
'forceRelay: $event.response.clientConfiguration.forceRelay');
'forceRelay: ${event.response.clientConfiguration.forceRelay}');

final rtcConfiguration = await _buildRtcConfiguration(
serverResponseForceRelay: event.response.clientConfiguration.forceRelay,
Expand Down Expand Up @@ -1313,7 +1312,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {

logger.fine('Handle ReconnectResponse: '
'iceServers: ${event.response.iceServers}, '
'forceRelay: $event.response.clientConfiguration.forceRelay, '
'forceRelay: ${event.response.clientConfiguration.forceRelay}, '
'lastMessageSeq: ${event.response.lastMessageSeq}');

final rtcConfiguration = await _buildRtcConfiguration(
Expand All @@ -1336,7 +1335,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
})
..on<SignalConnectedEvent>((event) async {
logger.fine('Signal connected');
reconnectAttempts = 0;
_reconnectAttempts = 0;
events.emit(const EngineConnectedEvent());
})
..on<SignalConnectingEvent>((event) async {
Expand Down Expand Up @@ -1456,7 +1455,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
logger.fine('disconnect: Cancel the reconnection processing!');
await signalClient.cleanUp();
await _signalListener.cancelAll();
clearPendingReconnect();
_clearPendingReconnect();
}
await cleanUp();
events.emit(EngineDisconnectedEvent(reason: reason));
Expand Down
6 changes: 3 additions & 3 deletions lib/src/core/room.dart
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
);
} catch (e) {
logger.warning('could not connect to $url $e');
if (_regionUrlProvider != null && e is WebSocketException ||
(e is ConnectException && e.reason != ConnectionErrorReason.NotAllowed)) {
if (_regionUrlProvider != null &&
(e is WebSocketException || (e is ConnectException && e.reason != ConnectionErrorReason.NotAllowed))) {
String? nextUrl;
try {
nextUrl = await _regionUrlProvider!.getNextBestRegionUrl();
Expand Down Expand Up @@ -971,7 +971,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
}
}

engine.sendSyncState(
await engine.sendSyncState(
subscription: lk_rtc.UpdateSubscription(
participantTracks: [],
trackSids: trackSids,
Expand Down
2 changes: 0 additions & 2 deletions lib/src/participant/local.dart
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,6 @@ class LocalParticipant extends Participant<LocalTrackPublication> {
logger.warning('Publication not found $trackSid');
return;
}
await pub.dispose();

final track = pub.track;
if (track != null) {
if (room.roomOptions.stopLocalTrackOnUnpublish) {
Expand Down
2 changes: 0 additions & 2 deletions lib/src/participant/remote.dart
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,6 @@ class RemoteParticipant extends Participant<RemoteTrackPublication> {
logger.warning('Publication not found $trackSid');
return;
}
await pub.dispose();

final track = pub.track;
// if has track
if (track != null) {
Expand Down
Loading