|
| 1 | +--- |
| 2 | +hide: |
| 3 | + - toc |
| 4 | + - navigation |
| 5 | +--- |
| 6 | + |
| 7 | +# Getting Started |
| 8 | + |
| 9 | +Duron is a Python library that makes async workflows replayable. You can pause, resume, or rerun async functions without redoing completed steps. This guide will walk you through the core concepts and get you building your first durable workflow. |
| 10 | + |
| 11 | +## Installation |
| 12 | + |
| 13 | +Duron requires **Python 3.10 or higher**. |
| 14 | + |
| 15 | +Install via pip: |
| 16 | + |
| 17 | +```bash |
| 18 | +pip install duron |
| 19 | +``` |
| 20 | + |
| 21 | +Or if you're using [uv](https://docs.astral.sh/uv/): |
| 22 | + |
| 23 | +```bash |
| 24 | +uv add duron |
| 25 | +``` |
| 26 | + |
| 27 | +## Core Concepts |
| 28 | + |
| 29 | +Duron introduces two fundamental building blocks for creating replayable workflows: |
| 30 | + |
| 31 | +### 1. Durable Functions (`@duron.durable`) |
| 32 | + |
| 33 | +Durable functions are the orchestrators of your workflow. They define the control flow and coordinate multiple operations. Key characteristics: |
| 34 | + |
| 35 | +- **Always take [`Context`][duron.Context] as the first parameter** - This is your handle to run effects and create streams/signals |
| 36 | +- **Deterministic** - The same inputs always produce the same execution path |
| 37 | +- **Replayable** - When resumed, Duron replays logged results to restore state without re-executing completed steps |
| 38 | +- **No side effects** - All I/O must go through effects |
| 39 | + |
| 40 | +```python |
| 41 | +@duron.durable |
| 42 | +async def my_workflow(ctx: duron.Context, arg: str) -> str: |
| 43 | + # Orchestration logic here |
| 44 | + result = await ctx.run(some_effect, arg) |
| 45 | + return result |
| 46 | +``` |
| 47 | + |
| 48 | +### 2. Effect Functions (`@duron.effect`) |
| 49 | + |
| 50 | +Effects wrap any code that interacts with the outside world. This includes: |
| 51 | + |
| 52 | +- API calls |
| 53 | +- Database queries |
| 54 | +- File I/O |
| 55 | +- Random number generation |
| 56 | +- Any non-deterministic operation |
| 57 | + |
| 58 | +Duron records each effect's return value so it runs **once per unique input**, even across restarts. |
| 59 | + |
| 60 | +```python |
| 61 | +@duron.effect |
| 62 | +async def fetch_data(url: str) -> dict: |
| 63 | + # This will only execute once per unique URL |
| 64 | + async with httpx.AsyncClient() as client: |
| 65 | + response = await client.get(url) |
| 66 | + return response.json() |
| 67 | +``` |
| 68 | + |
| 69 | +## Your First Workflow |
| 70 | + |
| 71 | +Let's build a simple greeting workflow that demonstrates the core concepts: |
| 72 | + |
| 73 | +```python |
| 74 | +import asyncio |
| 75 | +import random |
| 76 | +from pathlib import Path |
| 77 | + |
| 78 | +import duron |
| 79 | +from duron.contrib.storage import FileLogStorage |
| 80 | + |
| 81 | + |
| 82 | +@duron.effect |
| 83 | +async def work(name: str) -> str: |
| 84 | + print("⚡ Preparing to greet...") |
| 85 | + await asyncio.sleep(2) # Simulate I/O |
| 86 | + print("⚡ Greeting...") |
| 87 | + return f"Hello, {name}!" |
| 88 | + |
| 89 | + |
| 90 | +@duron.effect |
| 91 | +async def generate_lucky_number() -> int: |
| 92 | + print("⚡ Generating lucky number...") |
| 93 | + await asyncio.sleep(1) # Simulate I/O |
| 94 | + return random.randint(1, 100) |
| 95 | + |
| 96 | + |
| 97 | +@duron.durable |
| 98 | +async def greeting_flow(ctx: duron.Context, name: str) -> str: |
| 99 | + # Run both effects in parallel |
| 100 | + message, lucky_number = await asyncio.gather( |
| 101 | + ctx.run(work, name), |
| 102 | + ctx.run(generate_lucky_number) |
| 103 | + ) |
| 104 | + return f"{message} Your lucky number is {lucky_number}." |
| 105 | + |
| 106 | + |
| 107 | +async def main(): |
| 108 | + # Create a file-based log storage |
| 109 | + storage = FileLogStorage(Path("log.jsonl")) |
| 110 | + |
| 111 | + # Invoke the workflow |
| 112 | + async with greeting_flow.invoke(storage) as job: |
| 113 | + await job.start("Alice") |
| 114 | + result = await job.wait() |
| 115 | + |
| 116 | + print(result) |
| 117 | + |
| 118 | + |
| 119 | +if __name__ == "__main__": |
| 120 | + asyncio.run(main()) |
| 121 | +``` |
| 122 | + |
| 123 | +Save this as `hello.py` and run it: |
| 124 | + |
| 125 | +```bash |
| 126 | +python hello.py |
| 127 | +``` |
| 128 | + |
| 129 | +You'll see output like: |
| 130 | + |
| 131 | +``` |
| 132 | +⚡ Preparing to greet... |
| 133 | +⚡ Generating lucky number... |
| 134 | +⚡ Greeting... |
| 135 | +Hello, Alice! Your lucky number is 42. |
| 136 | +``` |
| 137 | + |
| 138 | +## Understanding Replay |
| 139 | + |
| 140 | +The magic of Duron is in its replay behavior. Run the same script again: |
| 141 | + |
| 142 | +```bash |
| 143 | +python hello.py |
| 144 | +``` |
| 145 | + |
| 146 | +**Notice**: No "⚡" output! Duron replayed the results from `log.jsonl` without re-executing the effects. The workflow completes instantly, but produces the **exact same result**. |
| 147 | + |
| 148 | +This is powerful for: |
| 149 | + |
| 150 | +- **Crash recovery** - If your process crashes mid-workflow, resume from the last checkpoint |
| 151 | +- **Development** - Test workflow logic without hitting external services repeatedly |
| 152 | +- **Debugging** - Reproduce exact execution paths |
| 153 | +- **Cost savings** - Don't re-run expensive API calls |
| 154 | + |
| 155 | +### Forcing a Fresh Run |
| 156 | + |
| 157 | +To start fresh, delete the log file: |
| 158 | + |
| 159 | +```bash |
| 160 | +rm log.jsonl |
| 161 | +python hello.py |
| 162 | +``` |
| 163 | + |
| 164 | +Now you'll see the effects execute again (and potentially get a different lucky number). |
| 165 | + |
| 166 | +## Storage Backends |
| 167 | + |
| 168 | +Duron is storage-agnostic. It ships with two built-in options: |
| 169 | + |
| 170 | +### File Storage (Recommended for Development) |
| 171 | + |
| 172 | +```python |
| 173 | +from pathlib import Path |
| 174 | +from duron.contrib.storage import FileLogStorage |
| 175 | + |
| 176 | +storage = FileLogStorage(Path("logs/workflow.jsonl")) |
| 177 | +``` |
| 178 | + |
| 179 | +Stores logs as JSON Lines in a file. Great for: |
| 180 | + |
| 181 | +- Local development |
| 182 | +- Single-machine workflows |
| 183 | +- Debugging (logs are human-readable) |
| 184 | + |
| 185 | +### Memory Storage (Testing Only) |
| 186 | + |
| 187 | +```python |
| 188 | +from duron.contrib.storage import MemoryLogStorage |
| 189 | + |
| 190 | +storage = MemoryLogStorage() |
| 191 | +``` |
| 192 | + |
| 193 | +Stores logs in memory. Use for: |
| 194 | + |
| 195 | +- Unit tests |
| 196 | +- Temporary workflows |
| 197 | +- Benchmarking |
| 198 | + |
| 199 | +**Note**: Memory storage is lost when the process exits. |
| 200 | + |
| 201 | +### Custom Storage |
| 202 | + |
| 203 | +Implement the `LogStorage` protocol for your own backend: |
| 204 | + |
| 205 | +```python |
| 206 | +from duron.log import LogStorage |
| 207 | + |
| 208 | +class MyStorage(LogStorage): |
| 209 | + async def read_log(self, lease_id: str) -> list[Entry]: |
| 210 | + # Read from your storage (database, S3, etc.) |
| 211 | + ... |
| 212 | + |
| 213 | + async def append_log(self, lease_id: str, entries: list[Entry]) -> None: |
| 214 | + # Append to your storage |
| 215 | + ... |
| 216 | + |
| 217 | + # ... implement other methods |
| 218 | +``` |
| 219 | + |
| 220 | +## Advanced Features |
| 221 | + |
| 222 | +### Streams |
| 223 | + |
| 224 | +Streams allow workflows to produce and consume values over time. Perfect for: |
| 225 | + |
| 226 | +- Multi-step agent interactions |
| 227 | +- Progress reporting |
| 228 | +- Event-driven workflows |
| 229 | + |
| 230 | +```python |
| 231 | +from duron import Provided, Stream, StreamWriter |
| 232 | + |
| 233 | +@duron.durable |
| 234 | +async def producer( |
| 235 | + ctx: duron.Context, |
| 236 | + output: StreamWriter[str] = Provided |
| 237 | +) -> None: |
| 238 | + for i in range(5): |
| 239 | + await output.send(f"Message {i}") |
| 240 | + await asyncio.sleep(1) |
| 241 | + |
| 242 | +async def main(): |
| 243 | + async with producer.invoke(storage) as job: |
| 244 | + stream: Stream[str] = job.open_stream("output", "r") |
| 245 | + |
| 246 | + await job.start() |
| 247 | + |
| 248 | + async for message in stream: |
| 249 | + print(f"Received: {message}") |
| 250 | + |
| 251 | + await job.wait() |
| 252 | +``` |
| 253 | + |
| 254 | +### Signals |
| 255 | + |
| 256 | +Signals enable external interruption of long-running operations: |
| 257 | + |
| 258 | +```python |
| 259 | +from duron import Signal, SignalInterrupt, Provided |
| 260 | + |
| 261 | +@duron.durable |
| 262 | +async def interruptible_task( |
| 263 | + ctx: duron.Context, |
| 264 | + signal: Signal[None] = Provided |
| 265 | +) -> str: |
| 266 | + try: |
| 267 | + async with signal: |
| 268 | + await ctx.run(long_running_effect) |
| 269 | + return "Completed" |
| 270 | + except SignalInterrupt: |
| 271 | + return "Interrupted by user" |
| 272 | + |
| 273 | +async def main(): |
| 274 | + async with interruptible_task.invoke(storage) as job: |
| 275 | + signal_writer = job.open_stream("signal", "w") |
| 276 | + |
| 277 | + await job.start() |
| 278 | + |
| 279 | + # Later... send interrupt signal |
| 280 | + await signal_writer.send(None) |
| 281 | + |
| 282 | + result = await job.wait() |
| 283 | + print(result) # "Interrupted by user" |
| 284 | +``` |
| 285 | + |
| 286 | +### Tracing |
| 287 | + |
| 288 | +Enable tracing to understand workflow execution: |
| 289 | + |
| 290 | +```python |
| 291 | +from duron.tracing import create_tracer, setup_tracing |
| 292 | + |
| 293 | +setup_tracing() # Configure logging |
| 294 | + |
| 295 | +async def main(): |
| 296 | + async with greeting_flow.invoke( |
| 297 | + storage, |
| 298 | + tracer=create_tracer("session-123") |
| 299 | + ) as job: |
| 300 | + await job.start("Alice") |
| 301 | + result = await job.wait() |
| 302 | +``` |
| 303 | + |
| 304 | +Traces are logged to your storage backend for analysis. Upload the jsonl to [Trace UI](https://brian14708.github.io/duron/trace-ui/) for visualization. |
0 commit comments