|
46 | 46 | ) |
47 | 47 | from workflow_streams.workflows.chat_workflow import ChatWorkflow |
48 | 48 |
|
49 | | -# A prompt long enough that you can comfortably kill the worker |
50 | | -# mid-stream and watch the retry render. Adjust to taste. |
| 49 | +# Long enough that you can comfortably kill the worker mid-stream and |
| 50 | +# watch the retry render. Adjust to taste. |
51 | 51 | DEFAULT_PROMPT = ( |
52 | | - "Write a 250-word friendly explainer for a new engineer about why " |
53 | | - "durable execution matters in distributed systems. Use short " |
54 | | - "paragraphs and a couple of concrete examples." |
| 52 | + "Write a 500-word comparison of Paxos, Raft, and Viewstamped " |
| 53 | + "Replication for a new distributed-systems engineer. Cover the " |
| 54 | + "core ideas, leader election, normal-case operation, " |
| 55 | + "reconfiguration, and the practical tradeoffs that show up when " |
| 56 | + "implementing each. Use short paragraphs." |
55 | 57 | ) |
56 | 58 |
|
57 | 59 |
|
58 | | -def _ansi_clear(line_count: int) -> None: |
59 | | - """Move the cursor up `line_count` lines and clear to end of screen. |
60 | | -
|
61 | | - Used on RETRY to throw away the failed attempt's rendered output |
62 | | - before the retried attempt starts. Counts logical newlines in the |
63 | | - rendered text; a long line that wraps in the terminal won't be |
64 | | - fully cleared by this — accept the rough edges, ``rich`` would do |
65 | | - it cleanly but we are deliberately stdlib-only here. |
66 | | - """ |
67 | | - sys.stdout.write("\r") |
68 | | - if line_count > 0: |
69 | | - sys.stdout.write(f"\033[{line_count}A") |
70 | | - sys.stdout.write("\033[J") |
71 | | - sys.stdout.flush() |
| 60 | +# ANSI cursor save / restore. ``\033[s`` saves the current cursor |
| 61 | +# position, ``\033[u`` restores it, ``\033[J`` clears from the cursor |
| 62 | +# to the end of the screen. Save once before the first delta, and on |
| 63 | +# RETRY restore + clear-to-end so the failed attempt's rendered output |
| 64 | +# disappears regardless of how it was wrapped by the terminal. Save |
| 65 | +# again afterwards so a second retry can rewind to the same point. |
| 66 | +ANSI_SAVE = "\033[s" |
| 67 | +ANSI_RESTORE_AND_CLEAR = "\033[u\033[J" |
72 | 68 |
|
73 | 69 |
|
74 | 70 | async def main() -> None: |
75 | 71 | client = await Client.connect("localhost:7233") |
76 | 72 | converter = client.data_converter.payload_converter |
77 | 73 |
|
78 | 74 | workflow_id = f"workflow-stream-chat-{uuid.uuid4().hex[:8]}" |
| 75 | + chat_input = ChatInput(prompt=DEFAULT_PROMPT) |
79 | 76 | handle = await client.start_workflow( |
80 | 77 | ChatWorkflow.run, |
81 | | - ChatInput(prompt=DEFAULT_PROMPT), |
| 78 | + chat_input, |
82 | 79 | id=workflow_id, |
83 | 80 | task_queue=CHAT_TASK_QUEUE, |
84 | 81 | ) |
85 | 82 |
|
| 83 | + # Print a header so the user sees something immediately. The |
| 84 | + # response will start streaming below it once the first delta |
| 85 | + # arrives — until then this is the only line on screen. |
| 86 | + print( |
| 87 | + f"[chat {workflow_id}] streaming response from {chat_input.model}, " |
| 88 | + f"awaiting first token..." |
| 89 | + ) |
| 90 | + print() |
| 91 | + sys.stdout.write(ANSI_SAVE) |
| 92 | + sys.stdout.flush() |
| 93 | + |
86 | 94 | stream = WorkflowStreamClient.create(client, workflow_id) |
87 | 95 |
|
88 | | - # Subscribe to all three topics on a single iterator. result_type= |
89 | | - # RawValue lets us dispatch on item.topic and decode against the |
90 | | - # right dataclass per topic. |
91 | | - accumulated: list[str] = [] |
| 96 | + # Subscribe to all three topics on a single iterator. |
| 97 | + # result_type=RawValue lets us dispatch on item.topic and decode |
| 98 | + # against the right dataclass per topic. |
92 | 99 | async for item in stream.subscribe( |
93 | 100 | [TOPIC_DELTA, TOPIC_RETRY, TOPIC_COMPLETE], |
94 | 101 | result_type=RawValue, |
95 | 102 | ): |
96 | 103 | if item.topic == TOPIC_RETRY: |
97 | 104 | evt = converter.from_payload(item.data.payload, RetryEvent) |
98 | | - line_count = "".join(accumulated).count("\n") |
99 | | - _ansi_clear(line_count) |
100 | | - print(f"[retry attempt {evt.attempt}] resetting output\n") |
101 | | - accumulated.clear() |
| 105 | + sys.stdout.write(ANSI_RESTORE_AND_CLEAR) |
| 106 | + sys.stdout.flush() |
| 107 | + print(f"[retry attempt {evt.attempt}] resetting output") |
| 108 | + print() |
| 109 | + sys.stdout.write(ANSI_SAVE) |
| 110 | + sys.stdout.flush() |
102 | 111 | elif item.topic == TOPIC_DELTA: |
103 | 112 | delta = converter.from_payload(item.data.payload, TextDelta) |
104 | | - accumulated.append(delta.text) |
105 | 113 | sys.stdout.write(delta.text) |
106 | 114 | sys.stdout.flush() |
107 | 115 | elif item.topic == TOPIC_COMPLETE: |
108 | | - # Newline so the prompt isn't on the same line as the |
109 | | - # last delta. The TextComplete payload is the full text |
110 | | - # (also returned by the workflow), but the consumer has |
111 | | - # already rendered it incrementally so we don't reprint. |
| 116 | + # The full text is also in the payload (and returned by |
| 117 | + # the workflow), but the consumer has already rendered it |
| 118 | + # incrementally. Just terminate the line. |
112 | 119 | converter.from_payload(item.data.payload, TextComplete) |
113 | 120 | print() |
114 | 121 | break |
|
0 commit comments