Skip to content

Commit 6294691

Browse files
committed
workflow_streams: deliver terminal events + fix run_publisher subscribe shape
End-to-end runs of the four workflow_streams scenarios surfaced two sample-side issues, both fixed here. run_publisher's consumer asserted ``isinstance(item.data, Payload)`` and called ``payload_converter.from_payload(item.data, T)``. The contrib's ``subscribe()`` defaults to converter-decoded data, not raw payloads, so this assertion fired on the first run. Switch to ``result_type=RawValue`` (the documented escape hatch for heterogeneous topics) and read ``item.data.payload``. Items published in the same workflow task that returns from ``@workflow.run`` were not delivered to subscribers — the in-memory log dies with the workflow and the next subscriber poll lands on a completed workflow. Fix: each scenario now uses an in-band terminator that subscribers break on, and each workflow holds the run open with ``await workflow.sleep(timedelta(milliseconds=500))`` so that final publish is fetched before the workflow exits: - OrderWorkflow / PipelineWorkflow: the workflow's own ``StatusEvent(kind="complete")`` / ``StageEvent(stage="complete")`` is the terminator (consumers already broke on it). - HubWorkflow: the *publisher* in run_external_publisher emits a sentinel ``NewsEvent(headline="__done__")`` immediately before signaling close; the consumer breaks on the sentinel. - TickerWorkflow: the final tick (n == count - 1) is the terminator; ``keep_last`` guarantees that offset survives the last truncation, so even slow consumers reach it. Because subscribers stop polling on the terminator, by the time ``workflow.run`` returns there are no in-flight poll handlers — no ``UnfinishedUpdateHandlersWarning`` from the SDK and no need for ``detach_pollers()`` / ``wait_condition(all_handlers_finished)`` in the workflow exit path. Two consecutive end-to-end runs of all four scenarios pass cleanly against ``temporal server start-dev --headless``.
1 parent 5d67b9e commit 6294691

7 files changed

Lines changed: 53 additions & 13 deletions

File tree

workflow_streams/run_external_publisher.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,13 @@
4444
"regulator opens probe",
4545
]
4646

47+
# In-band terminator the publisher emits before signaling close. The
48+
# subscriber recognizes this value and stops polling — without an
49+
# explicit terminator the consumer would have to rely on the workflow
50+
# returning to break the iterator, which means racing the last item
51+
# delivery against workflow completion.
52+
DONE_HEADLINE = "__done__"
53+
4754

4855
async def main() -> None:
4956
client = await Client.connect("localhost:7233")
@@ -69,9 +76,10 @@ async def publish_news() -> None:
6976
news.publish(NewsEvent(headline=headline))
7077
print(f"[publisher] sent: {headline}")
7178
await asyncio.sleep(0.5)
79+
news.publish(NewsEvent(headline=DONE_HEADLINE), force_flush=True)
7280
await producer.flush()
73-
# Tell the hub it can stop. The workflow's run() returns, and
74-
# any in-flight subscribers see their async-for loop exit.
81+
# Tell the hub it can stop. The subscriber has already broken
82+
# out of its async-for loop on the sentinel above.
7583
await handle.signal(HubWorkflow.close)
7684
print("[publisher] signaled close")
7785

@@ -80,6 +88,8 @@ async def consume_news() -> None:
8088
async for item in consumer.subscribe(
8189
[TOPIC_NEWS], result_type=NewsEvent
8290
):
91+
if item.data.headline == DONE_HEADLINE:
92+
return
8393
print(f"[subscriber] offset={item.offset}: {item.data.headline}")
8494

8595
await asyncio.gather(publish_news(), consume_news())

workflow_streams/run_publisher.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
import asyncio
44
import uuid
55

6-
from temporalio.api.common.v1 import Payload
76
from temporalio.client import Client
7+
from temporalio.common import RawValue
88
from temporalio.contrib.workflow_streams import WorkflowStreamClient
99

1010
from workflow_streams.shared import (
@@ -35,17 +35,19 @@ async def main() -> None:
3535

3636
async def consume() -> None:
3737
# Single iterator over both topics — avoids a cancellation race
38-
# between two concurrent subscribers. result_type is left unset
39-
# so we can dispatch heterogeneous events on item.topic.
40-
async for item in stream.subscribe([TOPIC_STATUS, TOPIC_PROGRESS]):
41-
assert isinstance(item.data, Payload)
38+
# between two concurrent subscribers. result_type=RawValue
39+
# delivers the underlying Payload so we can dispatch
40+
# heterogeneous events on item.topic.
41+
async for item in stream.subscribe(
42+
[TOPIC_STATUS, TOPIC_PROGRESS], result_type=RawValue
43+
):
4244
if item.topic == TOPIC_STATUS:
43-
evt = converter.from_payload(item.data, StatusEvent)
45+
evt = converter.from_payload(item.data.payload, StatusEvent)
4446
print(f"[status] {evt.kind}: order={evt.order_id}")
4547
if evt.kind == "complete":
4648
return
4749
elif item.topic == TOPIC_PROGRESS:
48-
progress = converter.from_payload(item.data, ProgressEvent)
50+
progress = converter.from_payload(item.data.payload, ProgressEvent)
4951
print(f"[progress] {progress.message}")
5052

5153
result = await race_with_workflow(consume(), handle)

workflow_streams/run_truncating_ticker.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242

4343
SLOW_SUBSCRIBER_DELAY_S = 1.5
44+
TICKER_COUNT = 20
4445

4546

4647
async def main() -> None:
@@ -50,7 +51,7 @@ async def main() -> None:
5051
handle = await client.start_workflow(
5152
TickerWorkflow.run,
5253
TickerInput(
53-
count=20,
54+
count=TICKER_COUNT,
5455
keep_last=3,
5556
truncate_every=5,
5657
interval_ms=400,
@@ -60,19 +61,24 @@ async def main() -> None:
6061
)
6162

6263
stream = WorkflowStreamClient.create(client, workflow_id)
64+
last_n = TICKER_COUNT - 1
6365

66+
# Both subscribers break on the final tick (n == last_n). ``keep_last``
67+
# ensures that offset survives the last truncation so even the slow
68+
# consumer reaches it.
6469
async def fast_subscriber() -> None:
6570
async for item in stream.subscribe([TOPIC_TICK], result_type=TickEvent):
6671
print(f"[fast] offset={item.offset:3d} n={item.data.n}")
72+
if item.data.n == last_n:
73+
return
6774

6875
async def slow_subscriber() -> None:
6976
async for item in stream.subscribe([TOPIC_TICK], result_type=TickEvent):
7077
print(f"[SLOW] offset={item.offset:3d} n={item.data.n}")
78+
if item.data.n == last_n:
79+
return
7180
await asyncio.sleep(SLOW_SUBSCRIBER_DELAY_S)
7281

73-
# Both iterators exit normally when the workflow completes. No
74-
# terminal sentinel is needed — see the doc's "When the Workflow
75-
# run completes" note.
7682
await asyncio.gather(fast_subscriber(), slow_subscriber())
7783

7884
result = await handle.result()

workflow_streams/workflows/hub_workflow.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from __future__ import annotations
22

3+
from datetime import timedelta
4+
35
from temporalio import workflow
46
from temporalio.contrib.workflow_streams import WorkflowStream
57

@@ -26,6 +28,11 @@ def __init__(self, input: HubInput) -> None:
2628
@workflow.run
2729
async def run(self, input: HubInput) -> str:
2830
await workflow.wait_condition(lambda: self._closed)
31+
# The publisher publishes its own terminator into the stream
32+
# before signaling close (see run_external_publisher.py).
33+
# Hold the run open briefly so subscribers' final poll
34+
# delivers any items still in the log.
35+
await workflow.sleep(timedelta(milliseconds=500))
2936
return f"hub {input.hub_id} closed"
3037

3138
@workflow.signal

workflow_streams/workflows/order_workflow.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,9 @@ async def run(self, input: OrderInput) -> str:
5050
self.status.publish(StatusEvent(kind="shipped", order_id=input.order_id))
5151
self.progress.publish(ProgressEvent(message=f"charge id: {charge_id}"))
5252
self.status.publish(StatusEvent(kind="complete", order_id=input.order_id))
53+
# The "complete" status event above is the in-band terminator
54+
# subscribers break on (see run_publisher.py). Hold the run
55+
# open briefly so subscribers' next poll delivers it before
56+
# this task returns and the in-memory log is gone.
57+
await workflow.sleep(timedelta(milliseconds=500))
5358
return charge_id

workflow_streams/workflows/pipeline_workflow.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,8 @@ async def run(self, input: PipelineInput) -> str:
4141
self.status.publish(StageEvent(stage=stage))
4242
if stage != "complete":
4343
await workflow.sleep(timedelta(seconds=2))
44+
# The "complete" stage above is the in-band terminator
45+
# subscribers break on. Hold the run open briefly so the final
46+
# poll delivers it.
47+
await workflow.sleep(timedelta(milliseconds=500))
4448
return f"pipeline {input.pipeline_id} done"

workflow_streams/workflows/ticker_workflow.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,10 @@ async def run(self, input: TickerInput) -> str:
5757
# Drop everything except the last `keep_last` entries.
5858
truncate_to = self._published - input.keep_last
5959
self.stream.truncate(truncate_to)
60+
# The final tick (n == count - 1) is the in-band terminator
61+
# subscribers break on. ``keep_last`` guarantees that final
62+
# offset survives the last truncation so even slow consumers
63+
# eventually see it. Hold the run open briefly so the final
64+
# poll delivers it.
65+
await workflow.sleep(timedelta(milliseconds=500))
6066
return f"ticker emitted {self._published} events"

0 commit comments

Comments
 (0)