Complete reference for the workflow engine API.
- Workflow Macro
- Await Point Macros
- Timer and Signal Derives
- DurationExt
- Reducers
- Tables
- Mutable Variables
- Control Flow
The #[workflow] attribute macro transforms sequential async-style code into a state machine.
#[workflow]
fn workflow_name(init: InitType) -> Result<ResultType> {
// Sequential code with await points
}- Function name: Becomes the workflow handler name (e.g.,
BuffWorkflowfromfn buff) - Init parameter: Must implement
Serialize + Deserialize + Clone - Result type: Must be
Result<T>whereTimplementsSerialize + Deserialize - Body: May contain await points (
timer!,signal!,spawn!,procedure!,select!)
use workflow_core::prelude::*;
use workflow_macros::{workflow, Timer, Signal};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BuffInit { pub duration_secs: u64 }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BuffResult { pub stacks: u32 }
#[workflow]
fn buff(init: BuffInit) -> Result<BuffResult> {
let mut stacks: u32 = 1;
timer!(BuffTimer::Expire, init.duration_secs.secs()).await;
Ok(BuffResult { stacks })
}Register workflows in the install! macro:
install! {
"buff" => BuffWorkflow,
"countdown" => CountdownWorkflow,
}Wait for a timer to fire.
timer!(TimerEnum::Variant, duration).await;| Parameter | Type | Description |
|---|---|---|
| Timer variant | impl Timer |
Type-safe timer identifier |
| Duration | Duration |
Time to wait (use .secs(), .millis()) |
Example:
timer!(MyTimer::Tick, 5.secs()).await;
timer!(MyTimer::Cooldown, 500.millis()).await;Wait for a signal (only valid inside select!).
select! {
signal!(SignalEnum::Variant) => { /* handler */ },
signal!(SignalEnum::WithPayload(binding)) => { /* use binding */ },
}.await;Payload binding: Signals with payloads automatically deserialize and bind the value.
#[derive(Signal)]
enum MySignal {
Simple, // No payload
WithData(u32), // Payload bound to variable
}
select! {
signal!(MySignal::Simple) => { /* no data */ },
signal!(MySignal::WithData(n)) => {
// n is already a u32!
println!("Received: {}", n);
},
}.await;Spawn a child workflow and wait for its result.
let result: ResultType = spawn!("workflow_name", init_data).await;| Parameter | Type | Description |
|---|---|---|
| Workflow name | &str |
Registered workflow type name |
| Init data | impl Serialize |
Initialization data for child |
Result binding: The result must be bound with an explicit type annotation.
let child_result: ChildResult = spawn!("child", ChildInit { value: 42 }).await;Call a procedure and wait for the result.
let result: ResultType = procedure!("procedure_name", args).await;| Parameter | Type | Description |
|---|---|---|
| Procedure name | &str |
Name of the procedure to call |
| Args | impl Serialize |
Arguments for the procedure |
Wait for the first of multiple events.
select! {
timer!(Timer::Variant, duration) => { /* timer handler */ },
signal!(Signal::Variant) => { /* signal handler */ },
signal!(Signal::WithPayload(n)) => { /* payload handler */ },
}.await;Rules:
- At least one arm required
- At most one timer arm allowed
- Multiple signal arms allowed
- Timer arm cannot have payload binding
- Each arm body can use
break,continue, orreturn
Generate a type-safe timer enum.
#[derive(Timer)]
pub enum MyTimer {
Tick,
Expire,
Cooldown,
}Generated methods:
| Method | Returns | Description |
|---|---|---|
name(&self) |
&'static str |
Wire name ("tick", "expire", "cooldown") |
from_name(name: &str) |
Option<Self> |
Parse from wire name |
all_names() |
&'static [&'static str] |
All valid timer names |
Requirements:
- All variants must be unit variants (no fields)
- Names are converted to snake_case for wire format (e.g.,
StartPatrol→"start_patrol")
Generate a type-safe signal enum with automatic payload deserialization.
#[derive(Signal)]
pub enum MySignal {
Pause, // No payload
SetValue(i32), // Single value payload
Configure { max: u32 }, // Struct payload
}Generated methods:
| Method | Returns | Description |
|---|---|---|
name(&self) |
&'static str |
Wire name |
from_name_and_payload(name, payload) |
Result<Self, SignalError> |
Parse with auto-deserialization |
all_names() |
&'static [&'static str] |
All valid signal names |
Payload deserialization:
- Unit variants expect empty payload
- Tuple variants deserialize the inner type from JSON
- Struct variants deserialize from JSON object
pub enum SignalError {
UnknownSignal { signal: String, expected: &'static [&'static str] },
DeserializationFailed { signal: String, error: String },
UnexpectedPayload { signal: String },
}Signal variant names are converted to snake_case for the wire format:
| Enum Variant | Wire Name |
|---|---|
EnemyKilled |
"enemy_killed" |
ThreatDetected |
"threat_detected" |
StandDown |
"stand_down" |
Use the snake_case name when calling workflow_signal or workflow_broadcast_signal.
The #[workflow] macro automatically generates a View struct for querying workflow state.
For a workflow like:
#[workflow]
fn quest(init: QuestInit) -> Result<QuestResult> {
let mut objectives_done: u32 = 0;
let mut abandoned: bool = false;
// ...
}The macro generates:
/// Auto-generated view of workflow state
pub struct QuestWorkflowView {
pub init: QuestInit,
pub objectives_done: u32,
pub abandoned: bool,
}
impl QuestWorkflow {
/// Parse state_data into a typed view
pub fn view(state_data: &[u8]) -> Result<QuestWorkflowView, serde_json::Error>;
}// Find a workflow
let workflow = ctx.db.workflow().id().find(&workflow_id)?;
// Parse state into typed view
let view = QuestWorkflow::view(&workflow.state_data)?;
// Access fields
if view.objectives_done >= view.init.objective_count {
complete_quest(workflow_id);
}const workflow = conn.db.workflow.id.find(workflowId);
const state = JSON.parse(new TextDecoder().decode(workflow.stateData));
// Access view fields (init + mutable variables)
console.log(`Progress: ${state.objectives_done}/${state.init.objective_count}`);Note: The state JSON includes internal fields (phase, loop counters), but serde ignores unknown fields when deserializing into the View struct.
Extension trait for ergonomic duration creation.
use workflow_core::prelude::*;
let d1 = 30.secs(); // 30 seconds
let d2 = 500.millis(); // 500 milliseconds
let d3 = 5.minutes(); // 5 minutesImplemented for u64, u32, and i32.
Create and start a new workflow.
workflow_start(
ctx: &ReducerContext,
workflow_type: String,
entity_id: Option<u64>,
correlation_id: Option<String>,
initial_data: Vec<u8>,
) -> Result<(), String>| Parameter | Description |
|---|---|
workflow_type |
Registered workflow type name |
entity_id |
Optional entity to attach workflow to |
correlation_id |
Optional grouping ID |
initial_data |
JSON-serialized initialization data |
CLI Example:
spacetime call my-module workflow_start '["buff", 42, null, "{\"duration_secs\":30}"]'Send a signal to a running workflow.
workflow_signal(
ctx: &ReducerContext,
workflow_id: u64,
signal_name: String,
payload: Vec<u8>,
) -> Result<(), String>| Parameter | Description |
|---|---|
workflow_id |
Target workflow ID |
signal_name |
Signal name (snake_case) |
payload |
JSON-serialized payload (empty [] for unit signals) |
CLI Example:
spacetime call my-module workflow_signal '[1, "stack", "5"]'
spacetime call my-module workflow_signal '[1, "dispel", "[]"]'Cancel a running workflow.
workflow_cancel(
ctx: &ReducerContext,
workflow_id: u64,
reason: String,
) -> Result<(), String>Main workflow state table.
| Column | Type | Description |
|---|---|---|
id |
u64 |
Primary key |
workflow_type |
String |
Registered type name |
entity_id |
Option<u64> |
Attached entity |
correlation_id |
Option<String> |
Grouping ID |
status |
WorkflowStatus |
Current status |
current_step |
String |
Human-readable step name |
state_data |
Vec<u8> |
Serialized state |
parent_id |
Option<u64> |
Parent workflow ID |
created_at |
Timestamp |
Creation time |
updated_at |
Timestamp |
Last update time |
error_message |
Option<String> |
Error if failed |
Indexes: entity_id, correlation_id, workflow_type, status
pub enum WorkflowStatus {
Running,
Suspended,
Completed,
Failed,
Cancelled,
}Scheduled timer table.
| Column | Type | Description |
|---|---|---|
scheduled_id |
u64 |
Primary key |
scheduled_at |
ScheduleAt |
When to fire |
workflow_id |
u64 |
Parent workflow |
timer_name |
String |
Timer name |
Stores ID of last created workflow.
| Column | Type | Description |
|---|---|---|
key |
u8 |
Always 0 |
workflow_id |
u64 |
Last created ID |
-- Find all workflows for an entity
SELECT * FROM workflow WHERE entity_id = 42;
-- Find active workflows by type
SELECT * FROM workflow
WHERE workflow_type = 'patrol'
AND status IN ('Running', 'Suspended');
-- Find pending timers
SELECT * FROM workflow_timer
ORDER BY scheduled_at;Variables that persist across await points must be declared with explicit type annotations.
- Explicit type required:
let mut count: u32 = 0; - Must be mutable: Only
let mutvariables are tracked - Serializable types: Must implement
Serialize + Deserialize
#[workflow]
fn example(init: ExampleInit) -> Result<ExampleResult> {
// These are persisted across await points
let mut count: u32 = 0;
let mut name: String = init.name.clone();
let mut items: Vec<u32> = vec![];
timer!(MyTimer::Tick, 1.secs()).await;
// Variables still have their values after await
count += 1;
items.push(count);
timer!(MyTimer::Tick, 1.secs()).await;
Ok(ExampleResult { count, items })
}- Cannot track references or closures
- Complex types must be
Clone + Serialize + Deserialize - Type inference not supported (must be explicit)
// Infinite loop with break
loop {
select! {
timer!(MyTimer::Tick, 1.secs()) => {
count += 1;
if count >= 10 { break }
continue
},
signal!(MySignal::Stop) => break,
}.await;
}
// Counter-based for loop
for i in 0..init.count as usize {
let result: ChildResult = spawn!("child", ChildInit { index: i }).await;
total += result.value;
}// If/else with await in branches
if init.fast_mode {
timer!(MyTimer::Fast, 1.secs()).await;
value += 10;
} else {
timer!(MyTimer::Slow, 5.secs()).await;
value += 50;
}#[workflow]
fn example(init: ExampleInit) -> Result<ExampleResult> {
if init.skip {
return Ok(ExampleResult { skipped: true });
}
// ... rest of workflow
}Inside select! arms:
break— Exit the enclosing loopcontinue— Re-enter the loop (re-register timers/signals)return— Exit the workflow with a result
loop {
select! {
timer!(MyTimer::Tick, 1.secs()) => {
count -= 1;
if count == 0 {
break // Exit loop, continue to code after
}
continue // Stay in loop
},
signal!(MySignal::Abort) => {
return Ok(ExampleResult { aborted: true }) // Exit workflow
},
}.await;
}
// Code after loop runs after break
Ok(ExampleResult { final_count: count })Import common types:
use workflow_core::prelude::*;
// Imports:
// - Result (anyhow)
// - Serialize, Deserialize (serde)
// - DurationExt trait (.secs(), .millis(), .minutes())
// - Timer, Signal traits
// - SignalErrorAlso import macros separately:
use workflow_macros::{workflow, Timer, Signal, install};→ Deployment — Production setup and operations → Troubleshooting — Common issues and solutions