Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 228 additions & 36 deletions livekit/src/rtc_engine/rtc_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use tokio::sync::{mpsc, oneshot, watch, Notify};
use super::{rtc_events, EngineError, EngineOptions, EngineResult, SimulateScenario};
use crate::{
id::ParticipantIdentity,
room::participant::{decompress_rpc_payload_bytes, RpcErrorCode},
utils::{
ttl_map::TtlMap,
tx_queue::{TxQueue, TxQueueItem},
Expand Down Expand Up @@ -341,6 +342,16 @@ struct SessionInner {
single_pc_mode: bool,
/// Mapping from SDP mid to track ID, used for track resolution in single PC mode
mid_to_track_id: Mutex<HashMap<String, String>>,
/// Offer/answer ordering guards for single-PC churn protection.
next_offer_id: AtomicU32,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: I don’t have a strong opinion here, but since this adds a bunch of related fields, it might make sense to group them into their own struct (e.g., SDPExchangeState) and include that here instead. This would add no overhead and could help clarify that these fields are conceptually related.

latest_offer_id: AtomicU32,
latest_answer_id: AtomicU32,
/// Coalesced media-sections requirement while an offer is in flight.
pending_media_req_audio: AtomicU32,
pending_media_req_video: AtomicU32,
/// Recvonly slots added locally but not yet reflected in a sent offer.
pending_recv_slots_audio: AtomicU32,
pending_recv_slots_video: AtomicU32,

pending_tracks: Mutex<HashMap<String, oneshot::Sender<proto::TrackInfo>>>,

Expand Down Expand Up @@ -434,7 +445,6 @@ impl RtcSession {

// Determine if single PC mode is active based on the path used
let single_pc_mode = signal_client.is_single_pc_mode_active();

let Some(participant_info) = SessionParticipantInfo::from_join(&join_response) else {
Err(EngineError::Internal("Join response missing participant info".into()))?
};
Expand Down Expand Up @@ -495,6 +505,13 @@ impl RtcSession {
subscriber_pc,
single_pc_mode,
mid_to_track_id: Mutex::new(HashMap::new()),
next_offer_id: AtomicU32::new(1),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you apply the suggestion from above, you can implement default on the struct and initialize here like this:

sdp_exchange_state: SdpExchangeState {
    next_offer_id: AtomicU32::new(1),
    ..Default::default()
},

latest_offer_id: AtomicU32::new(0),
latest_answer_id: AtomicU32::new(0),
pending_media_req_audio: AtomicU32::new(0),
pending_media_req_video: AtomicU32::new(0),
pending_recv_slots_audio: AtomicU32::new(0),
pending_recv_slots_video: AtomicU32::new(0),
pending_tracks: Default::default(),
lossy_dc,
lossy_dc_buffered_amount_low_threshold: AtomicU64::new(
Expand Down Expand Up @@ -964,6 +981,21 @@ impl SessionInner {
) -> EngineResult<()> {
match event {
proto::signal_response::Message::Answer(answer) => {
let answer_id = answer.id;
let latest_offer_id = self.latest_offer_id.load(Ordering::Acquire);
let latest_answer_id = self.latest_answer_id.load(Ordering::Acquire);
if answer_id > 0 && (answer_id < latest_offer_id || answer_id <= latest_answer_id) {
log::warn!(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Should this be logged at warn level?

"ignoring stale answer: answer_id={} latest_offer_id={} latest_answer_id={}",
answer_id,
latest_offer_id,
latest_answer_id
);
return Ok(());
}
if answer_id > 0 {
self.latest_answer_id.store(answer_id, Ordering::Release);
}
log::debug!("received publisher answer: {:?}", answer);

// Store mid_to_track_id mapping in single PC mode
Expand Down Expand Up @@ -1105,34 +1137,29 @@ impl SessionInner {
self: &Arc<Self>,
req: proto::MediaSectionsRequirement,
) -> EngineResult<()> {
log::debug!(
"MediaSectionsRequirement: adding {} audio and {} video recvonly transceivers",
req.num_audios,
req.num_videos
);

let recvonly_init = RtpTransceiverInit {
direction: RtpTransceiverDirection::RecvOnly,
stream_ids: Vec::new(),
send_encodings: Vec::new(),
};

// Add audio transceivers
for _ in 0..req.num_audios {
self.publisher_pc
.peer_connection()
.add_transceiver_for_media(MediaType::Audio, recvonly_init.clone())?;
// Coalesce duplicate requirements while waiting for the current answer.
if self.negotiation_queue.waiting_for_answer.load(Ordering::Acquire) {
let prev_audio = self.pending_media_req_audio.fetch_max(req.num_audios, Ordering::AcqRel);
let prev_video = self.pending_media_req_video.fetch_max(req.num_videos, Ordering::AcqRel);
log::debug!(
"coalescing media requirement while waiting for answer: req={{audio:{}, video:{}}} pending={{audio:{}->{}, video:{}->{}}}",
req.num_audios, req.num_videos,
prev_audio, prev_audio.max(req.num_audios),
prev_video, prev_video.max(req.num_videos)
);
// Mirror JS renegotiate behavior: guarantee one more negotiation pass
// after the in-flight offer/answer completes.
self.publisher_negotiation_needed();
return Ok(());
}

// Add video transceivers
for _ in 0..req.num_videos {
self.publisher_pc
.peer_connection()
.add_transceiver_for_media(MediaType::Video, recvonly_init.clone())?;
}
let (add_audio, add_video) =
self.reconcile_recv_media_sections(req.num_audios, req.num_videos)?;

// Trigger renegotiation
self.publisher_negotiation_needed();
if add_audio > 0 || add_video > 0 {
// Trigger renegotiation only when capacity changed.
self.publisher_negotiation_needed();
}

Ok(())
}
Expand Down Expand Up @@ -1183,13 +1210,26 @@ impl SessionInner {
}
}
RtcEvent::Offer { offer, target: _ } => {
// If requirements arrived while we were waiting for answer, apply them once here.
self.apply_pending_media_sections_requirement()?;
// Newly added recvonly slots are now reflected in this offer.
self.pending_recv_slots_audio.store(0, Ordering::Release);
self.pending_recv_slots_video.store(0, Ordering::Release);
// Send the publisher offer to the server
log::debug!("sending publisher offer: {:?}", offer);
let mapping_size = self.mid_to_track_id.lock().len();
let offer_id = self.next_offer_id.fetch_add(1, Ordering::AcqRel);
self.latest_offer_id.store(offer_id, Ordering::Release);
log::debug!(
"sending publisher offer: offer_id={} sdp_len={} cached_mid_to_track_id_count={}",
offer_id,
offer.to_string().len(),
mapping_size
);
self.signal_client
.send(proto::signal_request::Message::Offer(proto::SessionDescription {
r#type: "offer".to_string(),
sdp: offer.to_string(),
id: 0,
id: offer_id,
mid_to_track_id: Default::default(),
}))
.await;
Expand Down Expand Up @@ -1241,6 +1281,111 @@ impl SessionInner {
Ok(())
}

fn apply_pending_media_sections_requirement(&self) -> EngineResult<()> {
let pending_audio = self.pending_media_req_audio.swap(0, Ordering::AcqRel);
let pending_video = self.pending_media_req_video.swap(0, Ordering::AcqRel);
if pending_audio == 0 && pending_video == 0 {
return Ok(());
}
log::debug!(
"applying coalesced media requirements before offer: audio={} video={}",
pending_audio,
pending_video
);
self.reconcile_recv_media_sections(pending_audio, pending_video)?;
Ok(())
}

fn reconcile_recv_media_sections(
&self,
requested_audio: u32,
requested_video: u32,
) -> EngineResult<(u32, u32)> {
let (available_audio, available_video) = self.available_recv_slots_by_kind();
let pending_audio = self.pending_recv_slots_audio.load(Ordering::Acquire);
let pending_video = self.pending_recv_slots_video.load(Ordering::Acquire);
let add_audio =
requested_audio.saturating_sub(available_audio.saturating_add(pending_audio));
let add_video =
requested_video.saturating_sub(available_video.saturating_add(pending_video));

log::debug!(
"reconciling recv media sections: requested={{audio:{}, video:{}}} available={{audio:{}, video:{}}} pending={{audio:{}, video:{}}} will_add={{audio:{}, video:{}}}",
requested_audio, requested_video,
available_audio, available_video,
pending_audio, pending_video,
add_audio, add_video
);

if add_audio == 0 && add_video == 0 {
return Ok((0, 0));
}

let recvonly_init = RtpTransceiverInit {
direction: RtpTransceiverDirection::RecvOnly,
stream_ids: Vec::new(),
send_encodings: Vec::new(),
};
for _ in 0..add_audio {
self.publisher_pc
.peer_connection()
.add_transceiver_for_media(MediaType::Audio, recvonly_init.clone())?;
}
for _ in 0..add_video {
self.publisher_pc
.peer_connection()
.add_transceiver_for_media(MediaType::Video, recvonly_init.clone())?;
}
if add_audio > 0 {
self.pending_recv_slots_audio.fetch_add(add_audio, Ordering::AcqRel);
}
if add_video > 0 {
self.pending_recv_slots_video.fetch_add(add_video, Ordering::AcqRel);
}
Ok((add_audio, add_video))
}

fn available_recv_slots_by_kind(&self) -> (u32, u32) {
let mapping = self.mid_to_track_id.lock();
let transceivers = self.publisher_pc.peer_connection().transceivers();
let mut audio = 0u32;
let mut video = 0u32;

for t in transceivers {
let direction = t.direction();
let current_direction = t.current_direction();

// Check if transceiver is recv-capable:
// - Explicitly set to RecvOnly, or
// - Currently negotiated as RecvOnly (including munged inactive->recvonly)
let recv_capable = direction == RtpTransceiverDirection::RecvOnly
|| current_direction == Some(RtpTransceiverDirection::RecvOnly);

if !recv_capable {
continue;
}

// Slot is occupied if server already mapped this MID to a concrete track.
if let Some(mid) = t.mid() {
if mapping.contains_key(&mid) {
continue;
}
}

// Only count transceivers that have tracks. Newly created transceivers without
// tracks yet are tracked via pending_recv_slots_* counters until negotiation.
let Some(track) = t.receiver().track() else {
continue;
};
match track {
MediaStreamTrack::Audio(_) => audio += 1,
MediaStreamTrack::Video(_) => video += 1,
}
}

(audio, video)
}

fn emit_incoming_packet(
&self,
kind: DataPacketKind,
Expand Down Expand Up @@ -1295,19 +1440,66 @@ impl SessionInner {
}
proto::data_packet::Value::RpcRequest(rpc_request) => {
let caller_identity = participant_identity;
self.emitter.send(SessionEvent::RpcRequest {
caller_identity,
request_id: rpc_request.id.clone(),
method: rpc_request.method,
payload: rpc_request.payload,
response_timeout: Duration::from_millis(rpc_request.response_timeout_ms as u64),
version: rpc_request.version,
})
// Prefer compressed payload when present.
// If decompression fails, only fall back to plain payload when it is non-empty.
let payload = if !rpc_request.compressed_payload.is_empty() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Was adding this code in this PR intentional?

match decompress_rpc_payload_bytes(&rpc_request.compressed_payload) {
Ok(decompressed) => Some(decompressed),
Err(e) => {
if rpc_request.payload.is_empty() {
log::error!(
"Failed to decompress RPC request payload and plain payload is empty: {}",
e
);
None
} else {
log::error!(
"Failed to decompress RPC request payload, falling back to plain payload: {}",
e
);
Some(rpc_request.payload)
}
}
}
} else {
Some(rpc_request.payload)
};
if let Some(payload) = payload {
self.emitter.send(SessionEvent::RpcRequest {
caller_identity,
request_id: rpc_request.id.clone(),
method: rpc_request.method,
payload,
response_timeout: Duration::from_millis(
rpc_request.response_timeout_ms as u64,
),
version: rpc_request.version,
})
} else {
Ok(())
}
}
proto::data_packet::Value::RpcResponse(rpc_response) => {
let (payload, error) = match rpc_response.value {
None => (None, None),
Some(proto::rpc_response::Value::Payload(payload)) => (Some(payload), None),
Some(proto::rpc_response::Value::CompressedPayload(compressed)) => {
match decompress_rpc_payload_bytes(&compressed) {
Ok(decompressed) => (Some(decompressed), None),
Err(e) => {
log::error!("Failed to decompress RPC response payload: {}", e);
(
None,
Some(proto::RpcError {
code: RpcErrorCode::ApplicationError as u32,
message: "Failed to decompress RPC response payload"
.to_string(),
data: e,
}),
)
}
}
}
Some(proto::rpc_response::Value::Error(err)) => (None, Some(err)),
};
self.emitter.send(SessionEvent::RpcResponse {
Expand Down
Loading