Skip to content

Commit b607117

Browse files
jssmithclaude
andcommitted
samples: workflow_stream: add external-publisher scenario
Adds a third scenario covering the third publisher shape: a backend service or scheduled job pushing events into a workflow it didn't itself start. The earlier scenarios publish either from inside the workflow or from one of its activities; this one uses WorkflowStreamClient.create() externally. HubWorkflow is a passive stream host — it does no work of its own and just waits to be told to close, fitting the event-bus pattern. The runner publishes a series of news headlines, runs a subscriber task alongside, signals close, and exits when both tasks complete. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent faac49f commit b607117

5 files changed

Lines changed: 159 additions & 2 deletions

File tree

workflow_stream/README.md

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,21 @@ This directory has two scenarios sharing one Worker.
4040
disappear (page refresh, server restart, laptop closed) and resume
4141
later without missing events or seeing duplicates.
4242

43-
`run_worker.py` registers both workflows and the activity.
43+
**Scenario 3 — external (non-Activity) publisher:**
44+
45+
* `workflows/hub_workflow.py` — a passive workflow that does no work
46+
of its own; it exists only to host a `WorkflowStream` and shut down
47+
when signaled.
48+
* `run_external_publisher.py` — starts the hub, then publishes events
49+
into it from a plain Python coroutine using
50+
`WorkflowStreamClient.create(client, workflow_id)`. A subscriber
51+
task runs alongside; when the publisher is done it signals
52+
`HubWorkflow.close`, the workflow's run finishes, and the
53+
subscriber's iterator exits normally. This is the shape that fits a
54+
backend service or scheduled job pushing events into a workflow it
55+
didn't itself start.
56+
57+
`run_worker.py` registers all three workflows and the activity.
4458

4559
## Run it
4660

@@ -52,6 +66,8 @@ uv run workflow_stream/run_worker.py
5266
uv run workflow_stream/run_publisher.py
5367
# or
5468
uv run workflow_stream/run_reconnecting_subscriber.py
69+
# or
70+
uv run workflow_stream/run_external_publisher.py
5571
```
5672

5773
Expected output on the basic publisher side:
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
"""External publisher: a non-Activity process pushes events into a workflow.
2+
3+
The two earlier scenarios publish from inside the workflow itself
4+
(``OrderWorkflow``, ``PipelineWorkflow``) or from an Activity it runs
5+
(``charge_card``). This scenario shows the third shape: a backend
6+
service, scheduled job, or anything else with a Temporal ``Client``
7+
publishing into a *running* workflow it didn't start. Same factory as
8+
the subscribe path — :py:meth:`WorkflowStreamClient.create` — used for
9+
publishing instead.
10+
11+
The script starts a ``HubWorkflow`` (which does no work of its own —
12+
it exists only to host the stream), then runs a publisher and a
13+
subscriber concurrently. When the publisher is done it signals
14+
``HubWorkflow.close``, the workflow's run finishes, and the
15+
subscriber's iterator exits normally.
16+
17+
Run the worker first (``uv run workflow_stream/run_worker.py``), then::
18+
19+
uv run workflow_stream/run_external_publisher.py
20+
"""
21+
22+
from __future__ import annotations
23+
24+
import asyncio
25+
import uuid
26+
27+
from temporalio.client import Client
28+
from temporalio.contrib.workflow_stream import WorkflowStreamClient
29+
30+
from workflow_stream.shared import (
31+
TASK_QUEUE,
32+
TOPIC_NEWS,
33+
HubInput,
34+
NewsEvent,
35+
)
36+
from workflow_stream.workflows.hub_workflow import HubWorkflow
37+
38+
39+
HEADLINES = [
40+
"rates held",
41+
"merger announced",
42+
"outage resolved",
43+
"earnings beat",
44+
"regulator opens probe",
45+
]
46+
47+
48+
async def main() -> None:
49+
client = await Client.connect("localhost:7233")
50+
51+
workflow_id = f"workflow-stream-hub-{uuid.uuid4().hex[:8]}"
52+
handle = await client.start_workflow(
53+
HubWorkflow.run,
54+
HubInput(hub_id=workflow_id),
55+
id=workflow_id,
56+
task_queue=TASK_QUEUE,
57+
)
58+
59+
async def publish_news() -> None:
60+
# WorkflowStreamClient.create takes a Temporal client and a
61+
# workflow id — the same factory used elsewhere for subscribing.
62+
# The async context manager batches publishes and flushes on
63+
# exit; we additionally call flush() before signaling close so
64+
# we know the events landed before the workflow shuts down.
65+
producer = WorkflowStreamClient.create(client, workflow_id)
66+
async with producer:
67+
for headline in HEADLINES:
68+
producer.publish(TOPIC_NEWS, NewsEvent(headline=headline))
69+
print(f"[publisher] sent: {headline}")
70+
await asyncio.sleep(0.5)
71+
await producer.flush()
72+
# Tell the hub it can stop. The workflow's run() returns, and
73+
# any in-flight subscribers see their async-for loop exit.
74+
await handle.signal(HubWorkflow.close)
75+
print("[publisher] signaled close")
76+
77+
async def consume_news() -> None:
78+
consumer = WorkflowStreamClient.create(client, workflow_id)
79+
async for item in consumer.subscribe(
80+
[TOPIC_NEWS], result_type=NewsEvent
81+
):
82+
print(f"[subscriber] offset={item.offset}: {item.data.headline}")
83+
84+
await asyncio.gather(publish_news(), consume_news())
85+
86+
result = await handle.result()
87+
print(f"\nworkflow result: {result}")
88+
89+
90+
if __name__ == "__main__":
91+
asyncio.run(main())

workflow_stream/run_worker.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from workflow_stream.activities.payment_activity import charge_card
1010
from workflow_stream.shared import TASK_QUEUE
11+
from workflow_stream.workflows.hub_workflow import HubWorkflow
1112
from workflow_stream.workflows.order_workflow import OrderWorkflow
1213
from workflow_stream.workflows.pipeline_workflow import PipelineWorkflow
1314

@@ -18,7 +19,7 @@ async def main() -> None:
1819
worker = Worker(
1920
client,
2021
task_queue=TASK_QUEUE,
21-
workflows=[OrderWorkflow, PipelineWorkflow],
22+
workflows=[HubWorkflow, OrderWorkflow, PipelineWorkflow],
2223
activities=[charge_card],
2324
)
2425
await worker.run()

workflow_stream/shared.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# Topics published by the workflow / activity.
1414
TOPIC_STATUS = "status"
1515
TOPIC_PROGRESS = "progress"
16+
TOPIC_NEWS = "news"
1617

1718

1819
@dataclass
@@ -45,6 +46,18 @@ class StageEvent:
4546
stage: str
4647

4748

49+
@dataclass
50+
class HubInput:
51+
hub_id: str
52+
# Carries stream state across continue-as-new. None on a fresh start.
53+
stream_state: WorkflowStreamState | None = None
54+
55+
56+
@dataclass
57+
class NewsEvent:
58+
headline: str
59+
60+
4861
T = TypeVar("T")
4962

5063

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from __future__ import annotations
2+
3+
from temporalio import workflow
4+
from temporalio.contrib.workflow_stream import WorkflowStream
5+
6+
from workflow_stream.shared import HubInput
7+
8+
9+
@workflow.defn
10+
class HubWorkflow:
11+
"""Passive stream host: starts up, waits, closes when told.
12+
13+
Unlike OrderWorkflow or PipelineWorkflow, this workflow does no
14+
work of its own — it exists only to host a ``WorkflowStream`` that
15+
external publishers push events into and external subscribers read
16+
from. The shape that fits a backend service or "event bus" pattern,
17+
where the workflow owns durable state but the events come from
18+
outside.
19+
"""
20+
21+
@workflow.init
22+
def __init__(self, input: HubInput) -> None:
23+
self.stream = WorkflowStream(prior_state=input.stream_state)
24+
self._closed = False
25+
26+
@workflow.run
27+
async def run(self, input: HubInput) -> str:
28+
await workflow.wait_condition(lambda: self._closed)
29+
return f"hub {input.hub_id} closed"
30+
31+
@workflow.signal
32+
def close(self) -> None:
33+
# Custom signal handler that does not read stream state, so the
34+
# synchronous-handler race documented in the README does not
35+
# apply.
36+
self._closed = True

0 commit comments

Comments
 (0)