Skip to content

Latest commit

 

History

History
614 lines (455 loc) · 13.5 KB

File metadata and controls

614 lines (455 loc) · 13.5 KB

API Reference

Complete reference for the workflow engine API.

Table of Contents


Workflow Macro

The #[workflow] attribute macro transforms sequential async-style code into a state machine.

Signature

#[workflow]
fn workflow_name(init: InitType) -> Result<ResultType> {
    // Sequential code with await points
}

Requirements

  • Function name: Becomes the workflow handler name (e.g., BuffWorkflow from fn buff)
  • Init parameter: Must implement Serialize + Deserialize + Clone
  • Result type: Must be Result<T> where T implements Serialize + Deserialize
  • Body: May contain await points (timer!, signal!, spawn!, procedure!, select!)

Example

use workflow_core::prelude::*;
use workflow_macros::{workflow, Timer, Signal};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BuffInit { pub duration_secs: u64 }

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BuffResult { pub stacks: u32 }

#[workflow]
fn buff(init: BuffInit) -> Result<BuffResult> {
    let mut stacks: u32 = 1;
    timer!(BuffTimer::Expire, init.duration_secs.secs()).await;
    Ok(BuffResult { stacks })
}

Registration

Register workflows in the install! macro:

install! {
    "buff" => BuffWorkflow,
    "countdown" => CountdownWorkflow,
}

Await Point Macros

timer!

Wait for a timer to fire.

timer!(TimerEnum::Variant, duration).await;
Parameter Type Description
Timer variant impl Timer Type-safe timer identifier
Duration Duration Time to wait (use .secs(), .millis())

Example:

timer!(MyTimer::Tick, 5.secs()).await;
timer!(MyTimer::Cooldown, 500.millis()).await;

signal!

Wait for a signal (only valid inside select!).

select! {
    signal!(SignalEnum::Variant) => { /* handler */ },
    signal!(SignalEnum::WithPayload(binding)) => { /* use binding */ },
}.await;

Payload binding: Signals with payloads automatically deserialize and bind the value.

#[derive(Signal)]
enum MySignal {
    Simple,           // No payload
    WithData(u32),    // Payload bound to variable
}

select! {
    signal!(MySignal::Simple) => { /* no data */ },
    signal!(MySignal::WithData(n)) => {
        // n is already a u32!
        println!("Received: {}", n);
    },
}.await;

spawn!

Spawn a child workflow and wait for its result.

let result: ResultType = spawn!("workflow_name", init_data).await;
Parameter Type Description
Workflow name &str Registered workflow type name
Init data impl Serialize Initialization data for child

Result binding: The result must be bound with an explicit type annotation.

let child_result: ChildResult = spawn!("child", ChildInit { value: 42 }).await;

procedure!

Call a procedure and wait for the result.

let result: ResultType = procedure!("procedure_name", args).await;
Parameter Type Description
Procedure name &str Name of the procedure to call
Args impl Serialize Arguments for the procedure

select!

Wait for the first of multiple events.

select! {
    timer!(Timer::Variant, duration) => { /* timer handler */ },
    signal!(Signal::Variant) => { /* signal handler */ },
    signal!(Signal::WithPayload(n)) => { /* payload handler */ },
}.await;

Rules:

  • At least one arm required
  • At most one timer arm allowed
  • Multiple signal arms allowed
  • Timer arm cannot have payload binding
  • Each arm body can use break, continue, or return

Timer and Signal Derives

#[derive(Timer)]

Generate a type-safe timer enum.

#[derive(Timer)]
pub enum MyTimer {
    Tick,
    Expire,
    Cooldown,
}

Generated methods:

Method Returns Description
name(&self) &'static str Wire name ("tick", "expire", "cooldown")
from_name(name: &str) Option<Self> Parse from wire name
all_names() &'static [&'static str] All valid timer names

Requirements:

  • All variants must be unit variants (no fields)
  • Names are converted to snake_case for wire format (e.g., StartPatrol"start_patrol")

#[derive(Signal)]

Generate a type-safe signal enum with automatic payload deserialization.

#[derive(Signal)]
pub enum MySignal {
    Pause,                    // No payload
    SetValue(i32),           // Single value payload
    Configure { max: u32 },  // Struct payload
}

Generated methods:

Method Returns Description
name(&self) &'static str Wire name
from_name_and_payload(name, payload) Result<Self, SignalError> Parse with auto-deserialization
all_names() &'static [&'static str] All valid signal names

Payload deserialization:

  • Unit variants expect empty payload
  • Tuple variants deserialize the inner type from JSON
  • Struct variants deserialize from JSON object

SignalError

pub enum SignalError {
    UnknownSignal { signal: String, expected: &'static [&'static str] },
    DeserializationFailed { signal: String, error: String },
    UnexpectedPayload { signal: String },
}

Signal Naming Convention

Signal variant names are converted to snake_case for the wire format:

Enum Variant Wire Name
EnemyKilled "enemy_killed"
ThreatDetected "threat_detected"
StandDown "stand_down"

Use the snake_case name when calling workflow_signal or workflow_broadcast_signal.


Workflow View Structs

The #[workflow] macro automatically generates a View struct for querying workflow state.

Generated Types

For a workflow like:

#[workflow]
fn quest(init: QuestInit) -> Result<QuestResult> {
    let mut objectives_done: u32 = 0;
    let mut abandoned: bool = false;
    // ...
}

The macro generates:

/// Auto-generated view of workflow state
pub struct QuestWorkflowView {
    pub init: QuestInit,
    pub objectives_done: u32,
    pub abandoned: bool,
}

impl QuestWorkflow {
    /// Parse state_data into a typed view
    pub fn view(state_data: &[u8]) -> Result<QuestWorkflowView, serde_json::Error>;
}

Usage (Rust)

// Find a workflow
let workflow = ctx.db.workflow().id().find(&workflow_id)?;

// Parse state into typed view
let view = QuestWorkflow::view(&workflow.state_data)?;

// Access fields
if view.objectives_done >= view.init.objective_count {
    complete_quest(workflow_id);
}

Usage (TypeScript)

const workflow = conn.db.workflow.id.find(workflowId);
const state = JSON.parse(new TextDecoder().decode(workflow.stateData));

// Access view fields (init + mutable variables)
console.log(`Progress: ${state.objectives_done}/${state.init.objective_count}`);

Note: The state JSON includes internal fields (phase, loop counters), but serde ignores unknown fields when deserializing into the View struct.


DurationExt

Extension trait for ergonomic duration creation.

use workflow_core::prelude::*;

let d1 = 30.secs();      // 30 seconds
let d2 = 500.millis();   // 500 milliseconds
let d3 = 5.minutes();    // 5 minutes

Implemented for u64, u32, and i32.


Reducers

workflow_start

Create and start a new workflow.

workflow_start(
    ctx: &ReducerContext,
    workflow_type: String,
    entity_id: Option<u64>,
    correlation_id: Option<String>,
    initial_data: Vec<u8>,
) -> Result<(), String>
Parameter Description
workflow_type Registered workflow type name
entity_id Optional entity to attach workflow to
correlation_id Optional grouping ID
initial_data JSON-serialized initialization data

CLI Example:

spacetime call my-module workflow_start '["buff", 42, null, "{\"duration_secs\":30}"]'

workflow_signal

Send a signal to a running workflow.

workflow_signal(
    ctx: &ReducerContext,
    workflow_id: u64,
    signal_name: String,
    payload: Vec<u8>,
) -> Result<(), String>
Parameter Description
workflow_id Target workflow ID
signal_name Signal name (snake_case)
payload JSON-serialized payload (empty [] for unit signals)

CLI Example:

spacetime call my-module workflow_signal '[1, "stack", "5"]'
spacetime call my-module workflow_signal '[1, "dispel", "[]"]'

workflow_cancel

Cancel a running workflow.

workflow_cancel(
    ctx: &ReducerContext,
    workflow_id: u64,
    reason: String,
) -> Result<(), String>

Tables

workflow

Main workflow state table.

Column Type Description
id u64 Primary key
workflow_type String Registered type name
entity_id Option<u64> Attached entity
correlation_id Option<String> Grouping ID
status WorkflowStatus Current status
current_step String Human-readable step name
state_data Vec<u8> Serialized state
parent_id Option<u64> Parent workflow ID
created_at Timestamp Creation time
updated_at Timestamp Last update time
error_message Option<String> Error if failed

Indexes: entity_id, correlation_id, workflow_type, status

WorkflowStatus

pub enum WorkflowStatus {
    Running,
    Suspended,
    Completed,
    Failed,
    Cancelled,
}

workflow_timer

Scheduled timer table.

Column Type Description
scheduled_id u64 Primary key
scheduled_at ScheduleAt When to fire
workflow_id u64 Parent workflow
timer_name String Timer name

last_workflow_id

Stores ID of last created workflow.

Column Type Description
key u8 Always 0
workflow_id u64 Last created ID

Common Queries

-- Find all workflows for an entity
SELECT * FROM workflow WHERE entity_id = 42;

-- Find active workflows by type
SELECT * FROM workflow
WHERE workflow_type = 'patrol'
AND status IN ('Running', 'Suspended');

-- Find pending timers
SELECT * FROM workflow_timer
ORDER BY scheduled_at;

Mutable Variables

Variables that persist across await points must be declared with explicit type annotations.

Rules

  1. Explicit type required: let mut count: u32 = 0;
  2. Must be mutable: Only let mut variables are tracked
  3. Serializable types: Must implement Serialize + Deserialize

Example

#[workflow]
fn example(init: ExampleInit) -> Result<ExampleResult> {
    // These are persisted across await points
    let mut count: u32 = 0;
    let mut name: String = init.name.clone();
    let mut items: Vec<u32> = vec![];

    timer!(MyTimer::Tick, 1.secs()).await;

    // Variables still have their values after await
    count += 1;
    items.push(count);

    timer!(MyTimer::Tick, 1.secs()).await;

    Ok(ExampleResult { count, items })
}

Limitations

  • Cannot track references or closures
  • Complex types must be Clone + Serialize + Deserialize
  • Type inference not supported (must be explicit)

Control Flow

Loops

// Infinite loop with break
loop {
    select! {
        timer!(MyTimer::Tick, 1.secs()) => {
            count += 1;
            if count >= 10 { break }
            continue
        },
        signal!(MySignal::Stop) => break,
    }.await;
}

// Counter-based for loop
for i in 0..init.count as usize {
    let result: ChildResult = spawn!("child", ChildInit { index: i }).await;
    total += result.value;
}

Conditionals

// If/else with await in branches
if init.fast_mode {
    timer!(MyTimer::Fast, 1.secs()).await;
    value += 10;
} else {
    timer!(MyTimer::Slow, 5.secs()).await;
    value += 50;
}

Early Return

#[workflow]
fn example(init: ExampleInit) -> Result<ExampleResult> {
    if init.skip {
        return Ok(ExampleResult { skipped: true });
    }

    // ... rest of workflow
}

Break and Continue

Inside select! arms:

  • break — Exit the enclosing loop
  • continue — Re-enter the loop (re-register timers/signals)
  • return — Exit the workflow with a result
loop {
    select! {
        timer!(MyTimer::Tick, 1.secs()) => {
            count -= 1;
            if count == 0 {
                break  // Exit loop, continue to code after
            }
            continue  // Stay in loop
        },
        signal!(MySignal::Abort) => {
            return Ok(ExampleResult { aborted: true })  // Exit workflow
        },
    }.await;
}

// Code after loop runs after break
Ok(ExampleResult { final_count: count })

Prelude

Import common types:

use workflow_core::prelude::*;

// Imports:
// - Result (anyhow)
// - Serialize, Deserialize (serde)
// - DurationExt trait (.secs(), .millis(), .minutes())
// - Timer, Signal traits
// - SignalError

Also import macros separately:

use workflow_macros::{workflow, Timer, Signal, install};

Next Steps

Deployment — Production setup and operations → Troubleshooting — Common issues and solutions