-
Notifications
You must be signed in to change notification settings - Fork 168
added code to avoid answers / offers race on rust client, #915
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,6 +39,7 @@ use tokio::sync::{mpsc, oneshot, watch, Notify}; | |
| use super::{rtc_events, EngineError, EngineOptions, EngineResult, SimulateScenario}; | ||
| use crate::{ | ||
| id::ParticipantIdentity, | ||
| room::participant::{decompress_rpc_payload_bytes, RpcErrorCode}, | ||
| utils::{ | ||
| ttl_map::TtlMap, | ||
| tx_queue::{TxQueue, TxQueueItem}, | ||
|
|
@@ -341,6 +342,16 @@ struct SessionInner { | |
| single_pc_mode: bool, | ||
| /// Mapping from SDP mid to track ID, used for track resolution in single PC mode | ||
| mid_to_track_id: Mutex<HashMap<String, String>>, | ||
| /// Offer/answer ordering guards for single-PC churn protection. | ||
| next_offer_id: AtomicU32, | ||
| latest_offer_id: AtomicU32, | ||
| latest_answer_id: AtomicU32, | ||
| /// Coalesced media-sections requirement while an offer is in flight. | ||
| pending_media_req_audio: AtomicU32, | ||
| pending_media_req_video: AtomicU32, | ||
| /// Recvonly slots added locally but not yet reflected in a sent offer. | ||
| pending_recv_slots_audio: AtomicU32, | ||
| pending_recv_slots_video: AtomicU32, | ||
|
|
||
| pending_tracks: Mutex<HashMap<String, oneshot::Sender<proto::TrackInfo>>>, | ||
|
|
||
|
|
@@ -434,7 +445,6 @@ impl RtcSession { | |
|
|
||
| // Determine if single PC mode is active based on the path used | ||
| let single_pc_mode = signal_client.is_single_pc_mode_active(); | ||
|
|
||
| let Some(participant_info) = SessionParticipantInfo::from_join(&join_response) else { | ||
| Err(EngineError::Internal("Join response missing participant info".into()))? | ||
| }; | ||
|
|
@@ -495,6 +505,13 @@ impl RtcSession { | |
| subscriber_pc, | ||
| single_pc_mode, | ||
| mid_to_track_id: Mutex::new(HashMap::new()), | ||
| next_offer_id: AtomicU32::new(1), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you apply the suggestion from above, you can implement default on the struct and initialize here like this: sdp_exchange_state: SdpExchangeState {
next_offer_id: AtomicU32::new(1),
..Default::default()
}, |
||
| latest_offer_id: AtomicU32::new(0), | ||
| latest_answer_id: AtomicU32::new(0), | ||
| pending_media_req_audio: AtomicU32::new(0), | ||
| pending_media_req_video: AtomicU32::new(0), | ||
| pending_recv_slots_audio: AtomicU32::new(0), | ||
| pending_recv_slots_video: AtomicU32::new(0), | ||
| pending_tracks: Default::default(), | ||
| lossy_dc, | ||
| lossy_dc_buffered_amount_low_threshold: AtomicU64::new( | ||
|
|
@@ -964,6 +981,21 @@ impl SessionInner { | |
| ) -> EngineResult<()> { | ||
| match event { | ||
| proto::signal_response::Message::Answer(answer) => { | ||
| let answer_id = answer.id; | ||
| let latest_offer_id = self.latest_offer_id.load(Ordering::Acquire); | ||
| let latest_answer_id = self.latest_answer_id.load(Ordering::Acquire); | ||
| if answer_id > 0 && (answer_id < latest_offer_id || answer_id <= latest_answer_id) { | ||
| log::warn!( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question: Should this be logged at warn level? |
||
| "ignoring stale answer: answer_id={} latest_offer_id={} latest_answer_id={}", | ||
| answer_id, | ||
| latest_offer_id, | ||
| latest_answer_id | ||
| ); | ||
| return Ok(()); | ||
| } | ||
| if answer_id > 0 { | ||
| self.latest_answer_id.store(answer_id, Ordering::Release); | ||
| } | ||
| log::debug!("received publisher answer: {:?}", answer); | ||
|
|
||
| // Store mid_to_track_id mapping in single PC mode | ||
|
|
@@ -1105,34 +1137,29 @@ impl SessionInner { | |
| self: &Arc<Self>, | ||
| req: proto::MediaSectionsRequirement, | ||
| ) -> EngineResult<()> { | ||
| log::debug!( | ||
| "MediaSectionsRequirement: adding {} audio and {} video recvonly transceivers", | ||
| req.num_audios, | ||
| req.num_videos | ||
| ); | ||
|
|
||
| let recvonly_init = RtpTransceiverInit { | ||
| direction: RtpTransceiverDirection::RecvOnly, | ||
| stream_ids: Vec::new(), | ||
| send_encodings: Vec::new(), | ||
| }; | ||
|
|
||
| // Add audio transceivers | ||
| for _ in 0..req.num_audios { | ||
| self.publisher_pc | ||
| .peer_connection() | ||
| .add_transceiver_for_media(MediaType::Audio, recvonly_init.clone())?; | ||
| // Coalesce duplicate requirements while waiting for the current answer. | ||
| if self.negotiation_queue.waiting_for_answer.load(Ordering::Acquire) { | ||
| let prev_audio = self.pending_media_req_audio.fetch_max(req.num_audios, Ordering::AcqRel); | ||
| let prev_video = self.pending_media_req_video.fetch_max(req.num_videos, Ordering::AcqRel); | ||
| log::debug!( | ||
| "coalescing media requirement while waiting for answer: req={{audio:{}, video:{}}} pending={{audio:{}->{}, video:{}->{}}}", | ||
| req.num_audios, req.num_videos, | ||
| prev_audio, prev_audio.max(req.num_audios), | ||
| prev_video, prev_video.max(req.num_videos) | ||
| ); | ||
| // Mirror JS renegotiate behavior: guarantee one more negotiation pass | ||
| // after the in-flight offer/answer completes. | ||
| self.publisher_negotiation_needed(); | ||
| return Ok(()); | ||
| } | ||
|
|
||
| // Add video transceivers | ||
| for _ in 0..req.num_videos { | ||
| self.publisher_pc | ||
| .peer_connection() | ||
| .add_transceiver_for_media(MediaType::Video, recvonly_init.clone())?; | ||
| } | ||
| let (add_audio, add_video) = | ||
| self.reconcile_recv_media_sections(req.num_audios, req.num_videos)?; | ||
|
|
||
| // Trigger renegotiation | ||
| self.publisher_negotiation_needed(); | ||
| if add_audio > 0 || add_video > 0 { | ||
| // Trigger renegotiation only when capacity changed. | ||
| self.publisher_negotiation_needed(); | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
@@ -1183,13 +1210,26 @@ impl SessionInner { | |
| } | ||
| } | ||
| RtcEvent::Offer { offer, target: _ } => { | ||
| // If requirements arrived while we were waiting for answer, apply them once here. | ||
| self.apply_pending_media_sections_requirement()?; | ||
| // Newly added recvonly slots are now reflected in this offer. | ||
| self.pending_recv_slots_audio.store(0, Ordering::Release); | ||
| self.pending_recv_slots_video.store(0, Ordering::Release); | ||
| // Send the publisher offer to the server | ||
| log::debug!("sending publisher offer: {:?}", offer); | ||
| let mapping_size = self.mid_to_track_id.lock().len(); | ||
| let offer_id = self.next_offer_id.fetch_add(1, Ordering::AcqRel); | ||
| self.latest_offer_id.store(offer_id, Ordering::Release); | ||
| log::debug!( | ||
| "sending publisher offer: offer_id={} sdp_len={} cached_mid_to_track_id_count={}", | ||
| offer_id, | ||
| offer.to_string().len(), | ||
| mapping_size | ||
| ); | ||
| self.signal_client | ||
| .send(proto::signal_request::Message::Offer(proto::SessionDescription { | ||
| r#type: "offer".to_string(), | ||
| sdp: offer.to_string(), | ||
| id: 0, | ||
| id: offer_id, | ||
| mid_to_track_id: Default::default(), | ||
| })) | ||
| .await; | ||
|
|
@@ -1241,6 +1281,111 @@ impl SessionInner { | |
| Ok(()) | ||
| } | ||
|
|
||
| fn apply_pending_media_sections_requirement(&self) -> EngineResult<()> { | ||
| let pending_audio = self.pending_media_req_audio.swap(0, Ordering::AcqRel); | ||
| let pending_video = self.pending_media_req_video.swap(0, Ordering::AcqRel); | ||
| if pending_audio == 0 && pending_video == 0 { | ||
| return Ok(()); | ||
| } | ||
| log::debug!( | ||
| "applying coalesced media requirements before offer: audio={} video={}", | ||
| pending_audio, | ||
| pending_video | ||
| ); | ||
| self.reconcile_recv_media_sections(pending_audio, pending_video)?; | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn reconcile_recv_media_sections( | ||
| &self, | ||
| requested_audio: u32, | ||
| requested_video: u32, | ||
| ) -> EngineResult<(u32, u32)> { | ||
| let (available_audio, available_video) = self.available_recv_slots_by_kind(); | ||
| let pending_audio = self.pending_recv_slots_audio.load(Ordering::Acquire); | ||
| let pending_video = self.pending_recv_slots_video.load(Ordering::Acquire); | ||
| let add_audio = | ||
| requested_audio.saturating_sub(available_audio.saturating_add(pending_audio)); | ||
| let add_video = | ||
| requested_video.saturating_sub(available_video.saturating_add(pending_video)); | ||
|
|
||
| log::debug!( | ||
| "reconciling recv media sections: requested={{audio:{}, video:{}}} available={{audio:{}, video:{}}} pending={{audio:{}, video:{}}} will_add={{audio:{}, video:{}}}", | ||
| requested_audio, requested_video, | ||
| available_audio, available_video, | ||
| pending_audio, pending_video, | ||
| add_audio, add_video | ||
| ); | ||
|
|
||
| if add_audio == 0 && add_video == 0 { | ||
| return Ok((0, 0)); | ||
| } | ||
|
|
||
| let recvonly_init = RtpTransceiverInit { | ||
| direction: RtpTransceiverDirection::RecvOnly, | ||
| stream_ids: Vec::new(), | ||
| send_encodings: Vec::new(), | ||
| }; | ||
| for _ in 0..add_audio { | ||
| self.publisher_pc | ||
| .peer_connection() | ||
| .add_transceiver_for_media(MediaType::Audio, recvonly_init.clone())?; | ||
| } | ||
| for _ in 0..add_video { | ||
| self.publisher_pc | ||
| .peer_connection() | ||
| .add_transceiver_for_media(MediaType::Video, recvonly_init.clone())?; | ||
| } | ||
| if add_audio > 0 { | ||
| self.pending_recv_slots_audio.fetch_add(add_audio, Ordering::AcqRel); | ||
| } | ||
| if add_video > 0 { | ||
| self.pending_recv_slots_video.fetch_add(add_video, Ordering::AcqRel); | ||
| } | ||
| Ok((add_audio, add_video)) | ||
| } | ||
|
|
||
| fn available_recv_slots_by_kind(&self) -> (u32, u32) { | ||
| let mapping = self.mid_to_track_id.lock(); | ||
| let transceivers = self.publisher_pc.peer_connection().transceivers(); | ||
| let mut audio = 0u32; | ||
| let mut video = 0u32; | ||
|
|
||
| for t in transceivers { | ||
| let direction = t.direction(); | ||
| let current_direction = t.current_direction(); | ||
|
|
||
| // Check if transceiver is recv-capable: | ||
| // - Explicitly set to RecvOnly, or | ||
| // - Currently negotiated as RecvOnly (including munged inactive->recvonly) | ||
| let recv_capable = direction == RtpTransceiverDirection::RecvOnly | ||
| || current_direction == Some(RtpTransceiverDirection::RecvOnly); | ||
|
|
||
| if !recv_capable { | ||
| continue; | ||
| } | ||
|
|
||
| // Slot is occupied if server already mapped this MID to a concrete track. | ||
| if let Some(mid) = t.mid() { | ||
| if mapping.contains_key(&mid) { | ||
| continue; | ||
| } | ||
| } | ||
|
|
||
| // Only count transceivers that have tracks. Newly created transceivers without | ||
| // tracks yet are tracked via pending_recv_slots_* counters until negotiation. | ||
| let Some(track) = t.receiver().track() else { | ||
| continue; | ||
| }; | ||
| match track { | ||
| MediaStreamTrack::Audio(_) => audio += 1, | ||
| MediaStreamTrack::Video(_) => video += 1, | ||
| } | ||
| } | ||
|
|
||
| (audio, video) | ||
| } | ||
|
|
||
| fn emit_incoming_packet( | ||
| &self, | ||
| kind: DataPacketKind, | ||
|
|
@@ -1295,19 +1440,66 @@ impl SessionInner { | |
| } | ||
| proto::data_packet::Value::RpcRequest(rpc_request) => { | ||
| let caller_identity = participant_identity; | ||
| self.emitter.send(SessionEvent::RpcRequest { | ||
| caller_identity, | ||
| request_id: rpc_request.id.clone(), | ||
| method: rpc_request.method, | ||
| payload: rpc_request.payload, | ||
| response_timeout: Duration::from_millis(rpc_request.response_timeout_ms as u64), | ||
| version: rpc_request.version, | ||
| }) | ||
| // Prefer compressed payload when present. | ||
| // If decompression fails, only fall back to plain payload when it is non-empty. | ||
| let payload = if !rpc_request.compressed_payload.is_empty() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question: Was adding this code in this PR intentional? |
||
| match decompress_rpc_payload_bytes(&rpc_request.compressed_payload) { | ||
| Ok(decompressed) => Some(decompressed), | ||
| Err(e) => { | ||
| if rpc_request.payload.is_empty() { | ||
| log::error!( | ||
| "Failed to decompress RPC request payload and plain payload is empty: {}", | ||
| e | ||
| ); | ||
| None | ||
| } else { | ||
| log::error!( | ||
| "Failed to decompress RPC request payload, falling back to plain payload: {}", | ||
| e | ||
| ); | ||
| Some(rpc_request.payload) | ||
| } | ||
| } | ||
| } | ||
| } else { | ||
| Some(rpc_request.payload) | ||
| }; | ||
| if let Some(payload) = payload { | ||
| self.emitter.send(SessionEvent::RpcRequest { | ||
| caller_identity, | ||
| request_id: rpc_request.id.clone(), | ||
| method: rpc_request.method, | ||
| payload, | ||
| response_timeout: Duration::from_millis( | ||
| rpc_request.response_timeout_ms as u64, | ||
| ), | ||
| version: rpc_request.version, | ||
| }) | ||
| } else { | ||
| Ok(()) | ||
| } | ||
| } | ||
| proto::data_packet::Value::RpcResponse(rpc_response) => { | ||
| let (payload, error) = match rpc_response.value { | ||
| None => (None, None), | ||
| Some(proto::rpc_response::Value::Payload(payload)) => (Some(payload), None), | ||
| Some(proto::rpc_response::Value::CompressedPayload(compressed)) => { | ||
| match decompress_rpc_payload_bytes(&compressed) { | ||
| Ok(decompressed) => (Some(decompressed), None), | ||
| Err(e) => { | ||
| log::error!("Failed to decompress RPC response payload: {}", e); | ||
| ( | ||
| None, | ||
| Some(proto::RpcError { | ||
| code: RpcErrorCode::ApplicationError as u32, | ||
| message: "Failed to decompress RPC response payload" | ||
| .to_string(), | ||
| data: e, | ||
| }), | ||
| ) | ||
| } | ||
| } | ||
| } | ||
| Some(proto::rpc_response::Value::Error(err)) => (None, Some(err)), | ||
| }; | ||
| self.emitter.send(SessionEvent::RpcResponse { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: I don’t have a strong opinion here, but since this adds a bunch of related fields, it might make sense to group them into their own struct (e.g., SDPExchangeState) and include that here instead. This would add no overhead and could help clarify that these fields are conceptually related.