Skip to content

Commit c8663e5

Browse files
committed
samples: workflow_streams: race the LLM consumer with workflow result
If the LLM activity exhausts its retries (bad OPENAI_API_KEY, provider outage, etc.), the workflow fails before the activity publishes the `complete` terminator. The consumer's previous async-for loop only exited on `complete`, so the script blocked indefinitely on a terminator that would never arrive instead of surfacing the workflow failure. Wrap the subscriber in a `consume()` coroutine and run it through the existing `race_with_workflow` helper (the same pattern `run_publisher.py` uses): if the workflow finishes first the subscriber gets cancelled and the workflow's exception propagates; if the subscriber sees `complete` first, the helper waits for the workflow result and returns it. Found in a Codex code review of today's workflow_streams changes.
1 parent 81bf605 commit c8663e5

1 file changed

Lines changed: 34 additions & 28 deletions

File tree

workflow_streams/run_llm.py

Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
TextComplete,
4545
TextDelta,
4646
)
47+
from workflow_streams.shared import race_with_workflow
4748
from workflow_streams.workflows.llm_workflow import LLMWorkflow
4849

4950
# Long enough that you can comfortably kill the worker mid-stream and
@@ -93,34 +94,39 @@ async def main() -> None:
9394

9495
stream = WorkflowStreamClient.create(client, workflow_id)
9596

96-
# Subscribe to all three topics on a single iterator.
97-
# result_type=RawValue lets us dispatch on item.topic and decode
98-
# against the right dataclass per topic.
99-
async for item in stream.subscribe(
100-
[TOPIC_DELTA, TOPIC_RETRY, TOPIC_COMPLETE],
101-
result_type=RawValue,
102-
):
103-
if item.topic == TOPIC_RETRY:
104-
evt = converter.from_payload(item.data.payload, RetryEvent)
105-
sys.stdout.write(ANSI_RESTORE_AND_CLEAR)
106-
sys.stdout.flush()
107-
print(f"[retry attempt {evt.attempt}] resetting output")
108-
print()
109-
sys.stdout.write(ANSI_SAVE)
110-
sys.stdout.flush()
111-
elif item.topic == TOPIC_DELTA:
112-
delta = converter.from_payload(item.data.payload, TextDelta)
113-
sys.stdout.write(delta.text)
114-
sys.stdout.flush()
115-
elif item.topic == TOPIC_COMPLETE:
116-
# The full text is also in the payload (and returned by
117-
# the workflow), but the consumer has already rendered it
118-
# incrementally. Just terminate the line.
119-
converter.from_payload(item.data.payload, TextComplete)
120-
print()
121-
break
122-
123-
result = await handle.result()
97+
async def consume() -> None:
98+
# Subscribe to all three topics on a single iterator.
99+
# result_type=RawValue lets us dispatch on item.topic and
100+
# decode against the right dataclass per topic.
101+
async for item in stream.subscribe(
102+
[TOPIC_DELTA, TOPIC_RETRY, TOPIC_COMPLETE],
103+
result_type=RawValue,
104+
):
105+
if item.topic == TOPIC_RETRY:
106+
evt = converter.from_payload(item.data.payload, RetryEvent)
107+
sys.stdout.write(ANSI_RESTORE_AND_CLEAR)
108+
sys.stdout.flush()
109+
print(f"[retry attempt {evt.attempt}] resetting output")
110+
print()
111+
sys.stdout.write(ANSI_SAVE)
112+
sys.stdout.flush()
113+
elif item.topic == TOPIC_DELTA:
114+
delta = converter.from_payload(item.data.payload, TextDelta)
115+
sys.stdout.write(delta.text)
116+
sys.stdout.flush()
117+
elif item.topic == TOPIC_COMPLETE:
118+
# The full text is also in the payload (and returned
119+
# by the workflow), but the consumer has already
120+
# rendered it incrementally. Just terminate the line.
121+
converter.from_payload(item.data.payload, TextComplete)
122+
print()
123+
return
124+
125+
# Race the subscriber against the workflow result so that if the
126+
# activity exhausts its retries the workflow's failure surfaces
127+
# here rather than leaving the subscriber blocked on a `complete`
128+
# that will never arrive.
129+
result = await race_with_workflow(consume(), handle)
124130
print(f"\n[workflow result: {len(result)} chars]")
125131

126132

0 commit comments

Comments
 (0)