@AGENTS.md
This is the Observo fork of Vector. The line above pulls in upstream's AGENTS.md, which covers generic Vector dev: make fmt, make check-clippy, cargo vdev, integration tests, PR conventions, project layout, YAML config defaults, license checks. Don't duplicate those here.
This file is the Observo-specific overlay: how the fork diverges, the proprietary components in lib/observo/, and the non-obvious architectural details that aren't in AGENTS.md.
AGENTS.md describes sources/transforms/sinks at a high level. The performance- and correctness-critical details below are unique to this file.
Sources -> async channels -> Transforms -> async channels -> Sinks
Connected via a Topology (src/topology/) that supports hot-reload. Channels carry EventArray batches (an enum: Logs(Vec<LogEvent>) / Metrics(Vec<Metric>) / Traces(Vec<TraceEvent>)), not individual Events. This is the central perf decision — homogeneous Vec layout, one tag check per batch, bulk-serializable. Don't introduce per-event channels in hot paths.
Event— enum wrappingLogEvent,Metric,TraceEventEventArray— the channeled type; one variant perEventvariantSourceConfig/SinkConfig/TransformConfig— traits every component config implements (returnedSource/VectorSink/Transformare runtime values)Source— type alias forBoxFuture<'static, Result<(), ()>>; sources are one-shot futures, not long-lived trait objectsTransform— enum:Function/Synchronous/Task(each wraps a different trait object; not unified under one trait)VectorSink— enum:Sink(Box<dyn Sink<EventArray>>)/Stream(Box<dyn StreamSink<EventArray>>)Fanout— custom 1→N broadcast atlib/vector-core/src/fanout.rswithAdd/Remove/Pause/Replacecontrol messages for hot-reload (nottokio::sync::broadcast)TopologyController— top-level lifecycle: start, reload, stop
src/topology/builder.rs— builds component tasks from config (build_sources,build_transforms,build_sinks; the transformRunnerandrun_inline/run_concurrentlymodes). Also home of Observo'sCHECKPT_STOREglobal.src/topology/running.rs—RunningTopology,spawn_*functions, reload state machinesrc/topology/task.rs—Taskwrapper (BoxFuture+ComponentKey+typetag)lib/vector-core/src/fanout.rs— the trickiest concurrency primitivelib/vector-core/src/event/mod.rs+event/array.rs— event modellib/vector-buffers/src/topology/channel/sender.rs— pluggable buffer backends
All components use #[configurable_component] + #[typetag::serde] + inventory for compile-time self-registration:
#[configurable_component(source("my_source", "Description"))]
#[derive(Clone, Debug)]
pub struct MySourceConfig { /* fields */ }
#[async_trait]
#[typetag::serde(name = "my_source")]
impl SourceConfig for MySourceConfig {
async fn build(&self, cx: SourceContext) -> Result<Source> { /* ... */ }
fn outputs(&self, _: LogNamespace) -> Vec<SourceOutput> { /* ... */ }
}Each component lives behind a feature flag: sources-{name}, sinks-{name}, transforms-{name}.
FunctionTransform— simple event-by-event; cloned per worker when concurrentSyncTransform— broader; can write to multiple named outputsTaskTransform<EventArray>— stateful, stream-to-stream; coordination barrier (cannot be parallelized)
- Stream-based (
StreamSink) orfutures::Sink<EventArray>, typically composed with Tower middleware insrc/sinks/util/for batching, retries, rate-limiting.
Structural — every channel is bounded. Per-sink WhenFull policy (lib/vector-buffers/src/lib.rs):
Block(default) — backpressure all the way up to the sourceDropNewest— shed load (intentionally breaks backpressure)Overflow— multi-stage buffers (memory → disk)
- One tokio runtime, multi-threaded (worker count = CPU count by default)
- Per source: 2 tasks (the source future + a "pump" task draining
SourceSenderintoFanout) - Per transform / sink: 1 task each
- Synchronous transforms can opt into
run_concurrently(clones the transform, spawns sub-tasks viaFuturesOrdered); task transforms cannot - Use
tokio::select! { biased; ... }when shutdown must trump other branches - Bounded channels for data (
BufferSender,LimitedSender); unboundedmpsconly for control plane (e.g., fanoutControlMessage,abort_tx)
Observo proprietary code lives in lib/observo/private/ — a git submodule (URL in .gitmodules). Public wrapper crates under lib/observo/{name}/ re-export from the private tree; build manifests (Cargo.toml) stay in the public tree.
After cloning, initialize the submodule: git submodule update --init
For IDE / rust-analyzer setup, see the private submodule's own tooling.
Observo crates do not register components with inventory themselves. They expose engine primitives; the integration glue lives in src/sources/, src/sinks/, src/transforms/ gated behind feature flags (e.g., #[cfg(feature = "sources-scol")] pub mod scol;).
observo— aggregates all Observo features (lext, scol, lv3, chkpts, stcp, wef, vrl, gcs, azs, ssa)observo-test— enables test-scenario features for scol, lv3, ssaobservo-lext,observo-lv3,observo-chkpts,observo-ssa— individual feature flags
Makefile behavior: By default, Observo crates are excluded from builds via EXCLUDE_WORKSPACES. Setting FEATURES=observo automatically clears this exclusion.
Build awareness: Default cargo check / cargo build / make test do not build Observo crates and do not mirror what CI tests. Before pushing changes that touch component-config glue, mod.rs files under src/sources/ / src/sinks/ / src/transforms/, or anything used by Observo crates, run:
FEATURES=observo cargo check # catch breakages early
FEATURES=observo make test # Observo unit tests
FEATURES=observo,observo-test make test # + Observo test scenariosAdding use crate::sources::scol (or any Observo-touching path) without a #[cfg(feature = "...")] gate will break the default build.
| Crate | Purpose |
|---|---|
| azs | Azure Storage sink |
| chkpts | Checkpointing / state |
| gcs | GCS source/sink |
| lext | Lua extensions |
| lv3 | Lua v3 transform |
| obvrl | Custom VRL functions |
| sauth | Auth framework |
| scol | Streaming collection/transform |
| ssa | Streaming aggregation |
| stcp | TCP source |
| wef | WEF format handler |
This repo depends on Observo-Inc/vrl.git (custom fork), not upstream VRL. The rev is pinned in root Cargo.toml. For local VRL iteration, see vdev README for git server setup (git://172.17.0.1/vrl). Observo VRL extensions (MessagePack, XML, etc.) live in lib/observo/obvrl and are wired in via the observo-vrl feature on vector-vrl-functions.
Observo introduces one deliberate global singleton in src/topology/builder.rs that exposes a checkpoint store to sources via SourceContext. It survives topology reloads (in-place reload() rather than rebuild). This is a deliberate exception to Vector's "no shared state" stance — sources need persistent checkpoints across reloads. Don't add hot-path locking; checkpoint access should be per-batch, not per-event.
- Logging: Use
tracingwith key-value style:warn!(message = "Failed.", %error);— notwarn!("Failed: {}", err); - Error variable naming: Always spell out
error, nevereorerr - Display over Debug: Prefer
%errorover?errorin tracing macros - Error handling: Use
snafucrate for structured errors. Never panic in regular code paths. - Rust version: Toolchain 1.83, MSRV 1.81
- Default
cargo testandmake testskip Observo crates (excluded viaEXCLUDE_WORKSPACES). RunFEATURES=observo make testto include them. cargo-nextestretries 3× before reporting failure (config in.config/nextest.toml, 30s slow-test threshold, no fail-fast) — flaky tests slip through. Watch the test summary for "flaky retries."#[tokio::test]defaults to single-threaded; use#[tokio::test(flavor = "multi_thread")]when concurrency is required for the test logic.- Config generation test:
crate::test_util::test_generate_config::<MyConfig>() - Test utilities in
src/test_util/:start_topology(),random_events_with_stream(), compliance assertions - For transform-only tests, fake source/sink helpers exist as
UnitTestStreamSourceConfig/UnitTestStreamSinkConfig(src/config/unit_test/)
.github/workflows/observo.test.yml— triggers test workflow in externaldataplane-buildrepo on PRs and master pushes.github/workflows/observo.release.yml— triggers publish workflow ontest.*orrelease.*tags- Actual build/test execution happens in the separate
Observo-Inc/dataplane-buildrepo (not in this repo's GitHub Actions)
AGENTS.md at the repo root is a verbatim copy of vectordotdev/vector's file. To refresh it:
# One-time: register upstream (already done locally; check `git remote -v`)
git remote add upstream https://github.com/vectordotdev/vector.git
# Resync
git fetch upstream master
git checkout upstream/master -- AGENTS.md
git diff --staged AGENTS.md # review before committingIf upstream's AGENTS.md introduces commands or conventions that conflict with our fork (e.g., new linting requirements, removed make targets), update this file's overlay accordingly rather than diverging AGENTS.md.