Skip to content

Commit 3b54795

Browse files
committed
Add activity_from_node sample demonstrating run_in_workflow
Shows how to call Temporal activities directly from a LangGraph node using the run_in_workflow=True metadata option.
1 parent 6e320fa commit 3b54795

8 files changed

Lines changed: 318 additions & 0 deletions

File tree

langgraph_samples/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ Each directory contains a complete example with its own README for detailed inst
5454
| Sample | Description |
5555
|--------|-------------|
5656
| [hello_world](./hello_world/) | Simple starter example demonstrating basic plugin setup and graph registration |
57+
| [activity_from_node](./activity_from_node/) | Calling Temporal activities from a graph node using run_in_workflow |
5758
| [react_agent](./react_agent/) | ReAct agent pattern with tool calling and multi-step reasoning |
5859
| [approval_workflow](./approval_workflow/) | Human-in-the-loop with interrupt/resume for approval workflows |
5960
| [supervisor](./supervisor/) | Multi-agent supervisor pattern coordinating specialized agents |
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# Activity from Node
2+
3+
Demonstrates calling Temporal activities directly from a LangGraph node using the `run_in_workflow` feature.
4+
5+
## What This Sample Demonstrates
6+
7+
- **run_in_workflow nodes**: Using `temporal_node_metadata(run_in_workflow=True)` to run a node in the workflow context
8+
- **Activity orchestration**: Calling multiple Temporal activities from within a graph node
9+
- **Mixed execution modes**: Combining run_in_workflow nodes with regular activity nodes
10+
- **Sandbox enforcement**: Node code is sandboxed to ensure deterministic execution
11+
12+
## How It Works
13+
14+
1. **Orchestrator Node**: Runs directly in the workflow (not as an activity) with `run_in_workflow=True`
15+
- Calls `validate_data` activity to validate input
16+
- Calls `enrich_data` activity to enrich valid data
17+
- Implements orchestration logic (conditional activity calls)
18+
19+
2. **Finalize Node**: Runs as a regular Temporal activity (default behavior)
20+
- Processes the enriched data
21+
22+
3. **Activities**: Standard Temporal activities called from the orchestrator
23+
- `validate_data`: Validates input data
24+
- `enrich_data`: Enriches data with additional information
25+
26+
## When to Use run_in_workflow
27+
28+
Use `run_in_workflow=True` when your node needs to:
29+
- Call Temporal activities, child workflows, or other Temporal operations
30+
- Use workflow features like timers, signals, or queries
31+
- Implement complex orchestration logic with multiple activity calls
32+
33+
**Important**: Code in run_in_workflow nodes is sandboxed to ensure deterministic execution. Non-deterministic operations (like `random.randint()`) will be blocked.
34+
35+
## Running the Example
36+
37+
First, start the worker:
38+
```bash
39+
uv run langgraph_samples/activity_from_node/run_worker.py
40+
```
41+
42+
Then, in a separate terminal, run the workflow:
43+
```bash
44+
uv run langgraph_samples/activity_from_node/run_workflow.py
45+
```
46+
47+
## Expected Output
48+
49+
```
50+
Result: {'data': 'Hello from LangGraph', 'validated': True, 'enriched_data': 'Hello from LangGraph [enriched at activity]', 'final_result': 'Processed: Hello from LangGraph [enriched at activity]'}
51+
```
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Activity from Node sample."""
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
"""Activity from Node - Activity Definitions.
2+
3+
Activities that are called from a run_in_workflow node.
4+
"""
5+
6+
from temporalio import activity
7+
8+
9+
@activity.defn
10+
async def validate_data(data: str) -> bool:
11+
"""Validate the input data.
12+
13+
In a real application, this could:
14+
- Check data format and schema
15+
- Verify required fields
16+
- Call external validation services
17+
"""
18+
activity.logger.info(f"Validating data: {data}")
19+
20+
# Simple validation - check if data is non-empty
21+
is_valid = bool(data and data.strip())
22+
23+
activity.logger.info(f"Validation result: {is_valid}")
24+
return is_valid
25+
26+
27+
@activity.defn
28+
async def enrich_data(data: str) -> str:
29+
"""Enrich the input data with additional information.
30+
31+
In a real application, this could:
32+
- Call external APIs for data enrichment
33+
- Lookup data from databases
34+
- Apply transformations
35+
"""
36+
activity.logger.info(f"Enriching data: {data}")
37+
38+
# Simple enrichment - add metadata
39+
enriched = f"{data} [enriched at activity]"
40+
41+
activity.logger.info(f"Enriched data: {enriched}")
42+
return enriched
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
"""Activity from Node - Graph Definition.
2+
3+
This module defines a graph where a node runs in the workflow context
4+
and calls Temporal activities directly.
5+
"""
6+
7+
from datetime import timedelta
8+
from typing import Any
9+
10+
from langgraph.graph import END, START, StateGraph
11+
from typing_extensions import TypedDict
12+
13+
14+
# =============================================================================
15+
# State Definition
16+
# =============================================================================
17+
18+
19+
class ProcessingState(TypedDict, total=False):
20+
"""State for the processing graph."""
21+
22+
data: str
23+
validated: bool
24+
enriched_data: str
25+
final_result: str
26+
27+
28+
# =============================================================================
29+
# Node Functions
30+
# =============================================================================
31+
32+
33+
async def orchestrator_node(state: ProcessingState) -> ProcessingState:
34+
"""Node that orchestrates multiple activity calls from the workflow.
35+
36+
This node runs directly in the workflow (run_in_workflow=True) so it can:
37+
- Call multiple Temporal activities
38+
- Use workflow features like timers, signals, queries
39+
- Implement complex orchestration logic
40+
41+
The node is sandboxed, ensuring deterministic code.
42+
"""
43+
from temporalio import workflow
44+
45+
data = state.get("data", "")
46+
47+
# Call validation activity
48+
is_valid = await workflow.execute_activity(
49+
"validate_data",
50+
data,
51+
start_to_close_timeout=timedelta(seconds=30),
52+
)
53+
54+
if not is_valid:
55+
return {"validated": False, "final_result": "Validation failed"}
56+
57+
# Call enrichment activity
58+
enriched = await workflow.execute_activity(
59+
"enrich_data",
60+
data,
61+
start_to_close_timeout=timedelta(seconds=30),
62+
)
63+
64+
return {"validated": True, "enriched_data": enriched}
65+
66+
67+
def finalize_node(state: ProcessingState) -> ProcessingState:
68+
"""Final processing node - runs as a regular activity.
69+
70+
This demonstrates mixing run_in_workflow nodes with regular activity nodes.
71+
"""
72+
if not state.get("validated"):
73+
return state
74+
75+
enriched = state.get("enriched_data", "")
76+
return {"final_result": f"Processed: {enriched}"}
77+
78+
79+
# =============================================================================
80+
# Graph Builder
81+
# =============================================================================
82+
83+
84+
def build_activity_from_node_graph() -> Any:
85+
"""Build a graph with a node that calls activities from the workflow.
86+
87+
The orchestrator node uses run_in_workflow=True to execute directly
88+
in the workflow context, allowing it to call Temporal activities.
89+
"""
90+
from temporalio.contrib.langgraph import temporal_node_metadata
91+
92+
graph = StateGraph(ProcessingState)
93+
94+
# Orchestrator runs in workflow to call activities
95+
graph.add_node(
96+
"orchestrator",
97+
orchestrator_node,
98+
metadata=temporal_node_metadata(run_in_workflow=True),
99+
)
100+
101+
# Finalize runs as a regular activity
102+
graph.add_node("finalize", finalize_node)
103+
104+
graph.add_edge(START, "orchestrator")
105+
graph.add_edge("orchestrator", "finalize")
106+
graph.add_edge("finalize", END)
107+
108+
return graph.compile()
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
"""Worker for the Activity from Node example.
2+
3+
Starts a Temporal worker that can execute ActivityFromNodeWorkflow.
4+
"""
5+
6+
import asyncio
7+
8+
from temporalio.client import Client
9+
from temporalio.contrib.langgraph import LangGraphPlugin
10+
from temporalio.envconfig import ClientConfig
11+
from temporalio.worker import Worker
12+
13+
from langgraph_samples.activity_from_node.activities import enrich_data, validate_data
14+
from langgraph_samples.activity_from_node.graph import build_activity_from_node_graph
15+
from langgraph_samples.activity_from_node.workflow import ActivityFromNodeWorkflow
16+
17+
18+
async def main() -> None:
19+
# Create the plugin with our graph registered
20+
plugin = LangGraphPlugin(
21+
graphs={"activity_from_node_graph": build_activity_from_node_graph},
22+
)
23+
24+
# Connect to Temporal with the plugin
25+
config = ClientConfig.load_client_connect_config()
26+
config.setdefault("target_host", "localhost:7233")
27+
client = await Client.connect(**config, plugins=[plugin])
28+
29+
# Create and run the worker
30+
# Note: We register our custom activities alongside LangGraph's auto-registered ones
31+
worker = Worker(
32+
client,
33+
task_queue="langgraph-activity-from-node",
34+
workflows=[ActivityFromNodeWorkflow],
35+
activities=[validate_data, enrich_data],
36+
)
37+
38+
print("Worker started. Ctrl+C to exit.")
39+
await worker.run()
40+
41+
42+
if __name__ == "__main__":
43+
asyncio.run(main())
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
"""Run the Activity from Node workflow.
2+
3+
Starts a workflow execution and waits for the result.
4+
"""
5+
6+
import asyncio
7+
import uuid
8+
9+
from temporalio.client import Client
10+
from temporalio.envconfig import ClientConfig
11+
12+
from langgraph_samples.activity_from_node.workflow import ActivityFromNodeWorkflow
13+
14+
15+
async def main() -> None:
16+
# Connect to Temporal
17+
config = ClientConfig.load_client_connect_config()
18+
config.setdefault("target_host", "localhost:7233")
19+
client = await Client.connect(**config)
20+
21+
# Run the workflow
22+
result = await client.execute_workflow(
23+
ActivityFromNodeWorkflow.run,
24+
"Hello from LangGraph",
25+
id=f"activity-from-node-{uuid.uuid4()}",
26+
task_queue="langgraph-activity-from-node",
27+
)
28+
29+
print(f"Result: {result}")
30+
31+
32+
if __name__ == "__main__":
33+
asyncio.run(main())
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
"""Activity from Node - Workflow Definition.
2+
3+
Demonstrates calling Temporal activities from a LangGraph node.
4+
5+
Note: This module only contains the workflow definition. The graph and
6+
activities are defined separately and imported only by the worker.
7+
"""
8+
9+
from typing import Any
10+
11+
from temporalio import workflow
12+
from temporalio.contrib.langgraph import compile
13+
14+
15+
@workflow.defn
16+
class ActivityFromNodeWorkflow:
17+
"""Workflow that runs a graph with activity-calling nodes.
18+
19+
This demonstrates:
20+
- Using run_in_workflow=True for nodes that need workflow context
21+
- Calling Temporal activities from within graph nodes
22+
- Mixing run_in_workflow nodes with regular activity nodes
23+
"""
24+
25+
@workflow.run
26+
async def run(self, data: str) -> dict[str, Any]:
27+
"""Run the processing graph.
28+
29+
Args:
30+
data: The input data to process.
31+
32+
Returns:
33+
The final state with processing results.
34+
"""
35+
app = compile("activity_from_node_graph")
36+
37+
result = await app.ainvoke({"data": data})
38+
39+
return result

0 commit comments

Comments
 (0)