Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libdd-data-pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ mod health_metrics;
mod pausable_worker;
#[allow(missing_docs)]
pub mod stats_exporter;
pub mod telemetry;
pub(crate) mod telemetry;
#[allow(missing_docs)]
pub mod trace_exporter;
24 changes: 20 additions & 4 deletions libdd-data-pipeline/src/telemetry/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ pub enum MetricKind {
ApiResponses,
/// trace_chunks_sent metric
ChunksSent,
/// trace_chunks_dropped metric
ChunksDropped,
/// trace_chunks_dropped metric (reason: p0_drop)
ChunksDroppedP0,
/// trace_chunks_dropped metric (reason: serialization_error)
ChunksDroppedSerializationError,
/// trace_chunks_dropped metric (reason: send_failure)
ChunksDroppedSendFailure,
}

/// Constants for metric names
Expand Down Expand Up @@ -96,13 +100,25 @@ const METRICS: &[Metric] = &[
name: CHUNKS_DROPPED_STR,
metric_type: MetricType::Count,
namespace: MetricNamespace::Tracers,
tags: &[tag!["src_library", "libdatadog"]],
tags: &[tag!["src_library", "libdatadog"], tag!["reason", "p0_drop"]],
},
Metric {
name: CHUNKS_DROPPED_STR,
metric_type: MetricType::Count,
namespace: MetricNamespace::Tracers,
tags: &[tag!["src_library", "libdatadog"]],
tags: &[
tag!["src_library", "libdatadog"],
tag!["reason", "serialization_error"],
],
},
Metric {
name: CHUNKS_DROPPED_STR,
metric_type: MetricType::Count,
namespace: MetricNamespace::Tracers,
tags: &[
tag!["src_library", "libdatadog"],
tag!["reason", "send_failure"],
],
},
];

Expand Down
192 changes: 150 additions & 42 deletions libdd-data-pipeline/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ pub struct SendPayloadTelemetry {
errors_status_code: u64,
bytes_sent: u64,
chunks_sent: u64,
chunks_dropped: u64,
chunks_dropped_p0: u64,
chunks_dropped_serialization_error: u64,
chunks_dropped_send_failure: u64,
responses_count_per_code: HashMap<u16, u64>,
}

Expand All @@ -160,16 +162,31 @@ impl From<&SendDataResult> for SendPayloadTelemetry {
errors_status_code: value.errors_status_code,
bytes_sent: value.bytes_sent,
chunks_sent: value.chunks_sent,
chunks_dropped: value.chunks_dropped,
chunks_dropped_send_failure: value.chunks_dropped,
responses_count_per_code: value.responses_count_per_code.clone(),
..Default::default()
}
}
}

impl SendPayloadTelemetry {
/// Create a [`SendPayloadTelemetry`] from a [`SendWithRetryResult`].
pub fn from_retry_result(value: &SendWithRetryResult, bytes_sent: u64, chunks: u64) -> Self {
let mut telemetry = Self::default();
///
/// # Arguments
/// * `value` - The result of sending traces with retry
/// * `bytes_sent` - The number of bytes in the payload
/// * `chunks` - The number of trace chunks in the payload
/// * `chunks_dropped_p0` - The number of P0 trace chunks dropped due to sampling
pub fn from_retry_result(
value: &SendWithRetryResult,
bytes_sent: u64,
chunks: u64,
chunks_dropped_p0: u64,
) -> Self {
let mut telemetry = Self {
chunks_dropped_p0,
..Default::default()
};
match value {
Ok((response, attempts)) => {
telemetry.chunks_sent = chunks;
Expand All @@ -179,29 +196,30 @@ impl SendPayloadTelemetry {
.insert(response.status().into(), 1);
telemetry.requests_count = *attempts as u64;
}
Err(err) => {
telemetry.chunks_dropped = chunks;
match err {
SendWithRetryError::Http(response, attempts) => {
telemetry.errors_status_code = 1;
telemetry
.responses_count_per_code
.insert(response.status().into(), 1);
telemetry.requests_count = *attempts as u64;
}
SendWithRetryError::Timeout(attempts) => {
telemetry.errors_timeout = 1;
telemetry.requests_count = *attempts as u64;
}
SendWithRetryError::Network(_, attempts) => {
telemetry.errors_network = 1;
telemetry.requests_count = *attempts as u64;
}
SendWithRetryError::Build(attempts) => {
telemetry.requests_count = *attempts as u64;
}
Err(err) => match err {
SendWithRetryError::Http(response, attempts) => {
telemetry.chunks_dropped_send_failure = chunks;
telemetry.errors_status_code = 1;
telemetry
.responses_count_per_code
.insert(response.status().into(), 1);
telemetry.requests_count = *attempts as u64;
}
}
SendWithRetryError::Timeout(attempts) => {
telemetry.chunks_dropped_send_failure = chunks;
telemetry.errors_timeout = 1;
telemetry.requests_count = *attempts as u64;
}
SendWithRetryError::Network(_, attempts) => {
telemetry.chunks_dropped_send_failure = chunks;
telemetry.errors_network = 1;
telemetry.requests_count = *attempts as u64;
}
SendWithRetryError::Build(attempts) => {
telemetry.chunks_dropped_serialization_error = chunks;
telemetry.requests_count = *attempts as u64;
}
},
};
telemetry
}
Expand Down Expand Up @@ -243,10 +261,24 @@ impl TelemetryClient {
self.worker
.add_point(data.chunks_sent as f64, key, vec![])?;
}
if data.chunks_dropped > 0 {
let key = self.metrics.get(metrics::MetricKind::ChunksDropped);
if data.chunks_dropped_p0 > 0 {
let key = self.metrics.get(metrics::MetricKind::ChunksDroppedP0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These lines seem to miss coverage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, added some more tests.

self.worker
.add_point(data.chunks_dropped_p0 as f64, key, vec![])?;
}
if data.chunks_dropped_serialization_error > 0 {
let key = self
.metrics
.get(metrics::MetricKind::ChunksDroppedSerializationError);
self.worker
.add_point(data.chunks_dropped as f64, key, vec![])?;
.add_point(data.chunks_dropped_serialization_error as f64, key, vec![])?;
}
if data.chunks_dropped_send_failure > 0 {
let key = self
.metrics
.get(metrics::MetricKind::ChunksDroppedSendFailure);
self.worker
.add_point(data.chunks_dropped_send_failure as f64, key, vec![])?;
}
if !data.responses_count_per_code.is_empty() {
let key = self.metrics.get(metrics::MetricKind::ApiResponses);
Expand Down Expand Up @@ -550,8 +582,8 @@ mod tests {

#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn chunks_dropped_test() {
let payload = Regex::new(r#""metric":"trace_chunks_dropped","points":\[\[\d+,1\.0\]\],"tags":\["src_library:libdatadog"\],"common":true,"type":"count"#).unwrap();
async fn chunks_dropped_send_failure_test() {
let payload = Regex::new(r#""metric":"trace_chunks_dropped","points":\[\[\d+,1\.0\]\],"tags":\["src_library:libdatadog","reason:send_failure"\],"common":true,"type":"count"#).unwrap();
let server = MockServer::start_async().await;

let telemetry_srv = server
Expand All @@ -562,7 +594,65 @@ mod tests {
.await;

let data = SendPayloadTelemetry {
chunks_dropped: 1,
chunks_dropped_send_failure: 1,
..Default::default()
};

let client = get_test_client(&server.url("/")).await;

client.start().await;
let _ = client.send(&data);
client.shutdown().await;
while telemetry_srv.calls_async().await == 0 {
sleep(Duration::from_millis(10)).await;
}
telemetry_srv.assert_calls_async(1).await;
}

#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn chunks_dropped_p0_test() {
let payload = Regex::new(r#""metric":"trace_chunks_dropped","points":\[\[\d+,1\.0\]\],"tags":\["src_library:libdatadog","reason:p0_drop"\],"common":true,"type":"count"#).unwrap();
let server = MockServer::start_async().await;

let telemetry_srv = server
.mock_async(|when, then| {
when.method(POST).body_matches(payload);
then.status(200).body("");
})
.await;

let data = SendPayloadTelemetry {
chunks_dropped_p0: 1,
..Default::default()
};

let client = get_test_client(&server.url("/")).await;

client.start().await;
let _ = client.send(&data);
client.shutdown().await;
while telemetry_srv.calls_async().await == 0 {
sleep(Duration::from_millis(10)).await;
}
telemetry_srv.assert_calls_async(1).await;
}

#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn chunks_dropped_serialization_error_test() {
let payload = Regex::new(r#""metric":"trace_chunks_dropped","points":\[\[\d+,1\.0\]\],"tags":\["src_library:libdatadog","reason:serialization_error"\],"common":true,"type":"count"#).unwrap();
let server = MockServer::start_async().await;

let telemetry_srv = server
.mock_async(|when, then| {
when.method(POST).body_matches(payload);
then.status(200).body("");
})
.await;

let data = SendPayloadTelemetry {
chunks_dropped_serialization_error: 1,
..Default::default()
};

Expand All @@ -580,13 +670,30 @@ mod tests {
#[test]
fn telemetry_from_ok_response_test() {
let result = Ok((Response::default(), 3));
let telemetry = SendPayloadTelemetry::from_retry_result(&result, 4, 5);
let telemetry = SendPayloadTelemetry::from_retry_result(&result, 4, 5, 0);
assert_eq!(
telemetry,
SendPayloadTelemetry {
bytes_sent: 4,
chunks_sent: 5,
requests_count: 3,
responses_count_per_code: HashMap::from([(200, 1)]),
..Default::default()
}
)
}

#[test]
fn telemetry_from_ok_response_with_p0_drops_test() {
let result = Ok((Response::default(), 3));
let telemetry = SendPayloadTelemetry::from_retry_result(&result, 4, 5, 10);
assert_eq!(
telemetry,
SendPayloadTelemetry {
bytes_sent: 4,
chunks_sent: 5,
requests_count: 3,
chunks_dropped_p0: 10,
responses_count_per_code: HashMap::from([(200, 1)]),
..Default::default()
}
Expand All @@ -598,11 +705,11 @@ mod tests {
let mut error_response = Response::default();
*error_response.status_mut() = StatusCode::BAD_REQUEST;
let result = Err(SendWithRetryError::Http(error_response, 5));
let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2);
let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2, 0);
assert_eq!(
telemetry,
SendPayloadTelemetry {
chunks_dropped: 2,
chunks_dropped_send_failure: 2,
requests_count: 5,
errors_status_code: 1,
responses_count_per_code: HashMap::from([(400, 1)]),
Expand All @@ -621,11 +728,11 @@ mod tests {
.unwrap_err();

let result = Err(SendWithRetryError::Network(hyper_error, 5));
let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2);
let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2, 0);
assert_eq!(
telemetry,
SendPayloadTelemetry {
chunks_dropped: 2,
chunks_dropped_send_failure: 2,
requests_count: 5,
errors_network: 1,
..Default::default()
Expand All @@ -636,11 +743,11 @@ mod tests {
#[test]
fn telemetry_from_timeout_error_test() {
let result = Err(SendWithRetryError::Timeout(5));
let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2);
let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2, 0);
assert_eq!(
telemetry,
SendPayloadTelemetry {
chunks_dropped: 2,
chunks_dropped_send_failure: 2,
requests_count: 5,
errors_timeout: 1,
..Default::default()
Expand All @@ -652,11 +759,11 @@ mod tests {
#[tokio::test]
async fn telemetry_from_build_error_test() {
let result = Err(SendWithRetryError::Build(5));
let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2);
let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2, 0);
assert_eq!(
telemetry,
SendPayloadTelemetry {
chunks_dropped: 2,
chunks_dropped_serialization_error: 2,
requests_count: 5,
..Default::default()
}
Expand Down Expand Up @@ -684,8 +791,9 @@ mod tests {
errors_status_code: 3,
bytes_sent: 4,
chunks_sent: 5,
chunks_dropped: 6,
chunks_dropped_send_failure: 6,
responses_count_per_code: HashMap::from([(200, 3)]),
..Default::default()
};

assert_eq!(SendPayloadTelemetry::from(&result), expected_telemetry)
Expand Down
5 changes: 4 additions & 1 deletion libdd-data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ impl TraceExporter {
mp_payload: Vec<u8>,
headers: HashMap<&'static str, String>,
chunks: usize,
chunks_dropped_p0: usize,
) -> Result<AgentResponse, TraceExporterError> {
let strategy = RetryStrategy::default();
let payload_len = mp_payload.len();
Expand Down Expand Up @@ -679,6 +680,7 @@ impl TraceExporter {
&result,
payload_len as u64,
chunks as u64,
chunks_dropped_p0 as u64,
)) {
error!(?e, "Error sending telemetry");
}
Expand All @@ -694,7 +696,7 @@ impl TraceExporter {
let mut header_tags: TracerHeaderTags = self.metadata.borrow().into();

// Process stats computation
stats::process_traces_for_stats(
let dropped_p0_stats = stats::process_traces_for_stats(
&mut traces,
&mut header_tags,
&self.client_side_stats,
Expand All @@ -717,6 +719,7 @@ impl TraceExporter {
prepared.data,
prepared.headers,
prepared.chunk_count,
dropped_p0_stats.dropped_p0_traces,
)
.await
}
Expand Down
Loading
Loading