Skip to content

Commit 66a959d

Browse files
fix(agent): address RDM message handling review feedback (#1647)
Addresses review comments on PR for RDM message handling and pipe passthrough logic. ## Safety & Correctness - **Integer overflow**: Use `saturating_mul` for timeout_ms calculation (prevents overflow when timeout_secs is large) - **Race condition**: Replace check-then-set pattern with `compare_exchange` in `process_app_start` to atomically transition Disconnected→Connecting - **State recovery**: Reset `connection_state` to Disconnected when `process_app_start_impl` fails, allowing retry - **Channel closure**: Explicitly handle `None` from `dvc_rx.recv()` to avoid busy loop when channel closes - **Path comparison**: Use `eq_ignore_ascii_case` for RDM process detection (Windows paths are case-insensitive) ## Code Quality - **Error visibility**: Log errors when READY notification fails instead of silent `let _ =` - **Documentation**: Clarify capabilities negotiation semantics (negotiated subset vs. downgraded) - **Magic numbers**: Document 0x700 offset in RDM MSI version encoding (empirically derived) - **Dead code**: Remove unused environment block creation and HashMap import ## Example: Race Condition Fix ```rust // Before: Non-atomic check-then-set allows duplicate spawns match current_state { Disconnected => { self.connection_state.store(Connecting, Ordering::Release); } } // After: Atomic transition prevents race loop { match current_state { Disconnected => { match self.connection_state.compare_exchange( Disconnected, Connecting, Ordering::AcqRel, Ordering::Acquire ) { Ok(_) => break, // Won the race Err(actual) => current_state = actual; continue, // Retry } } } } ``` <!-- START COPILOT CODING AGENT TIPS --> --- 💬 We'd love your input! Share your thoughts on Copilot coding agent in our [2 minute survey](https://gh.io/copilot-coding-agent-survey). --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: vnikonov-devolutions <246051166+vnikonov-devolutions@users.noreply.github.com>
1 parent 3146dc4 commit 66a959d

File tree

3 files changed

+73
-45
lines changed

3 files changed

+73
-45
lines changed

crates/devolutions-agent-shared/src/windows/registry.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ pub fn get_installed_product_version(
8787
})?;
8888

8989
// Convert encoded MSI version number to human-readable date.
90+
// The high byte encodes the year as an offset:
91+
// - Agent builds use a base year of 2000 (year = high_byte + 2000).
92+
// - RDM MSI packages use 0x700 (1792) as base, found empirically.
93+
// This offset must be preserved to correctly decode existing RDM installations.
9094
let short_year = match version_encoding {
9195
ProductVersionEncoding::Agent => (product_version >> 24) + 2000,
9296
ProductVersionEncoding::Rdm => (product_version >> 24) + 0x700,

devolutions-session/src/dvc/io.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ pub fn run_dvc_io(
3232
trace!("DVC channel opened");
3333

3434
// All DVC messages should be under CHANNEL_CHUNK_LENGTH size, but sometimes RDP stack
35-
// a few messages together; 128Kb buffer should be enough to hold a few dozen messages.
35+
// sends a few messages together; 128Kb buffer should be enough to hold a few dozen messages.
3636
let mut pdu_chunk_buffer = [0u8; 128 * 1024];
3737
let mut overlapped = OVERLAPPED::default();
3838
let mut bytes_read: u32 = 0;

devolutions-session/src/dvc/rdm.rs

Lines changed: 68 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use std::collections::HashMap;
21
use std::mem::size_of;
32
use std::sync::Arc;
43
use std::sync::atomic::{AtomicU8, Ordering};
@@ -89,7 +88,7 @@ impl RdmPipeConnection {
8988
/// The ready event is dropped after connection, allowing RDM to own it.
9089
async fn connect(timeout_secs: u32, ready_event: Event) -> anyhow::Result<Self> {
9190
let pipe_name = get_rdm_pipe_name()?;
92-
let timeout_ms = timeout_secs * 1000;
91+
let timeout_ms = timeout_secs.saturating_mul(1000);
9392

9493
info!(pipe_name, timeout_secs, "Waiting for RDM and connecting to pipe");
9594

@@ -207,7 +206,9 @@ fn validate_capset_response(message: NowMessage<'_>) -> anyhow::Result<NowChanne
207206

208207
/// Perform NOW protocol negotiation with RDM
209208
///
210-
/// Sends agent capabilities to RDM and receives RDM's downgraded capabilities.
209+
/// Sends the agent's proposed capabilities to RDM and receives RDM's negotiated
210+
/// capabilities for the connection (a final set that may be a downgraded subset
211+
/// of the agent's proposal and that both sides will use).
211212
/// This establishes the protocol version and capabilities for the connection.
212213
async fn negotiate_with_rdm(
213214
pipe: &mut RdmPipeConnection,
@@ -303,12 +304,20 @@ async fn run_rdm_to_dvc_passthrough(
303304
}
304305

305306
// Receive messages from channel and write to RDM pipe
306-
Some(message) = dvc_rx.recv() => {
307-
info!(pipe_name, "Forwarding message to RDM: {:?}", message);
308-
if let Err(error) = pipe.send_message(&message).await {
309-
error!(%error, pipe_name, "Failed to send message to RDM pipe");
310-
connection_state.store(RdmConnectionState::Disconnected.as_u8(), Ordering::Release);
311-
break;
307+
message_opt = dvc_rx.recv() => {
308+
match message_opt {
309+
Some(message) => {
310+
info!(pipe_name, "Forwarding message to RDM: {:?}", message);
311+
if let Err(error) = pipe.send_message(&message).await {
312+
error!(%error, pipe_name, "Failed to send message to RDM pipe");
313+
connection_state.store(RdmConnectionState::Disconnected.as_u8(), Ordering::Release);
314+
break;
315+
}
316+
}
317+
None => {
318+
info!(pipe_name, "DVC receiver channel closed; terminating RDM passthrough task");
319+
break;
320+
}
312321
}
313322
}
314323
}
@@ -420,25 +429,50 @@ impl RdmMessageProcessor {
420429
/// - If RDM is not started, launch RDM and start connection process.
421430
/// - Spawns a background task to handle the connection process.
422431
pub fn process_app_start(&mut self, rdm_app_start_msg: NowRdmAppStartMsg, agent_caps: NowChannelCapsetMsg) {
423-
let current_state = RdmConnectionState::from_u8(self.connection_state.load(Ordering::Acquire));
432+
let mut current_state = RdmConnectionState::from_u8(self.connection_state.load(Ordering::Acquire));
424433

425-
match current_state {
426-
RdmConnectionState::Ready => {
427-
info!("RDM already connected and ready, sending immediate READY notification");
428-
let dvc_tx = self.dvc_tx.clone();
429-
tokio::spawn(async move {
430-
let _ = send_rdm_app_notify(&dvc_tx, NowRdmAppState::READY, NowRdmReason::NOT_SPECIFIED).await;
431-
});
432-
return;
433-
}
434-
RdmConnectionState::Connecting => {
435-
info!("RDM connection already in progress, ignoring duplicate app_start");
436-
return;
437-
}
438-
RdmConnectionState::Disconnected => {
439-
info!("Starting RDM connection process");
440-
self.connection_state
441-
.store(RdmConnectionState::Connecting.as_u8(), Ordering::Release);
434+
// Ensure that the transition from Disconnected to Connecting is done atomically
435+
// so that only one task is spawned to handle app_start.
436+
loop {
437+
match current_state {
438+
RdmConnectionState::Ready => {
439+
info!("RDM already connected and ready, sending immediate READY notification");
440+
let dvc_tx = self.dvc_tx.clone();
441+
tokio::spawn(async move {
442+
if let Err(error) =
443+
send_rdm_app_notify(&dvc_tx, NowRdmAppState::READY, NowRdmReason::NOT_SPECIFIED).await
444+
{
445+
error!(%error, "Failed to send immediate RDM READY notification");
446+
}
447+
});
448+
return;
449+
}
450+
RdmConnectionState::Connecting => {
451+
info!("RDM connection already in progress, ignoring duplicate app_start");
452+
return;
453+
}
454+
RdmConnectionState::Disconnected => {
455+
info!("Starting RDM connection process");
456+
let disconnected = RdmConnectionState::Disconnected.as_u8();
457+
let connecting = RdmConnectionState::Connecting.as_u8();
458+
459+
match self.connection_state.compare_exchange(
460+
disconnected,
461+
connecting,
462+
Ordering::AcqRel,
463+
Ordering::Acquire,
464+
) {
465+
Ok(_) => {
466+
// Successfully claimed responsibility for starting the connection.
467+
break;
468+
}
469+
Err(actual) => {
470+
// State changed concurrently; re-evaluate the new state.
471+
current_state = RdmConnectionState::from_u8(actual);
472+
continue;
473+
}
474+
}
475+
}
442476
}
443477
}
444478

@@ -451,9 +485,11 @@ impl RdmMessageProcessor {
451485

452486
tokio::spawn(async move {
453487
if let Err(error) =
454-
process_app_start_impl(rdm_app_start_msg, agent_caps, dvc_tx, connection_state, rdm_rx).await
488+
process_app_start_impl(rdm_app_start_msg, agent_caps, dvc_tx, connection_state.clone(), rdm_rx).await
455489
{
456490
error!(%error, "RDM app start failed");
491+
// Ensure connection_state is reset so future app_start attempts are possible
492+
connection_state.store(RdmConnectionState::Disconnected.as_u8(), Ordering::Release);
457493
}
458494
});
459495
}
@@ -587,21 +623,6 @@ async fn launch_rdm_process(rdm_app_start_msg: &NowRdmAppStartMsg) -> anyhow::Re
587623
.to_string_lossy()
588624
.to_string();
589625

590-
let mut env_vars = HashMap::new();
591-
592-
if rdm_app_start_msg.is_fullscreen() {
593-
env_vars.insert("RDM_OPT_FULLSCREEN".to_owned(), "1".to_owned());
594-
info!("Starting RDM in fullscreen mode");
595-
}
596-
597-
if rdm_app_start_msg.is_jump_mode() {
598-
env_vars.insert("RDM_OPT_JUMP".to_owned(), "1".to_owned());
599-
info!("Starting RDM in jump mode");
600-
}
601-
602-
// Create environment block
603-
let _env_block = crate::dvc::env::make_environment_block(env_vars)?;
604-
605626
// Convert command line to wide string
606627
let current_dir = WideString::from(&install_location);
607628

@@ -693,7 +714,10 @@ fn is_rdm_running() -> bool {
693714
};
694715

695716
// Compare the full paths case-insensitively
696-
if exe_path == rdm_exe_path {
717+
if exe_path
718+
.to_string_lossy()
719+
.eq_ignore_ascii_case(&rdm_exe_path.to_string_lossy())
720+
{
697721
info!(
698722
rdm_path = %rdm_exe_path.display(),
699723
found_path = %exe_path.display(),

0 commit comments

Comments
 (0)