Skip to content

Commit b376e8e

Browse files
brianstrauchclaude
andcommitted
Convert continue_as_new chat to workflow update
`user_says` → `turn`, now a `@workflow.update` returning the assistant's reply directly so callers no longer need a separate `messages` query to discover what the agent said. The run loop drains in-flight handlers via `workflow.all_handlers_finished` before continue-as-new. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d8565fc commit b376e8e

4 files changed

Lines changed: 45 additions & 39 deletions

File tree

strands_plugin/continue_as_new/README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ A chat-style workflow accumulates history with every turn and will eventually hi
44

55
## What This Sample Demonstrates
66

7-
- Maintaining a multi-turn chat over signals and queries
7+
- Driving a multi-turn chat with **updates**, so each caller gets the assistant's reply back from the same call
88
- Seeding a new `TemporalAgent` with prior `agent.messages`
99
- Using `workflow.info().is_continue_as_new_suggested()` + `workflow.continue_as_new(...)` to keep the workflow alive indefinitely
10+
- Draining in-flight update handlers with `workflow.all_handlers_finished` before continue-as-new
1011

1112
## Running the Sample
1213

@@ -18,12 +19,12 @@ uv run strands_plugin/continue_as_new/run_worker.py
1819
uv run strands_plugin/continue_as_new/run_workflow.py
1920
```
2021

21-
The starter sends a couple of `user_says` signals, queries the message history, then signals `end_chat`. In a real chatbot, a UI would drive the signals and the workflow would run indefinitely, continuing-as-new whenever history gets large.
22+
The starter calls the `turn` update for each user message and prints the assistant's reply, then signals `end_chat`. In a real chatbot, a UI would drive the updates and the workflow would run indefinitely, continuing-as-new whenever history gets large.
2223

2324
## Files
2425

2526
| File | Description |
2627
|------|-------------|
27-
| `workflow.py` | `ChatInput`, `ChatWorkflow` with `user_says` / `end_chat` signals and `messages` query |
28+
| `workflow.py` | `ChatInput`, `ChatWorkflow` with `turn` update, `end_chat` signal, and `messages` query |
2829
| `run_worker.py` | Registers `StrandsPlugin`, starts the worker |
2930
| `run_workflow.py` | Starts the chat, sends a few turns, ends it |

strands_plugin/continue_as_new/run_workflow.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,13 @@ async def main() -> None:
2222
task_queue="strands-chat",
2323
)
2424

25-
await handle.signal(ChatWorkflow.user_says, "Hi! What is durable execution?")
26-
await asyncio.sleep(2)
27-
await handle.signal(ChatWorkflow.user_says, "Give me a one-sentence summary.")
28-
await asyncio.sleep(2)
29-
30-
messages = await handle.query(ChatWorkflow.messages)
31-
print(f"Conversation so far ({len(messages)} messages):")
32-
for message in messages:
33-
print(f" {message['role']}: {message['content']}")
25+
for prompt in [
26+
"Hi! What is durable execution?",
27+
"Give me a one-sentence summary.",
28+
]:
29+
reply = await handle.execute_update(ChatWorkflow.turn, prompt)
30+
print(f"user: {prompt}")
31+
print(f"assistant: {reply}\n")
3432

3533
await handle.signal(ChatWorkflow.end_chat)
3634
await handle.result()
Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
"""Chat-style workflow that continues-as-new before history grows too large.
22
3-
Every turn appends to ``agent.messages``. Once Temporal suggests
4-
continue-as-new, the workflow hands the accumulated messages off to a fresh
5-
run, which seeds a new ``TemporalAgent`` with them and keeps the chat going.
3+
Each user turn arrives as a Temporal **update**, so the caller gets the
4+
assistant's reply back from the same call. Once Temporal suggests
5+
continue-as-new, the workflow drains any in-flight update handlers and hands
6+
``agent.messages`` off to a fresh run.
67
"""
78

9+
import asyncio
810
from dataclasses import dataclass, field
911
from datetime import timedelta
1012

@@ -21,36 +23,41 @@ class ChatInput:
2123
@workflow.defn
2224
class ChatWorkflow:
2325
def __init__(self) -> None:
24-
self._pending: list[str] = []
2526
self._done = False
26-
self._messages: Messages = []
27-
28-
@workflow.signal
29-
def user_says(self, prompt: str) -> None:
30-
self._pending.append(prompt)
27+
self._lock = asyncio.Lock()
28+
self._agent: TemporalAgent | None = None
29+
30+
@workflow.update
31+
async def turn(self, prompt: str) -> str:
32+
# Updates can arrive before ``run`` has constructed the agent.
33+
await workflow.wait_condition(lambda: self._agent is not None)
34+
# Serialize turns so concurrent updates can't interleave on ``agent.messages``.
35+
async with self._lock:
36+
assert self._agent is not None
37+
result = await self._agent.invoke_async(prompt)
38+
return str(result).strip()
3139

3240
@workflow.signal
3341
def end_chat(self) -> None:
3442
self._done = True
3543

3644
@workflow.query
3745
def messages(self) -> Messages:
38-
return list(self._messages)
46+
return list(self._agent.messages) if self._agent else []
3947

4048
@workflow.run
4149
async def run(self, input: ChatInput) -> None:
42-
agent = TemporalAgent(
50+
self._agent = TemporalAgent(
4351
start_to_close_timeout=timedelta(seconds=60),
4452
messages=list(input.messages),
4553
)
46-
self._messages = agent.messages
47-
48-
while True:
49-
await workflow.wait_condition(lambda: bool(self._pending) or self._done)
50-
if self._done:
51-
return
52-
prompt = self._pending.pop(0)
53-
await agent.invoke_async(prompt)
54-
self._messages = agent.messages
55-
if workflow.info().is_continue_as_new_suggested():
56-
workflow.continue_as_new(ChatInput(messages=agent.messages))
54+
55+
await workflow.wait_condition(
56+
lambda: self._done or workflow.info().is_continue_as_new_suggested()
57+
)
58+
59+
# Let any in-flight ``turn`` updates finish before we exit or hand off.
60+
await workflow.wait_condition(workflow.all_handlers_finished)
61+
62+
if not self._done:
63+
workflow.continue_as_new(ChatInput(messages=self._agent.messages))

tests/strands_plugin/continue_as_new_test.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,13 @@ async def test_continue_as_new_chat(
3333
id=f"strands-chat-{uuid.uuid4()}",
3434
task_queue=task_queue,
3535
)
36-
await handle.signal(ChatWorkflow.user_says, "Hello")
37-
await handle.signal(ChatWorkflow.user_says, "How are you?")
36+
reply1 = await handle.execute_update(ChatWorkflow.turn, "Hello")
37+
reply2 = await handle.execute_update(ChatWorkflow.turn, "How are you?")
3838

39-
messages: list = []
40-
while len(messages) < 4:
41-
messages = await handle.query(ChatWorkflow.messages)
39+
assert reply1 == "First reply."
40+
assert reply2 == "Second reply."
4241

42+
messages = await handle.query(ChatWorkflow.messages)
4343
await handle.signal(ChatWorkflow.end_chat)
4444
await handle.result()
4545

0 commit comments

Comments
 (0)