Skip to content

feat(log-agent): add datadog-log-agent crate and wire into serverless-compat#95

Draft
litianningdatadog wants to merge 30 commits intomainfrom
tianning.li/SVLS-8573-datadog-log-agent
Draft

feat(log-agent): add datadog-log-agent crate and wire into serverless-compat#95
litianningdatadog wants to merge 30 commits intomainfrom
tianning.li/SVLS-8573-datadog-log-agent

Conversation

@litianningdatadog
Copy link
Contributor

@litianningdatadog litianningdatadog commented Mar 10, 2026

https://datadoghq.atlassian.net/browse/SVLS-8573

Summary

  • Adds a new datadog-log-agent library crate providing generic log aggregation and flushing to the Datadog Logs API
  • Wires the log agent into the datadog-serverless-compat binary so serverless runtimes (Lambda, Azure Functions, etc.) can ship logs
  • Integrates LogFlusher into datadog-serverless-compat's flush pipeline alongside traces and metrics

Key components

datadog-log-agent crate:

  • LogEntry — flat serde struct with standard Datadog fields + #[serde(flatten)] attributes map for runtime-specific enrichment (e.g. lambda.arn, azure.resource_id)
  • LogAggregator — size/count-bounded batch collector (≤1000 entries, ≤5 MB per batch)
  • AggregatorService / AggregatorHandle — channel-based async service; callers insert via handle, flusher drains via get_batches()
  • LogFlusher — ships batches to /api/v2/logs with zstd compression, configurable retry (3 attempts), OPW mode, and additional endpoints
  • LogFlusherConfig::from_env() — reads all config from environment variables with sensible defaults

datadog-serverless-compat integration:

  • start_log_agent() spawns the aggregator and wires a LogFlusher into the flush pipeline
  • Disabled by default — requires DD_LOGS_ENABLED=true
  • Supports OPW mode via DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_ENABLED / DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_URL

Test plan

  • Unit tests: cargo test -p datadog-log-agent (39 tests across aggregator, flusher, config, log_entry)
  • Integration tests: cargo test -p datadog-log-agent --test integration_test (covers full pipeline, batching, retries, OPW mode)
  • Local smoke test: ./scripts/test-log-intake.sh
  • Release build: cargo build -p datadog-serverless-compat --release

litianningdatadog and others added 12 commits March 9, 2026 15:28
…errors

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- add generic LogEntry with flatten attributes for runtime enrichment
- add features table and fix zstd compression level docs
…ection

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…n tally

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…l pattern

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…nd OPW mode

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…inary

- Clean public re-exports in datadog-log-agent lib.rs
- Add datadog-log-agent dependency to datadog-serverless-compat
- Wire log agent startup in main.rs following DogStatsD pattern
- Respect DD_LOGS_ENABLED env var (default: true)
- Use FIPS-compliant HTTP client via create_reqwest_client_builder
- Flush logs on same interval as DogStatsD metrics
- Add integration test verifying full pipeline compiles and runs
- Update CLAUDE.md with log-agent architecture and env vars

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…key, use crate re-exports

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@litianningdatadog litianningdatadog requested review from a team as code owners March 10, 2026 19:06
@litianningdatadog litianningdatadog requested review from Lewis-E and duncanista and removed request for a team March 10, 2026 19:06
@duncanista duncanista requested a review from Copilot March 10, 2026 19:08

impl LogEntry {
/// Create a minimal log entry with only the required fields.
pub fn new(message: impl Into<String>, timestamp: i64) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new shouldnt' be used ignoring fields, this is not a rust standard practice

I'd suggest using from_...

@litianningdatadog litianningdatadog marked this pull request as draft March 10, 2026 19:10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rename this to something more akin to what it is, since this is an Intake Log format, it should be defined referencing it

/// Stores pre-serialized JSON strings in a FIFO queue. A batch is "full"
/// when it reaches `MAX_BATCH_ENTRIES` entries or `MAX_CONTENT_BYTES` of
/// uncompressed content.
pub struct LogAggregator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to add Log suffix in structs, we are already in the logs project

Comment on lines +47 to +100
impl LogFlusherConfig {
/// Build a config from environment variables, falling back to sensible defaults.
///
/// | Variable | Default |
/// |---|---|
/// | `DD_API_KEY` | `""` |
/// | `DD_SITE` | `datadoghq.com` |
/// | `DD_LOGS_CONFIG_USE_COMPRESSION` | `true` |
/// | `DD_LOGS_CONFIG_COMPRESSION_LEVEL` | `3` |
/// | `DD_FLUSH_TIMEOUT` | `5` (seconds) |
/// | `DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_ENABLED` | `false` |
/// | `DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_URL` | (none) |
pub fn from_env() -> Self {
let api_key = std::env::var("DD_API_KEY").unwrap_or_default();
let site = std::env::var("DD_SITE").unwrap_or_else(|_| DEFAULT_SITE.to_string());

let use_compression = std::env::var("DD_LOGS_CONFIG_USE_COMPRESSION")
.map(|v| v.to_lowercase() != "false")
.unwrap_or(true);

let compression_level = std::env::var("DD_LOGS_CONFIG_COMPRESSION_LEVEL")
.ok()
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(DEFAULT_COMPRESSION_LEVEL);

let flush_timeout_secs = std::env::var("DD_FLUSH_TIMEOUT")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(DEFAULT_FLUSH_TIMEOUT_SECS);

let opw_enabled = std::env::var("DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_ENABLED")
.map(|v| v.to_lowercase() == "true")
.unwrap_or(false);

let mode = if opw_enabled {
let url =
std::env::var("DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_URL").unwrap_or_default();
if url.is_empty() {
tracing::warn!("OPW mode enabled but DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_URL is not set — log flush will fail");
}
FlusherMode::ObservabilityPipelinesWorker { url }
} else {
FlusherMode::Datadog
};

Self {
api_key,
site,
mode,
additional_endpoints: Vec::new(),
use_compression,
compression_level,
flush_timeout: Duration::from_secs(flush_timeout_secs),
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this is ideal, should be delegated to the consumer

- disable log agent by default — require explicit DD_LOGS_ENABLED=true
- log the actual error when reqwest client build() fails in start_log_agent
- fail fast in start_log_agent when OPW URL is empty
- apply rustfmt to integration test formatting.
@litianningdatadog litianningdatadog force-pushed the tianning.li/SVLS-8573-datadog-log-agent branch from 7652f6f to eda2718 Compare March 10, 2026 19:13

/// Controls where and how logs are shipped.
#[derive(Debug, Clone)]
pub enum FlusherMode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't reflect a flushing mode, but instead a location to where data is being sent to

use crate::log_entry::LogEntry;

#[derive(Debug)]
enum LogAggregatorCommand {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same goes with the suffix, no need for log suffix

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new datadog-log-agent crate to batch/flush logs to Datadog (or OPW) and wires a LogFlusher into the existing datadog-serverless-compat periodic flush loop.

Changes:

  • Added datadog-log-agent crate (log entry model, batch aggregator service, flusher, env-based config) plus integration tests.
  • Integrated log flushing into datadog-serverless-compat behind DD_LOGS_ENABLED=true.
  • Updated workspace/build metadata (Cargo workspace exclude, lockfile, gitignore).

Reviewed changes

Copilot reviewed 16 out of 18 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
crates/datadog-serverless-compat/src/main.rs Adds log-agent startup + periodic flush; includes new tests in binary crate.
crates/datadog-serverless-compat/Cargo.toml Adds dependency on datadog-log-agent and dev-deps for new tests.
crates/datadog-log-agent/Cargo.toml New crate manifest with reqwest/serde/tokio/tracing deps and dev-deps.
crates/datadog-log-agent/src/lib.rs New crate root with module exports and re-exports.
crates/datadog-log-agent/src/log_entry.rs Defines LogEntry serde model with flattened runtime attributes + unit tests.
crates/datadog-log-agent/src/constants.rs Adds batching and default config constants (entry count/byte limits, defaults).
crates/datadog-log-agent/src/errors.rs Adds error types for aggregation/flush (and creation).
crates/datadog-log-agent/src/config.rs Adds LogFlusherConfig + from_env() and FlusherMode (Datadog vs OPW).
crates/datadog-log-agent/src/aggregator/mod.rs Aggregator module wiring and public re-exports.
crates/datadog-log-agent/src/aggregator/core.rs Implements in-memory batch aggregation and JSON-array batch building.
crates/datadog-log-agent/src/aggregator/service.rs Adds async command-driven aggregator service + handle API.
crates/datadog-log-agent/src/flusher.rs Implements batch shipping with retry, optional compression, OPW behavior + tests.
crates/datadog-log-agent/tests/integration_test.rs End-to-end integration tests using mockito OPW endpoint.
Cargo.toml Excludes crates/.claude from workspace members.
Cargo.lock Adds datadog-log-agent and serverless-compat dep updates.
.gitignore Ignores /CLAUDE.md.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +96 to +100
// Account for the comma separator added before every entry after the first
let separator = if count == 0 { 0 } else { 1 };
if count > 0 && bytes_in_batch + separator + msg_len > MAX_CONTENT_BYTES {
break;
}
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MAX_CONTENT_BYTES check doesn’t account for JSON framing bytes (the surrounding [/] and comma separators). Because of that, the produced batch can exceed MAX_CONTENT_BYTES by a few bytes even when the check passes. Consider including the brackets/separators in the byte accounting to keep the 5MB cap strict.

Copilot uses AI. Check for mistakes.
Comment on lines +348 to +352
let (service, handle): (LogAggregatorService, LogAggregatorHandle) =
LogAggregatorService::new();
tokio::spawn(service.run());

let client = create_reqwest_client_builder()
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start_log_agent() spawns the aggregator and then only returns a LogFlusher, keeping the LogAggregatorHandle private inside the flusher. In datadog-serverless-compat there are no other references to the handle, so nothing can ever call insert_batch() and the periodic flush() will remain a no-op. Consider returning/exposing the LogAggregatorHandle (or wiring it into the component that receives runtime logs) so producers can enqueue LogEntrys.

Copilot uses AI. Check for mistakes.
Comment on lines +424 to +431
/// start_log_agent must reject OPW mode with an empty URL.
#[test]
fn test_opw_empty_url_is_detected() {
let config = LogFlusherConfig {
api_key: "key".to_string(),
site: "datadoghq.com".to_string(),
mode: FlusherMode::ObservabilityPipelinesWorker { url: String::new() },
additional_endpoints: Vec::new(),
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_opw_empty_url_is_detected is currently a tautology: it constructs a config with url: String::new() and then asserts the URL is empty via matches!, which will always pass and doesn't validate any production behavior. Consider rewriting this test to call start_log_agent() (or LogFlusherConfig::from_env() + validation) and assert that OPW-enabled + empty URL is rejected.

Copilot uses AI. Check for mistakes.
Comment on lines +40 to +42
/// zstd compression level 1–21 (ignored when `use_compression` is false).
pub compression_level: i32,

Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compression_level is documented as "1–21", but zstd (and your constants comment) allow negative levels, and the env parsing accepts any i32. Please align the docs with actual accepted values, or clamp/validate to the documented range to avoid surprising configuration.

Copilot uses AI. Check for mistakes.
Comment on lines +444 to +451

/// Verify the LogFlusher struct is constructible from within this crate — compile-time test.
#[allow(dead_code)]
fn _assert_log_flusher_constructible() {
let _ = std::mem::size_of::<LogFlusher>();
let _ = std::mem::size_of::<LogFlusherConfig>();
let _ = std::mem::size_of::<FlusherMode>();
}
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_assert_log_flusher_constructible is labeled as a compile-time test but only checks size_of::<...>(), which provides little value and can create maintenance noise. If the goal is to enforce API visibility/constructibility across crates, consider relying on cargo check + the real integration tests in datadog-log-agent/tests/ instead, or replace this with a behavior-based test.

Suggested change
/// Verify the LogFlusher struct is constructible from within this crate — compile-time test.
#[allow(dead_code)]
fn _assert_log_flusher_constructible() {
let _ = std::mem::size_of::<LogFlusher>();
let _ = std::mem::size_of::<LogFlusherConfig>();
let _ = std::mem::size_of::<FlusherMode>();
}

Copilot uses AI. Check for mistakes.
# SPDX-License-Identifier: Apache-2.0

[package]
name = "datadog-log-agent"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rename this to datadog-logs-agent

Singular doesn't reflect the standard across Datadog

litianningdatadog and others added 17 commits March 10, 2026 16:07
Cover the full HTTP→LogServer→AggregatorService→LogFlusher→backend
pipeline, concurrent client ingestion, and error recovery after a
malformed request.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
All 4xx responses were previously short-circuited as permanent failures,
causing rate-limited (429) and timed-out (408) batches to be silently
dropped.  These are transient conditions that should go through the
existing retry loop.

TODO: parse Retry-After header on 429 to add proper backoff.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Previously the boolean returned by LogFlusher::flush() was silently
discarded, giving operators no signal when logs were being dropped.
Now logs a warning on each failed flush cycle.

TODO: expose as a statsd counter/gauge for durable telemetry.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…eader instead of size_hint

size_hint().upper() returns None for chunked/streaming bodies; coercing
that to u64::MAX caused every request without a Content-Length header to
be rejected with 413 before any body bytes were read.

Replace the pre-read guard with a direct Content-Length header parse:
reject early only when the header is present and exceeds MAX_BODY_BYTES,
and fall through to the post-read bytes.len() check otherwise.

Adds a regression test that sends a raw Transfer-Encoding: chunked
request (no Content-Length) via TcpStream and asserts 200 + correct
aggregator insertion.
…t bind-failure gap

- Extract DEFAULT_LOG_INTAKE_PORT = 10517 constant (was hardcoded 8080)
- Add TODO explaining that LogServer::serve binds inside the spawned task,
  so a port-conflict failure is silently swallowed while the caller still
  returns Some(...) and logs "log agent started"
- Replace sequential for-loop over additional_endpoints with join_all()
- Add futures crate dependency for join_all
- Add unit tests: one verifying all endpoints receive the batch, one
  using Barrier(2) to prove concurrent in-flight dispatch

Rationale: LogFlusherConfig documented additional_endpoints as shipped
"in parallel" but the implementation was sequential — this aligns the
implementation with the documented contract

This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
- Replace sequential for-batch loop with join_all over all batches
- Each batch now ships to primary and extras concurrently in parallel
- Collect per-batch primary results via join_all then fold with .all()

Rationale: multiple batches were flushed one at a time; concurrent
dispatch reduces total flush latency when the aggregator produces
more than one batch

This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
- Call resp.bytes().await after receiving a response to consume the body
- Ensures the TCP connection is returned to the pool instead of lingering
  in CLOSE_WAIT, which would exhaust the connection pool under high flush
  frequency

Rationale: reqwest reuses connections only after the response body is
fully consumed; skipping this keeps connections open unnecessarily

This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
- Add LogsAdditionalEndpoint {api_key, url, is_reliable} matching the
  bottlecap/datadog-agent wire format (Host+Port deserialized to url)
- Add parse_additional_endpoints() and read DD_LOGS_CONFIG_ADDITIONAL_ENDPOINTS
  in LogFlusherConfig::from_env()
- Update ship_batch to accept explicit api_key param so each additional
  endpoint uses its own key instead of the primary key
- Re-export LogsAdditionalEndpoint from crate root
- Update all test fixtures to use the new struct

Rationale: aligns with the datadog-lambda-extension bottlecap model where
each additional endpoint authenticates independently with its own API key
- Replace ? propagation with match in ship_batch compression block
- On compress error, warn and send raw bytes without Content-Encoding header
- Avoids dropping the batch entirely due to a transient encoder failure

Rationale: compression failures are rare (OOM, corrupted encoder state)
and silently dropping the batch is worse than sending it uncompressed
…passback

- Change flush() -> bool to flush(Vec<RequestBuilder>) -> Vec<RequestBuilder>
- send_with_retry returns Err(builder) on transient exhaustion instead of FlushError
- serverless-compat flush loop stores and redrives failed builders each cycle
- Additional endpoint failures remain best-effort (not tracked for retry)
- Add tests: cross-invocation redrive succeeds, additional endpoint failures excluded

Rationale: aligns with bottlecap FlushingService retry pattern; batches that
hit transient intake errors survive across Lambda invocations instead of being
silently dropped after MAX_FLUSH_ATTEMPTS

This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants