Skip to content

Commit 149803f

Browse files
committed
Add LangGraph plugin samples
Seven samples demonstrating the Temporal LangGraph plugin across both the Graph API and Functional API: - Human-in-the-loop: interrupt() + Temporal signals for chatbot approval - Continue-as-new: task result caching across workflow boundaries - ReAct agent: tool-calling loop with conditional edges / while loop - Control flow (Functional API only): parallel, for-loop, if/else Related SDK PR: temporalio/sdk-python#1448
1 parent ce5d8dd commit 149803f

34 files changed

Lines changed: 1156 additions & 0 deletions

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ Some examples require extra dependencies. See each sample's directory for specif
7272
* [gevent_async](gevent_async) - Combine gevent and Temporal.
7373
* [hello_standalone_activity](hello_standalone_activity) - Use activities without using a workflow.
7474
* [langchain](langchain) - Orchestrate workflows for LangChain.
75+
* [langgraph_plugin](langgraph_plugin) - Run LangGraph workflows as durable Temporal workflows (Graph API and Functional API).
7576
* [message_passing/introduction](message_passing/introduction/) - Introduction to queries, signals, and updates.
7677
* [message_passing/safe_message_handlers](message_passing/safe_message_handlers/) - Safely handling updates and signals.
7778
* [message_passing/update_with_start/lazy_initialization](message_passing/update_with_start/lazy_initialization/) - Use update-with-start to update a Shopping Cart, starting it if it does not exist.

langgraph_plugin/README.md

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
# LangGraph Plugin Samples
2+
3+
These samples demonstrate the [Temporal LangGraph plugin](https://github.com/temporalio/sdk-python/pull/1448), which runs LangGraph workflows as durable Temporal workflows. Each LangGraph graph node (Graph API) or `@task` (Functional API) executes as a Temporal activity with automatic retries, timeouts, and crash recovery.
4+
5+
Samples are organized by API style:
6+
7+
- **Graph API** (`graph_api/`) -- Define workflows as `StateGraph` with nodes and edges.
8+
- **Functional API** (`functional_api/`) -- Define workflows with `@task` and `@entrypoint` decorators for an imperative programming style.
9+
10+
## Samples
11+
12+
| Sample | Graph API | Functional API | Description |
13+
|--------|:---------:|:--------------:|-------------|
14+
| **Human-in-the-loop** | [graph_api/human_in_the_loop](graph_api/human_in_the_loop) | [functional_api/human_in_the_loop](functional_api/human_in_the_loop) | Chatbot that uses `interrupt()` to pause for human approval, Temporal signals to receive feedback, and queries to expose the pending draft. |
15+
| **Continue-as-new** | [graph_api/continue_as_new](graph_api/continue_as_new) | [functional_api/continue_as_new](functional_api/continue_as_new) | Multi-stage data pipeline that uses `continue-as-new` with task result caching so previously-completed stages are not re-executed. |
16+
| **ReAct Agent** | [graph_api/react_agent](graph_api/react_agent) | [functional_api/react_agent](functional_api/react_agent) | Tool-calling agent loop. Graph API uses conditional edges; Functional API uses a `while` loop. |
17+
| **Control Flow** | -- | [functional_api/control_flow](functional_api/control_flow) | Demonstrates parallel task execution, `for` loops, and `if/else` branching -- patterns that are natural in the Functional API. |
18+
19+
## Prerequisites
20+
21+
1. Install dependencies:
22+
23+
```bash
24+
uv sync --group langgraph
25+
```
26+
27+
2. Start a [Temporal dev server](https://docs.temporal.io/cli#start-dev-server):
28+
29+
```bash
30+
temporal server start-dev
31+
```
32+
33+
## Running a Sample
34+
35+
Each sample has two scripts -- start the worker first, then the workflow starter in a separate terminal.
36+
37+
```bash
38+
# Terminal 1: start the worker
39+
uv run langgraph_plugin/<api>/<sample>/run_worker.py
40+
41+
# Terminal 2: start the workflow
42+
uv run langgraph_plugin/<api>/<sample>/run_workflow.py
43+
```
44+
45+
For example, to run the Graph API human-in-the-loop chatbot:
46+
47+
```bash
48+
# Terminal 1
49+
uv run langgraph_plugin/graph_api/human_in_the_loop/run_worker.py
50+
51+
# Terminal 2
52+
uv run langgraph_plugin/graph_api/human_in_the_loop/run_workflow.py
53+
```
54+
55+
## Key Features Demonstrated
56+
57+
- **Durable execution** -- Every graph node / `@task` runs as a Temporal activity with configurable timeouts and retry policies.
58+
- **Human-in-the-loop** -- LangGraph's `interrupt()` pauses the graph; Temporal signals deliver human input; queries expose pending state to UIs.
59+
- **Continue-as-new with caching** -- `get_cache()` captures completed task results; passing the cache to the next execution avoids re-running them.
60+
- **Conditional routing** -- Graph API's `add_conditional_edges` and Functional API's native `if/else`/`while` for agent loops.
61+
- **Parallel execution** -- Functional API launches multiple tasks concurrently by creating futures before awaiting them.
62+
63+
## Related
64+
65+
- [SDK PR: LangGraph plugin](https://github.com/temporalio/sdk-python/pull/1448)

langgraph_plugin/__init__.py

Whitespace-only changes.

langgraph_plugin/functional_api/__init__.py

Whitespace-only changes.

langgraph_plugin/functional_api/continue_as_new/__init__.py

Whitespace-only changes.
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
"""Worker for the continue-as-new pipeline (Functional API)."""
2+
3+
import asyncio
4+
5+
from temporalio.client import Client
6+
from temporalio.contrib.langgraph import LangGraphPlugin
7+
from temporalio.worker import Worker
8+
9+
from langgraph_plugin.functional_api.continue_as_new.workflow import (
10+
PipelineFunctionalWorkflow,
11+
activity_options,
12+
all_tasks,
13+
pipeline_entrypoint,
14+
)
15+
16+
17+
async def main() -> None:
18+
client = await Client.connect("localhost:7233")
19+
plugin = LangGraphPlugin(
20+
entrypoints={"pipeline": pipeline_entrypoint},
21+
tasks=all_tasks,
22+
activity_options=activity_options,
23+
)
24+
25+
worker = Worker(
26+
client,
27+
task_queue="langgraph-pipeline-functional",
28+
workflows=[PipelineFunctionalWorkflow],
29+
plugins=[plugin],
30+
)
31+
print("Worker started. Ctrl+C to exit.")
32+
await worker.run()
33+
34+
35+
if __name__ == "__main__":
36+
asyncio.run(main())
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
"""Start the continue-as-new pipeline workflow (Functional API)."""
2+
3+
import asyncio
4+
from datetime import timedelta
5+
6+
from temporalio.client import Client
7+
8+
from langgraph_plugin.functional_api.continue_as_new.workflow import (
9+
PipelineFunctionalWorkflow,
10+
PipelineInput,
11+
)
12+
13+
14+
async def main() -> None:
15+
client = await Client.connect("localhost:7233")
16+
17+
result = await client.execute_workflow(
18+
PipelineFunctionalWorkflow.run,
19+
PipelineInput(data=10),
20+
id="pipeline-functional-workflow",
21+
task_queue="langgraph-pipeline-functional",
22+
execution_timeout=timedelta(seconds=60),
23+
)
24+
25+
# 10*2=20 -> 20+50=70 -> 70*3=210
26+
print(f"Pipeline result: {result}")
27+
28+
29+
if __name__ == "__main__":
30+
asyncio.run(main())
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
"""Continue-as-new with caching using the LangGraph Functional API with Temporal.
2+
3+
Same pattern as the Graph API version, but using @task and @entrypoint decorators.
4+
"""
5+
6+
from dataclasses import dataclass
7+
from datetime import timedelta
8+
from typing import Any
9+
10+
from langgraph.func import entrypoint as lg_entrypoint
11+
from langgraph.func import task
12+
from temporalio import workflow
13+
from temporalio.contrib.langgraph import entrypoint, get_cache
14+
15+
16+
@task
17+
def extract(data: int) -> int:
18+
"""Stage 1: Extract -- simulate data extraction by doubling the input."""
19+
return data * 2
20+
21+
22+
@task
23+
def transform(data: int) -> int:
24+
"""Stage 2: Transform -- simulate transformation by adding 50."""
25+
return data + 50
26+
27+
28+
@task
29+
def load(data: int) -> int:
30+
"""Stage 3: Load -- simulate loading by tripling the result."""
31+
return data * 3
32+
33+
34+
@lg_entrypoint()
35+
async def pipeline_entrypoint(data: int) -> dict:
36+
"""Run the 3-stage pipeline: extract -> transform -> load."""
37+
extracted = await extract(data)
38+
transformed = await transform(extracted)
39+
loaded = await load(transformed)
40+
return {"result": loaded}
41+
42+
43+
all_tasks = [extract, transform, load]
44+
45+
activity_options = {
46+
t.func.__name__: {"start_to_close_timeout": timedelta(seconds=30)}
47+
for t in all_tasks
48+
}
49+
50+
51+
@dataclass
52+
class PipelineInput:
53+
data: int
54+
cache: dict[str, Any] | None = None
55+
phase: int = 1
56+
57+
58+
@workflow.defn
59+
class PipelineFunctionalWorkflow:
60+
"""Runs the pipeline, continuing-as-new after each phase.
61+
62+
Input 10: 10*2=20 -> 20+50=70 -> 70*3=210
63+
Each task executes once; phases 2 and 3 use cached results.
64+
"""
65+
66+
@workflow.run
67+
async def run(self, input_data: PipelineInput) -> dict[str, Any]:
68+
result = await entrypoint(
69+
"pipeline", cache=input_data.cache
70+
).ainvoke(input_data.data)
71+
72+
if input_data.phase < 3:
73+
workflow.continue_as_new(
74+
PipelineInput(
75+
data=input_data.data,
76+
cache=get_cache(),
77+
phase=input_data.phase + 1,
78+
)
79+
)
80+
81+
return result

langgraph_plugin/functional_api/control_flow/__init__.py

Whitespace-only changes.
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
"""Worker for the control flow pipeline (Functional API)."""
2+
3+
import asyncio
4+
5+
from temporalio.client import Client
6+
from temporalio.contrib.langgraph import LangGraphPlugin
7+
from temporalio.worker import Worker
8+
9+
from langgraph_plugin.functional_api.control_flow.workflow import (
10+
ControlFlowWorkflow,
11+
activity_options,
12+
all_tasks,
13+
control_flow_pipeline,
14+
)
15+
16+
17+
async def main() -> None:
18+
client = await Client.connect("localhost:7233")
19+
plugin = LangGraphPlugin(
20+
entrypoints={"control_flow": control_flow_pipeline},
21+
tasks=all_tasks,
22+
activity_options=activity_options,
23+
)
24+
25+
worker = Worker(
26+
client,
27+
task_queue="langgraph-control-flow",
28+
workflows=[ControlFlowWorkflow],
29+
plugins=[plugin],
30+
)
31+
print("Worker started. Ctrl+C to exit.")
32+
await worker.run()
33+
34+
35+
if __name__ == "__main__":
36+
asyncio.run(main())

0 commit comments

Comments
 (0)