Skip to content

false-systems/polku

POLKU

The path your events take.

License Rust

Messages in  ───>  POLKU  ───>  Messages out
                (transform)
                (filter)
                (route)

What is POLKU?

POLKU is a programmable protocol hub. Messages come in, get transformed, and go out to multiple destinations. Routing logic is Rust code, not config files.

                     ┌────────────────────────────────────────────────────┐
                     │                      POLKU                         │
                     │                                                    │
   Your App ────────>│  Ingestors ──> Middleware ──> Buffer ──> Emitters  │────────> gRPC
                     │      │              │                       │      │
   Agents ──────────>│      │         [transform]                  │      │────────> Webhook
                     │      │         [filter]                     │      │
   Webhooks ────────>│      │         [route]                      │      │────────> Stdout
                     │      │         [validate]                   │      │
                     │      v         [aggregate]                  v      │
                     │  ┌────────┐                           ┌────────┐  │
                     │  │ Plugin │  <── gRPC ──>             │ Plugin │  │
                     │  │ (any   │                           │ (any   │  │
                     │  │  lang) │                           │  lang) │  │
                     │  └────────┘                           └────────┘  │
                     └────────────────────────────────────────────────────┘

Why POLKU?

  • No YAML - Routing logic is code, not config files
  • Any language - Write plugins in Python, Go, Rust, whatever (gRPC interface)
  • Tiny - 10-20MB memory, no external dependencies
  • Fast - 178k+ events/sec streaming, ~1.3k/sec unary
  • Observable - 40+ Prometheus metrics, pipeline pressure gauge, health endpoints

Installation

From Source

git clone https://github.com/false-systems/polku
cd polku

# Build the gateway binary
cargo build --release

# Binary at ./target/release/polku-gateway

Docker

docker build -t polku-gateway .
docker run -p 50051:50051 -p 9090:9090 polku-gateway

As a Library

Add to your Cargo.toml:

[dependencies]
polku-runtime = { git = "https://github.com/false-systems/polku" }

Usage

Standalone Gateway

Run the binary — it listens for gRPC on port 50051 and serves Prometheus metrics on port 9090:

# Debug mode: prints messages to stdout
./polku-gateway

# Router mode: forward to downstream gRPC endpoints
POLKU_EMIT_GRPC_ENDPOINTS=host1:50051,host2:50051 ./polku-gateway

# With JSON structured logging
POLKU_LOG_FORMAT=json POLKU_LOG_LEVEL=debug ./polku-gateway

Send a message:

grpcurl -plaintext -d '{
  "source": "my-app",
  "cluster": "dev",
  "payload": {
    "events": {
      "events": [{
        "id": "1",
        "timestamp_unix_ns": 0,
        "source": "my-app",
        "event_type": "user.signup",
        "payload": "aGVsbG8="
      }]
    }
  }
}' localhost:50051 polku.v1.Gateway/SendEvent

As a Rust Library

Define your pipeline in your own crate — POLKU handles tracing, metrics, gRPC, and shutdown:

use polku_runtime::prelude::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    polku_runtime::run(|hub| async move {
        Ok(hub
            .ingestor(JsonIngestor::new())
            .middleware(Filter::new(|msg: &Message| {
                msg.message_type.starts_with("user.")
            }))
            .middleware(Validator::json())
            .emitter(StdoutEmitter::pretty()))
    }).await
}

Your closure receives a Hub pre-configured from environment variables (buffer capacity, batch size, flush interval). You add your ingestors, middleware, and emitters — the runtime does the rest.

For more control over ports or to disable gRPC:

use polku_runtime::prelude::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    RuntimeBuilder::new()
        .grpc_addr("0.0.0.0:50052".parse()?)
        .metrics_port(9091)
        .configure(|hub| async move {
            let grpc = GrpcEmitter::with_endpoints(vec![
                "downstream:50051".into(),
            ]).await?;

            Ok(hub
                .ingestor(JsonIngestor::new())
                .emitter(grpc))
        }).await
}
Advanced: direct Hub wiring without the runtime

If you need full control and want to manage tracing, metrics, and the gRPC server yourself:

use polku_gateway::*;
use std::sync::Arc;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let (_, hub_sender, runner) = Hub::new()
        .buffer_capacity(50_000)
        .batch_size(500)
        .flush_interval_ms(50)
        .middleware(Filter::new(|msg: &Message| msg.message_type.starts_with("user.")))
        .middleware(Validator::json())
        .emitter(StdoutEmitter::pretty())
        .build();

    tokio::spawn(runner.run());

    let registry = Arc::new(PluginRegistry::new());
    let service = polku_gateway::server::GatewayService::with_hub(
        registry, hub_sender,
    );

    tonic::transport::Server::builder()
        .add_service(service.into_server())
        .serve("[::1]:50051".parse()?)
        .await?;

    Ok(())
}

Kubernetes

POLKU runs as a single pod. Minimal deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: polku
spec:
  replicas: 1
  selector:
    matchLabels:
      app: polku
  template:
    metadata:
      labels:
        app: polku
    spec:
      containers:
        - name: polku
          image: polku-gateway:latest
          ports:
            - containerPort: 50051  # gRPC
            - containerPort: 9090   # Metrics
          env:
            - name: POLKU_GRPC_ADDR
              value: "0.0.0.0:50051"
            - name: POLKU_METRICS_ADDR
              value: "0.0.0.0:9090"
---
apiVersion: v1
kind: Service
metadata:
  name: polku
spec:
  selector:
    app: polku
  ports:
    - name: grpc
      port: 50051
    - name: metrics
      port: 9090

The Pipeline

┌─────────────────────────────────────────────────────────────────────┐
│                          MESSAGE FLOW                                │
│                                                                      │
│   INGEST            MIDDLEWARE            BUFFER            EMIT     │
│  ┌───────┐        ┌───────────┐        ┌───────────┐     ┌───────┐ │
│  │ bytes │───────>│  Filter   │───────>│   Ring    │────>│ Fan   │ │
│  │   |   │        │  Enrich   │        │  Buffer   │     │ Out   │ │
│  │Message│        │  Route    │        │  (async)  │     │       │ │
│  └───────┘        │  Validate │        │  Tiered   │     └───────┘ │
│                   │  Aggregate│        └───────────┘               │
│                   └───────────┘                                     │
└─────────────────────────────────────────────────────────────────────┘
  1. Ingestors - Parse bytes into Messages (protobuf, JSON, NDJSON, custom via gRPC plugin)
  2. Middleware - Transform, filter, enrich, validate, route, aggregate, sample, rate-limit, deduplicate
  3. Buffer - Decouple ingestion from emission (backpressure). Standard ring buffer or tiered with zstd compression.
  4. Emitters - Send to destinations (gRPC, webhooks, stdout, custom via gRPC plugin). Optional resilience wrappers: retry, circuit breaker, failure capture.

The core type flowing through the pipeline is Message — protocol-agnostic, zero-copy:

pub struct Message {
    pub id: MessageId,                        // 16-byte ULID (Copy)
    pub timestamp: i64,                       // Unix nanos
    pub source: InternedStr,                  // Interned, O(1) clone
    pub message_type: InternedStr,            // Interned, O(1) clone
    pub metadata: Option<Box<HashMap<...>>>,  // Lazy-allocated
    pub payload: Bytes,                       // Zero-copy (Arc-based)
    pub route_to: SmallVec<[String; 2]>,      // Inline for 0-2 targets
}

The proto Event type only exists at gRPC boundaries (wire format). Inside the pipeline, everything is Message.


Built-in Components

Ingestors

Name Sources Description
PassthroughIngestor passthrough, polku Decodes protobuf Event into Message
JsonIngestor json, json-lines, ndjson JSON objects, arrays, or newline-delimited
ExternalIngestor configurable Delegates to any gRPC plugin

Middleware

Name What it does
Filter Drop messages that don't match a predicate
Transform Apply a function to each message
Router Content-based routing (sets route_to targets)
RateLimiter Global token-bucket rate limiting
Throttle Per-source rate limiting with LRU eviction
Sampler Probabilistic sampling (lock-free xorshift64)
Deduplicator Drop duplicate IDs within a TTL window
Enricher Add metadata from async lookups (DB, API)
Validator Validate messages (built-ins: JSON, non-empty, max-size)
Aggregator Batch N messages into 1 (strategies: JSON array, concat, first, last)

Emitters

Name Description
StdoutEmitter Print to console (compact or pretty format)
GrpcEmitter Send to downstream gRPC endpoints with load balancing
WebhookEmitter HTTP POST JSON to any URL
ExternalEmitter Delegate to any gRPC plugin

Resilience Wrappers

Composable layers that wrap any emitter:

Wrapper What it does
RetryEmitter Exponential backoff with jitter
CircuitBreakerEmitter Closed/Open/Half-Open state machine
FailureCaptureEmitter Buffer failed messages for debugging

Compose with the builder:

let emitter = ResilientEmitter::wrap(my_emitter)
    .with_default_retry()
    .with_default_circuit_breaker()
    .with_failure_capture(buffer)
    .build();

FALSE Protocol

POLKU has first-class support for the FALSE Protocol (Functional AI-native Semantic Events). The polku-fp crate provides plugins for ingesting, routing, correlating, and aggregating Occurrences.

[dependencies]
polku-fp = { git = "https://github.com/false-systems/polku" }

Occurrence Pipeline

use polku_fp::*;

polku_runtime::run(|hub| async move {
    Ok(hub
        .ingestor(OccurrenceIngestor::new())
        .middleware(OccurrenceCorrelator::new().ttl_secs(300))
        .middleware(OccurrenceAggregator::new(5).ttl_secs(600))
        .middleware(OccurrenceRouter::new()
            .route("ci.", "sykli")
            .route("kernel.", "tapio"))
        .emitter(grpc_downstream))
}).await

FP Components

Component What it does
OccurrenceIngestor Decode Occurrence JSON (single or array) into Messages
OccurrenceRouter Route by message_type prefix (first match wins)
OccurrenceCorrelator Track correlation groups, enrich with polku.correlation_key metadata
OccurrenceAggregator Batch correlated messages into summaries with History linking
Probe Self-aware pipeline observer — emits Occurrences about pressure, health, circuit state

History & Causal Chains

Occurrences carry a History block for causal chain tracking:

{
  "type": "gateway.pipeline.pressure_recovered",
  "history": {
    "previous_ids": ["01ALERT_ID"],
    "lifecycle": "resolved"
  },
  "context": {
    "correlation_keys": ["probe.pressure_high.polku-prod"]
  }
}

The Probe automatically links recovery events to their triggering alerts via shared correlation keys and history.previous_ids. The Aggregator marks batch summaries with lifecycle: "aggregated" and links to all constituent IDs.


Write a Plugin

Plugins communicate over gRPC. Write them in any language.

Python: Slack Emitter

# Receives Message data as Event over gRPC
class SlackEmitter:
    def Info(self, request, context):
        return PluginInfo(
            name="slack-emitter",
            type=EMITTER,
            emitter_name="slack",
        )

    def Emit(self, request, context):
        for event in request.events:
            message = f"*{event.event_type}* from `{event.source}`"
            requests.post(SLACK_WEBHOOK, json={"text": message})
        return EmitResponse(success_count=len(request.events))

server = grpc.server(ThreadPoolExecutor())
server.add_insecure_port('[::]:9001')
server.start()

Go: CSV Ingestor

func (p *CSVIngestor) Ingest(ctx context.Context, req *IngestRequest) (*IngestResponse, error) {
    reader := csv.NewReader(bytes.NewReader(req.Data))
    headers, _ := reader.Read()

    var events []*Event
    for {
        row, err := reader.Read()
        if err != nil { break }

        metadata := make(map[string]string)
        for i, h := range headers {
            metadata[h] = row[i]
        }

        events = append(events, &Event{
            Id:        uuid.New().String(),
            EventType: "csv.row",
            Metadata:  metadata,
        })
    }
    return &IngestResponse{Events: events}, nil
}

Wire up in Rust:

Hub::new()
    .ingestor(ExternalIngestor::new("csv", "http://localhost:9002"))
    .emitter(Arc::new(ExternalEmitter::new("http://localhost:9001")))
    .build();

Configuration

Environment Variables

Variable Default Description
POLKU_GRPC_ADDR [::1]:50051 gRPC listen address
POLKU_METRICS_ADDR 127.0.0.1:9090 Prometheus metrics address
POLKU_BUFFER_CAPACITY 100000 Ring buffer capacity
POLKU_BATCH_SIZE 1000 Batch size for emitters
POLKU_FLUSH_INTERVAL_MS 100 Flush interval (ms)
POLKU_LOG_LEVEL info Log level
POLKU_LOG_FORMAT pretty Log format (pretty or json)
POLKU_EMIT_GRPC_ENDPOINTS - Comma-separated gRPC downstream endpoints
POLKU_EMIT_GRPC_LAZY false Lazy gRPC connections

See Usage for programmatic Rust examples.


Observability

Prometheus Metrics (port 9090)

POLKU exposes 40+ metrics at GET /metrics:

Category Key Metrics
Throughput polku_events_received_total, polku_events_forwarded_total, polku_events_per_second
Buffer polku_buffer_size, polku_buffer_capacity, polku_buffer_overflow_total
Latency polku_processing_latency, polku_flush_duration_seconds, polku_middleware_duration_seconds
Emitter health polku_emitter_health, polku_circuit_breaker_state, polku_emitter_throughput
Load balancing polku_grpc_endpoint_fill_ratio, polku_grpc_endpoint_health, polku_grpc_failover_total
Pipeline polku_pipeline_pressure (composite: 0.0 idle → 1.0 overloaded)

Health Endpoints

  • GET /health - JSON health summary with pipeline pressure and per-component status
  • GET /pipeline - JSON pipeline manifest (topology, components, configuration)

Processing Trace

Set metadata["polku.trace"] = "true" on a message to get per-middleware timing:

[{"mw":"filter","action":"passed","us":12},{"mw":"router","action":"routed","targets":["kafka"]}]

Stored in metadata["_polku.trace"] after pipeline processing.


Performance

Metric Value
Memory 10-20 MB
Streaming throughput 178k+ events/sec
Unary throughput ~1.3k events/sec
Plugin overhead ~100-500us per gRPC call

Streaming (StreamEvents) is the high-throughput path. For bulk ingestion, use batched streaming instead of unary SendEvent.

The pipeline is zero-copy end-to-end: Bytes payloads flow without allocation, InternedStr fields clone in O(1), and MessageId is Copy.


Project Structure

polku/
├── core/                    # Shared types crate
│   └── src/
│       ├── message.rs       # Message, MessageId, Routes, Metadata
│       ├── emit.rs          # Emitter trait
│       ├── error.rs         # PluginError
│       └── intern.rs        # InternedStr
│
├── gateway/                 # Gateway library + binary
│   └── src/
│       ├── main.rs          # Standalone entry point
│       ├── config.rs        # Env var configuration
│       ├── server.rs        # gRPC server (polku.v1.Gateway)
│       ├── hub/             # Hub builder + runner
│       ├── ingest/          # Ingestors (3 built-in)
│       ├── middleware/       # Middleware (10 types)
│       ├── emit/            # Emitters (4 + resilience wrappers)
│       ├── metrics.rs       # Prometheus metric definitions
│       ├── metrics_server.rs# /metrics, /health, /pipeline
│       ├── buffer.rs        # RingBuffer
│       ├── buffer_tiered.rs # TieredBuffer (zstd compression)
│       └── manifest.rs      # Pipeline self-description
│
├── false-protocol/          # FALSE Protocol types (standalone)
│   └── src/
│       └── lib.rs           # Occurrence, Severity, Outcome, History, Context, Error, Reasoning
│
├── fp/                      # FALSE Protocol plugins for POLKU
│   └── src/
│       ├── ingest.rs        # OccurrenceIngestor — JSON → Message
│       ├── middleware.rs     # OccurrenceRouter — prefix-based routing
│       ├── correlator.rs    # OccurrenceCorrelator — correlation key enrichment
│       ├── aggregator.rs    # OccurrenceAggregator — batch by correlation key
│       └── probe.rs         # Probe — self-aware pipeline observer
│
├── runtime/                 # Injectable pipeline interface
│   └── src/
│       ├── lib.rs           # run(), RuntimeBuilder
│       └── prelude.rs       # Re-exports for pipeline authors
│
├── ci/                      # CI utilities
├── test-plugins/            # Test plugin implementations
│   └── receiver/            # gRPC receiver for testing
├── tests/e2e/               # End-to-end tests (Kind k8s)
└── docs/                    # Documentation

Proto definitions live in a separate repository at proto/polku/v1/:

  • gateway.proto - Client-facing gRPC service (SendEvent, StreamEvents, Health)
  • plugin.proto - Plugin gRPC interface (Ingestor/Emitter plugins)
  • event.proto - Wire-format Event type (gRPC boundaries only)

Naming

Polku (Finnish) = "path"

The path your messages take through the system.


License

Apache 2.0

About

A programmable Rust message pipeline that transforms, buffers, and routes messages between internal services.

Topics

Resources

License

Contributing

Security policy

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages