Skip to content

Commit c107687

Browse files
committed
samples: workflow_streams: surface multiple truncation jumps in ticker
The truncating-ticker demo is meant to make the bounded-log trade visible: fast subscriber sees every event, slow subscriber loses intermediate ones to truncation. The previous parameters (truncate_every=5, keep_last=3, interval_ms=400, slow_delay=1.5s) produced at most one tiny jump near the end of the run — easy to miss. Tighter parameters (truncate_every=2, keep_last=1, interval_ms=200, count=30) keep the workflow log at one or two entries between truncations. That shrinks the slow subscriber's per-poll batch, so it re-polls more often, and most polls land after a truncation that has passed its position. The result is several visible jumps over the demo, not a single batched one at the end. Switch the output to two lanes (fast on the left, slow on the right with explicit "↪ jumped offset=N → M (K dropped)" markers) so the divergence reads at a glance instead of being lost in interleaved single-stream output. Also extend the docstring to call out the opposite trade — never truncating means slow consumers eventually catch up at the cost of unbounded workflow history — so readers know when this pattern is the wrong fit.
1 parent 553bfdb commit c107687

1 file changed

Lines changed: 67 additions & 26 deletions

File tree

workflow_streams/run_truncating_ticker.py

Lines changed: 67 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,26 @@
33
The ``TickerWorkflow`` publishes ``count`` events at a fixed interval,
44
calling ``self.stream.truncate(...)`` periodically to bound log
55
growth. This script subscribes twice — once fast, once slow — and
6-
prints both side-by-side so the trade is visible:
7-
8-
* The fast subscriber keeps up and sees every published offset in
9-
order.
10-
* The slow subscriber sleeps between iterations. When a truncation
11-
runs past its position, the iterator silently jumps forward to the
12-
new base offset — the slow subscriber's offsets jump too, and
13-
intermediate events are not visible to it.
14-
15-
This is the bounded-log model: log size is capped, slow consumers may
16-
miss intermediate events, but they always see the most recent state.
17-
For long-running workflows pushing high event volumes this is usually
18-
the right trade — pair with set-semantic events where each event
19-
carries enough state to make missing the prior ones recoverable.
6+
prints them in two lanes so the trade is visible at a glance:
7+
8+
* **Fast lane** (left). Keeps up. Sees every published offset.
9+
* **Slow lane** (right). Sleeps between iterations. When a truncation
10+
has dropped its position by the time it polls again, the iterator
11+
silently jumps forward to the new base offset; the slow lane prints
12+
a ``↪ jumped N → M (K dropped)`` marker for each gap and resumes
13+
at the new offset.
14+
15+
``truncate()`` is unilateral: the workflow does not know who is
16+
subscribed and does not wait for them. The implicit alternative —
17+
never truncating — keeps every event around forever, lets slow
18+
consumers eventually catch up without losses, and pays for it in
19+
unbounded workflow history. The truncation model is the opposite
20+
trade: bounded log, at-best-effort delivery to slow consumers, no
21+
backpressure on the publisher. Pair it with set-semantic events where
22+
each event carries enough state to make missing the prior ones
23+
recoverable. (If you actually need lossless delivery to slow
24+
consumers, the workflow has to coordinate acknowledgements
25+
explicitly — that is a different sample.)
2026
2127
Run the worker first (``uv run workflow_streams/run_worker.py``), then::
2228
@@ -39,9 +45,37 @@
3945
)
4046
from workflow_streams.workflows.ticker_workflow import TickerWorkflow
4147

42-
48+
# Aggressive truncation so the log stays at most KEEP_LAST entries
49+
# right after each truncation, which keeps the slow subscriber's
50+
# per-poll batch tiny. Small batches + a slow per-event sleep mean the
51+
# slow subscriber re-polls often, and most of those polls land after a
52+
# truncation that has passed its position — so it sees several jumps
53+
# during the run rather than one batched at the end.
54+
TICKER_COUNT = 30
55+
INTERVAL_MS = 200
56+
TRUNCATE_EVERY = 2
57+
KEEP_LAST = 1
4358
SLOW_SUBSCRIBER_DELAY_S = 1.5
44-
TICKER_COUNT = 20
59+
60+
LANE_WIDTH = 32
61+
SEP = "│"
62+
63+
64+
def emit_fast(message: str) -> None:
65+
print(f"{message:<{LANE_WIDTH}} {SEP}", flush=True)
66+
67+
68+
def emit_slow(message: str) -> None:
69+
print(f"{' ' * LANE_WIDTH} {SEP} {message}", flush=True)
70+
71+
72+
def emit_header() -> None:
73+
rule = "─" * LANE_WIDTH
74+
print(
75+
f"{'fast (every event)':<{LANE_WIDTH}} {SEP} "
76+
f"slow (sleeps {SLOW_SUBSCRIBER_DELAY_S}s between events)"
77+
)
78+
print(f"{rule} {SEP} {rule}")
4579

4680

4781
async def main() -> None:
@@ -52,37 +86,44 @@ async def main() -> None:
5286
TickerWorkflow.run,
5387
TickerInput(
5488
count=TICKER_COUNT,
55-
keep_last=3,
56-
truncate_every=5,
57-
interval_ms=400,
89+
keep_last=KEEP_LAST,
90+
truncate_every=TRUNCATE_EVERY,
91+
interval_ms=INTERVAL_MS,
5892
),
5993
id=workflow_id,
6094
task_queue=TASK_QUEUE,
6195
)
62-
6396
stream = WorkflowStreamClient.create(client, workflow_id)
6497
last_n = TICKER_COUNT - 1
6598

66-
# Both subscribers break on the final tick (n == last_n). ``keep_last``
67-
# ensures that offset survives the last truncation so even the slow
68-
# consumer reaches it.
99+
emit_header()
100+
69101
async def fast_subscriber() -> None:
70102
async for item in stream.subscribe([TOPIC_TICK], result_type=TickEvent):
71-
print(f"[fast] offset={item.offset:3d} n={item.data.n}")
103+
emit_fast(f"offset={item.offset:>3} n={item.data.n}")
72104
if item.data.n == last_n:
73105
return
74106

75107
async def slow_subscriber() -> None:
108+
last_offset = -1
76109
async for item in stream.subscribe([TOPIC_TICK], result_type=TickEvent):
77-
print(f"[SLOW] offset={item.offset:3d} n={item.data.n}")
110+
if last_offset >= 0 and item.offset > last_offset + 1:
111+
gap = item.offset - last_offset - 1
112+
emit_slow(
113+
f"↪ jumped offset={last_offset}{item.offset} "
114+
f"({gap} dropped)"
115+
)
116+
emit_slow(f"offset={item.offset:>3} n={item.data.n}")
117+
last_offset = item.offset
78118
if item.data.n == last_n:
79119
return
80120
await asyncio.sleep(SLOW_SUBSCRIBER_DELAY_S)
81121

82122
await asyncio.gather(fast_subscriber(), slow_subscriber())
83123

84124
result = await handle.result()
85-
print(f"\nworkflow result: {result}")
125+
print()
126+
print(f"workflow result: {result}")
86127

87128

88129
if __name__ == "__main__":

0 commit comments

Comments
 (0)