Skip to content

Commit 81bf605

Browse files
committed
samples: workflow_streams: rename chat -> llm in scenario 5
"Chat" implies multi-turn conversation. The new scenario is a one-shot LLM completion stream, not a chat. Rename to make the scope clear: - chat_shared.py -> llm_shared.py - workflows/chat_workflow.py -> workflows/llm_workflow.py - activities/chat_activity.py -> activities/llm_activity.py - run_chat.py -> run_llm.py - run_chat_worker.py -> run_llm_worker.py - ChatInput / ChatWorkflow -> LLMInput / LLMWorkflow - CHAT_TASK_QUEUE -> LLM_TASK_QUEUE ("workflow-stream-chat-task-queue" -> "workflow-stream-llm-task-queue") - chat-stream extra -> llm-stream - workflow id prefix workflow-stream-chat-... -> workflow-stream-llm-... The activity's `stream_completion` defn name and the topic constants (`delta`, `complete`, `retry`) stay the same — those already describe what they do without the "chat" framing. README, docstrings, and run instructions updated to match.
1 parent 0b4cbc8 commit 81bf605

7 files changed

Lines changed: 50 additions & 50 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ openai-agents = [
4848
pydantic-converter = ["pydantic>=2.10.6,<3"]
4949
sentry = ["sentry-sdk>=2.13.0"]
5050
trio-async = ["trio>=0.28.0,<0.29", "trio-asyncio>=0.15.0,<0.16"]
51-
chat-stream = ["openai>=1.0"]
51+
llm-stream = ["openai>=1.0"]
5252
cloud-export-to-parquet = [
5353
"pandas>=2.2.2,<3 ; python_version >= '3.10' and python_version < '4.0'",
5454
"numpy>=1.26.0,<2 ; python_version >= '3.10' and python_version < '3.13'",

workflow_streams/README.md

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -71,49 +71,49 @@ it's separate).
7171

7272
## Scenario 5 — LLM streaming
7373

74-
* `workflows/chat_workflow.py` — a workflow that hosts a
74+
* `workflows/llm_workflow.py` — a workflow that hosts a
7575
`WorkflowStream` and runs `stream_completion` as a single activity.
7676
The workflow itself does no streaming; the activity owns the
7777
non-deterministic OpenAI call.
78-
* `activities/chat_activity.py` — calls
78+
* `activities/llm_activity.py` — calls
7979
`openai.AsyncOpenAI().chat.completions.create(stream=True)`,
8080
publishes each token chunk as a `TextDelta` on the `delta` topic,
8181
the final accumulated text on the `complete` topic, and a
8282
`RetryEvent` on the `retry` topic when running on attempt > 1.
83-
* `run_chat.py` — subscribes to all three topics, renders deltas to
83+
* `run_llm.py` — subscribes to all three topics, renders deltas to
8484
the terminal as they arrive, and on a `retry` event uses ANSI
8585
escapes to rewind the printed output before the retried attempt
8686
starts re-publishing.
87-
* `run_chat_worker.py` — separate worker on its own task queue
88-
(`workflow-stream-chat-task-queue`), registering only `ChatWorkflow`
87+
* `run_llm_worker.py` — separate worker on its own task queue
88+
(`workflow-stream-llm-task-queue`), registering only `LLMWorkflow`
8989
and `stream_completion`. This isolates the `openai` dependency and
9090
the `OPENAI_API_KEY` requirement to this one scenario.
9191

9292
This scenario is split out for two reasons. First, it needs an extra
9393
dependency (`openai`) and a secret (`OPENAI_API_KEY`) — putting it on
9494
the main worker would force every other scenario to set up an OpenAI
95-
key. Second, killing the chat worker mid-stream is the easiest way to
95+
key. Second, killing the LLM worker mid-stream is the easiest way to
9696
demonstrate retry handling, and you don't want the same `Ctrl-C` to
9797
interrupt the other four scenarios' worker.
9898

9999
Setup:
100100

101101
```bash
102-
uv sync --group chat-stream
102+
uv sync --group llm-stream
103103
export OPENAI_API_KEY=...
104104
```
105105

106106
Run:
107107

108108
```bash
109-
# Terminal 1: chat worker (its own task queue)
110-
uv run workflow_streams/run_chat_worker.py
109+
# Terminal 1: LLM worker (its own task queue)
110+
uv run workflow_streams/run_llm_worker.py
111111

112112
# Terminal 2:
113-
uv run workflow_streams/run_chat.py
113+
uv run workflow_streams/run_llm.py
114114
```
115115

116-
To trigger the retry path, kill the chat worker in Terminal 1
116+
To trigger the retry path, kill the LLM worker in Terminal 1
117117
(`Ctrl-C`) while output is streaming, then start it again. The
118118
activity's next attempt sends a `RetryEvent` first; the consumer
119119
clears its on-screen output via ANSI escapes and re-renders from

workflow_streams/activities/chat_activity.py renamed to workflow_streams/activities/llm_activity.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,20 @@
66
from temporalio import activity
77
from temporalio.contrib.workflow_streams import WorkflowStreamClient
88

9-
from workflow_streams.chat_shared import (
9+
from workflow_streams.llm_shared import (
1010
TOPIC_COMPLETE,
1111
TOPIC_DELTA,
1212
TOPIC_RETRY,
13-
ChatInput,
13+
LLMInput,
1414
RetryEvent,
1515
TextComplete,
1616
TextDelta,
1717
)
1818

1919

2020
@activity.defn
21-
async def stream_completion(input: ChatInput) -> str:
22-
"""Stream a chat completion to the parent workflow's stream.
21+
async def stream_completion(input: LLMInput) -> str:
22+
"""Stream an LLM completion to the parent workflow's stream.
2323
2424
Activity-as-publisher: each delta from the OpenAI streaming API is
2525
pushed to the workflow's stream as a ``TextDelta`` event on the
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
"""Types and constants for the LLM-streaming scenario.
22
33
Kept separate from ``shared.py`` because the other scenarios don't
4-
use these — and the chat scenario runs on its own worker and task
5-
queue so the ``openai`` dependency stays out of everyone else's path.
4+
use these — and this scenario runs on its own worker and task queue
5+
so the ``openai`` dependency stays out of everyone else's path.
66
"""
77

88
from __future__ import annotations
@@ -11,9 +11,9 @@
1111

1212
from temporalio.contrib.workflow_streams import WorkflowStreamState
1313

14-
# Scenario 5 (LLM streaming) runs on its own worker so the openai
15-
# dependency only matters for that scenario.
16-
CHAT_TASK_QUEUE = "workflow-stream-chat-task-queue"
14+
# Scenario 5 runs on its own worker so the openai dependency only
15+
# matters for that scenario.
16+
LLM_TASK_QUEUE = "workflow-stream-llm-task-queue"
1717

1818
# Topics published by the activity.
1919
TOPIC_DELTA = "delta"
@@ -22,7 +22,7 @@
2222

2323

2424
@dataclass
25-
class ChatInput:
25+
class LLMInput:
2626
prompt: str
2727
model: str = "gpt-5-mini"
2828
# Carries stream state across continue-as-new. None on a fresh start.
Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,24 @@
11
"""Stream LLM output to the terminal, handling retries.
22
3-
Starts a ``ChatWorkflow``, subscribes to its delta / complete / retry
3+
Starts an ``LLMWorkflow``, subscribes to its delta / complete / retry
44
topics, and renders the model's output to stdout as it arrives. On a
55
``RETRY`` event (the activity is on attempt > 1), the consumer rewinds
66
its rendered output with ANSI escapes and starts fresh — so a killed
77
worker doesn't leave a half-finished response stuck on screen
88
followed by the retried attempt's full output.
99
10-
Requires ``OPENAI_API_KEY`` in the environment and the ``chat-stream``
10+
Requires ``OPENAI_API_KEY`` in the environment and the ``llm-stream``
1111
extra::
1212
13-
uv sync --group chat-stream
13+
uv sync --group llm-stream
1414
export OPENAI_API_KEY=...
1515
16-
Run the chat worker first (``uv run workflow_streams/run_chat_worker.py``),
16+
Run the LLM worker first (``uv run workflow_streams/run_llm_worker.py``),
1717
then::
1818
19-
uv run workflow_streams/run_chat.py
19+
uv run workflow_streams/run_llm.py
2020
21-
To see retry handling in action, kill the chat worker mid-stream
21+
To see retry handling in action, kill the LLM worker mid-stream
2222
(Ctrl-C in its terminal) and start it again. The consumer will clear
2323
its accumulated output on the ``RETRY`` event and re-render the
2424
retried attempt's output from scratch.
@@ -34,17 +34,17 @@
3434
from temporalio.common import RawValue
3535
from temporalio.contrib.workflow_streams import WorkflowStreamClient
3636

37-
from workflow_streams.chat_shared import (
38-
CHAT_TASK_QUEUE,
37+
from workflow_streams.llm_shared import (
38+
LLM_TASK_QUEUE,
3939
TOPIC_COMPLETE,
4040
TOPIC_DELTA,
4141
TOPIC_RETRY,
42-
ChatInput,
42+
LLMInput,
4343
RetryEvent,
4444
TextComplete,
4545
TextDelta,
4646
)
47-
from workflow_streams.workflows.chat_workflow import ChatWorkflow
47+
from workflow_streams.workflows.llm_workflow import LLMWorkflow
4848

4949
# Long enough that you can comfortably kill the worker mid-stream and
5050
# watch the retry render. Adjust to taste.
@@ -71,20 +71,20 @@ async def main() -> None:
7171
client = await Client.connect("localhost:7233")
7272
converter = client.data_converter.payload_converter
7373

74-
workflow_id = f"workflow-stream-chat-{uuid.uuid4().hex[:8]}"
75-
chat_input = ChatInput(prompt=DEFAULT_PROMPT)
74+
workflow_id = f"workflow-stream-llm-{uuid.uuid4().hex[:8]}"
75+
llm_input = LLMInput(prompt=DEFAULT_PROMPT)
7676
handle = await client.start_workflow(
77-
ChatWorkflow.run,
78-
chat_input,
77+
LLMWorkflow.run,
78+
llm_input,
7979
id=workflow_id,
80-
task_queue=CHAT_TASK_QUEUE,
80+
task_queue=LLM_TASK_QUEUE,
8181
)
8282

8383
# Print a header so the user sees something immediately. The
8484
# response will start streaming below it once the first delta
8585
# arrives — until then this is the only line on screen.
8686
print(
87-
f"[chat {workflow_id}] streaming response from {chat_input.model}, "
87+
f"[llm {workflow_id}] streaming response from {llm_input.model}, "
8888
f"awaiting first token..."
8989
)
9090
print()
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
scenario. Different task queue too — the other four samples won't
66
route work to this worker.
77
8-
Kill this worker mid-stream while ``run_chat.py`` is running to
8+
Kill this worker mid-stream while ``run_llm.py`` is running to
99
trigger a retry: Temporal restarts the activity on the next worker
1010
to come up, the activity publishes a ``RetryEvent`` on its second
1111
attempt, and the consumer resets its rendered output.
@@ -19,18 +19,18 @@
1919
from temporalio.client import Client
2020
from temporalio.worker import Worker
2121

22-
from workflow_streams.activities.chat_activity import stream_completion
23-
from workflow_streams.chat_shared import CHAT_TASK_QUEUE
24-
from workflow_streams.workflows.chat_workflow import ChatWorkflow
22+
from workflow_streams.activities.llm_activity import stream_completion
23+
from workflow_streams.llm_shared import LLM_TASK_QUEUE
24+
from workflow_streams.workflows.llm_workflow import LLMWorkflow
2525

2626

2727
async def main() -> None:
2828
logging.basicConfig(level=logging.INFO)
2929
client = await Client.connect("localhost:7233")
3030
worker = Worker(
3131
client,
32-
task_queue=CHAT_TASK_QUEUE,
33-
workflows=[ChatWorkflow],
32+
task_queue=LLM_TASK_QUEUE,
33+
workflows=[LLMWorkflow],
3434
activities=[stream_completion],
3535
)
3636
await worker.run()

workflow_streams/workflows/chat_workflow.py renamed to workflow_streams/workflows/llm_workflow.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@
66
from temporalio.common import RetryPolicy
77
from temporalio.contrib.workflow_streams import WorkflowStream
88

9-
from workflow_streams.chat_shared import ChatInput
9+
from workflow_streams.llm_shared import LLMInput
1010

1111
with workflow.unsafe.imports_passed_through():
12-
from workflow_streams.activities.chat_activity import stream_completion
12+
from workflow_streams.activities.llm_activity import stream_completion
1313

1414

1515
@workflow.defn
16-
class ChatWorkflow:
16+
class LLMWorkflow:
1717
"""Wrapper for an LLM-streaming activity.
1818
1919
The workflow does no streaming of its own; it hosts the
@@ -26,18 +26,18 @@ class ChatWorkflow:
2626
retries it (up to ``max_attempts``); the retried attempt
2727
re-publishes from the start, so the consumer must reset on the
2828
activity's ``RETRY`` event. See
29-
`activities/chat_activity.py` and `run_chat.py`.
29+
`activities/llm_activity.py` and `run_llm.py`.
3030
"""
3131

3232
@workflow.init
33-
def __init__(self, input: ChatInput) -> None:
33+
def __init__(self, input: LLMInput) -> None:
3434
# Construct the stream from `@workflow.init` so the
3535
# publish-Signal handler is registered before any external
3636
# publisher (the activity, here) tries to publish.
3737
self.stream = WorkflowStream(prior_state=input.stream_state)
3838

3939
@workflow.run
40-
async def run(self, input: ChatInput) -> str:
40+
async def run(self, input: LLMInput) -> str:
4141
result = await workflow.execute_activity(
4242
stream_completion,
4343
input,

0 commit comments

Comments
 (0)