Skip to content

Commit d8565fc

Browse files
brianstrauchclaude
andcommitted
Add Strands plugin samples
Demonstrates the Temporal Strands plugin: hello world, tools (in-workflow, custom activity, strands_tools), HITL, hooks, MCP, structured output, streaming, interrupt, and continue-as-new. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 6168ebc commit d8565fc

62 files changed

Lines changed: 2274 additions & 17 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ Some examples require extra dependencies. See each sample's directory for specif
8888
* [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models.
8989
* [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule.
9090
* [sentry](sentry) - Report errors to Sentry.
91+
* [strands_plugin](strands_plugin) - Run Strands Agents as durable Temporal workflows (model calls, tools, MCP, HITL).
9192
* [trio_async](trio_async) - Use asyncio Temporal in Trio-based environments.
9293
* [updatable_timer](updatable_timer) - A timer that can be updated while sleeping.
9394
* [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers.

pyproject.toml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ openai-agents = [
5959
]
6060
pydantic-converter = ["pydantic>=2.10.6,<3"]
6161
sentry = ["sentry-sdk>=2.13.0"]
62+
strands = [
63+
"strands-agents>=1.39.0",
64+
"strands-agents-tools>=0.5.2",
65+
"mcp>=1.0.0",
66+
"boto3>=1.34.92,<2",
67+
"temporalio[strands,pydantic]>=1.27.0",
68+
]
6269
trio-async = ["trio>=0.28.0,<0.29", "trio-asyncio>=0.15.0,<0.16"]
6370
cloud-export-to-parquet = [
6471
"pandas>=2.3.3,<3 ; python_version >= '3.10' and python_version < '4.0'",
@@ -67,6 +74,12 @@ cloud-export-to-parquet = [
6774
"pyarrow>=19.0.1",
6875
]
6976

77+
# Temporary: the strands extra of temporalio is shipping in an upcoming release.
78+
# Point at the strands branch of sdk-python until it's published.
79+
# Remove this section once `temporalio[strands]` is on PyPI.
80+
[tool.uv.sources]
81+
temporalio = { git = "https://github.com/temporalio/sdk-python.git", branch = "strands" }
82+
7083
[tool.hatch.metadata]
7184
allow-direct-references = true
7285

@@ -105,6 +118,7 @@ packages = [
105118
"schedules",
106119
"sentry",
107120
"sleep_for_days",
121+
"strands_plugin",
108122
"tests",
109123
"trio_async",
110124
"updatable_timer",

strands_plugin/README.md

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
# Strands Agents Samples
2+
3+
These samples demonstrate the [Temporal Strands plugin](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/strands), which runs [Strands Agents](https://strandsagents.com/) inside Temporal Workflows. Model invocations, tool calls, and MCP tool calls all execute as Temporal Activities, so you get durable execution, Temporal-managed retries, and timeouts.
4+
5+
## Samples
6+
7+
| Sample | Description |
8+
|--------|-------------|
9+
| [hello_world](hello_world) | Minimal `TemporalAgent` invocation. Start here. |
10+
| [tools](tools) | Three tool patterns side by side: in-workflow `@tool`, custom `@activity.defn` wrapped via `activity_as_tool`, and a `strands_tools` tool wrapped as a Temporal activity. |
11+
| [human_in_the_loop](human_in_the_loop) | Pause a tool call on `BeforeToolCallEvent.interrupt()`, resume via Temporal signal. The canonical Strands HITL pattern. |
12+
| [tool_interrupt](tool_interrupt) | Raise `InterruptException` from a Temporal activity to surface a HITL prompt across the activity boundary. Plugin-specific feature. |
13+
| [hooks](hooks) | `HookProvider` with both an in-workflow callback and an `activity_as_hook` callback for I/O. |
14+
| [mcp](mcp) | Connect to an MCP server (`FastMCP` echo) via `TemporalMCPClient`. |
15+
| [structured_output](structured_output) | Pydantic-typed agent output via `structured_output_model`. |
16+
| [streaming](streaming) | Forward model chunks to an external subscriber via `streaming_topic` + `WorkflowStream`. |
17+
| [continue_as_new](continue_as_new) | Chat-style workflow that hands off `agent.messages` when history grows large. |
18+
19+
## Prerequisites
20+
21+
1. Install dependencies:
22+
23+
```bash
24+
uv sync --group strands
25+
```
26+
27+
> The `strands` extra of `temporalio` is shipping in an upcoming release. Until then, install the SDK from the strands branch:
28+
>
29+
> ```bash
30+
> uv pip install -e ../sdk-python --extra strands --extra pydantic
31+
> ```
32+
33+
2. Configure AWS credentials. The samples use the plugin's default `BedrockModel()`, which picks up the standard AWS SDK credential chain. Make sure the credentials grant access to a Bedrock model in your selected region (e.g., `us-west-2`).
34+
35+
```bash
36+
export AWS_REGION=us-west-2
37+
# plus AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY or an SSO profile
38+
```
39+
40+
You can pick a specific model by passing it to `BedrockModel(model_id="...")` in each sample's worker.
41+
42+
3. Start a [Temporal dev server](https://docs.temporal.io/cli#start-dev-server):
43+
44+
```bash
45+
temporal server start-dev
46+
```
47+
48+
## Running a Sample
49+
50+
Each sample has two scripts. Start the Worker first, then the Workflow starter in a separate terminal:
51+
52+
```bash
53+
# Terminal 1: start the Worker
54+
uv run strands_plugin/<sample>/run_worker.py
55+
56+
# Terminal 2: start the Workflow
57+
uv run strands_plugin/<sample>/run_workflow.py
58+
```
59+
60+
For example, to run the tools sample:
61+
62+
```bash
63+
# Terminal 1
64+
uv run strands_plugin/tools/run_worker.py
65+
66+
# Terminal 2
67+
uv run strands_plugin/tools/run_workflow.py
68+
```
69+
70+
## Key Features Demonstrated
71+
72+
- **Durable model invocation** — every model call runs in an `invoke_model` activity with configurable timeouts and retries.
73+
- **Three ways to define tools** — pure Strands `@tool`, custom Temporal activities, and ecosystem `strands_tools` wrapped as activities.
74+
- **Human-in-the-loop** — both hook-based (`BeforeToolCallEvent.interrupt()`) and tool-body (`raise InterruptException`) styles.
75+
- **Hook system** — deterministic in-workflow callbacks plus I/O callbacks dispatched via `activity_as_hook`.
76+
- **MCP integration** — connect to MCP servers at worker startup; tool calls dispatched through per-server activities.
77+
- **Structured output** — Pydantic-typed agent results via the plugin's `pydantic_data_converter`.
78+
- **Streaming** — forward model chunks live to external subscribers.
79+
- **Long-lived chats** — hand off `agent.messages` via `continue-as-new` to stay under Temporal's history limit.
80+
81+
## Related
82+
83+
- [Temporal Strands plugin docs](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/strands)
84+
- [Strands Agents](https://strandsagents.com/)

strands_plugin/__init__.py

Whitespace-only changes.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Continue-as-new
2+
3+
A chat-style workflow accumulates history with every turn and will eventually hit Temporal's per-workflow history limit. `workflow.info().is_continue_as_new_suggested()` flips `True` once the server decides history has grown large enough; this sample checks it after each turn and hands off to a fresh run with `agent.messages` as input.
4+
5+
## What This Sample Demonstrates
6+
7+
- Maintaining a multi-turn chat over signals and queries
8+
- Seeding a new `TemporalAgent` with prior `agent.messages`
9+
- Using `workflow.info().is_continue_as_new_suggested()` + `workflow.continue_as_new(...)` to keep the workflow alive indefinitely
10+
11+
## Running the Sample
12+
13+
```bash
14+
# Terminal 1
15+
uv run strands_plugin/continue_as_new/run_worker.py
16+
17+
# Terminal 2
18+
uv run strands_plugin/continue_as_new/run_workflow.py
19+
```
20+
21+
The starter sends a couple of `user_says` signals, queries the message history, then signals `end_chat`. In a real chatbot, a UI would drive the signals and the workflow would run indefinitely, continuing-as-new whenever history gets large.
22+
23+
## Files
24+
25+
| File | Description |
26+
|------|-------------|
27+
| `workflow.py` | `ChatInput`, `ChatWorkflow` with `user_says` / `end_chat` signals and `messages` query |
28+
| `run_worker.py` | Registers `StrandsPlugin`, starts the worker |
29+
| `run_workflow.py` | Starts the chat, sends a few turns, ends it |

strands_plugin/continue_as_new/__init__.py

Whitespace-only changes.
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
"""Worker for the chat continue-as-new sample."""
2+
3+
import asyncio
4+
import os
5+
6+
from temporalio.client import Client
7+
from temporalio.contrib.strands import StrandsPlugin
8+
from temporalio.worker import Worker
9+
10+
from strands_plugin.continue_as_new.workflow import ChatWorkflow
11+
12+
13+
async def main() -> None:
14+
plugin = StrandsPlugin()
15+
client = await Client.connect(
16+
os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"),
17+
plugins=[plugin],
18+
)
19+
20+
worker = Worker(
21+
client,
22+
task_queue="strands-chat",
23+
workflows=[ChatWorkflow],
24+
)
25+
print("Worker started. Ctrl+C to exit.")
26+
await worker.run()
27+
28+
29+
if __name__ == "__main__":
30+
asyncio.run(main())
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
"""Start the chat workflow, send a few turns, then end it."""
2+
3+
import asyncio
4+
import os
5+
6+
from temporalio.client import Client
7+
from temporalio.contrib.strands import StrandsPlugin
8+
9+
from strands_plugin.continue_as_new.workflow import ChatInput, ChatWorkflow
10+
11+
12+
async def main() -> None:
13+
client = await Client.connect(
14+
os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"),
15+
plugins=[StrandsPlugin()],
16+
)
17+
18+
handle = await client.start_workflow(
19+
ChatWorkflow.run,
20+
ChatInput(),
21+
id="strands-chat",
22+
task_queue="strands-chat",
23+
)
24+
25+
await handle.signal(ChatWorkflow.user_says, "Hi! What is durable execution?")
26+
await asyncio.sleep(2)
27+
await handle.signal(ChatWorkflow.user_says, "Give me a one-sentence summary.")
28+
await asyncio.sleep(2)
29+
30+
messages = await handle.query(ChatWorkflow.messages)
31+
print(f"Conversation so far ({len(messages)} messages):")
32+
for message in messages:
33+
print(f" {message['role']}: {message['content']}")
34+
35+
await handle.signal(ChatWorkflow.end_chat)
36+
await handle.result()
37+
38+
39+
if __name__ == "__main__":
40+
asyncio.run(main())
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
"""Chat-style workflow that continues-as-new before history grows too large.
2+
3+
Every turn appends to ``agent.messages``. Once Temporal suggests
4+
continue-as-new, the workflow hands the accumulated messages off to a fresh
5+
run, which seeds a new ``TemporalAgent`` with them and keeps the chat going.
6+
"""
7+
8+
from dataclasses import dataclass, field
9+
from datetime import timedelta
10+
11+
from strands.types.content import Messages
12+
from temporalio import workflow
13+
from temporalio.contrib.strands import TemporalAgent
14+
15+
16+
@dataclass
17+
class ChatInput:
18+
messages: Messages = field(default_factory=list)
19+
20+
21+
@workflow.defn
22+
class ChatWorkflow:
23+
def __init__(self) -> None:
24+
self._pending: list[str] = []
25+
self._done = False
26+
self._messages: Messages = []
27+
28+
@workflow.signal
29+
def user_says(self, prompt: str) -> None:
30+
self._pending.append(prompt)
31+
32+
@workflow.signal
33+
def end_chat(self) -> None:
34+
self._done = True
35+
36+
@workflow.query
37+
def messages(self) -> Messages:
38+
return list(self._messages)
39+
40+
@workflow.run
41+
async def run(self, input: ChatInput) -> None:
42+
agent = TemporalAgent(
43+
start_to_close_timeout=timedelta(seconds=60),
44+
messages=list(input.messages),
45+
)
46+
self._messages = agent.messages
47+
48+
while True:
49+
await workflow.wait_condition(lambda: bool(self._pending) or self._done)
50+
if self._done:
51+
return
52+
prompt = self._pending.pop(0)
53+
await agent.invoke_async(prompt)
54+
self._messages = agent.messages
55+
if workflow.info().is_continue_as_new_suggested():
56+
workflow.continue_as_new(ChatInput(messages=agent.messages))
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Hello World
2+
3+
The simplest Strands + Temporal sample: one `TemporalAgent` invoked once. Every model call runs as an `invoke_model` Temporal activity, so it gets durable retries, timeouts, and crash recovery for free.
4+
5+
## What This Sample Demonstrates
6+
7+
- Wiring `StrandsPlugin` onto the client and worker
8+
- Constructing a `TemporalAgent` with no explicit model (defaults to `BedrockModel()`)
9+
- Invoking the agent from a `@workflow.defn`
10+
11+
## Running the Sample
12+
13+
Prerequisites: `uv sync --group strands`, AWS credentials with Bedrock access, and a running Temporal dev server (`temporal server start-dev`).
14+
15+
```bash
16+
# Terminal 1
17+
uv run strands_plugin/hello_world/run_worker.py
18+
19+
# Terminal 2
20+
uv run strands_plugin/hello_world/run_workflow.py
21+
```
22+
23+
## Files
24+
25+
| File | Description |
26+
|------|-------------|
27+
| `workflow.py` | `HelloWorldWorkflow` with a single `TemporalAgent` |
28+
| `run_worker.py` | Registers `StrandsPlugin`, starts the worker |
29+
| `run_workflow.py` | Executes the workflow and prints the result |

0 commit comments

Comments
 (0)