Skip to content

Chetas-Patil/workflowx

Repository files navigation

workflowx (salvador)

Cloud-native workflow runtime for Go. Persist tasks, run ordered steps reliably, and swap the storage and queue layer without touching workflow logic.

Go Reference Go Version CI Security License


Why

Most teams reach for a heavyweight orchestrator (Temporal, Airflow, Cadence) when all they need is durable, ordered, observable execution of a few business workflows. salvador is the small library you can drop into a Go service to get:

  • Durability — every task is persisted before it executes; a process crash never loses work.
  • Ordered semantics — steps run in a deterministic order; failures stop the chain and surface the precise step that broke.
  • Pluggable backends — Redis ships in the box; a single Store interface lets you back the engine with PostgreSQL, DynamoDB, NATS JetStream, or an in-memory store for tests.
  • Predictable concurrency — workers per workflow are a single tuning knob.

System Architecture

flowchart LR
    P[Producer / API handler] -- Submit(workflow, body) --> E[Engine]
    E -- Save task --> S[(Store)]
    E -- Enqueue task ID --> Q[(Queue)]
    subgraph Workers
      W1[Worker 1]
      W2[Worker 2]
      Wn[Worker N]
    end
    Q -- Dequeue --> W1
    Q -- Dequeue --> W2
    Q -- Dequeue --> Wn
    W1 -- Run steps --> S
    W2 -- Run steps --> S
    Wn -- Run steps --> S
    S -. Default impl .- R[(Redis)]
    Q -. Default impl .- R
Loading

The Store interface owns both task persistence and the pending queue. Production deployments typically point both at the same Redis cluster, but the interface lets you separate them (e.g. Postgres for durability, Kafka for queuing) without changing the engine.

Task Lifecycle

sequenceDiagram
    participant API as API / Producer
    participant Engine
    participant Store as Store (Redis)
    participant Worker

    API->>Engine: Submit("process-order", body)
    Engine->>Store: Save(task, status=pending)
    Engine->>Store: Enqueue(workflow, taskID)
    Engine-->>API: taskID

    loop pollInterval (1s default)
        Worker->>Store: Dequeue(workflow)
        Store-->>Worker: taskID
        Worker->>Store: Get(taskID)
        Worker->>Store: Save(status=running)
        loop step in workflow.steps[task.Step:]
            Worker->>Worker: step.fn(ctx, task)
            alt step error
                Worker->>Store: Save(status=failed, error)
                Note right of Worker: subsequent steps skipped
            else step ok
                Worker->>Store: Save(step+=1)
            end
        end
        Worker->>Store: Save(status=completed)
    end
Loading

A worker only re-saves the task between steps, so a crash mid-step replays that step on recovery — step functions should be idempotent.

Quick Start

go get github.com/workflowx/salvador
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"

    "github.com/redis/go-redis/v9"
    "github.com/workflowx/salvador"
)

func main() {
    rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
    store := salvador.NewRedisStore(rdb, salvador.WithKeyPrefix("orders:"))

    wf := salvador.NewWorkflow("process-order").
        Step("validate", validate).
        Step("charge-payment", charge).
        Step("ship", ship)

    engine := salvador.NewEngine(store, salvador.WithConcurrency(8))
    engine.Register(wf)

    ctx := context.Background()
    body, _ := json.Marshal(map[string]any{"item": "widget", "quantity": 5})
    taskID, err := engine.Submit(ctx, "process-order", body)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println("submitted:", taskID)

    if err := engine.Start(ctx); err != nil {
        log.Fatal(err)
    }
}

func validate(ctx context.Context, t *salvador.Task) error { /* ... */ return nil }
func charge(ctx context.Context, t *salvador.Task) error    { /* ... */ return nil }
func ship(ctx context.Context, t *salvador.Task) error      { /* ... */ return nil }

Concepts

Task

type Task struct {
    ID        string            // UUID assigned at Submit
    Name      string            // workflow name
    Body      []byte            // arbitrary, opaque to the engine
    Status    TaskStatus        // pending | running | completed | failed
    Step      int               // last completed step (resume index)
    Error     string            // populated when Status == failed
    CreatedAt time.Time
    UpdatedAt time.Time
    Meta      map[string]string // user-controlled labels
}

Workflow

wf := salvador.NewWorkflow("process-order").
    Step("validate", validate).
    Step("charge-payment", charge).
    Step("ship", ship)

Steps are appended in order. Each step is a StepFunc:

type StepFunc func(ctx context.Context, task *Task) error

A step returning nil advances the task; returning an error fails it and skips the rest.

Engine

engine := salvador.NewEngine(store,
    salvador.WithPollInterval(2 * time.Second), // default: 1s
    salvador.WithConcurrency(8),                // default: 1 (workers per workflow)
    salvador.WithLogger(myLogger),              // default: log.Default()
)

engine.Start(ctx) blocks; cancel the context to drain.

Custom Store

type Store interface {
    Save(ctx context.Context, task *Task) error
    Get(ctx context.Context, id string) (*Task, error)
    Enqueue(ctx context.Context, workflowName string, taskID string) error
    Dequeue(ctx context.Context, workflowName string) (string, error)
}

A memStore implementation lives in engine_test.go and is the simplest reference for new backends.

Operational Notes

Status & Recovery

Inspect a task at any time:

task, _ := engine.GetTask(ctx, taskID)
if task.Status == salvador.TaskStatusFailed {
    log.Printf("failed at step %d: %s", task.Step, task.Error)
}

The engine resumes from task.Step on the next dequeue, so a crash mid-step replays that step on recovery.

Concurrency Model

WithConcurrency(N) spawns N workers per registered workflow. Total goroutines: N * len(workflows). Each worker independently polls the queue at pollInterval. Use a longer interval for low-throughput workloads to reduce Redis load.

Backpressure

The default Redis store uses LPUSH / RPOP (FIFO). There is no per-workflow rate limit — saturate by lowering WithConcurrency. For hard rate limits, wrap the step function with a golang.org/x/time/rate limiter.

Security Considerations

  • Redis is not authenticated by default. Always run with requirepass and TLS in production. The library does not enforce this for you.
  • Task.Body is opaque bytes. Do not put cleartext PII or secrets in it; the JSON-encoded task is persisted in Redis. Encrypt sensitive payloads client-side before Submit.
  • Step functions execute arbitrary code. Treat workflow registration as code execution: only register workflows from trusted sources, especially if you ever load step definitions dynamically.
  • No retries. Failed tasks stay failed. Wrap step functions with your own retry policy (e.g. cenkalti/backoff) and decide explicitly which errors are transient.

Scalability

Lever Knob Notes
Vertical WithConcurrency Workers per workflow per process.
Horizontal Multiple processes The Redis queue serializes dequeues — safe to run N processes against one Redis.
Storage WithKeyPrefix Isolate environments / tenants on a shared Redis.
Polling WithPollInterval Trade latency for Redis QPS.
Backend Custom Store Swap Redis for Postgres / NATS / Kafka without touching workflows.

The default Redis store keeps every task forever — pair it with a TTL/cleanup job when running at high volume.

Roadmap

The current release is a minimal, durable runtime. Open work tracked in Issues:

  • Built-in retry / backoff policy on StepFunc
  • Compensation hooks (saga pattern) for rollback semantics
  • OpenTelemetry tracing on step boundaries
  • Dead-letter queue for terminally failed tasks
  • Postgres-backed Store reference implementation

Development

make test                  # go test ./... -race
make lint                  # golangci-lint run
make security              # gosec ./...

# or, plain
go test ./... -race -cover

Continuous integration runs the same checks on every push and PR; see .github/workflows.

License

MIT — see LICENSE.

About

Cloud-native workflow runtime for Go. Pluggable state + queue backends, built-in tracing, retries, and compensation.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors