Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changes/fix-duplicate-track-publish-guard
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
patch type="fixed" "Duplicate tracks published when setCameraEnabled called rapidly"
148 changes: 85 additions & 63 deletions lib/src/participant/local.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import '../proto/livekit_models.pb.dart' as lk_models;
import '../proto/livekit_rtc.pb.dart' as lk_rtc;
import '../publication/local.dart';
import '../support/platform.dart';
import '../support/serial_runner.dart';
import '../track/local/audio.dart';
import '../track/local/local.dart';
import '../track/local/video.dart';
Expand All @@ -67,6 +68,9 @@ class LocalParticipant extends Participant<LocalTrackPublication> {
// RPC Pending Responses
final Map<String, Function(String? payload, RpcError? error)> _pendingResponses = {};

// Serializes publish operations to prevent duplicate tracks from concurrent calls
final _publishRunner = SerialRunner<LocalTrackPublication?>();

LocalParticipant._({
required Room room,
required String sid,
Expand Down Expand Up @@ -144,6 +148,14 @@ class LocalParticipant extends Participant<LocalTrackPublication> {
Future<LocalTrackPublication<LocalAudioTrack>> publishAudioTrack(
LocalAudioTrack track, {
AudioPublishOptions? publishOptions,
}) async {
final result = await _publishRunner.run(() => _publishAudioTrack(track, publishOptions: publishOptions));
return result! as LocalTrackPublication<LocalAudioTrack>;
}

Future<LocalTrackPublication<LocalAudioTrack>?> _publishAudioTrack(
LocalAudioTrack track, {
AudioPublishOptions? publishOptions,
}) async {
if (audioTrackPublications.any((e) => e.track?.mediaStreamTrack.id == track.mediaStreamTrack.id)) {
throw TrackPublishException('track already exists');
Expand Down Expand Up @@ -238,6 +250,14 @@ class LocalParticipant extends Participant<LocalTrackPublication> {
Future<LocalTrackPublication<LocalVideoTrack>> publishVideoTrack(
LocalVideoTrack track, {
VideoPublishOptions? publishOptions,
}) async {
final result = await _publishRunner.run(() => _publishVideoTrack(track, publishOptions: publishOptions));
return result! as LocalTrackPublication<LocalVideoTrack>;
}

Future<LocalTrackPublication<LocalVideoTrack>?> _publishVideoTrack(
LocalVideoTrack track, {
VideoPublishOptions? publishOptions,
}) async {
if (videoTrackPublications.any((e) => e.track?.mediaStreamTrack.id == track.mediaStreamTrack.id)) {
throw TrackPublishException('track already exists');
Expand Down Expand Up @@ -710,77 +730,79 @@ class LocalParticipant extends Participant<LocalTrackPublication> {
{bool? captureScreenAudio,
AudioCaptureOptions? audioCaptureOptions,
CameraCaptureOptions? cameraCaptureOptions,
ScreenShareCaptureOptions? screenShareCaptureOptions}) async {
logger.fine('setSourceEnabled(source: $source, enabled: $enabled)');

if (TrackSource.screenShareVideo == source && lkPlatformIsWebMobile()) {
throw TrackCreateException('Screen sharing is not supported on mobile devices');
}
ScreenShareCaptureOptions? screenShareCaptureOptions}) {
return _publishRunner.run(() async {
if (TrackSource.screenShareVideo == source && lkPlatformIsWebMobile()) {
throw TrackCreateException('Screen sharing is not supported on mobile devices');
}

final publication = getTrackPublicationBySource(source);
if (publication != null) {
final stopOnMute = switch (publication.source) {
TrackSource.camera => cameraCaptureOptions?.stopCameraCaptureOnMute ?? true,
TrackSource.microphone => audioCaptureOptions?.stopAudioCaptureOnMute ?? true,
_ => true,
};
if (enabled) {
await publication.unmute(stopOnMute: stopOnMute);
} else {
if (source == TrackSource.screenShareVideo) {
await removePublishedTrack(publication.sid);
final screenAudio = getTrackPublicationBySource(TrackSource.screenShareAudio);
if (screenAudio != null) {
await removePublishedTrack(screenAudio.sid);
}
logger.fine('setSourceEnabled(source: $source, enabled: $enabled)');

final publication = getTrackPublicationBySource(source);
if (publication != null) {
final stopOnMute = switch (publication.source) {
TrackSource.camera => cameraCaptureOptions?.stopCameraCaptureOnMute ?? true,
TrackSource.microphone => audioCaptureOptions?.stopAudioCaptureOnMute ?? true,
_ => true,
};
if (enabled) {
await publication.unmute(stopOnMute: stopOnMute);
} else {
await publication.mute(stopOnMute: stopOnMute);
}
}
return publication;
} else if (enabled) {
if (source == TrackSource.camera) {
final CameraCaptureOptions captureOptions =
cameraCaptureOptions ?? room.roomOptions.defaultCameraCaptureOptions;
final track = await LocalVideoTrack.createCameraTrack(captureOptions);
return await publishVideoTrack(track);
} else if (source == TrackSource.microphone) {
final AudioCaptureOptions captureOptions = audioCaptureOptions ?? room.roomOptions.defaultAudioCaptureOptions;
final track = await LocalAudioTrack.create(captureOptions);
return await publishAudioTrack(track);
} else if (source == TrackSource.screenShareVideo) {
ScreenShareCaptureOptions captureOptions =
screenShareCaptureOptions ?? room.roomOptions.defaultScreenShareCaptureOptions;

if (lkPlatformIs(PlatformType.iOS) && !BroadcastManager().isBroadcasting) {
// Wait until broadcasting to publish track
await BroadcastManager().requestActivation();
return null;
if (source == TrackSource.screenShareVideo) {
await removePublishedTrack(publication.sid);
final screenAudio = getTrackPublicationBySource(TrackSource.screenShareAudio);
if (screenAudio != null) {
await removePublishedTrack(screenAudio.sid);
}
} else {
await publication.mute(stopOnMute: stopOnMute);
}
}
return publication;
} else if (enabled) {
if (source == TrackSource.camera) {
final CameraCaptureOptions captureOptions =
cameraCaptureOptions ?? room.roomOptions.defaultCameraCaptureOptions;
final track = await LocalVideoTrack.createCameraTrack(captureOptions);
return await _publishVideoTrack(track);
} else if (source == TrackSource.microphone) {
final AudioCaptureOptions captureOptions = audioCaptureOptions ?? room.roomOptions.defaultAudioCaptureOptions;
final track = await LocalAudioTrack.create(captureOptions);
return await _publishAudioTrack(track);
} else if (source == TrackSource.screenShareVideo) {
ScreenShareCaptureOptions captureOptions =
screenShareCaptureOptions ?? room.roomOptions.defaultScreenShareCaptureOptions;

if (lkPlatformIs(PlatformType.iOS) && !BroadcastManager().isBroadcasting) {
// Wait until broadcasting to publish track
await BroadcastManager().requestActivation();
return null;
}

/// When capturing chrome table audio, we can't capture audio/video
/// track separately, it has to be returned once in getDisplayMedia,
/// so we publish it twice here, but only return videoTrack to user.
if (captureScreenAudio ?? false) {
captureOptions = captureOptions.copyWith(captureScreenAudio: true);
final tracks = await LocalVideoTrack.createScreenShareTracksWithAudio(captureOptions);
LocalTrackPublication<LocalVideoTrack>? publication;
for (final track in tracks) {
if (track is LocalVideoTrack) {
publication = await publishVideoTrack(track);
} else if (track is LocalAudioTrack) {
await publishAudioTrack(track);
/// When capturing chrome table audio, we can't capture audio/video
/// track separately, it has to be returned once in getDisplayMedia,
/// so we publish it twice here, but only return videoTrack to user.
if (captureScreenAudio ?? false) {
captureOptions = captureOptions.copyWith(captureScreenAudio: true);
final tracks = await LocalVideoTrack.createScreenShareTracksWithAudio(captureOptions);
LocalTrackPublication<LocalVideoTrack>? publication;
for (final track in tracks) {
if (track is LocalVideoTrack) {
publication = await _publishVideoTrack(track);
} else if (track is LocalAudioTrack) {
await _publishAudioTrack(track);
}
}
}

/// just return the video track publication
return publication;
/// just return the video track publication
return publication;
}
final track = await LocalVideoTrack.createScreenShareTrack(captureOptions);
return await _publishVideoTrack(track);
}
final track = await LocalVideoTrack.createScreenShareTrack(captureOptions);
return await publishVideoTrack(track);
}
}
return null;
return null;
});
}

bool _allParticipantsAllowed = true;
Expand Down
51 changes: 51 additions & 0 deletions lib/src/support/serial_runner.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2025 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import 'dart:async';

/// Serializes async operations so concurrent calls execute sequentially.
///
/// When [run] is called while a previous operation is still in progress,
/// the new call waits for the previous one to complete before executing.
/// This prevents race conditions from concurrent calls to the same
/// async operation.
///
/// Equivalent to the Swift SDK's `SerialRunnerActor`.
class SerialRunner<T> {
Future<void>? _pending;

/// Whether an operation is currently in progress.
bool get isRunning => _pending != null;

/// Runs [block] after any pending operation completes.
///
/// If no operation is pending, [block] executes immediately.
/// If an operation is pending, waits for it to finish first.
/// Errors from [block] propagate to the caller only, not to
/// subsequent waiters.
Future<T> run(Future<T> Function() block) async {
while (_pending != null) {
await _pending;
}

final completer = Completer<void>();
_pending = completer.future;
try {
return await block();
} finally {
completer.complete();
_pending = null;
}
}
}
23 changes: 23 additions & 0 deletions test/core/room_e2e_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import '../mock/websocket_mock.dart';
import 'signal_client_test.dart';

void main() {
TestWidgetsFlutterBinding.ensureInitialized();
late E2EContainer container;
late Room room;
late MockWebSocketConnector ws;
Expand Down Expand Up @@ -213,6 +214,28 @@ void main() {
expect(trackSubscribed.publication.track, isNotNull);
});
});

group('publish guards', () {
test('concurrent setSourceEnabled serializes calls', () async {
final lp = room.localParticipant!;

// Both calls will fail (no camera hardware in test), but the
// SerialRunner should serialize them — the second waits for the
// first to complete before executing.
final future1 = lp.setSourceEnabled(TrackSource.camera, true);
final future2 = lp.setSourceEnabled(TrackSource.camera, true);

// Both should fail with the same error (no camera), not with
// duplicate track errors or unhandled exceptions.
final results = await Future.wait([
future1.catchError((_) => null),
future2.catchError((_) => null),
]);

// Both calls completed (didn't hang or deadlock).
expect(results.length, 2);
});
});
}

class _FakeMediaStream extends rtc.MediaStream {
Expand Down
Loading