Your code is the graph. A zero-dependency, async orchestration library for building event-driven AI pipelines.
Building production AI agents often leads to a mess of nested asyncio loops, manual state management, and brittle error handling. justpipe provides a lightweight, Pythonic abstraction that lets you focus on logic while it handles the orchestration.
- Zero Dependencies: Pure Python. Fast, lightweight, and easy to drop into any project.
- Async-First: Built from the ground up for modern Python concurrency.
- Developer Experience: "The code is the graph." No complex YAML or UI builders-your decorators define the topology.
- Production Ready: Built-in support for retries, backpressure, structured observability, and lineage tracking.
# As a CLI tool (recommended)
pipx install justpipe
# Or with uv
uv tool install justpipe
# Or as a library dependency
pip install justpipe
# With retry support (via tenacity)
pip install "justpipe[retry]"Define a type-safe pipeline that fetches search results, generates a prompt, and streams an LLM response.
import asyncio
from dataclasses import dataclass, field
from justpipe import Pipe, EventType
@dataclass
class AIState:
query: str
context: list[str] = field(default_factory=list)
response: str = ""
pipe = Pipe(AIState)
@pipe.step()
async def call_llm(state: AIState):
"""A streaming step yielding tokens from an LLM."""
# Simulate a streaming LLM response
for chunk in ["Based ", "on ", "your ", "data..."]:
yield chunk
@pipe.step(to=call_llm)
async def generate_prompt(state: AIState):
"""Format the prompt using gathered context."""
state.response = f"Context: {state.context}\nQuery: {state.query}"
@pipe.step(to=generate_prompt)
async def search_vectors(state: AIState):
"""Simulate a RAG vector search."""
state.context.append("Vector DB result: Python is fun.")
async def main():
state = AIState(query="Tell me about Python")
# pipe.run() returns an async generator of events
async for event in pipe.run(state):
if event.type == EventType.TOKEN:
print(event.payload, end="", flush=True)
print("\nPipeline Complete!")
if __name__ == "__main__":
asyncio.run(main())In justpipe, decorators do the heavy lifting. They register your functions into a Directed Acyclic Graph (DAG) during import time. The to= parameter wires the flow.
@pipe.step(to="respond") # Forward reference via string
async def greet(state):
state.message = "Hello"
@pipe.step()
async def respond(state):
yield f"{state.message}, World!"- Type-Safe DI: Automatically injects
state,context, or even specific items from map workers based on type hints or parameter names. - Mutable State: Designed around Python
dataclasses. No complex state reduction-just mutate your state object directly. - Context-Aware Routing: Use
@pipe.switchfor dynamic branching or@pipe.mapfor massive fan-out parallelism.
Run steps concurrently by linking a single step to multiple targets. barrier_timeout prevents hangs if a branch fails.
@pipe.step(barrier_timeout=5.0)
async def combine(state):
# Runs only after BOTH fetch_a and fetch_b complete
state.result = state.a + state.b
@pipe.step(to=combine)
async def fetch_a(state):
state.a = await fetch_api_a()
@pipe.step(to=combine)
async def fetch_b(state):
state.b = await fetch_api_b()Process lists in parallel. justpipe manages the workers and results for you.
@pipe.step("worker")
async def process_item(item: int, state):
# 'item' is automatically injected from the list below
print(f"Processing {item}")
@pipe.map(each="worker", max_concurrency=5)
async def create_batch(state):
# Spawns a 'worker' step for every item returned
return [1, 2, 3, 4, 5]Route execution dynamically based on return values.
@pipe.switch(to={
"ok": "process_success",
"error": "handle_failure"
})
async def validate(state) -> str:
return "ok" if state.is_valid else "error"Compose complex workflows by nesting pipelines as single steps, allowing for modular and reusable logic.
Pause execution to wait for external input or human approval, then resume exactly where it left off.
justpipe comes with a powerful CLI for debugging and performance analysis. Query runs, visualize timelines, and compare executions with ease.
# List recent pipeline runs
justpipe list
# Show detailed events for a specific run
justpipe show <run-id>
# Generate an interactive HTML timeline
justpipe timeline <run-id> --format html
# Compare two runs to detect performance regressions
justpipe compare <run-id-1> <run-id-2>- Built for AI: Native support for token streaming and complex retry logic makes it ideal for LLM orchestration.
- Developer First: No YAML, no complex UI—just Python functions and decorators.
- Observability by Default: Every run is traceable with built-in lineage tracking and a rich event system.
- Lightweight: Zero core dependencies. Perfect for edge deployments or simple microservices.
A workflow engine is only as good as its test suite. justpipe is built with a rigorous testing philosophy, ensuring your pipelines behave predictably under load and failure.
Our suite follows a strict separation of concerns:
- Unit Tests: Isolated testing of internal components using mocks and fakes.
- Integration Tests: Verification of interactions across storage boundaries and module interfaces.
- Functional Tests: End-to-end validation of pipeline contracts, event ordering, and concurrency.
- Fuzzing: Randomized property-based testing to uncover edge cases in graph execution.
Use the TestPipe harness to write expressive tests for your own workflows:
from justpipe import TestPipe
async def test_my_workflow():
with TestPipe(pipe) as tester:
# Mock a streaming LLM call
async def mock_llm(state):
yield "Mocked answer"
tester.mock("call_llm", side_effect=mock_llm)
# Execute the pipeline
result = await tester.run(AIState(query="test"))
# Precise assertions
assert result.was_called("search_vectors")
assert "Mocked answer" in result.tokensSee your logic come to life. Generate beautiful Mermaid diagrams directly from your pipe instance.
print(pipe.graph())graph TD
Start(["▶ Start"])
subgraph parallel_n3[Parallel]
direction LR
n8["Search Knowledge Graph"]
n9(["Search Vectors ⚡"])
n10(["Search Web ⚡"])
end
n1["Build Context"]
n3["Embed Query"]
n4(["Format Output ⚡"])
n5(["Generate Response ⚡"])
n6["Parse Query"]
n7["Rank Results"]
End(["■ End"])
n4 --> End
Start --> n6
n1 --> n5
n3 --> n8
n3 --> n9
n3 --> n10
n5 --> n4
n6 --> n3
n7 --> n1
n8 --> n7
n9 --> n7
n10 --> n7
subgraph utilities[Utilities]
direction TB
n0(["Analytics Logger ⚡"]):::isolated
n2["Cache Manager"]:::isolated
end
class n1,n3,n6,n7,n8 step;
class n10,n4,n5,n9 streaming;
class n0,n2 isolated;
class Start,End startEnd;
%% Styling
classDef default fill:#f8f9fa,stroke:#dee2e6,stroke-width:1px;
classDef step fill:#e3f2fd,stroke:#1976d2,stroke-width:2px,color:#0d47a1;
classDef streaming fill:#fff3e0,stroke:#f57c00,stroke-width:2px,color:#e65100;
classDef map fill:#e8f5e9,stroke:#388e3c,stroke-width:2px,color:#1b5e20;
classDef switch fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px,color:#4a148c;
classDef sub fill:#f1f8e9,stroke:#558b2f,stroke-width:2px,color:#33691e;
classDef isolated fill:#fce4ec,stroke:#c2185b,stroke-width:2px,stroke-dasharray: 5 5,color:#880e4f;
classDef startEnd fill:#e8f5e9,stroke:#388e3c,stroke-width:3px,color:#1b5e20;
justpipe uses uv for dependency management.
# Install development dependencies
uv sync --all-extras --dev
# Run full suite
uv run pytestMIT