Skip to content

Commit a2ca8cc

Browse files
committed
refactor, change folder name, fix imports and update readme to match style
1 parent bfc170c commit a2ca8cc

10 files changed

Lines changed: 36 additions & 53 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ Some examples require extra dependencies. See each sample's directory for specif
5656
* [hello_signal](hello/hello_signal.py) - Send signals to a workflow.
5757
<!-- Keep this list in alphabetical order -->
5858
* [activity_worker](activity_worker) - Use Python activities from a workflow in another language.
59+
* [batch_sliding_window](batch_sliding_window) - Batch processing with a sliding window of child workflows.
5960
* [bedrock](bedrock) - Orchestrate a chatbot with Amazon Bedrock.
6061
* [cloud_export_to_parquet](cloud_export_to_parquet) - Set up schedule workflow to process exported files on an hourly basis
6162
* [context_propagation](context_propagation) - Context propagation through workflows/activities via interceptor.

batch-sliding-window/README.md

Lines changed: 0 additions & 29 deletions
This file was deleted.

batch_sliding_window/README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Batch Sliding Window
2+
3+
This sample demonstrates a batch processing workflow that maintains a sliding window of record processing workflows.
4+
5+
A `SlidingWindowWorkflow` starts a configured number (sliding window size) of `RecordProcessorWorkflow` children in parallel. Each child processes a single record. When a child completes, a new child is started.
6+
7+
The `SlidingWindowWorkflow` calls continue-as-new after starting a preconfigured number of children to keep its history size bounded. A `RecordProcessorWorkflow` reports its completion through a signal to its parent, which allows notification of a parent that called continue-as-new.
8+
9+
A single instance of `SlidingWindowWorkflow` has limited window size and throughput. To support larger window size and overall throughput, multiple instances of `SlidingWindowWorkflow` run in parallel.
10+
11+
### Running This Sample
12+
13+
To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the worker:
14+
15+
uv run run_worker.py
16+
17+
This will start the worker. Then, in another terminal, run the following to execute the workflow:
18+
19+
uv run run_starter.py
20+
21+
The workflow will process 90 records using a sliding window of 10 parallel workers across 3 partitions, with a page size of 5 records per continue-as-new iteration.
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@
99
- RecordLoader: Activity for loading records from external sources
1010
"""
1111

12-
from .batch_workflow import ProcessBatchWorkflow, ProcessBatchWorkflowInput
13-
from .sliding_window_workflow import (
12+
from batch_sliding_window.batch_workflow import ProcessBatchWorkflow, ProcessBatchWorkflowInput
13+
from batch_sliding_window.sliding_window_workflow import (
1414
SlidingWindowWorkflow,
1515
SlidingWindowWorkflowInput,
1616
SlidingWindowState,
1717
)
18-
from .record_processor_workflow import RecordProcessorWorkflow
19-
from .record_loader_activity import (
18+
from batch_sliding_window.record_processor_workflow import RecordProcessorWorkflow
19+
from batch_sliding_window.record_loader_activity import (
2020
RecordLoader,
2121
GetRecordsInput,
2222
GetRecordsOutput,
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
from temporalio.common import WorkflowIDReusePolicy
77
from temporalio.exceptions import ApplicationError
88

9-
from .record_loader_activity import RecordLoader
10-
from .sliding_window_workflow import SlidingWindowWorkflow, SlidingWindowWorkflowInput
9+
from batch_sliding_window.record_loader_activity import RecordLoader
10+
from batch_sliding_window.sliding_window_workflow import SlidingWindowWorkflow, SlidingWindowWorkflowInput
1111

1212

1313
@dataclass
File renamed without changes.

batch-sliding-window/record_processor_workflow.py renamed to batch_sliding_window/record_processor_workflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from temporalio import workflow
55

6-
from .record_loader_activity import SingleRecord
6+
from batch_sliding_window.record_loader_activity import SingleRecord
77

88

99
@workflow.defn
Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,8 @@
11
#!/usr/bin/env python3
2-
"""Standalone starter for the batch sliding window sample."""
2+
"""Starter for the batch sliding window sample."""
33

44
import asyncio
55
import logging
6-
import sys
7-
from pathlib import Path
8-
9-
# Add the python-samples directory to the path to enable imports
10-
sys.path.insert(0, str(Path(__file__).parent.parent))
116

127
from temporalio.client import Client
138

@@ -35,8 +30,8 @@ async def main():
3530
handle = await client.start_workflow(
3631
ProcessBatchWorkflow.run,
3732
workflow_input,
38-
id="batch-sliding-window-example",
39-
task_queue="batch-sliding-window",
33+
id="batch_sliding_window_example",
34+
task_queue="batch_sliding_window",
4035
)
4136

4237
print(f"Started workflow: {handle.id}")
Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,8 @@
11
#!/usr/bin/env python3
2-
"""Standalone worker runner for the batch sliding window sample."""
2+
"""Worker for the batch sliding window sample."""
33

44
import asyncio
55
import logging
6-
import sys
7-
from pathlib import Path
8-
9-
# Add the python-samples directory to the path to enable imports
10-
sys.path.insert(0, str(Path(__file__).parent.parent))
116

127
from temporalio import worker
138
from temporalio.client import Client
@@ -32,7 +27,7 @@ async def main():
3227
# Create worker
3328
temporal_worker = worker.Worker(
3429
client,
35-
task_queue="batch-sliding-window",
30+
task_queue="batch_sliding_window",
3631
workflows=[
3732
ProcessBatchWorkflow,
3833
SlidingWindowWorkflow,

batch-sliding-window/sliding_window_workflow.py renamed to batch_sliding_window/sliding_window_workflow.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
from temporalio import workflow
66
from temporalio.common import WorkflowIDReusePolicy
77

8-
from .record_loader_activity import RecordLoader, GetRecordsInput, SingleRecord
9-
from .record_processor_workflow import RecordProcessorWorkflow
8+
from batch_sliding_window.record_loader_activity import RecordLoader, GetRecordsInput, SingleRecord
9+
from batch_sliding_window.record_processor_workflow import RecordProcessorWorkflow
1010

1111

1212
@dataclass

0 commit comments

Comments
 (0)