Skip to content

Commit 91233b0

Browse files
jssmithclaude
andcommitted
samples: workflow_stream: add truncating-ticker scenario
Adds a fourth scenario for long-running workflows that need to bound their event log: the workflow publishes events at a fixed cadence and calls self.stream.truncate(...) periodically to keep only the most recent entries. The runner subscribes twice — fast and slow — to make the trade visible: the fast subscriber sees every offset in order; the slow one falls behind a truncation, has its iterator transparently jump forward to the new base offset, and shows the offset gap that intermediate events fell into. This is the model for high-volume long-running streams: bounded log size, slow consumers may miss intermediate events but always see the most recent state. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent b607117 commit 91233b0

5 files changed

Lines changed: 176 additions & 2 deletions

File tree

workflow_stream/README.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,20 @@ This directory has two scenarios sharing one Worker.
5454
backend service or scheduled job pushing events into a workflow it
5555
didn't itself start.
5656

57-
`run_worker.py` registers all three workflows and the activity.
57+
**Scenario 4 — bounded log via `truncate()`:**
58+
59+
* `workflows/ticker_workflow.py` — a long-running workflow that
60+
publishes events at a fixed cadence and calls
61+
`self.stream.truncate(...)` periodically to bound log growth, keeping
62+
only the most recent N entries.
63+
* `run_truncating_ticker.py` — runs a fast subscriber and a slow
64+
subscriber side by side. The fast one keeps up and sees every offset
65+
in order; the slow one sleeps between iterations, falls behind a
66+
truncation, and silently jumps forward to the new base offset. The
67+
output makes the trade visible: bounded log size in exchange for
68+
intermediate events being invisible to slow consumers.
69+
70+
`run_worker.py` registers all four workflows and the activity.
5871

5972
## Run it
6073

@@ -68,6 +81,8 @@ uv run workflow_stream/run_publisher.py
6881
uv run workflow_stream/run_reconnecting_subscriber.py
6982
# or
7083
uv run workflow_stream/run_external_publisher.py
84+
# or
85+
uv run workflow_stream/run_truncating_ticker.py
7186
```
7287

7388
Expected output on the basic publisher side:
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
"""Truncating ticker: bounded log + slow vs. fast subscribers.
2+
3+
The ``TickerWorkflow`` publishes ``count`` events at a fixed interval,
4+
calling ``self.stream.truncate(...)`` periodically to bound log
5+
growth. This script subscribes twice — once fast, once slow — and
6+
prints both side-by-side so the trade is visible:
7+
8+
* The fast subscriber keeps up and sees every published offset in
9+
order.
10+
* The slow subscriber sleeps between iterations. When a truncation
11+
runs past its position, the iterator silently jumps forward to the
12+
new base offset — the slow subscriber's offsets jump too, and
13+
intermediate events are not visible to it.
14+
15+
This is the bounded-log model: log size is capped, slow consumers may
16+
miss intermediate events, but they always see the most recent state.
17+
For long-running workflows pushing high event volumes this is usually
18+
the right trade — pair with set-semantic events where each event
19+
carries enough state to make missing the prior ones recoverable.
20+
21+
Run the worker first (``uv run workflow_stream/run_worker.py``), then::
22+
23+
uv run workflow_stream/run_truncating_ticker.py
24+
"""
25+
26+
from __future__ import annotations
27+
28+
import asyncio
29+
import uuid
30+
31+
from temporalio.client import Client
32+
from temporalio.contrib.workflow_stream import WorkflowStreamClient
33+
34+
from workflow_stream.shared import (
35+
TASK_QUEUE,
36+
TOPIC_TICK,
37+
TickerInput,
38+
TickEvent,
39+
)
40+
from workflow_stream.workflows.ticker_workflow import TickerWorkflow
41+
42+
43+
SLOW_SUBSCRIBER_DELAY_S = 1.5
44+
45+
46+
async def main() -> None:
47+
client = await Client.connect("localhost:7233")
48+
49+
workflow_id = f"workflow-stream-ticker-{uuid.uuid4().hex[:8]}"
50+
handle = await client.start_workflow(
51+
TickerWorkflow.run,
52+
TickerInput(
53+
count=20,
54+
keep_last=3,
55+
truncate_every=5,
56+
interval_ms=400,
57+
),
58+
id=workflow_id,
59+
task_queue=TASK_QUEUE,
60+
)
61+
62+
stream = WorkflowStreamClient.create(client, workflow_id)
63+
64+
async def fast_subscriber() -> None:
65+
async for item in stream.subscribe([TOPIC_TICK], result_type=TickEvent):
66+
print(f"[fast] offset={item.offset:3d} n={item.data.n}")
67+
68+
async def slow_subscriber() -> None:
69+
async for item in stream.subscribe([TOPIC_TICK], result_type=TickEvent):
70+
print(f"[SLOW] offset={item.offset:3d} n={item.data.n}")
71+
await asyncio.sleep(SLOW_SUBSCRIBER_DELAY_S)
72+
73+
# Both iterators exit normally when the workflow completes. No
74+
# terminal sentinel is needed — see the doc's "When the Workflow
75+
# run completes" note.
76+
await asyncio.gather(fast_subscriber(), slow_subscriber())
77+
78+
result = await handle.result()
79+
print(f"\nworkflow result: {result}")
80+
81+
82+
if __name__ == "__main__":
83+
asyncio.run(main())

workflow_stream/run_worker.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from workflow_stream.workflows.hub_workflow import HubWorkflow
1212
from workflow_stream.workflows.order_workflow import OrderWorkflow
1313
from workflow_stream.workflows.pipeline_workflow import PipelineWorkflow
14+
from workflow_stream.workflows.ticker_workflow import TickerWorkflow
1415

1516

1617
async def main() -> None:
@@ -19,7 +20,7 @@ async def main() -> None:
1920
worker = Worker(
2021
client,
2122
task_queue=TASK_QUEUE,
22-
workflows=[HubWorkflow, OrderWorkflow, PipelineWorkflow],
23+
workflows=[HubWorkflow, OrderWorkflow, PipelineWorkflow, TickerWorkflow],
2324
activities=[charge_card],
2425
)
2526
await worker.run()

workflow_stream/shared.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
TOPIC_STATUS = "status"
1515
TOPIC_PROGRESS = "progress"
1616
TOPIC_NEWS = "news"
17+
TOPIC_TICK = "tick"
1718

1819

1920
@dataclass
@@ -58,6 +59,21 @@ class NewsEvent:
5859
headline: str
5960

6061

62+
@dataclass
63+
class TickerInput:
64+
count: int = 20
65+
keep_last: int = 3
66+
truncate_every: int = 5
67+
interval_ms: int = 400
68+
# Carries stream state across continue-as-new. None on a fresh start.
69+
stream_state: WorkflowStreamState | None = None
70+
71+
72+
@dataclass
73+
class TickEvent:
74+
n: int
75+
76+
6177
T = TypeVar("T")
6278

6379

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
from __future__ import annotations
2+
3+
from datetime import timedelta
4+
5+
from temporalio import workflow
6+
from temporalio.contrib.workflow_stream import WorkflowStream
7+
8+
from workflow_stream.shared import (
9+
TOPIC_TICK,
10+
TickEvent,
11+
TickerInput,
12+
)
13+
14+
15+
@workflow.defn
16+
class TickerWorkflow:
17+
"""Long-running ticker that bounds its event log via ``truncate``.
18+
19+
Long-running workflows that publish high volumes of events would
20+
otherwise grow their event log unboundedly. This workflow shows
21+
the truncation pattern: every ``truncate_every`` events, drop
22+
everything except the last ``keep_last`` entries by calling
23+
``self.stream.truncate(safe_offset)``.
24+
25+
Subscribers that fall behind a truncation jump forward to the new
26+
base offset transparently (the iterator handles the
27+
``TruncatedOffset`` error internally), so consumers stay live but
28+
may not see every intermediate event. That is the trade: bounded
29+
log size in exchange for at-best-effort delivery to slow
30+
consumers.
31+
32+
To compute the truncation offset the workflow tracks its own
33+
published count. ``WorkflowStream`` does not expose a workflow-side
34+
head-offset accessor, but the running count plus the carried
35+
``base_offset`` (in continue-as-new chains) is sufficient.
36+
"""
37+
38+
@workflow.init
39+
def __init__(self, input: TickerInput) -> None:
40+
self.stream = WorkflowStream(prior_state=input.stream_state)
41+
# Running count of events published by THIS run. To compute a
42+
# global offset, add the prior_state's base_offset (omitted
43+
# here — this sample doesn't continue-as-new).
44+
self._published = 0
45+
46+
@workflow.run
47+
async def run(self, input: TickerInput) -> str:
48+
for n in range(input.count):
49+
self.stream.publish(TOPIC_TICK, TickEvent(n=n))
50+
self._published += 1
51+
await workflow.sleep(timedelta(milliseconds=input.interval_ms))
52+
if (
53+
self._published % input.truncate_every == 0
54+
and self._published > input.keep_last
55+
):
56+
# Drop everything except the last `keep_last` entries.
57+
truncate_to = self._published - input.keep_last
58+
self.stream.truncate(truncate_to)
59+
return f"ticker emitted {self._published} events"

0 commit comments

Comments
 (0)