feat(log-agent): add datadog-log-agent crate and wire into serverless-compat#95
feat(log-agent): add datadog-log-agent crate and wire into serverless-compat#95litianningdatadog wants to merge 30 commits intomainfrom
Conversation
…errors Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- add generic LogEntry with flatten attributes for runtime enrichment - add features table and fix zstd compression level docs
…ection Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…n tally Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…l pattern Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…nd OPW mode Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…inary - Clean public re-exports in datadog-log-agent lib.rs - Add datadog-log-agent dependency to datadog-serverless-compat - Wire log agent startup in main.rs following DogStatsD pattern - Respect DD_LOGS_ENABLED env var (default: true) - Use FIPS-compliant HTTP client via create_reqwest_client_builder - Flush logs on same interval as DogStatsD metrics - Add integration test verifying full pipeline compiles and runs - Update CLAUDE.md with log-agent architecture and env vars Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…key, use crate re-exports Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…atching, retries, and OPW mode
|
|
||
| impl LogEntry { | ||
| /// Create a minimal log entry with only the required fields. | ||
| pub fn new(message: impl Into<String>, timestamp: i64) -> Self { |
There was a problem hiding this comment.
new shouldnt' be used ignoring fields, this is not a rust standard practice
I'd suggest using from_...
There was a problem hiding this comment.
I would rename this to something more akin to what it is, since this is an Intake Log format, it should be defined referencing it
| /// Stores pre-serialized JSON strings in a FIFO queue. A batch is "full" | ||
| /// when it reaches `MAX_BATCH_ENTRIES` entries or `MAX_CONTENT_BYTES` of | ||
| /// uncompressed content. | ||
| pub struct LogAggregator { |
There was a problem hiding this comment.
No need to add Log suffix in structs, we are already in the logs project
| impl LogFlusherConfig { | ||
| /// Build a config from environment variables, falling back to sensible defaults. | ||
| /// | ||
| /// | Variable | Default | | ||
| /// |---|---| | ||
| /// | `DD_API_KEY` | `""` | | ||
| /// | `DD_SITE` | `datadoghq.com` | | ||
| /// | `DD_LOGS_CONFIG_USE_COMPRESSION` | `true` | | ||
| /// | `DD_LOGS_CONFIG_COMPRESSION_LEVEL` | `3` | | ||
| /// | `DD_FLUSH_TIMEOUT` | `5` (seconds) | | ||
| /// | `DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_ENABLED` | `false` | | ||
| /// | `DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_URL` | (none) | | ||
| pub fn from_env() -> Self { | ||
| let api_key = std::env::var("DD_API_KEY").unwrap_or_default(); | ||
| let site = std::env::var("DD_SITE").unwrap_or_else(|_| DEFAULT_SITE.to_string()); | ||
|
|
||
| let use_compression = std::env::var("DD_LOGS_CONFIG_USE_COMPRESSION") | ||
| .map(|v| v.to_lowercase() != "false") | ||
| .unwrap_or(true); | ||
|
|
||
| let compression_level = std::env::var("DD_LOGS_CONFIG_COMPRESSION_LEVEL") | ||
| .ok() | ||
| .and_then(|v| v.parse::<i32>().ok()) | ||
| .unwrap_or(DEFAULT_COMPRESSION_LEVEL); | ||
|
|
||
| let flush_timeout_secs = std::env::var("DD_FLUSH_TIMEOUT") | ||
| .ok() | ||
| .and_then(|v| v.parse::<u64>().ok()) | ||
| .unwrap_or(DEFAULT_FLUSH_TIMEOUT_SECS); | ||
|
|
||
| let opw_enabled = std::env::var("DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_ENABLED") | ||
| .map(|v| v.to_lowercase() == "true") | ||
| .unwrap_or(false); | ||
|
|
||
| let mode = if opw_enabled { | ||
| let url = | ||
| std::env::var("DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_URL").unwrap_or_default(); | ||
| if url.is_empty() { | ||
| tracing::warn!("OPW mode enabled but DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_URL is not set — log flush will fail"); | ||
| } | ||
| FlusherMode::ObservabilityPipelinesWorker { url } | ||
| } else { | ||
| FlusherMode::Datadog | ||
| }; | ||
|
|
||
| Self { | ||
| api_key, | ||
| site, | ||
| mode, | ||
| additional_endpoints: Vec::new(), | ||
| use_compression, | ||
| compression_level, | ||
| flush_timeout: Duration::from_secs(flush_timeout_secs), | ||
| } |
There was a problem hiding this comment.
Not sure this is ideal, should be delegated to the consumer
- disable log agent by default — require explicit DD_LOGS_ENABLED=true - log the actual error when reqwest client build() fails in start_log_agent - fail fast in start_log_agent when OPW URL is empty - apply rustfmt to integration test formatting.
7652f6f to
eda2718
Compare
|
|
||
| /// Controls where and how logs are shipped. | ||
| #[derive(Debug, Clone)] | ||
| pub enum FlusherMode { |
There was a problem hiding this comment.
This doesn't reflect a flushing mode, but instead a location to where data is being sent to
| use crate::log_entry::LogEntry; | ||
|
|
||
| #[derive(Debug)] | ||
| enum LogAggregatorCommand { |
There was a problem hiding this comment.
Same goes with the suffix, no need for log suffix
There was a problem hiding this comment.
Pull request overview
This PR introduces a new datadog-log-agent crate to batch/flush logs to Datadog (or OPW) and wires a LogFlusher into the existing datadog-serverless-compat periodic flush loop.
Changes:
- Added
datadog-log-agentcrate (log entry model, batch aggregator service, flusher, env-based config) plus integration tests. - Integrated log flushing into
datadog-serverless-compatbehindDD_LOGS_ENABLED=true. - Updated workspace/build metadata (Cargo workspace exclude, lockfile, gitignore).
Reviewed changes
Copilot reviewed 16 out of 18 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/datadog-serverless-compat/src/main.rs | Adds log-agent startup + periodic flush; includes new tests in binary crate. |
| crates/datadog-serverless-compat/Cargo.toml | Adds dependency on datadog-log-agent and dev-deps for new tests. |
| crates/datadog-log-agent/Cargo.toml | New crate manifest with reqwest/serde/tokio/tracing deps and dev-deps. |
| crates/datadog-log-agent/src/lib.rs | New crate root with module exports and re-exports. |
| crates/datadog-log-agent/src/log_entry.rs | Defines LogEntry serde model with flattened runtime attributes + unit tests. |
| crates/datadog-log-agent/src/constants.rs | Adds batching and default config constants (entry count/byte limits, defaults). |
| crates/datadog-log-agent/src/errors.rs | Adds error types for aggregation/flush (and creation). |
| crates/datadog-log-agent/src/config.rs | Adds LogFlusherConfig + from_env() and FlusherMode (Datadog vs OPW). |
| crates/datadog-log-agent/src/aggregator/mod.rs | Aggregator module wiring and public re-exports. |
| crates/datadog-log-agent/src/aggregator/core.rs | Implements in-memory batch aggregation and JSON-array batch building. |
| crates/datadog-log-agent/src/aggregator/service.rs | Adds async command-driven aggregator service + handle API. |
| crates/datadog-log-agent/src/flusher.rs | Implements batch shipping with retry, optional compression, OPW behavior + tests. |
| crates/datadog-log-agent/tests/integration_test.rs | End-to-end integration tests using mockito OPW endpoint. |
| Cargo.toml | Excludes crates/.claude from workspace members. |
| Cargo.lock | Adds datadog-log-agent and serverless-compat dep updates. |
| .gitignore | Ignores /CLAUDE.md. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| // Account for the comma separator added before every entry after the first | ||
| let separator = if count == 0 { 0 } else { 1 }; | ||
| if count > 0 && bytes_in_batch + separator + msg_len > MAX_CONTENT_BYTES { | ||
| break; | ||
| } |
There was a problem hiding this comment.
The MAX_CONTENT_BYTES check doesn’t account for JSON framing bytes (the surrounding [/] and comma separators). Because of that, the produced batch can exceed MAX_CONTENT_BYTES by a few bytes even when the check passes. Consider including the brackets/separators in the byte accounting to keep the 5MB cap strict.
| let (service, handle): (LogAggregatorService, LogAggregatorHandle) = | ||
| LogAggregatorService::new(); | ||
| tokio::spawn(service.run()); | ||
|
|
||
| let client = create_reqwest_client_builder() |
There was a problem hiding this comment.
start_log_agent() spawns the aggregator and then only returns a LogFlusher, keeping the LogAggregatorHandle private inside the flusher. In datadog-serverless-compat there are no other references to the handle, so nothing can ever call insert_batch() and the periodic flush() will remain a no-op. Consider returning/exposing the LogAggregatorHandle (or wiring it into the component that receives runtime logs) so producers can enqueue LogEntrys.
| /// start_log_agent must reject OPW mode with an empty URL. | ||
| #[test] | ||
| fn test_opw_empty_url_is_detected() { | ||
| let config = LogFlusherConfig { | ||
| api_key: "key".to_string(), | ||
| site: "datadoghq.com".to_string(), | ||
| mode: FlusherMode::ObservabilityPipelinesWorker { url: String::new() }, | ||
| additional_endpoints: Vec::new(), |
There was a problem hiding this comment.
test_opw_empty_url_is_detected is currently a tautology: it constructs a config with url: String::new() and then asserts the URL is empty via matches!, which will always pass and doesn't validate any production behavior. Consider rewriting this test to call start_log_agent() (or LogFlusherConfig::from_env() + validation) and assert that OPW-enabled + empty URL is rejected.
| /// zstd compression level 1–21 (ignored when `use_compression` is false). | ||
| pub compression_level: i32, | ||
|
|
There was a problem hiding this comment.
compression_level is documented as "1–21", but zstd (and your constants comment) allow negative levels, and the env parsing accepts any i32. Please align the docs with actual accepted values, or clamp/validate to the documented range to avoid surprising configuration.
|
|
||
| /// Verify the LogFlusher struct is constructible from within this crate — compile-time test. | ||
| #[allow(dead_code)] | ||
| fn _assert_log_flusher_constructible() { | ||
| let _ = std::mem::size_of::<LogFlusher>(); | ||
| let _ = std::mem::size_of::<LogFlusherConfig>(); | ||
| let _ = std::mem::size_of::<FlusherMode>(); | ||
| } |
There was a problem hiding this comment.
_assert_log_flusher_constructible is labeled as a compile-time test but only checks size_of::<...>(), which provides little value and can create maintenance noise. If the goal is to enforce API visibility/constructibility across crates, consider relying on cargo check + the real integration tests in datadog-log-agent/tests/ instead, or replace this with a behavior-based test.
| /// Verify the LogFlusher struct is constructible from within this crate — compile-time test. | |
| #[allow(dead_code)] | |
| fn _assert_log_flusher_constructible() { | |
| let _ = std::mem::size_of::<LogFlusher>(); | |
| let _ = std::mem::size_of::<LogFlusherConfig>(); | |
| let _ = std::mem::size_of::<FlusherMode>(); | |
| } |
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| [package] | ||
| name = "datadog-log-agent" |
There was a problem hiding this comment.
I would rename this to datadog-logs-agent
Singular doesn't reflect the standard across Datadog
Cover the full HTTP→LogServer→AggregatorService→LogFlusher→backend pipeline, concurrent client ingestion, and error recovery after a malformed request. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
All 4xx responses were previously short-circuited as permanent failures, causing rate-limited (429) and timed-out (408) batches to be silently dropped. These are transient conditions that should go through the existing retry loop. TODO: parse Retry-After header on 429 to add proper backoff. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Previously the boolean returned by LogFlusher::flush() was silently discarded, giving operators no signal when logs were being dropped. Now logs a warning on each failed flush cycle. TODO: expose as a statsd counter/gauge for durable telemetry. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…eader instead of size_hint size_hint().upper() returns None for chunked/streaming bodies; coercing that to u64::MAX caused every request without a Content-Length header to be rejected with 413 before any body bytes were read. Replace the pre-read guard with a direct Content-Length header parse: reject early only when the header is present and exceeds MAX_BODY_BYTES, and fall through to the post-read bytes.len() check otherwise. Adds a regression test that sends a raw Transfer-Encoding: chunked request (no Content-Length) via TcpStream and asserts 200 + correct aggregator insertion.
…t bind-failure gap - Extract DEFAULT_LOG_INTAKE_PORT = 10517 constant (was hardcoded 8080) - Add TODO explaining that LogServer::serve binds inside the spawned task, so a port-conflict failure is silently swallowed while the caller still returns Some(...) and logs "log agent started"
- Replace sequential for-loop over additional_endpoints with join_all() - Add futures crate dependency for join_all - Add unit tests: one verifying all endpoints receive the batch, one using Barrier(2) to prove concurrent in-flight dispatch Rationale: LogFlusherConfig documented additional_endpoints as shipped "in parallel" but the implementation was sequential — this aligns the implementation with the documented contract This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
- Replace sequential for-batch loop with join_all over all batches - Each batch now ships to primary and extras concurrently in parallel - Collect per-batch primary results via join_all then fold with .all() Rationale: multiple batches were flushed one at a time; concurrent dispatch reduces total flush latency when the aggregator produces more than one batch This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
- Call resp.bytes().await after receiving a response to consume the body - Ensures the TCP connection is returned to the pool instead of lingering in CLOSE_WAIT, which would exhaust the connection pool under high flush frequency Rationale: reqwest reuses connections only after the response body is fully consumed; skipping this keeps connections open unnecessarily This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
- Add LogsAdditionalEndpoint {api_key, url, is_reliable} matching the
bottlecap/datadog-agent wire format (Host+Port deserialized to url)
- Add parse_additional_endpoints() and read DD_LOGS_CONFIG_ADDITIONAL_ENDPOINTS
in LogFlusherConfig::from_env()
- Update ship_batch to accept explicit api_key param so each additional
endpoint uses its own key instead of the primary key
- Re-export LogsAdditionalEndpoint from crate root
- Update all test fixtures to use the new struct
Rationale: aligns with the datadog-lambda-extension bottlecap model where
each additional endpoint authenticates independently with its own API key
- Replace ? propagation with match in ship_batch compression block - On compress error, warn and send raw bytes without Content-Encoding header - Avoids dropping the batch entirely due to a transient encoder failure Rationale: compression failures are rare (OOM, corrupted encoder state) and silently dropping the batch is worse than sending it uncompressed
…passback - Change flush() -> bool to flush(Vec<RequestBuilder>) -> Vec<RequestBuilder> - send_with_retry returns Err(builder) on transient exhaustion instead of FlushError - serverless-compat flush loop stores and redrives failed builders each cycle - Additional endpoint failures remain best-effort (not tracked for retry) - Add tests: cross-invocation redrive succeeds, additional endpoint failures excluded Rationale: aligns with bottlecap FlushingService retry pattern; batches that hit transient intake errors survive across Lambda invocations instead of being silently dropped after MAX_FLUSH_ATTEMPTS This commit made by [/dd:git:commit:quick](https://github.com/DataDog/claude-marketplace/tree/main/dd/commands/git/commit/quick.md)
https://datadoghq.atlassian.net/browse/SVLS-8573
Summary
datadog-log-agentlibrary crate providing generic log aggregation and flushing to the Datadog Logs APIdatadog-serverless-compatbinary so serverless runtimes (Lambda, Azure Functions, etc.) can ship logsLogFlusherintodatadog-serverless-compat's flush pipeline alongside traces and metricsKey components
datadog-log-agentcrate:LogEntry— flat serde struct with standard Datadog fields +#[serde(flatten)]attributes map for runtime-specific enrichment (e.g.lambda.arn,azure.resource_id)LogAggregator— size/count-bounded batch collector (≤1000 entries, ≤5 MB per batch)AggregatorService/AggregatorHandle— channel-based async service; callers insert via handle, flusher drains viaget_batches()LogFlusher— ships batches to/api/v2/logswith zstd compression, configurable retry (3 attempts), OPW mode, and additional endpointsLogFlusherConfig::from_env()— reads all config from environment variables with sensible defaultsdatadog-serverless-compatintegration:start_log_agent()spawns the aggregator and wires aLogFlusherinto the flush pipelineDD_LOGS_ENABLED=trueDD_OBSERVABILITY_PIPELINES_WORKER_LOGS_ENABLED/DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_URLTest plan
cargo test -p datadog-log-agent(39 tests across aggregator, flusher, config, log_entry)cargo test -p datadog-log-agent --test integration_test(covers full pipeline, batching, retries, OPW mode)./scripts/test-log-intake.shcargo build -p datadog-serverless-compat --release