Skip to content

Commit 01f3ded

Browse files
committed
Cleaned things up and got everything functioning
1 parent a2ca8cc commit 01f3ded

6 files changed

Lines changed: 65 additions & 56 deletions

File tree

batch_sliding_window/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ A single instance of `SlidingWindowWorkflow` has limited window size and through
1212

1313
To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the worker:
1414

15-
uv run run_worker.py
15+
uv run worker.py
1616

1717
This will start the worker. Then, in another terminal, run the following to execute the workflow:
1818

19-
uv run run_starter.py
19+
uv run starter.py
2020

2121
The workflow will process 90 records using a sliding window of 10 parallel workers across 3 partitions, with a page size of 5 records per continue-as-new iteration.

batch_sliding_window/record_processor_workflow.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@ async def run(self, record: SingleRecord) -> None:
2121
# But for unit testing it might be useful to skip the notification if there is none.
2222
if parent:
2323
# Don't specify run_id as parent calls continue-as-new
24-
await workflow.external_workflow_handle(parent.workflow_id).signal(
25-
"report_completion", record.id
26-
)
24+
handle = workflow.get_external_workflow_handle(parent.workflow_id)
25+
await handle.signal("report_completion", record.id)
2726

2827
async def _process_record(self, record: SingleRecord) -> None:
2928
"""Simulate application specific record processing."""

batch_sliding_window/run_starter.py

Lines changed: 0 additions & 46 deletions
This file was deleted.

batch_sliding_window/sliding_window_workflow.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class SlidingWindowWorkflow:
3939

4040
def __init__(self):
4141
self.current_records: Set[int] = set()
42-
self.children_started_by_this_run: List[workflow.ChildWorkflowHandle] = []
42+
self.children_started_by_this_run = []
4343
self.offset = 0
4444
self.progress = 0
4545
self._completion_signals_received = 0
@@ -117,9 +117,8 @@ async def _continue_as_new_or_complete(self, input: SlidingWindowWorkflowInput)
117117
new_offset = input.offset + len(self.children_started_by_this_run)
118118

119119
if new_offset < input.maximum_offset:
120-
# Wait for all children started by this run to begin execution
121-
for child in self.children_started_by_this_run:
122-
await child.get_workflow_execution()
120+
# In Python, await start_child_workflow() already waits until
121+
# the start has been accepted by the server, so no additional wait needed
123122

124123
# Continue-as-new with updated state
125124
new_input = SlidingWindowWorkflowInput(

batch_sliding_window/starter.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
#!/usr/bin/env python3
2+
"""Starter for the batch sliding window sample."""
3+
4+
import asyncio
5+
import datetime
6+
import logging
7+
8+
from temporalio.client import Client
9+
10+
from batch_sliding_window.batch_workflow import ProcessBatchWorkflow, ProcessBatchWorkflowInput
11+
12+
13+
async def main():
14+
"""Start the ProcessBatchWorkflow."""
15+
# Set up logging
16+
logging.basicConfig(level=logging.INFO)
17+
18+
# Create client
19+
client = await Client.connect("localhost:7233")
20+
21+
# Create unique workflow ID with timestamp
22+
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
23+
workflow_id = f"batch_sliding_window_example_{timestamp}"
24+
25+
# Define workflow input
26+
workflow_input = ProcessBatchWorkflowInput(
27+
page_size=5,
28+
sliding_window_size=10,
29+
partitions=3,
30+
)
31+
32+
print(f"Starting workflow with ID: {workflow_id}")
33+
print(f"Input: {workflow_input}")
34+
35+
# Start workflow
36+
handle = await client.start_workflow(
37+
ProcessBatchWorkflow.run,
38+
workflow_input,
39+
id=workflow_id,
40+
task_queue="batch_sliding_window_task_queue",
41+
)
42+
43+
print(f"Workflow started with ID: {handle.id}")
44+
print(f"Waiting for workflow to complete...")
45+
46+
# Wait for result
47+
try:
48+
result = await handle.result()
49+
print(f"Workflow completed successfully!")
50+
print(f"Total records processed: {result}")
51+
except Exception as e:
52+
print(f"Workflow failed with error: {e}")
53+
raise
54+
55+
56+
if __name__ == "__main__":
57+
asyncio.run(main())
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ async def main():
2727
# Create worker
2828
temporal_worker = worker.Worker(
2929
client,
30-
task_queue="batch_sliding_window",
30+
task_queue="batch_sliding_window_task_queue",
3131
workflows=[
3232
ProcessBatchWorkflow,
3333
SlidingWindowWorkflow,

0 commit comments

Comments
 (0)