Skip to content

Commit 78062b4

Browse files
jssmithclaude
andcommitted
samples: rename workflow_stream → workflow_streams; migrate to topic handles
- Directory and module path renamed to plural to match sdk-python `temporalio.contrib.workflow_streams` rename. - Workflow-side: bind a typed topic handle in `@workflow.init` and call `topic.publish(value)` — the removed `WorkflowStream.publish` form is gone. Same change applied to the activity and external-publisher. - Activity: `WorkflowStreamClient.from_activity()` → `from_within_activity()`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 91233b0 commit 78062b4

17 files changed

Lines changed: 96 additions & 100 deletions

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ Some examples require extra dependencies. See each sample's directory for specif
7979
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
8080
* [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
8181
* [prometheus](prometheus) - Configure Prometheus metrics on clients/workers.
82-
* [workflow_stream](workflow_stream) - Workflow-hosted durable event stream via `temporalio.contrib.workflow_stream`. **Experimental — requires the [`contrib/pubsub` branch](https://github.com/temporalio/sdk-python/tree/contrib/pubsub) of sdk-python.**
82+
* [workflow_streams](workflow_streams) - Workflow-hosted durable event stream via `temporalio.contrib.workflow_streams`. **Experimental — requires the [`contrib/pubsub` branch](https://github.com/temporalio/sdk-python/tree/contrib/pubsub) of sdk-python.**
8383
* [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models.
8484
* [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule.
8585
* [sentry](sentry) - Report errors to Sentry.

workflow_stream/activities/payment_activity.py

Lines changed: 0 additions & 41 deletions
This file was deleted.
Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
# Workflow Streams
22

33
> **Experimental.** These samples target the
4-
> `temporalio.contrib.workflow_stream` module on the
4+
> `temporalio.contrib.workflow_streams` module on the
55
> [`contrib/pubsub` branch of sdk-python][branch], which is not yet
66
> released. To run them locally, install sdk-python from that branch
77
> (e.g. `uv pip install -e <path-to-sdk-python>` after checking out the
88
> branch).
99
1010
[branch]: https://github.com/temporalio/sdk-python/tree/contrib/pubsub
1111

12-
`temporalio.contrib.workflow_stream` lets a workflow host a durable,
12+
`temporalio.contrib.workflow_streams` lets a workflow host a durable,
1313
offset-addressed event channel. The workflow holds an append-only log;
1414
external clients (activities, starters, BFFs) publish to topics via
1515
signals and subscribe via long-poll updates. This packages the
@@ -24,7 +24,7 @@ This directory has two scenarios sharing one Worker.
2424
`WorkflowStream` and publishes status events as it processes an order.
2525
* `activities/payment_activity.py` — an activity that publishes
2626
intermediate progress to the stream via
27-
`WorkflowStreamClient.from_activity()`.
27+
`WorkflowStreamClient.from_within_activity()`.
2828
* `run_publisher.py` — starts the workflow, subscribes to both topics,
2929
decodes each by `item.topic`, and prints events as they arrive.
3030

@@ -73,16 +73,16 @@ This directory has two scenarios sharing one Worker.
7373

7474
```bash
7575
# Terminal 1: worker
76-
uv run workflow_stream/run_worker.py
76+
uv run workflow_streams/run_worker.py
7777

7878
# Terminal 2: pick a scenario
79-
uv run workflow_stream/run_publisher.py
79+
uv run workflow_streams/run_publisher.py
8080
# or
81-
uv run workflow_stream/run_reconnecting_subscriber.py
81+
uv run workflow_streams/run_reconnecting_subscriber.py
8282
# or
83-
uv run workflow_stream/run_external_publisher.py
83+
uv run workflow_streams/run_external_publisher.py
8484
# or
85-
uv run workflow_stream/run_truncating_ticker.py
85+
uv run workflow_streams/run_truncating_ticker.py
8686
```
8787

8888
Expected output on the basic publisher side:
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
from datetime import timedelta
5+
6+
from temporalio import activity
7+
from temporalio.contrib.workflow_streams import WorkflowStreamClient
8+
9+
from workflow_streams.shared import TOPIC_PROGRESS, ProgressEvent
10+
11+
12+
@activity.defn
13+
async def charge_card(order_id: str) -> str:
14+
"""Pretend to charge a card, publishing progress to the parent workflow.
15+
16+
`WorkflowStreamClient.from_within_activity()` reads the parent
17+
workflow id and the Temporal client from the activity context, so
18+
this activity can push events back without any wiring.
19+
20+
Caveat: each call to ``from_within_activity()`` creates a fresh
21+
client with a random ``publisher_id``, so dedup does not protect
22+
against an activity retry republishing the same events. For
23+
activities that must be exactly-once on the stream side, derive a
24+
stable ``publisher_id`` from ``activity.info().activity_id`` (this
25+
is invariant across attempts of the same scheduled activity). The
26+
current ``WorkflowStreamClient`` API does not yet expose
27+
``publisher_id`` on its constructors; this sample accepts
28+
at-most-once-per-attempt semantics.
29+
"""
30+
client = WorkflowStreamClient.from_within_activity(
31+
batch_interval=timedelta(milliseconds=200)
32+
)
33+
async with client:
34+
progress = client.topic(TOPIC_PROGRESS, type=ProgressEvent)
35+
progress.publish(ProgressEvent(message="charging card..."))
36+
await asyncio.sleep(1.0)
37+
progress.publish(
38+
ProgressEvent(message="card charged"),
39+
force_flush=True,
40+
)
41+
return f"charge-{order_id}"

workflow_stream/run_external_publisher.py renamed to workflow_streams/run_external_publisher.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414
``HubWorkflow.close``, the workflow's run finishes, and the
1515
subscriber's iterator exits normally.
1616
17-
Run the worker first (``uv run workflow_stream/run_worker.py``), then::
17+
Run the worker first (``uv run workflow_streams/run_worker.py``), then::
1818
19-
uv run workflow_stream/run_external_publisher.py
19+
uv run workflow_streams/run_external_publisher.py
2020
"""
2121

2222
from __future__ import annotations
@@ -25,15 +25,15 @@
2525
import uuid
2626

2727
from temporalio.client import Client
28-
from temporalio.contrib.workflow_stream import WorkflowStreamClient
28+
from temporalio.contrib.workflow_streams import WorkflowStreamClient
2929

30-
from workflow_stream.shared import (
30+
from workflow_streams.shared import (
3131
TASK_QUEUE,
3232
TOPIC_NEWS,
3333
HubInput,
3434
NewsEvent,
3535
)
36-
from workflow_stream.workflows.hub_workflow import HubWorkflow
36+
from workflow_streams.workflows.hub_workflow import HubWorkflow
3737

3838

3939
HEADLINES = [
@@ -64,8 +64,9 @@ async def publish_news() -> None:
6464
# we know the events landed before the workflow shuts down.
6565
producer = WorkflowStreamClient.create(client, workflow_id)
6666
async with producer:
67+
news = producer.topic(TOPIC_NEWS, type=NewsEvent)
6768
for headline in HEADLINES:
68-
producer.publish(TOPIC_NEWS, NewsEvent(headline=headline))
69+
news.publish(NewsEvent(headline=headline))
6970
print(f"[publisher] sent: {headline}")
7071
await asyncio.sleep(0.5)
7172
await producer.flush()
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55

66
from temporalio.api.common.v1 import Payload
77
from temporalio.client import Client
8-
from temporalio.contrib.workflow_stream import WorkflowStreamClient
8+
from temporalio.contrib.workflow_streams import WorkflowStreamClient
99

10-
from workflow_stream.shared import (
10+
from workflow_streams.shared import (
1111
TASK_QUEUE,
1212
TOPIC_PROGRESS,
1313
TOPIC_STATUS,
@@ -16,7 +16,7 @@
1616
StatusEvent,
1717
race_with_workflow,
1818
)
19-
from workflow_stream.workflows.order_workflow import OrderWorkflow
19+
from workflow_streams.workflows.order_workflow import OrderWorkflow
2020

2121

2222
async def main() -> None:

workflow_stream/run_reconnecting_subscriber.py renamed to workflow_streams/run_reconnecting_subscriber.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
the demo short. The same code shape works across actual process
1111
restarts because the resume offset is persisted to disk between phases.
1212
13-
Run the worker first (``uv run workflow_stream/run_worker.py``), then::
13+
Run the worker first (``uv run workflow_streams/run_worker.py``), then::
1414
15-
uv run workflow_stream/run_reconnecting_subscriber.py
15+
uv run workflow_streams/run_reconnecting_subscriber.py
1616
"""
1717

1818
from __future__ import annotations
@@ -23,15 +23,15 @@
2323
from pathlib import Path
2424

2525
from temporalio.client import Client
26-
from temporalio.contrib.workflow_stream import WorkflowStreamClient
26+
from temporalio.contrib.workflow_streams import WorkflowStreamClient
2727

28-
from workflow_stream.shared import (
28+
from workflow_streams.shared import (
2929
TASK_QUEUE,
3030
TOPIC_STATUS,
3131
PipelineInput,
3232
StageEvent,
3333
)
34-
from workflow_stream.workflows.pipeline_workflow import PipelineWorkflow
34+
from workflow_streams.workflows.pipeline_workflow import PipelineWorkflow
3535

3636
# Number of events read in phase 1 before simulating a disconnect.
3737
# Picked small enough that the workflow is still running after.
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
the right trade — pair with set-semantic events where each event
1919
carries enough state to make missing the prior ones recoverable.
2020
21-
Run the worker first (``uv run workflow_stream/run_worker.py``), then::
21+
Run the worker first (``uv run workflow_streams/run_worker.py``), then::
2222
23-
uv run workflow_stream/run_truncating_ticker.py
23+
uv run workflow_streams/run_truncating_ticker.py
2424
"""
2525

2626
from __future__ import annotations
@@ -29,15 +29,15 @@
2929
import uuid
3030

3131
from temporalio.client import Client
32-
from temporalio.contrib.workflow_stream import WorkflowStreamClient
32+
from temporalio.contrib.workflow_streams import WorkflowStreamClient
3333

34-
from workflow_stream.shared import (
34+
from workflow_streams.shared import (
3535
TASK_QUEUE,
3636
TOPIC_TICK,
3737
TickerInput,
3838
TickEvent,
3939
)
40-
from workflow_stream.workflows.ticker_workflow import TickerWorkflow
40+
from workflow_streams.workflows.ticker_workflow import TickerWorkflow
4141

4242

4343
SLOW_SUBSCRIBER_DELAY_S = 1.5

0 commit comments

Comments
 (0)