Skip to content

Commit bfc170c

Browse files
committed
translated Go sample of sliding window to be writting in python
1 parent 090b96d commit bfc170c

8 files changed

Lines changed: 517 additions & 0 deletions

File tree

batch-sliding-window/README.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
## Sliding Window Batch Sample
2+
3+
A sample implementation of a batch processing Workflow that maintains a sliding window of record processing Workflows.
4+
5+
A SlidingWindowWorkflow starts a configured number (sliding window size) of RecordProcessorWorkflow children in parallel.
6+
Each child processes a single record. When a child completes a new child is started.
7+
8+
A SlidingWindowWorkflow calls continue-as-new after starting a preconfigured number of children to keep its history size bounded.
9+
A RecordProcessorWorkflow reports its completion through a Signal to its parent.
10+
This allows to notify a parent that called continue-as-new.
11+
12+
A single instance of SlidingWindowWorkflow has limited window size and throughput.
13+
To support larger window size and overall throughput multiple instances of SlidingWindowWorkflow run in parallel.
14+
15+
#### Running the Sliding Window Batch Sample
16+
17+
Make sure the [Temporal Server is running locally](https://learn.temporal.io/getting_started/python/dev_environment/#set-up-a-local-temporal-service-for-development-with-temporal-cli).
18+
19+
From the root of the project, start a Worker:
20+
21+
```bash
22+
python python-samples/batch-sliding-window/worker.py
23+
```
24+
25+
Start the Workflow Execution:
26+
27+
```bash
28+
python python-samples/batch-sliding-window/starter.py
29+
```

batch-sliding-window/__init__.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
"""Sliding Window Batch Processing Sample.
2+
3+
This sample demonstrates a batch processing workflow that maintains a sliding window
4+
of record processing workflows. It includes:
5+
6+
- ProcessBatchWorkflow: Main workflow that partitions work across multiple sliding windows
7+
- SlidingWindowWorkflow: Implements the sliding window pattern with continue-as-new
8+
- RecordProcessorWorkflow: Processes individual records
9+
- RecordLoader: Activity for loading records from external sources
10+
"""
11+
12+
from .batch_workflow import ProcessBatchWorkflow, ProcessBatchWorkflowInput
13+
from .sliding_window_workflow import (
14+
SlidingWindowWorkflow,
15+
SlidingWindowWorkflowInput,
16+
SlidingWindowState,
17+
)
18+
from .record_processor_workflow import RecordProcessorWorkflow
19+
from .record_loader_activity import (
20+
RecordLoader,
21+
GetRecordsInput,
22+
GetRecordsOutput,
23+
SingleRecord,
24+
)
25+
26+
__all__ = [
27+
"ProcessBatchWorkflow",
28+
"ProcessBatchWorkflowInput",
29+
"SlidingWindowWorkflow",
30+
"SlidingWindowWorkflowInput",
31+
"SlidingWindowState",
32+
"RecordProcessorWorkflow",
33+
"RecordLoader",
34+
"GetRecordsInput",
35+
"GetRecordsOutput",
36+
"SingleRecord",
37+
]
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
from dataclasses import dataclass
2+
from typing import List
3+
import asyncio
4+
5+
from temporalio import workflow
6+
from temporalio.common import WorkflowIDReusePolicy
7+
from temporalio.exceptions import ApplicationError
8+
9+
from .record_loader_activity import RecordLoader
10+
from .sliding_window_workflow import SlidingWindowWorkflow, SlidingWindowWorkflowInput
11+
12+
13+
@dataclass
14+
class ProcessBatchWorkflowInput:
15+
"""Input for the ProcessBatchWorkflow.
16+
17+
A single input structure is preferred to multiple workflow arguments
18+
to simplify backward compatible API changes.
19+
"""
20+
page_size: int # Number of children started by a single sliding window workflow run
21+
sliding_window_size: int # Maximum number of children to run in parallel
22+
partitions: int # How many sliding windows to run in parallel
23+
24+
25+
@workflow.defn
26+
class ProcessBatchWorkflow:
27+
"""Sample workflow that partitions the data set into continuous ranges.
28+
29+
A real application can choose any other way to divide the records
30+
into multiple collections.
31+
"""
32+
33+
@workflow.run
34+
async def run(self, input: ProcessBatchWorkflowInput) -> int:
35+
# Get total record count
36+
record_count = await workflow.execute_activity(
37+
RecordLoader.get_record_count,
38+
start_to_close_timeout=workflow.timedelta(seconds=5),
39+
)
40+
41+
if input.sliding_window_size < input.partitions:
42+
raise ApplicationError(
43+
"SlidingWindowSize cannot be less than number of partitions"
44+
)
45+
46+
partitions = self._divide_into_partitions(record_count, input.partitions)
47+
window_sizes = self._divide_into_partitions(input.sliding_window_size, input.partitions)
48+
49+
workflow.logger.info(
50+
f"ProcessBatchWorkflow started",
51+
extra={
52+
"input": input,
53+
"record_count": record_count,
54+
"partitions": partitions,
55+
"window_sizes": window_sizes,
56+
}
57+
)
58+
59+
# Start child workflows for each partition
60+
tasks = []
61+
offset = 0
62+
63+
for i in range(input.partitions):
64+
# Make child id more user-friendly
65+
child_id = f"{workflow.info().workflow_id}/{i}"
66+
67+
# Define partition boundaries
68+
maximum_partition_offset = offset + partitions[i]
69+
if maximum_partition_offset > record_count:
70+
maximum_partition_offset = record_count
71+
72+
child_input = SlidingWindowWorkflowInput(
73+
page_size=input.page_size,
74+
sliding_window_size=window_sizes[i],
75+
offset=offset, # inclusive
76+
maximum_offset=maximum_partition_offset, # exclusive
77+
progress=0,
78+
current_records=None,
79+
)
80+
81+
task = workflow.execute_child_workflow(
82+
SlidingWindowWorkflow.run,
83+
child_input,
84+
id=child_id,
85+
id_reuse_policy=WorkflowIDReusePolicy.ALLOW_DUPLICATE,
86+
)
87+
tasks.append(task)
88+
offset += partitions[i]
89+
90+
# Wait for all child workflows to complete
91+
results = await asyncio.gather(*tasks)
92+
return sum(results)
93+
94+
def _divide_into_partitions(self, number: int, n: int) -> List[int]:
95+
"""Divide a number into n partitions as evenly as possible."""
96+
base = number // n
97+
remainder = number % n
98+
partitions = [base] * n
99+
100+
for i in range(remainder):
101+
partitions[i] += 1
102+
103+
return partitions
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
from dataclasses import dataclass
2+
from typing import List
3+
4+
from temporalio import activity
5+
6+
7+
@dataclass
8+
class GetRecordsInput:
9+
"""Input for the GetRecords activity."""
10+
page_size: int
11+
offset: int
12+
max_offset: int
13+
14+
15+
@dataclass
16+
class SingleRecord:
17+
"""Represents a single record to be processed."""
18+
id: int
19+
20+
21+
@dataclass
22+
class GetRecordsOutput:
23+
"""Output from the GetRecords activity."""
24+
records: List[SingleRecord]
25+
26+
27+
class RecordLoader:
28+
"""Activities for loading records from an external data source."""
29+
30+
def __init__(self, record_count: int):
31+
self.record_count = record_count
32+
33+
@activity.defn
34+
async def get_record_count(self) -> int:
35+
"""Get the total record count.
36+
37+
Used to partition processing across parallel sliding windows.
38+
The sample implementation just returns a fake value passed during worker initialization.
39+
"""
40+
return self.record_count
41+
42+
@activity.defn
43+
async def get_records(self, input: GetRecordsInput) -> GetRecordsOutput:
44+
"""Get records loaded from an external data source.
45+
46+
The sample returns fake records.
47+
"""
48+
if input.max_offset > self.record_count:
49+
raise ValueError(f"max_offset({input.max_offset}) > record_count({self.record_count})")
50+
51+
limit = min(input.offset + input.page_size, input.max_offset)
52+
records = [SingleRecord(id=i) for i in range(input.offset, limit)]
53+
54+
return GetRecordsOutput(records=records)
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import asyncio
2+
import random
3+
4+
from temporalio import workflow
5+
6+
from .record_loader_activity import SingleRecord
7+
8+
9+
@workflow.defn
10+
class RecordProcessorWorkflow:
11+
"""Workflow that implements processing of a single record."""
12+
13+
@workflow.run
14+
async def run(self, record: SingleRecord) -> None:
15+
await self._process_record(record)
16+
17+
# Notify parent about completion via signal
18+
parent = workflow.info().parent
19+
20+
# This workflow is always expected to have a parent.
21+
# But for unit testing it might be useful to skip the notification if there is none.
22+
if parent:
23+
# Don't specify run_id as parent calls continue-as-new
24+
await workflow.external_workflow_handle(parent.workflow_id).signal(
25+
"report_completion", record.id
26+
)
27+
28+
async def _process_record(self, record: SingleRecord) -> None:
29+
"""Simulate application specific record processing."""
30+
# Use workflow.random() to get a random number to ensure workflow determinism
31+
sleep_duration = workflow.random().randint(1, 10)
32+
await workflow.sleep(sleep_duration)
33+
34+
workflow.logger.info(f"Processed record {record}")
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#!/usr/bin/env python3
2+
"""Standalone starter for the batch sliding window sample."""
3+
4+
import asyncio
5+
import logging
6+
import sys
7+
from pathlib import Path
8+
9+
# Add the python-samples directory to the path to enable imports
10+
sys.path.insert(0, str(Path(__file__).parent.parent))
11+
12+
from temporalio.client import Client
13+
14+
from batch_sliding_window.batch_workflow import ProcessBatchWorkflow, ProcessBatchWorkflowInput
15+
16+
17+
async def main():
18+
"""Start the ProcessBatchWorkflow."""
19+
# Set up logging
20+
logging.basicConfig(level=logging.INFO)
21+
22+
# Create client
23+
client = await Client.connect("localhost:7233")
24+
25+
# Create workflow input
26+
workflow_input = ProcessBatchWorkflowInput(
27+
page_size=5,
28+
sliding_window_size=10,
29+
partitions=3,
30+
)
31+
32+
print(f"Starting workflow with input: {workflow_input}")
33+
34+
# Start workflow
35+
handle = await client.start_workflow(
36+
ProcessBatchWorkflow.run,
37+
workflow_input,
38+
id="batch-sliding-window-example",
39+
task_queue="batch-sliding-window",
40+
)
41+
42+
print(f"Started workflow: {handle.id}")
43+
44+
# Wait for workflow completion
45+
# This is rarely needed in real use cases as batch workflows are usually long-running
46+
result = await handle.result()
47+
print(f"Workflow completed. Total records processed: {result}")
48+
49+
50+
if __name__ == "__main__":
51+
asyncio.run(main())

batch-sliding-window/run_worker.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
#!/usr/bin/env python3
2+
"""Standalone worker runner for the batch sliding window sample."""
3+
4+
import asyncio
5+
import logging
6+
import sys
7+
from pathlib import Path
8+
9+
# Add the python-samples directory to the path to enable imports
10+
sys.path.insert(0, str(Path(__file__).parent.parent))
11+
12+
from temporalio import worker
13+
from temporalio.client import Client
14+
15+
from batch_sliding_window.batch_workflow import ProcessBatchWorkflow
16+
from batch_sliding_window.sliding_window_workflow import SlidingWindowWorkflow
17+
from batch_sliding_window.record_processor_workflow import RecordProcessorWorkflow
18+
from batch_sliding_window.record_loader_activity import RecordLoader
19+
20+
21+
async def main():
22+
"""Run the worker that registers all workflows and activities."""
23+
# Set up logging
24+
logging.basicConfig(level=logging.INFO)
25+
26+
# Create client
27+
client = await Client.connect("localhost:7233")
28+
29+
# Create RecordLoader activity with sample data
30+
record_loader = RecordLoader(record_count=90)
31+
32+
# Create worker
33+
temporal_worker = worker.Worker(
34+
client,
35+
task_queue="batch-sliding-window",
36+
workflows=[
37+
ProcessBatchWorkflow,
38+
SlidingWindowWorkflow,
39+
RecordProcessorWorkflow,
40+
],
41+
activities=[
42+
record_loader.get_record_count,
43+
record_loader.get_records,
44+
],
45+
)
46+
47+
print("Starting worker...")
48+
# Run the worker
49+
await temporal_worker.run()
50+
51+
52+
if __name__ == "__main__":
53+
asyncio.run(main())

0 commit comments

Comments
 (0)