Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/sources/opentelemetry/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,13 @@ impl Service {
mut events: Vec<Event>,
log_name: &'static str,
) -> Result<(), Status> {
let count = events.len();
// When using OTLP decoding, count individual items within the batch
// to maintain consistency with other Vector sources
let count = if self.deserializer.is_some() {
super::count_otlp_items(&events)
} else {
events.len()
};
let byte_size = events.estimated_json_encoded_size_of();
self.events_received.emit(CountByteSize(count, byte_size));

Expand Down
4 changes: 3 additions & 1 deletion src/sources/opentelemetry/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,10 @@ fn parse_with_deserializer(
.map(|r| r.into_vec())
.map_err(emit_decode_error)?;

// Count individual items within OTLP batches for consistency with other sources
let count = super::count_otlp_items(&events);
events_received.emit(CountByteSize(
events.len(),
count,
events.estimated_json_encoded_size_of(),
));

Expand Down
65 changes: 65 additions & 0 deletions src/sources/opentelemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,68 @@ mod grpc;
mod http;
mod reply;
mod status;

use vector_lib::{
event::Event,
opentelemetry::proto::{
RESOURCE_LOGS_JSON_FIELD, RESOURCE_METRICS_JSON_FIELD, RESOURCE_SPANS_JSON_FIELD,
},
};
use vrl::value::Value;

fn count_items_inner(resource: &Value, array_id: &str, inner_id: &str) -> usize {
let Some(resource_array) = resource.as_array() else {
return 0;
};

resource_array
.iter()
.map(|r| {
r.get(array_id)
.and_then(|s| s.as_array())
.map(|scope_array| {
scope_array
.iter()
.map(|sl| {
sl.get(inner_id)
.and_then(|lr| lr.as_array())
.map(|arr| arr.len())
.unwrap_or(0)
})
.sum::<usize>()
})
.unwrap_or(0)
})
.sum()
}

/// Counts individual log records, metrics, or spans within OTLP batch events.
/// When use_otlp_decoding is enabled, events contain entire OTLP batches, but
/// we want to count the individual items for metric consistency with other sources.
/// This iterates through the Value structure, which is less efficient than
/// counting from the typed protobuf request, but avoids decoding twice.
pub(crate) fn count_otlp_items(events: &[Event]) -> usize {
events
.iter()
.map(|event| match event {
Event::Log(log) => {
if let Some(resource_logs) = log.get(RESOURCE_LOGS_JSON_FIELD) {
count_items_inner(resource_logs, "scopeLogs", "logRecords")
} else if let Some(resource_metrics) = log.get(RESOURCE_METRICS_JSON_FIELD) {
count_items_inner(resource_metrics, "scopeMetrics", "metrics")
} else {
0
}
}
Event::Trace(trace) => {
// Count spans in resourceSpans
if let Some(resource_spans) = trace.get(RESOURCE_SPANS_JSON_FIELD) {
count_items_inner(resource_spans, "scopeSpans", "spans")
} else {
0
}
}
_ => 0, // unreachable
})
.sum()
}
12 changes: 12 additions & 0 deletions tests/e2e/opentelemetry/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,15 @@ fn vector_sink_otel_sink_logs_match() {
"Collector and Vector log requests should match"
);
}

#[test]
fn vector_component_received_events_total_counts_individual_log_records() {
// This test verifies that when use_otlp_decoding is enabled, the
// component_received_events_total metric counts individual log records
// within OTLP batches, not the number of batch requests.
// This ensures consistency with other Vector sources and with the same
// OpenTelemetry source when use_otlp_decoding is disabled.
use crate::opentelemetry::assert_component_received_events_total;

assert_component_received_events_total("logs", EXPECTED_LOG_COUNT);
}
12 changes: 12 additions & 0 deletions tests/e2e/opentelemetry/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,3 +302,15 @@ fn vector_sink_otel_sink_metrics_match() {
"Collector and Vector metric requests should match"
);
}

#[test]
fn vector_component_received_events_total_counts_individual_metrics() {
// This test verifies that when use_otlp_decoding is enabled, the
// component_received_events_total metric counts individual metrics
// within OTLP batches, not the number of batch requests.
// This ensures consistency with other Vector sources and with the same
// OpenTelemetry source when use_otlp_decoding is disabled.
use crate::opentelemetry::assert_component_received_events_total;

assert_component_received_events_total("metrics", EXPECTED_METRIC_COUNT);
}
54 changes: 54 additions & 0 deletions tests/e2e/opentelemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,57 @@ pub fn assert_service_name_with<ResourceT, F>(
}
}
}

/// Verifies that the component_received_events_total internal metric counts
/// individual log records/metrics/spans, not batch requests.
/// This ensures consistency when use_otlp_decoding is enabled.
pub fn assert_component_received_events_total(data_type: &str, expected_count: usize) {
let metrics_content = read_file_helper(data_type, "vector-internal-metrics-sink.log")
.expect("Failed to read internal metrics file");

// Parse the metrics file to find component_received_events_total
let mut found_metric = false;
let mut total_events = 0u64;

for line in metrics_content.lines() {
let line = line.trim();
if line.is_empty() {
continue;
}

// Parse the JSON metric
let metric: serde_json::Value = serde_json::from_str(line)
.unwrap_or_else(|e| panic!("Failed to parse metrics JSON: {e}"));

// Look for component_received_events_total metric
if let Some(name) = metric.get("name").and_then(|v| v.as_str())
&& name == "component_received_events_total"
{
// Check if this is for our opentelemetry source
if let Some(tags) = metric.get("tags")
&& let Some(component_id) = tags.get("component_id").and_then(|v| v.as_str())
&& component_id == "source0"
{
found_metric = true;
// Get the counter value
if let Some(counter) = metric.get("counter")
&& let Some(value) = counter.get("value").and_then(|v| v.as_f64())
{
total_events = value as u64;
}
}
}
}

assert!(
found_metric,
"Could not find component_received_events_total metric for source0 in internal metrics"
);

// Verify that the metric counts individual items, not batch requests
assert_eq!(
total_events, expected_count as u64,
"component_received_events_total should count individual items ({expected_count}), \
not batch requests. Found: {total_events}"
);
}
Loading