Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
livekit-ffi: patch
webrtc-sys-build: patch
yuv-sys: patch
imgproc: patch
livekit-protocol: patch
webrtc-sys: patch
livekit: patch
libwebrtc: patch
livekit-wakeword: patch
soxr-sys: patch
livekit-api: patch
---

# fix PC timeout when connecting with can_subscribe=false

#955 by @s-hamdananwar

When a participant connects with `canSubscribe=false` in their token, the server sends `subscriber_primary=false` in the JoinResponse and does not send a subscriber offer. This results in `wait_pc_connection` timing out as it is expecting a subscriber PC even when the publisher PC is primary. This PR will skip waiting for subscriber PC when `subscriber_primary=false`.
9 changes: 6 additions & 3 deletions livekit/src/rtc_engine/rtc_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ struct SessionInner {
pending_requests: Mutex<HashMap<u32, oneshot::Sender<proto::RequestResponse>>>,

e2ee_manager: Option<E2eeManager>,
subscriber_primary: bool,
}

/// Information about the local participant needed for outgoing
Expand Down Expand Up @@ -431,6 +432,7 @@ impl RtcSession {
SignalClient::connect(url, token, options.signal_options.clone()).await?;
let signal_client = Arc::new(signal_client);
log::debug!("received JoinResponse: {:?}", join_response);
let subscriber_primary = join_response.subscriber_primary;

// Determine if single PC mode is active based on the path used
let single_pc_mode = signal_client.is_single_pc_mode_active();
Expand Down Expand Up @@ -517,6 +519,7 @@ impl RtcSession {
negotiation_queue: NegotiationQueue::new(),
pending_requests: Default::default(),
e2ee_manager,
subscriber_primary,
});

// Start session tasks
Expand All @@ -530,7 +533,7 @@ impl RtcSession {

// In single PC mode (or with fast_publish), trigger initial negotiation
// This matches JS SDK behavior: if (!this.subscriberPrimary || joinResponse.fastPublish) { this.negotiate(); }
if single_pc_mode || join_response.fast_publish {
if single_pc_mode || join_response.fast_publish || !subscriber_primary {
inner.publisher_negotiation_needed();
}

Expand Down Expand Up @@ -1750,8 +1753,8 @@ impl SessionInner {
}

let publisher_connected = self.publisher_pc.is_connected();
let subscriber_connected = if self.single_pc_mode {
true // No subscriber in single PC mode
let subscriber_connected = if self.single_pc_mode || !self.subscriber_primary {
true // No subscriber in single PC mode or if PC is publisher primary
} else {
self.subscriber_pc.as_ref().map(|pc| pc.is_connected()).unwrap_or(true)
};
Expand Down
44 changes: 44 additions & 0 deletions livekit/tests/peer_connection_signaling_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,17 @@ async fn test_v1_localhost_fallback_to_v0() -> Result<()> {
Ok(())
}

/// Test that a participant with can_subscribe=false in their token can connect without timing out.
#[test_log::test(tokio::test)]
async fn test_v0_connect_can_subscribe_false() -> Result<()> {
test_connect_can_subscribe_false_impl(SignalingMode::DualPC).await
}

#[test_log::test(tokio::test)]
async fn test_v1_connect_can_subscribe_false() -> Result<()> {
test_connect_can_subscribe_false_impl(SignalingMode::SinglePC).await
}

/// Corner case: reconnect twice in a row
#[test_log::test(tokio::test)]
async fn test_v0_double_reconnect() -> Result<()> {
Expand Down Expand Up @@ -772,6 +783,39 @@ async fn test_node_failure_impl(mode: SignalingMode) -> Result<()> {
Ok(())
}

/// Test that a participant with can_subscribe=false in their token can connect without timing out.
async fn test_connect_can_subscribe_false_impl(mode: SignalingMode) -> Result<()> {
let (url, api_key, api_secret) = get_env_for_mode(mode);
let room_name = format!("test_{:?}_no_subscribe_{}", mode, create_random_uuid());

let grants = VideoGrants {
room_join: true,
room: room_name.clone(),
can_publish: true,
can_subscribe: false,
..Default::default()
};
let token = AccessToken::with_api_key(&api_key, &api_secret)
.with_ttl(Duration::from_secs(30 * 60))
.with_grants(grants)
.with_identity("no-subscribe-participant")
.with_name("no-subscribe-participant")
.to_jwt()
.context("Failed to generate JWT")?;

log::info!("[{}] Connecting with can_subscribe=false", mode.name());
let (room, _events) = connect_room(&url, &token, mode).await?;

assert_eq!(
room.connection_state(),
ConnectionState::Connected,
"Room should be connected even when can_subscribe=false"
);

log::info!("[{}] Test passed - can_subscribe=false connects without timeout!", mode.name());
Ok(())
}

/// Test two sequential reconnect cycles on the same room connection
async fn test_double_reconnect_impl(mode: SignalingMode) -> Result<()> {
let (url, api_key, api_secret) = get_env_for_mode(mode);
Expand Down
Loading