Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion orb-dogd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ publish = false
testing = []

[dependencies]
dogstatsd.workspace = true
eyre.workspace = true
flume.workspace = true
thiserror.workspace = true
Expand Down
191 changes: 142 additions & 49 deletions orb-dogd/src/dd.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use dogstatsd::Client;
use flume::Sender;
use flume::TrySendError;
use std::os::fd::OwnedFd;
use std::os::unix::net::UnixDatagram;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::{fs, path::Path, time::Duration};
use tracing::warn;
Expand All @@ -10,10 +12,12 @@ use super::{MetricEmitter, MetricError};

pub struct DogstatsdClient {
tx: Sender<Metric>,
connection: Arc<Connection>,
}

const DOGSTATSD_SOCKET_PATH: &str = "/run/datadog/dsd.socket";
const DOGSTATSD_BACKOFF: Duration = Duration::from_secs(3);
const CHANNEL_CAPACITY: usize = 512;

#[derive(Debug, Clone, PartialEq)]
pub(crate) enum Metric {
Expand Down Expand Up @@ -56,32 +60,58 @@ pub(crate) enum Metric {
},
}

/// Holds the connected dsd socket once the background retry loop has
/// reached the daemon. `wait_for_connection` blocks on the condvar until
/// the bg thread publishes a connected socket here.
struct Connection {
socket: Mutex<Option<UnixDatagram>>,
cv: Condvar,
}

impl Connection {
fn new() -> Self {
Self {
socket: Mutex::new(None),
cv: Condvar::new(),
}
}

fn publish(&self, socket: UnixDatagram) {
let mut guard = self.socket.lock().expect("connection mutex poisoned");
*guard = Some(socket);
self.cv.notify_all();
}
}

impl Default for DogstatsdClient {
fn default() -> Self {
Self::new()
}
}

impl DogstatsdClient {
/// Connect to the local statsd collector.
/// Connect to the local statsd collector at `/run/datadog/dsd.socket`.
///
/// Fails if the underlying socket cannot be bound.
/// Returns immediately. A background thread opens the socket with retry
/// and then drains the metric channel. The channel is bounded; once it
/// fills (typically when the daemon is unreachable for an extended
/// period), new emissions are dropped via `try_send` rather than
/// blocking the caller.
pub fn new() -> Self {
let (tx, rx) = flume::bounded(512);
let (tx, rx) = flume::bounded(CHANNEL_CAPACITY);
let connection = Arc::new(Connection::new());

let conn = Arc::clone(&connection);
thread::spawn(move || {
let client = loop {
let socket = loop {
let err_msg =
if fs::exists(Path::new(DOGSTATSD_SOCKET_PATH)).unwrap_or(false) {
info!("datadog-agent socket found, using it for IPC");

let opts = dogstatsd::OptionsBuilder::new()
.socket_path(Some(DOGSTATSD_SOCKET_PATH.to_string()))
.build();

match Client::new(opts) {
Ok(client) => break client,
Err(e) => format!("failed to create DD client {e}"),
match try_connect(DOGSTATSD_SOCKET_PATH) {
Ok(socket) => {
info!("datadog-agent socket found, using it for IPC");
break socket;
}
Err(e) => format!("failed to connect to DD daemon: {e}"),
}
} else {
format!("{DOGSTATSD_SOCKET_PATH} not found")
Expand All @@ -95,39 +125,64 @@ impl DogstatsdClient {
thread::sleep(DOGSTATSD_BACKOFF);
};

while let Ok(metric) = rx.recv() {
match metric {
Metric::Count { stat, val, tags } => {
if let Err(e) = client.count(stat, val, tags) {
warn!("emitting metric failed with: {e}");
}
}
Metric::Gauge { stat, val, tags } => {
if let Err(e) = client.gauge(stat, val.to_string(), tags) {
warn!("emitting metric failed with: {e}");
}
}
Metric::Histogram { stat, val, tags } => {
if let Err(e) = client.histogram(stat, val.to_string(), tags) {
warn!("emitting metric failed with: {e}");
}
}
Metric::Distribution { stat, val, tags } => {
if let Err(e) = client.distribution(stat, val.to_string(), tags)
{
warn!("emitting metric failed with: {e}");
}
}
Metric::Timing { stat, val, tags } => {
if let Err(e) = client.timing(stat, val, tags) {
warn!("emitting metric failed with: {e}");
}
}
}
// Publish a clone of the connected socket so external callers
// (e.g. orb-core's ProcessInitializer) can dup the FD and pass
// it to sandboxed subprocesses, which cannot connect from
// inside a separate network namespace.
match socket.try_clone() {
Ok(shared) => conn.publish(shared),
Err(e) => error!("failed to clone dsd socket for sharing: {e}"),
}

emit_loop(rx, socket);
});

Self { tx }
Self { tx, connection }
}

/// Build a client that emits over an already-connected Unix datagram
/// socket inherited from a parent process. Used by sandboxed
/// subprocesses (network namespace) that cannot `connect()` to the dsd
/// socket themselves. No retry; emit failures are logged.
pub fn from_unix_datagram(socket: UnixDatagram) -> Self {
let (tx, rx) = flume::bounded(CHANNEL_CAPACITY);
let connection = Arc::new(Connection::new());

thread::spawn(move || {
emit_loop(rx, socket);
});

Self { tx, connection }
}

/// Block until the bg thread has connected to the dsd daemon, then
/// return a dup'd `OwnedFd` of the connected socket. The returned FD
/// can be passed across `fork()` so a subprocess in a separate network
/// namespace can still emit metrics via [`Self::from_unix_datagram`].
///
/// Returns `None` if the timeout elapses without a connection.
pub fn wait_for_connection(&self, timeout: Duration) -> Option<OwnedFd> {
let guard = self
.connection
.socket
.lock()
.expect("connection mutex poisoned");
let (guard, result) = self
.connection
.cv
.wait_timeout_while(guard, timeout, |s| s.is_none())
.expect("connection mutex poisoned");
if result.timed_out() {
return None;
}
let socket = guard.as_ref()?;
match socket.try_clone() {
Ok(cloned) => Some(OwnedFd::from(cloned)),
Err(e) => {
error!("failed to clone dsd socket for export: {e}");
None
}
}
}

fn emit(&self, metric: Metric) -> Result<(), MetricError> {
Expand All @@ -139,6 +194,48 @@ impl DogstatsdClient {
}
}

fn try_connect(path: &str) -> std::io::Result<UnixDatagram> {
let socket = UnixDatagram::unbound()?;
socket.connect(path)?;
Ok(socket)
}

fn emit_loop(rx: flume::Receiver<Metric>, socket: UnixDatagram) {
while let Ok(metric) = rx.recv() {
let payload = format_metric(&metric);
if let Err(e) = socket.send(payload.as_bytes()) {
warn!("emitting metric failed with: {e}");
}
}
}

fn format_metric(metric: &Metric) -> String {
match metric {
Metric::Count { stat, val, tags } => format_payload(stat, val, "c", tags),
Metric::Gauge { stat, val, tags } => format_payload(stat, val, "g", tags),
Metric::Histogram { stat, val, tags } => format_payload(stat, val, "h", tags),
Metric::Distribution { stat, val, tags } => format_payload(stat, val, "d", tags),
Metric::Timing { stat, val, tags } => format_payload(stat, val, "ms", tags),
}
}

// Tags are joined as-is. Tags containing `,` or `|` will be misparsed by
// the daemon; this matches the prior behavior with the `dogstatsd-rs`
// crate, which also does no escaping. Callers are responsible for keeping
// tag strings free of these characters.
fn format_payload<V: std::fmt::Display>(
stat: &str,
val: V,
ty: &str,
tags: &[String],
) -> String {
if tags.is_empty() {
format!("{stat}:{val}|{ty}")
} else {
format!("{stat}:{val}|{ty}|#{}", tags.join(","))
}
}

impl MetricEmitter for DogstatsdClient {
fn count<S, I>(&self, stat: S, val: i64, tags: I) -> Result<(), MetricError>
where
Expand Down Expand Up @@ -215,10 +312,6 @@ impl MetricEmitter for DogstatsdClient {
val,
tags: tags.into_iter().map(Into::into).collect(),
};
self.tx
.send(metric)
.map_err(|_| eyre::eyre!("metrics worker has died"))?;

Ok(())
self.emit(metric)
}
}
1 change: 0 additions & 1 deletion orb-dogd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

mod dd;
pub use dd::DogstatsdClient;
pub use dogstatsd::DogstatsdError;

#[cfg(any(test, feature = "testing"))]
pub mod test;
Expand Down
Loading