Skip to content

Commit 44d944b

Browse files
committed
samples: workflow_streams: drop race_with_workflow helper
The helper wrapped the consumer in an asyncio.gather that cancelled the subscriber when the workflow result settled — defensive logic for a case the SDK already handles. WorkflowStreamClient.subscribe() exits cleanly on every workflow terminal state (return, continue-as-new, failure) via its AcceptedUpdateCompletedWorkflow, WorkflowUpdateRPCTimeoutOrCancelledError, and NOT_FOUND branches in sdk-python. The async-for loop ends naturally when the workflow terminates without a publish, so we don't need a separate task to race against handle.result(). Replace the helper with the obvious shape in both runners: async for item in stream.subscribe(...): ... if item.is_terminator: break result = await handle.result() # raises on workflow failure Either path reaches handle.result(): an explicit break on the in-band terminator (workflow still running, hold-open lets the poll deliver the event), or the iterator naturally exhausting when the workflow has already terminated. handle.result() then either returns or raises the workflow's failure — covering the LLM "activity exhausted retries" case that prompted the helper to be added in the first place. Smoke tested: uv run workflow_streams/run_publisher.py uv run workflow_streams/run_llm.py
1 parent c8663e5 commit 44d944b

3 files changed

Lines changed: 53 additions & 111 deletions

File tree

workflow_streams/run_llm.py

Lines changed: 32 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
TextComplete,
4545
TextDelta,
4646
)
47-
from workflow_streams.shared import race_with_workflow
4847
from workflow_streams.workflows.llm_workflow import LLMWorkflow
4948

5049
# Long enough that you can comfortably kill the worker mid-stream and
@@ -94,39 +93,38 @@ async def main() -> None:
9493

9594
stream = WorkflowStreamClient.create(client, workflow_id)
9695

97-
async def consume() -> None:
98-
# Subscribe to all three topics on a single iterator.
99-
# result_type=RawValue lets us dispatch on item.topic and
100-
# decode against the right dataclass per topic.
101-
async for item in stream.subscribe(
102-
[TOPIC_DELTA, TOPIC_RETRY, TOPIC_COMPLETE],
103-
result_type=RawValue,
104-
):
105-
if item.topic == TOPIC_RETRY:
106-
evt = converter.from_payload(item.data.payload, RetryEvent)
107-
sys.stdout.write(ANSI_RESTORE_AND_CLEAR)
108-
sys.stdout.flush()
109-
print(f"[retry attempt {evt.attempt}] resetting output")
110-
print()
111-
sys.stdout.write(ANSI_SAVE)
112-
sys.stdout.flush()
113-
elif item.topic == TOPIC_DELTA:
114-
delta = converter.from_payload(item.data.payload, TextDelta)
115-
sys.stdout.write(delta.text)
116-
sys.stdout.flush()
117-
elif item.topic == TOPIC_COMPLETE:
118-
# The full text is also in the payload (and returned
119-
# by the workflow), but the consumer has already
120-
# rendered it incrementally. Just terminate the line.
121-
converter.from_payload(item.data.payload, TextComplete)
122-
print()
123-
return
124-
125-
# Race the subscriber against the workflow result so that if the
126-
# activity exhausts its retries the workflow's failure surfaces
127-
# here rather than leaving the subscriber blocked on a `complete`
128-
# that will never arrive.
129-
result = await race_with_workflow(consume(), handle)
96+
# result_type=RawValue lets us dispatch on item.topic and decode
97+
# against the right dataclass per topic. The loop ends either on
98+
# the `complete` terminator (break) or because the iterator
99+
# naturally exhausts when the workflow reaches a terminal state
100+
# without one (activity exhausted retries, etc.). Either way the
101+
# handle.result() below either returns the full text or raises
102+
# the workflow's failure.
103+
async for item in stream.subscribe(
104+
[TOPIC_DELTA, TOPIC_RETRY, TOPIC_COMPLETE],
105+
result_type=RawValue,
106+
):
107+
if item.topic == TOPIC_RETRY:
108+
evt = converter.from_payload(item.data.payload, RetryEvent)
109+
sys.stdout.write(ANSI_RESTORE_AND_CLEAR)
110+
sys.stdout.flush()
111+
print(f"[retry attempt {evt.attempt}] resetting output")
112+
print()
113+
sys.stdout.write(ANSI_SAVE)
114+
sys.stdout.flush()
115+
elif item.topic == TOPIC_DELTA:
116+
delta = converter.from_payload(item.data.payload, TextDelta)
117+
sys.stdout.write(delta.text)
118+
sys.stdout.flush()
119+
elif item.topic == TOPIC_COMPLETE:
120+
# The full text is also in the payload (and returned by
121+
# the workflow), but the consumer has already rendered it
122+
# incrementally. Just terminate the line.
123+
converter.from_payload(item.data.payload, TextComplete)
124+
print()
125+
break
126+
127+
result = await handle.result()
130128
print(f"\n[workflow result: {len(result)} chars]")
131129

132130

workflow_streams/run_publisher.py

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
OrderInput,
1515
ProgressEvent,
1616
StatusEvent,
17-
race_with_workflow,
1817
)
1918
from workflow_streams.workflows.order_workflow import OrderWorkflow
2019

@@ -33,24 +32,27 @@ async def main() -> None:
3332
stream = WorkflowStreamClient.create(client, workflow_id)
3433
converter = client.data_converter.payload_converter
3534

36-
async def consume() -> None:
37-
# Single iterator over both topics — avoids a cancellation race
38-
# between two concurrent subscribers. result_type=RawValue
39-
# delivers the underlying Payload so we can dispatch
40-
# heterogeneous events on item.topic.
41-
async for item in stream.subscribe(
42-
[TOPIC_STATUS, TOPIC_PROGRESS], result_type=RawValue
43-
):
44-
if item.topic == TOPIC_STATUS:
45-
evt = converter.from_payload(item.data.payload, StatusEvent)
46-
print(f"[status] {evt.kind}: order={evt.order_id}")
47-
if evt.kind == "complete":
48-
return
49-
elif item.topic == TOPIC_PROGRESS:
50-
progress = converter.from_payload(item.data.payload, ProgressEvent)
51-
print(f"[progress] {progress.message}")
52-
53-
result = await race_with_workflow(consume(), handle)
35+
# Single iterator over both topics — avoids a cancellation race
36+
# between two concurrent subscribers. result_type=RawValue
37+
# delivers the underlying Payload so we can dispatch heterogeneous
38+
# events on item.topic. The loop ends either on the in-band
39+
# `complete` terminator (break) or because the iterator exhausts
40+
# when the workflow reaches a terminal state without one (e.g. on
41+
# failure). Either way we then await handle.result(), which raises
42+
# if the workflow failed.
43+
async for item in stream.subscribe(
44+
[TOPIC_STATUS, TOPIC_PROGRESS], result_type=RawValue
45+
):
46+
if item.topic == TOPIC_STATUS:
47+
evt = converter.from_payload(item.data.payload, StatusEvent)
48+
print(f"[status] {evt.kind}: order={evt.order_id}")
49+
if evt.kind == "complete":
50+
break
51+
elif item.topic == TOPIC_PROGRESS:
52+
progress = converter.from_payload(item.data.payload, ProgressEvent)
53+
print(f"[progress] {progress.message}")
54+
55+
result = await handle.result()
5456
print(f"workflow result: {result}")
5557

5658

workflow_streams/shared.py

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
from __future__ import annotations
22

3-
import asyncio
4-
from collections.abc import Coroutine
53
from dataclasses import dataclass
6-
from typing import Any, TypeVar
74

8-
from temporalio.client import WorkflowHandle
95
from temporalio.contrib.workflow_streams import WorkflowStreamState
106

117
TASK_QUEUE = "workflow-stream-sample-task-queue"
@@ -72,57 +68,3 @@ class TickerInput:
7268
@dataclass
7369
class TickEvent:
7470
n: int
75-
76-
77-
T = TypeVar("T")
78-
79-
80-
async def race_with_workflow(
81-
consumer: Coroutine[Any, Any, None],
82-
handle: WorkflowHandle[Any, T],
83-
) -> T:
84-
"""Run a subscriber concurrently with the workflow.
85-
86-
If the workflow finishes before the subscriber sees its terminal
87-
event, cancel the subscriber and surface the workflow's result
88-
(raising on failure). If the subscriber finishes first, wait for
89-
the workflow result. A non-cancellation failure in the subscriber
90-
is propagated either way.
91-
92-
Without this, a workflow that raises before publishing its terminal
93-
event would leave the subscriber blocked on its next poll forever.
94-
"""
95-
consumer_task = asyncio.create_task(consumer)
96-
result_task = asyncio.create_task(handle.result())
97-
we_cancelled_consumer = False
98-
try:
99-
await asyncio.wait(
100-
[consumer_task, result_task],
101-
return_when=asyncio.FIRST_COMPLETED,
102-
)
103-
if not consumer_task.done():
104-
consumer_task.cancel()
105-
we_cancelled_consumer = True
106-
# gather(return_exceptions=True) drains both tasks. Only
107-
# cancellation we initiated is expected — anything else
108-
# propagates.
109-
consumer_outcome, workflow_outcome = await asyncio.gather(
110-
consumer_task, result_task, return_exceptions=True
111-
)
112-
if isinstance(consumer_outcome, asyncio.CancelledError):
113-
if not we_cancelled_consumer:
114-
raise consumer_outcome
115-
elif isinstance(consumer_outcome, BaseException):
116-
raise consumer_outcome
117-
if isinstance(workflow_outcome, BaseException):
118-
raise workflow_outcome
119-
return workflow_outcome
120-
finally:
121-
for task in (consumer_task, result_task):
122-
if not task.done():
123-
task.cancel()
124-
for task in (consumer_task, result_task):
125-
try:
126-
await task
127-
except BaseException:
128-
pass

0 commit comments

Comments
 (0)