Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 90 additions & 59 deletions crates/datadog-trace-agent/src/mini_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ impl MiniAgent {
// start our trace flusher. receives trace payloads and handles buffering + deciding when to
// flush to backend.
let trace_flusher = self.trace_flusher.clone();
let trace_flusher_handle = tokio::spawn(async move {
#[allow(unused_mut)]
let mut trace_flusher_handle = tokio::spawn(async move {
trace_flusher.start_trace_flusher(trace_rx).await;
});

Expand All @@ -84,7 +85,8 @@ impl MiniAgent {
// start our stats flusher.
let stats_flusher = self.stats_flusher.clone();
let stats_config = self.config.clone();
let stats_flusher_handle = tokio::spawn(async move {
#[allow(unused_mut)]
let mut stats_flusher_handle = tokio::spawn(async move {
stats_flusher
.start_stats_flusher(stats_config, stats_rx)
.await;
Expand Down Expand Up @@ -128,53 +130,73 @@ impl MiniAgent {
)
});

// Determine which transport to use based on configuration
// Determine which transports to use based on configuration
#[cfg(any(all(windows, feature = "windows-pipes"), test))]
let pipe_name_opt = self.config.dd_apm_windows_pipe_name.as_ref();
#[cfg(not(any(all(windows, feature = "windows-pipes"), test)))]
let pipe_name_opt: Option<&String> = None;

if let Some(pipe_name) = pipe_name_opt {
debug!("Mini Agent started: listening on named pipe {}", pipe_name);
} else {
debug!(
"Mini Agent started: listening on port {}",
self.config.dd_apm_receiver_port
);
}
debug!(
"Time taken to start the Mini Agent: {} ms",
now.elapsed().as_millis()
);

// Always start TCP listener
let addr = SocketAddr::from(([127, 0, 0, 1], self.config.dd_apm_receiver_port));
let tcp_listener = tokio::net::TcpListener::bind(&addr).await?;
debug!(
"Mini Agent listening on TCP port {}",
self.config.dd_apm_receiver_port
);

if let Some(pipe_name) = pipe_name_opt {
// Windows named pipe transport
// Both TCP and named pipe transports
debug!("Mini Agent also listening on named pipe {}", pipe_name);

#[cfg(all(windows, feature = "windows-pipes"))]
{
Self::serve_named_pipe(
pipe_name,
service,
trace_flusher_handle,
stats_flusher_handle,
)
.await?;
let tcp_service = service.clone();
let pipe_service = service;

let mut tcp_handle =
tokio::spawn(Self::serve_accept_loop_tcp(tcp_listener, tcp_service));

let mut pipe_handle = tokio::spawn(Self::serve_accept_loop_named_pipe(
pipe_name.clone(),
pipe_service,
));

// Monitor all tasks — if any critical task dies, shut down
tokio::select! {
result = &mut tcp_handle => {
error!("TCP accept loop died: {:?}", result);
return Err("TCP accept loop terminated unexpectedly".into());
},
result = &mut pipe_handle => {
error!("Named pipe accept loop died: {:?}", result);
return Err("Named pipe accept loop terminated unexpectedly".into());
},
result = &mut trace_flusher_handle => {
error!("Trace flusher task died: {:?}", result);
return Err("Trace flusher task terminated unexpectedly".into());
},
result = &mut stats_flusher_handle => {
error!("Stats flusher task died: {:?}", result);
return Err("Stats flusher task terminated unexpectedly".into());
},
}
}
#[cfg(not(all(windows, feature = "windows-pipes")))]
{
let _ = pipe_name; // Suppress unused variable warning
let _ = pipe_name;
unreachable!(
"Named pipes are only supported on Windows with the windows-pipes feature \
enabled, cannot use pipe: {}.",
pipe_name
"Named pipes are only supported on Windows with the windows-pipes feature enabled."
);
}
} else {
// TCP transport
let addr = SocketAddr::from(([127, 0, 0, 1], self.config.dd_apm_receiver_port));
let listener = tokio::net::TcpListener::bind(&addr).await?;

// TCP-only transport
Self::serve_tcp(
listener,
tcp_listener,
service,
trace_flusher_handle,
stats_flusher_handle,
Expand All @@ -191,6 +213,38 @@ impl MiniAgent {
mut trace_flusher_handle: tokio::task::JoinHandle<()>,
mut stats_flusher_handle: tokio::task::JoinHandle<()>,
) -> Result<(), Box<dyn std::error::Error>>
where
S: hyper::service::Service<
hyper::Request<hyper::body::Incoming>,
Response = hyper::Response<hyper_migration::Body>,
> + Clone
+ Send
+ 'static,
S::Future: Send,
S::Error: std::error::Error + Send + Sync + 'static,
{
let mut tcp_handle = tokio::spawn(Self::serve_accept_loop_tcp(listener, service));

tokio::select! {
result = &mut tcp_handle => {
error!("TCP accept loop died: {:?}", result);
return Err("TCP accept loop terminated unexpectedly".into());
},
result = &mut trace_flusher_handle => {
error!("Trace flusher task died: {:?}", result);
return Err("Trace flusher task terminated unexpectedly".into());
},
result = &mut stats_flusher_handle => {
error!("Stats flusher task died: {:?}", result);
return Err("Stats flusher task terminated unexpectedly".into());
},
}
}

async fn serve_accept_loop_tcp<S>(
listener: tokio::net::TcpListener,
service: S,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
S: hyper::service::Service<
hyper::Request<hyper::body::Incoming>,
Expand Down Expand Up @@ -218,7 +272,7 @@ impl MiniAgent {
continue;
}
Err(e) => {
error!("Server error: {e}");
error!("TCP server error: {e}");
return Err(e.into());
}
Ok((conn, _)) => conn,
Expand All @@ -234,34 +288,24 @@ impl MiniAgent {
},
Ok(()) | Err(_) => continue,
},
// If there's some error in the background tasks, we can't send data
result = &mut trace_flusher_handle => {
error!("Trace flusher task died: {:?}", result);
return Err("Trace flusher task terminated unexpectedly".into());
},
result = &mut stats_flusher_handle => {
error!("Stats flusher task died: {:?}", result);
return Err("Stats flusher task terminated unexpectedly".into());
},
};
let conn = hyper_util::rt::TokioIo::new(conn);
let server = server.clone();
let service = service.clone();
joinset.spawn(async move {
if let Err(e) = server.serve_connection(conn, service).await {
error!("Connection error: {e}");
error!("TCP connection error: {e}");
}
});
}
}

/// Named pipe accept loop without flusher monitoring, for use when running alongside TCP.
#[cfg(all(windows, feature = "windows-pipes"))]
async fn serve_named_pipe<S>(
pipe_name: &str,
async fn serve_accept_loop_named_pipe<S>(
pipe_name: String,
service: S,
mut trace_flusher_handle: tokio::task::JoinHandle<()>,
mut stats_flusher_handle: tokio::task::JoinHandle<()>,
) -> Result<(), Box<dyn std::error::Error>>
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
S: hyper::service::Service<
hyper::Request<hyper::body::Incoming>,
Expand All @@ -276,9 +320,7 @@ impl MiniAgent {
let mut joinset = tokio::task::JoinSet::new();

loop {
// Create a new pipe instance
// pipe_name already includes \\.\pipe\ prefix from config
let pipe = match ServerOptions::new().create(pipe_name) {
let pipe = match ServerOptions::new().create(&pipe_name) {
Ok(pipe) => {
debug!("Created pipe server instance '{}' in byte mode", pipe_name);
pipe
Expand All @@ -289,7 +331,6 @@ impl MiniAgent {
}
};

// Wait for client connection
let conn = tokio::select! {
connect_res = pipe.connect() => match connect_res {
Err(e)
Expand Down Expand Up @@ -322,24 +363,14 @@ impl MiniAgent {
},
Ok(()) | Err(_) => continue,
},
// If there's some error in the background tasks, we can't send data
result = &mut trace_flusher_handle => {
error!("Trace flusher task died: {:?}", result);
return Err("Trace flusher task terminated unexpectedly".into());
},
result = &mut stats_flusher_handle => {
error!("Stats flusher task died: {:?}", result);
return Err("Stats flusher task terminated unexpectedly".into());
},
};

// Hyper http parser handles buffering pipe data
let conn = hyper_util::rt::TokioIo::new(conn);
let server = server.clone();
let service = service.clone();
joinset.spawn(async move {
if let Err(e) = server.serve_connection(conn, service).await {
error!("Connection error: {e}");
error!("Named pipe connection error: {e}");
}
});
}
Expand Down
Loading