Skip to content

Commit 553bfdb

Browse files
committed
samples: workflow_streams: drop temp-file resume offset; add stats column
The reconnecting-subscriber demo previously persisted its resume offset to a temp file between phases. Inside one process that's theatrical: the disconnect/reconnect shape comes from creating a fresh Client + WorkflowStreamClient with from_offset=N, not from where N happens to be stored. Replace the file with a local int and a comment about durable storage in production (a DB row keyed by user_id/run_id, etc.). Restructure output around a stats column so the demo conveys what's happening to the stream at all times, not just between phases. A background poller calls WorkflowStreamClient.get_offset() throughout and emits a heartbeat line once a second; every emit prints current proc/avail/pend in a left column followed by the phase or event message. Watching pend grow during the disconnect window and shrink again as phase 2 catches up is the demo's core point.
1 parent d5cc2fe commit 553bfdb

2 files changed

Lines changed: 134 additions & 69 deletions

File tree

workflow_streams/README.md

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -130,22 +130,27 @@ Expected output on the basic publisher side:
130130
workflow result: charge-order-1
131131
```
132132

133-
Expected output on the reconnecting subscriber side (note the offsets
134-
are continuous across the disconnect — no events lost, none duplicated):
133+
Expected output on the reconnecting subscriber side. Each line carries
134+
a stats column on the left (`proc`, `avail`, `pend`) and a phase /
135+
event message on the right; a background poller emits a `·` heartbeat
136+
once a second. Offsets are continuous across the disconnect — no
137+
events lost, none duplicated:
135138

136139
```
137-
[phase 1] connecting and reading first few events
138-
offset= 0 stage=validating
139-
offset= 1 stage=loading data
140-
[phase 1] persisted resume offset=2 -> /tmp/...; disconnecting
141-
142-
[phase 2] reconnecting and resuming from persisted offset
143-
offset= 2 stage=transforming
144-
offset= 3 stage=writing output
145-
offset= 4 stage=verifying
146-
offset= 5 stage=complete
147-
148-
workflow result: pipeline workflow-stream-pipeline-... done
140+
proc= 0 avail= 0 pend= 0 │ started workflow-stream-pipeline-...
141+
proc= 0 avail= 1 pend= 1 │ [phase 1] connecting
142+
proc= 1 avail= 1 pend= 0 │ offset= 0 stage=validating
143+
proc= 2 avail= 2 pend= 0 │ offset= 1 stage=loading data
144+
proc= 2 avail= 2 pend= 0 │ [phase 1] disconnecting
145+
proc= 2 avail= 3 pend= 1 │ ·
146+
proc= 2 avail= 3 pend= 1 │ ·
147+
proc= 2 avail= 4 pend= 2 │ ·
148+
proc= 2 avail= 4 pend= 2 │ [phase 2] reconnecting
149+
proc= 3 avail= 4 pend= 1 │ offset= 2 stage=transforming
150+
proc= 4 avail= 4 pend= 0 │ offset= 3 stage=writing output
151+
proc= 5 avail= 5 pend= 0 │ offset= 4 stage=verifying
152+
proc= 6 avail= 6 pend= 0 │ offset= 5 stage=complete
153+
proc= 6 avail= 6 pend= 0 │ workflow result: pipeline ... done
149154
```
150155

151156
## Notes

workflow_streams/run_reconnecting_subscriber.py

Lines changed: 115 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
"""Reconnecting subscriber: persist offset, disconnect, resume.
1+
"""Reconnecting subscriber: read a few events, disconnect, resume.
22
33
Demonstrates the central Workflow Streams use case: a consumer can
44
disappear mid-stream — page refresh, server restart, laptop closed —
@@ -8,7 +8,15 @@
88
99
The script runs the pattern in two phases inside one process to keep
1010
the demo short. The same code shape works across actual process
11-
restarts because the resume offset is persisted to disk between phases.
11+
restarts because the resume offset is durable in the workflow, not in
12+
the consumer.
13+
14+
Output is one line per emit, with current stream stats in a left column
15+
and a phase / event message in a right column. A background poller
16+
calls ``WorkflowStreamClient.get_offset()`` for the whole demo and
17+
emits a heartbeat line once a second so you can watch ``pending``
18+
(``available - processed``) grow while the consumer is disconnected
19+
and shrink as phase 2 catches up.
1220
1321
Run the worker first (``uv run workflow_streams/run_worker.py``), then::
1422
@@ -18,9 +26,8 @@
1826
from __future__ import annotations
1927

2028
import asyncio
21-
import tempfile
2229
import uuid
23-
from pathlib import Path
30+
from dataclasses import dataclass
2431

2532
from temporalio.client import Client
2633
from temporalio.contrib.workflow_streams import WorkflowStreamClient
@@ -37,6 +44,36 @@
3744
# Picked small enough that the workflow is still running after.
3845
PHASE_1_EVENTS = 2
3946

47+
# How long to stay disconnected.
48+
DISCONNECT_SECONDS = 3.0
49+
50+
# Background poller cadence. The poller refreshes state.available this
51+
# often and emits a heartbeat line once per HEARTBEAT_SECONDS.
52+
POLL_INTERVAL_SECONDS = 0.25
53+
HEARTBEAT_SECONDS = 1.0
54+
55+
# Width of the stats column. Picked to fit the longest stats string.
56+
LEFT_WIDTH = 30
57+
58+
59+
@dataclass
60+
class State:
61+
processed: int = 0
62+
available: int = 0
63+
64+
@property
65+
def pending(self) -> int:
66+
return max(0, self.available - self.processed)
67+
68+
69+
def emit(state: State, message: str) -> None:
70+
left = (
71+
f"proc={state.processed:>2} "
72+
f"avail={state.available:>2} "
73+
f"pend={state.pending:>2}"
74+
)
75+
print(f"{left:<{LEFT_WIDTH}}{message}", flush=True)
76+
4077

4178
async def main() -> None:
4279
client = await Client.connect("localhost:7233")
@@ -49,58 +86,81 @@ async def main() -> None:
4986
task_queue=TASK_QUEUE,
5087
)
5188

52-
# Where the consumer remembers its position. In a real BFF or UI
53-
# backend this would be a database row keyed by (user_id, run_id);
54-
# a temp file keeps the sample self-contained.
55-
offset_path = Path(tempfile.gettempdir()) / f"{workflow_id}.offset"
56-
57-
# ---- Phase 1: connect, read a couple of events, persist offset, disconnect.
58-
print("[phase 1] connecting and reading first few events")
89+
# In a real BFF the resume offset lives in durable storage keyed by
90+
# (user_id, run_id) — a database row, a Redis key, etc. For an
91+
# in-process demo a State.processed attribute works the same way.
92+
state = State()
5993
stream = WorkflowStreamClient.create(client, workflow_id)
60-
seen = 0
61-
next_offset = 0
62-
async for item in stream.subscribe([TOPIC_STATUS], result_type=StageEvent):
63-
print(f" offset={item.offset:2d} stage={item.data.stage}")
64-
# Persist *one past* the offset just consumed. On resume we want
65-
# the *next* unseen event, not the one we already showed.
66-
next_offset = item.offset + 1
67-
offset_path.write_text(str(next_offset))
68-
seen += 1
69-
if seen >= PHASE_1_EVENTS:
70-
break
71-
72-
print(
73-
f"[phase 1] persisted resume offset={next_offset} -> {offset_path}; disconnecting\n"
74-
)
75-
# The async for loop exits the subscribe() iterator. Any background
76-
# poll Update is cancelled. The workflow keeps running in the
77-
# background, accumulating events into its log.
78-
await asyncio.sleep(3) # let the workflow publish more in our absence
79-
80-
# ---- Phase 2: reconnect, read persisted offset, resume from there.
81-
print("[phase 2] reconnecting and resuming from persisted offset")
82-
resume_from = int(offset_path.read_text())
83-
# A brand-new client and stream object — same shape as a different
84-
# process picking up where the first one left off.
85-
client2 = await Client.connect("localhost:7233")
86-
stream2 = WorkflowStreamClient.create(client2, workflow_id)
87-
async for item in stream2.subscribe(
88-
[TOPIC_STATUS],
89-
from_offset=resume_from,
90-
result_type=StageEvent,
91-
):
92-
print(f" offset={item.offset:2d} stage={item.data.stage}")
93-
# Continue persisting after each event so a second crash here
94-
# would also resume cleanly.
95-
offset_path.write_text(str(item.offset + 1))
96-
if item.data.stage == "complete":
97-
break
98-
99-
result = await handle.result()
100-
print(f"\nworkflow result: {result}")
101-
# Clean up the offset file; in a real consumer you'd retain it as
102-
# long as the user might reconnect.
103-
offset_path.unlink(missing_ok=True)
94+
emit(state, f"started {workflow_id}")
95+
96+
stop = asyncio.Event()
97+
98+
async def poller() -> None:
99+
"""Refresh state.available; emit a heartbeat line once a second."""
100+
loop = asyncio.get_running_loop()
101+
last_emit = loop.time()
102+
while not stop.is_set():
103+
try:
104+
state.available = await stream.get_offset()
105+
except Exception:
106+
pass
107+
now = loop.time()
108+
if now - last_emit >= HEARTBEAT_SECONDS:
109+
emit(state, "·")
110+
last_emit = now
111+
try:
112+
await asyncio.wait_for(
113+
stop.wait(), timeout=POLL_INTERVAL_SECONDS
114+
)
115+
except asyncio.TimeoutError:
116+
pass
117+
118+
poller_task = asyncio.create_task(poller())
119+
try:
120+
# ---- Phase 1: connect, read a couple of events, "disconnect".
121+
emit(state, "[phase 1] connecting")
122+
seen = 0
123+
async for item in stream.subscribe(
124+
[TOPIC_STATUS], result_type=StageEvent
125+
):
126+
# Remember *one past* the offset just consumed: on resume we
127+
# want the next unseen event, not the one we already showed.
128+
state.processed = item.offset + 1
129+
emit(state, f" offset={item.offset:2d} stage={item.data.stage}")
130+
seen += 1
131+
if seen >= PHASE_1_EVENTS:
132+
break
133+
emit(state, "[phase 1] disconnecting")
134+
135+
# ---- Disconnect window: nobody reads. The workflow keeps
136+
# publishing — `pend` grows on the heartbeat lines as the offset
137+
# advances past `processed`.
138+
await asyncio.sleep(DISCONNECT_SECONDS)
139+
140+
# ---- Phase 2: brand-new client + stream, resume from saved
141+
# offset. Same shape as a different process picking up where the
142+
# first one left off.
143+
emit(state, "[phase 2] reconnecting")
144+
client2 = await Client.connect("localhost:7233")
145+
stream2 = WorkflowStreamClient.create(client2, workflow_id)
146+
async for item in stream2.subscribe(
147+
[TOPIC_STATUS],
148+
from_offset=state.processed,
149+
result_type=StageEvent,
150+
):
151+
state.processed = item.offset + 1
152+
emit(state, f" offset={item.offset:2d} stage={item.data.stage}")
153+
if item.data.stage == "complete":
154+
break
155+
156+
result = await handle.result()
157+
emit(state, f"workflow result: {result}")
158+
finally:
159+
stop.set()
160+
try:
161+
await poller_task
162+
except asyncio.CancelledError:
163+
pass
104164

105165

106166
if __name__ == "__main__":

0 commit comments

Comments
 (0)