flowctl is a pipeline orchestrator that coordinates data flow between components. It manages sources (data producers), processors (transformers), and sinks (consumers) through an embedded control plane, enabling you to build robust data pipelines with minimal boilerplate.
- Unix-style CLI interface
- Pluggable processor architecture
- Support for multiple sink types
- YAML-based configuration
- Structured logging with Uber's Zap
- Built-in health monitoring
- Prometheus metrics
- Docker Compose deployment
- DAG-based processor chaining with buffered channels
- Flexible pipeline topologies with fan-out/fan-in support
- Secure communication with TLS and mutual TLS support
# Clone the repository
git clone https://github.com/withobsrvr/flowctl.git
cd flowctl
# Build the binary
make build
# Install dependencies
make depsGet your first Stellar pipeline running:
# 1. Build flowctl
git clone https://github.com/withobsrvr/flowctl.git && cd flowctl && make build
# 2. Create a pipeline interactively
./bin/flowctl init
# 3. Run it (components auto-download!)
./bin/flowctl run stellar-pipeline.yamlComponents are automatically downloaded on first run.
./bin/flowctl init --non-interactive --network testnet --destination duckdb
./bin/flowctl run stellar-pipeline.yaml
# Query your data
duckdb stellar-pipeline.duckdb "SELECT event_type, COUNT(*) FROM contract_events GROUP BY event_type"# Start PostgreSQL (if not already running)
docker run --name flowctl-postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 -d postgres:16
docker exec flowctl-postgres createdb -U postgres stellar_events
# Create and run pipeline
./bin/flowctl init --non-interactive --network testnet --destination postgres
./bin/flowctl run stellar-pipeline.yaml
# Query your data
docker exec flowctl-postgres psql -U postgres -d stellar_events \
-c "SELECT event_type, COUNT(*) FROM contract_events GROUP BY event_type"See also: Quickstart Examples | flowctl init Reference
The flowctl init command creates pipeline configurations through an interactive wizard or via flags.
./bin/flowctl initPrompts for:
- Network:
testnetormainnet - Destination:
postgres,duckdb, orcsv
Use flags for automation:
./bin/flowctl init --non-interactive --network testnet --destination duckdb
./bin/flowctl init --non-interactive --network mainnet --destination postgres -o my-pipeline.yaml| Flag | Values | Description |
|---|---|---|
--network |
testnet, mainnet |
Stellar network to connect to |
--destination |
postgres, duckdb, csv |
Where to write data |
--output, -o |
filename | Output file (default: stellar-pipeline.yaml) |
--non-interactive |
- | Skip interactive prompts |
| Destination | Prerequisite |
|---|---|
duckdb |
None - embedded database, just works |
postgres |
PostgreSQL running on localhost:5432 with database stellar_events |
csv |
Write access to ./data directory |
When you run the pipeline, flowctl automatically downloads components from Docker Hub:
| Component | Type | Description |
|---|---|---|
stellar-live-source@v1.0.0 |
Source | Streams Stellar ledger data |
contract-events-processor@v1.0.0 |
Processor | Extracts Soroban contract events |
duckdb-consumer@v1.0.0 |
Sink | Writes to embedded DuckDB database |
postgres-consumer@v1.0.0 |
Sink | Writes to PostgreSQL with JSONB |
Components are cached in ~/.flowctl/components/.
Full reference: docs/init-command.md
flowctl is a pipeline orchestrator, not a data processor itself. It coordinates separate component binaries (sources, processors, sinks) and manages the data flow between them via an embedded control plane.
┌─────────────────────────────────────────────────────────────┐
│ flowctl (Orchestrator) │
│ • Control Plane (gRPC API) │
│ • Component Registry & Health Monitoring │
│ • Stream Management │
└─────────────────────────────┬───────────────────────────────┘
│
┌───────────────────┼───────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Source │───▶│ Processor │───▶│ Sink │
│ │ │ │ │ │
│ (Produces │ │ (Transforms │ │ (Consumes │
│ data) │ │ data) │ │ data) │
└──────────────┘ └──────────────┘ └──────────────┘
Components are separate programs that you build or install independently:
- Sources: Produce data (e.g., Stellar ledger streams, Kafka consumers)
- Processors: Transform data (e.g., extract contract events, filter transactions)
- Sinks: Consume data (e.g., PostgreSQL, webhooks, file storage)
To build your own components, use the flowctl-sdk which provides:
- Component registration and health checks
- Automatic heartbeat management
- gRPC streaming infrastructure
- Standardized configuration patterns
Learn more: Getting Started Guide | Architecture Details
Pipelines are configured using Kubernetes-style YAML files. Here's a minimal example:
apiVersion: flowctl/v1
kind: Pipeline
metadata:
name: hello-world-pipeline
description: A simple pipeline demonstrating flowctl basics
spec:
driver: process # Run components as local processes
sources:
- id: data-generator
command: ["./bin/data-generator"]
env:
INTERVAL_MS: "1000"
OUTPUT_FORMAT: "json"
processors:
- id: data-transformer
command: ["./bin/data-transformer"]
env:
TRANSFORM_TYPE: "uppercase"
sinks:
- id: console-logger
command: ["./bin/console-logger"]
env:
LOG_FORMAT: "pretty"Key configuration concepts:
apiVersion: flowctl/v1- Standard pipeline formatspec.driver- Execution environment (process,docker,kubernetes,nomad)- Component
command- Full path to the component binary - Component
env- Environment variables for configuration
For more complex examples, see:
examples/minimal.yaml- Minimal working pipelineexamples/docker-pipeline.yaml- Pipeline with Docker deploymentexamples/dag-pipeline.yaml- DAG-based pipeline with complex topology- Real-world demo - Complete Stellar contract events pipeline with PostgreSQL
flowctl uses structured logging powered by Uber's Zap library. You can control the log level using:
- Configuration file: Set the
log_levelfield in your YAML configuration - Command line: Use the
--log-levelflag (e.g.,--log-level=debug)
Available log levels:
debug: Detailed information for debugginginfo: General operational information (default)warn: Warning conditionserror: Error conditions
Example:
./bin/flowctl apply -f examples/minimal.yaml --log-level=debugflowctl supports translating pipeline configurations to different deployment formats:
# Translate a pipeline to Docker Compose
./bin/flowctl translate -f examples/docker-pipeline.yaml -o docker-compose
# Save the output to a file
./bin/flowctl translate -f examples/docker-pipeline.yaml -o docker-compose --to-file docker-compose.yml
# Add a resource prefix for naming consistency
./bin/flowctl translate -f examples/docker-pipeline.yaml -o docker-compose --prefix myapp
# Specify a container registry
./bin/flowctl translate -f examples/docker-pipeline.yaml -o docker-compose --registry ghcr.io/myorg
# Generate local execution script
./bin/flowctl translate -f examples/local-pipeline.yaml -o local --to-file run_pipeline.shSupported output formats:
docker-compose: Docker Compose YAMLlocal: Local execution configuration (Docker Compose or bash script)
flowctl supports a Directed Acyclic Graph (DAG) based processing pipeline, which allows complex topologies beyond simple linear chains. This enables:
- Parallel processing of data streams
- Fan-out/fan-in patterns
- Buffered channels for flow control
- Strongly typed event connections
For more information, see DAG-Based Processing.
flowctl supports secure communication between components using Transport Layer Security (TLS):
- Server-side TLS for encrypted communication
- Mutual TLS (mTLS) where both client and server authenticate each other
- Certificate validation options, including CA certificate support
- TLS skip verification for development environments (not recommended for production)
For server configuration:
# Start a server with TLS
flowctl server --port 8080 --tls-cert server.crt --tls-key server.key
# Start a server with mutual TLS
flowctl server --port 8080 --tls-cert server.crt --tls-key server.key --tls-ca-cert ca.crtFor client configuration, add a tls section to your YAML configuration:
tls:
mode: "enabled" # Options: disabled, enabled, mutual
cert_file: "client.crt"
key_file: "client.key"
ca_file: "ca.crt" # Required for mutual TLS
skip_verify: false
server_name: "flowctl-server.example.com" # For SNI verificationFor more information, see TLS Configuration.
When using the local output format, flowctl generates a configuration for running your pipeline locally. By default, it creates a Docker Compose configuration with profiles, but you can also use the legacy bash script generator if needed.
The Docker Compose-based local generator creates:
- A Docker Compose configuration file with profile support
- An environment file with all required variables
- Proper dependency ordering between components
- Health check monitoring
- Volume management for persistent data and logs
# Generate Docker Compose configuration for local execution
./bin/flowctl translate -f examples/local-pipeline.yaml -o local --to-file docker-compose.yaml
# Start the pipeline
docker compose --profile local up -d
# View logs
docker compose logs -f
# Stop the pipeline
docker compose downSee Local Execution with Docker Compose for more details.
For compatibility with existing workflows, you can still use the bash script generator:
# Set environment variable to use bash script generator
export FLOWCTL_LOCAL_GENERATOR_TYPE=bash
# Generate local execution script
./bin/flowctl translate -f examples/local-pipeline.yaml -o local --to-file run_pipeline.sh
# Make the script executable
chmod +x run_pipeline.sh
# Run the pipeline
./run_pipeline.shThe bash script generator creates:
- A bash script to start and manage pipeline components
- An environment file with all required variables
- Proper dependency ordering between components
- Health check monitoring
- Process supervision with automatic restart
Discover and inspect processors registered with the control plane at runtime.
Note: These commands require a running pipeline with an active control plane.
# List all healthy processors
./bin/flowctl processors list
# Include unhealthy processors
./bin/flowctl processors list --include-unhealthy
# Output as JSON
./bin/flowctl processors list -o json# Find processors that accept stellar.ledger.v1 events
./bin/flowctl processors find --input stellar.ledger.v1
# Find processors that produce token.transfer events
./bin/flowctl processors find --output stellar.token.transfer.v1
# Find processors for a specific network
./bin/flowctl processors find --metadata network=testnet# Show full details for a processor
./bin/flowctl processors show ttp-processor-v1Full reference: docs/processor-discovery.md
Symptoms: Pipeline starts but component fails immediately
Solutions:
# 1. Check binary exists and is executable
ls -la /path/to/component
chmod +x /path/to/component
# 2. Verify absolute path is used (required for driver: process)
# ✅ Good: /home/user/bin/my-component
# ❌ Bad: ./bin/my-component
# 3. Test component standalone
/path/to/component
# 4. Check component logs in flowctl output
./bin/flowctl run pipeline.yaml 2>&1 | grep "component-id"Symptoms: Component starts but doesn't appear in registry, no health checks
Solutions:
# 1. Verify ENABLE_FLOWCTL is set
# In pipeline YAML:
env:
ENABLE_FLOWCTL: "true"
FLOWCTL_ENDPOINT: "127.0.0.1:8080"
# 2. Check control plane started
# Look for log: "Starting control plane on 127.0.0.1:8080"
# 3. Check component built with flowctl-sdk
# Components must use SDK for registration:
# github.com/withObsrvr/flowctl-sdk/pkg/{source,processor,consumer}Symptoms: Components start but no data appears in sink
Solutions:
# 1. Verify inputs are correctly specified
processors:
- id: my-processor
inputs: ["correct-source-id"] # Must match source id
# 2. Check event types match
# Source OutputType must match Processor InputType
# Check component code or logs
# 3. Enable debug logging
./bin/flowctl run pipeline.yaml --log-level=debug
# 4. Check for processing errors
./bin/flowctl run pipeline.yaml 2>&1 | grep -i errorSymptoms: "bind: address already in use"
Solutions:
# 1. Check what's using the port
lsof -i :50051
netstat -tulpn | grep 50051
# 2. Use unique ports for each component
sources:
- id: source-1
env:
PORT: ":50051"
HEALTH_PORT: "8088"
processors:
- id: processor-1
env:
PORT: ":50052" # Different port
HEALTH_PORT: "8089"
# 3. Kill conflicting process if needed
kill <PID>Symptoms: "validation failed" or schema errors
Solutions:
# 1. Validate configuration
./bin/flowctl run --dry-run pipeline.yaml
# 2. Check required fields
# - apiVersion: flowctl/v1
# - kind: Pipeline
# - metadata.name
# - spec.driver
# - At least one source and one sink
# 3. Verify YAML syntax
# Use yamllint or online YAML validator
# 4. Check for common mistakes:
# - Singular "source" instead of "sources" (array)
# - Wrong apiVersion (should be flowctl/v1)
# - Missing inputs on processors/sinksSymptoms: Component starts then immediately exits
Solutions:
# 1. Check environment variables are set
# Components may require config via env vars
# 2. Test component standalone with env vars
ENABLE_FLOWCTL=false \
PORT=:50051 \
HEALTH_PORT=8088 \
CONFIG_KEY=value \
/path/to/component
# 3. Check component logs for errors
# Look for initialization failures, missing dependencies
# 4. Verify dependencies are available
# Database connections, API endpoints, file paths, etc.If you encounter permission errors when running flowctl sandbox commands, the tool will provide platform-specific guidance. Common solutions include:
Linux:
- Run with sudo:
sudo flowctl sandbox start ... - Add your user to the docker group:
sudo usermod -aG docker $USER(then log out and back in) - Check if Docker service is running:
sudo systemctl status docker
macOS:
- Ensure Docker Desktop is running
- Check Docker Desktop permissions in System Preferences
- Try restarting Docker Desktop
NixOS:
- See detailed setup guide: docs/nixos-docker-setup.md
- Quick fix: Run with sudo
- Permanent fix: Add user to docker group in configuration.nix
If flowctl cannot find Docker or nerdctl:
- Install Docker: https://docs.docker.com/get-docker/
- Or install nerdctl: https://github.com/containerd/nerdctl
- flowctl uses your system-installed container runtime by default
# Run with verbose output
./bin/flowctl run pipeline.yaml --log-level=debug
# Save logs to file
./bin/flowctl run pipeline.yaml --log-level=debug 2>&1 | tee pipeline.log# While pipeline is running, check health endpoints
# (if component has HEALTH_PORT configured)
curl http://localhost:8088/health # Source
curl http://localhost:8089/health # Processor
curl http://localhost:8090/health # Sink# Control plane runs on 127.0.0.1:8080 by default
# Check registered components (requires gRPC client like grpcurl)
# Or check logs for registration messages
./bin/flowctl run pipeline.yaml 2>&1 | grep "Component registered"- Getting Started: examples/getting-started/README.md
- Configuration Guide: docs/configuration.md
- Building Components: docs/building-components.md
If you're still having trouble:
- Check existing issues: https://github.com/withobsrvr/flowctl/issues
- Create a new issue with:
- flowctl version (
./bin/flowctl version) - Pipeline YAML (redact sensitive info)
- Full error output
- Steps to reproduce
- flowctl version (
For sandbox-specific issues, refer to docs/sandbox.md for detailed troubleshooting steps.
# Build for current platform
make build
# Build for multiple platforms
make build-allmake testmake run-example- Getting Started Guide - Complete beginner's guide to flowctl
- Examples Overview - Navigate all examples by use case
- Configuration Guide - Complete schema reference and examples
- Building Components - How to build sources, processors, and sinks
- Migration Guide - Migrate from old config format to flowctl/v1
- Architecture - System design, data flow, and component lifecycle
- Performance Tuning - Optimization strategies and scaling
- Contract Events Pipeline - Complete Stellar → PostgreSQL pipeline (< 5 min to run)
MPL-2.0
- Fork the repository
- Create your feature branch
- Commit your changes
- Push to the branch
- Create a Pull Request