Skip to content

Commit c32fa47

Browse files
committed
Run linter
1 parent d22e8a0 commit c32fa47

7 files changed

Lines changed: 65 additions & 40 deletions

File tree

batch_sliding_window/__init__.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@
99
- RecordLoader: Activity for loading records from external sources
1010
"""
1111

12-
from batch_sliding_window.batch_workflow import ProcessBatchWorkflow, ProcessBatchWorkflowInput
12+
from batch_sliding_window.batch_workflow import (
13+
ProcessBatchWorkflow,
14+
ProcessBatchWorkflowInput,
15+
)
1316
from batch_sliding_window.sliding_window_workflow import (
1417
SlidingWindowWorkflow,
1518
SlidingWindowWorkflowInput,
@@ -34,4 +37,4 @@
3437
"GetRecordsInput",
3538
"GetRecordsOutput",
3639
"SingleRecord",
37-
]
40+
]

batch_sliding_window/batch_workflow.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,20 @@
77
from temporalio.exceptions import ApplicationError
88

99
from batch_sliding_window.record_loader_activity import RecordLoader
10-
from batch_sliding_window.sliding_window_workflow import SlidingWindowWorkflow, SlidingWindowWorkflowInput
10+
from batch_sliding_window.sliding_window_workflow import (
11+
SlidingWindowWorkflow,
12+
SlidingWindowWorkflowInput,
13+
)
1114

1215

1316
@dataclass
1417
class ProcessBatchWorkflowInput:
1518
"""Input for the ProcessBatchWorkflow.
16-
17-
A single input structure is preferred to multiple workflow arguments
19+
20+
A single input structure is preferred to multiple workflow arguments
1821
to simplify backward compatible API changes.
1922
"""
23+
2024
page_size: int # Number of children started by a single sliding window workflow run
2125
sliding_window_size: int # Maximum number of children to run in parallel
2226
partitions: int # How many sliding windows to run in parallel
@@ -25,8 +29,8 @@ class ProcessBatchWorkflowInput:
2529
@workflow.defn
2630
class ProcessBatchWorkflow:
2731
"""Sample workflow that partitions the data set into continuous ranges.
28-
29-
A real application can choose any other way to divide the records
32+
33+
A real application can choose any other way to divide the records
3034
into multiple collections.
3135
"""
3236

@@ -44,7 +48,9 @@ async def run(self, input: ProcessBatchWorkflowInput) -> int:
4448
)
4549

4650
partitions = self._divide_into_partitions(record_count, input.partitions)
47-
window_sizes = self._divide_into_partitions(input.sliding_window_size, input.partitions)
51+
window_sizes = self._divide_into_partitions(
52+
input.sliding_window_size, input.partitions
53+
)
4854

4955
workflow.logger.info(
5056
f"ProcessBatchWorkflow started",
@@ -53,22 +59,22 @@ async def run(self, input: ProcessBatchWorkflowInput) -> int:
5359
"record_count": record_count,
5460
"partitions": partitions,
5561
"window_sizes": window_sizes,
56-
}
62+
},
5763
)
5864

5965
# Start child workflows for each partition
6066
tasks = []
6167
offset = 0
62-
68+
6369
for i in range(input.partitions):
6470
# Make child id more user-friendly
6571
child_id = f"{workflow.info().workflow_id}/{i}"
66-
72+
6773
# Define partition boundaries
6874
maximum_partition_offset = offset + partitions[i]
6975
if maximum_partition_offset > record_count:
7076
maximum_partition_offset = record_count
71-
77+
7278
child_input = SlidingWindowWorkflowInput(
7379
page_size=input.page_size,
7480
sliding_window_size=window_sizes[i],
@@ -77,7 +83,7 @@ async def run(self, input: ProcessBatchWorkflowInput) -> int:
7783
progress=0,
7884
current_records=None,
7985
)
80-
86+
8187
task = workflow.execute_child_workflow(
8288
SlidingWindowWorkflow.run,
8389
child_input,
@@ -100,4 +106,4 @@ def _divide_into_partitions(self, number: int, n: int) -> List[int]:
100106
for i in range(remainder):
101107
partitions[i] += 1
102108

103-
return partitions
109+
return partitions

batch_sliding_window/record_loader_activity.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
@dataclass
88
class GetRecordsInput:
99
"""Input for the GetRecords activity."""
10+
1011
page_size: int
1112
offset: int
1213
max_offset: int
@@ -15,12 +16,14 @@ class GetRecordsInput:
1516
@dataclass
1617
class SingleRecord:
1718
"""Represents a single record to be processed."""
19+
1820
id: int
1921

2022

2123
@dataclass
2224
class GetRecordsOutput:
2325
"""Output from the GetRecords activity."""
26+
2427
records: List[SingleRecord]
2528

2629

@@ -33,7 +36,7 @@ def __init__(self, record_count: int):
3336
@activity.defn
3437
async def get_record_count(self) -> int:
3538
"""Get the total record count.
36-
39+
3740
Used to partition processing across parallel sliding windows.
3841
The sample implementation just returns a fake value passed during worker initialization.
3942
"""
@@ -42,13 +45,15 @@ async def get_record_count(self) -> int:
4245
@activity.defn
4346
async def get_records(self, input: GetRecordsInput) -> GetRecordsOutput:
4447
"""Get records loaded from an external data source.
45-
48+
4649
The sample returns fake records.
4750
"""
4851
if input.max_offset > self.record_count:
49-
raise ValueError(f"max_offset({input.max_offset}) > record_count({self.record_count})")
52+
raise ValueError(
53+
f"max_offset({input.max_offset}) > record_count({self.record_count})"
54+
)
5055

5156
limit = min(input.offset + input.page_size, input.max_offset)
5257
records = [SingleRecord(id=i) for i in range(input.offset, limit)]
53-
54-
return GetRecordsOutput(records=records)
58+
59+
return GetRecordsOutput(records=records)

batch_sliding_window/record_processor_workflow.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ class RecordProcessorWorkflow:
1313
@workflow.run
1414
async def run(self, record: SingleRecord) -> None:
1515
await self._process_record(record)
16-
16+
1717
# Notify parent about completion via signal
1818
parent = workflow.info().parent
19-
19+
2020
# This workflow is always expected to have a parent.
2121
# But for unit testing it might be useful to skip the notification if there is none.
2222
if parent:
@@ -29,5 +29,5 @@ async def _process_record(self, record: SingleRecord) -> None:
2929
# Use workflow.random() to get a random number to ensure workflow determinism
3030
sleep_duration = workflow.random().randint(1, 10)
3131
await workflow.sleep(sleep_duration)
32-
33-
workflow.logger.info(f"Processed record {record}")
32+
33+
workflow.logger.info(f"Processed record {record}")

batch_sliding_window/sliding_window_workflow.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,18 @@
55
from temporalio import workflow
66
from temporalio.common import WorkflowIDReusePolicy
77

8-
from batch_sliding_window.record_loader_activity import RecordLoader, GetRecordsInput, SingleRecord
8+
from batch_sliding_window.record_loader_activity import (
9+
RecordLoader,
10+
GetRecordsInput,
11+
SingleRecord,
12+
)
913
from batch_sliding_window.record_processor_workflow import RecordProcessorWorkflow
1014

1115

1216
@dataclass
1317
class SlidingWindowWorkflowInput:
1418
"""Contains SlidingWindowWorkflow arguments."""
19+
1520
page_size: int
1621
sliding_window_size: int
1722
offset: int # inclusive
@@ -24,6 +29,7 @@ class SlidingWindowWorkflowInput:
2429
@dataclass
2530
class SlidingWindowState:
2631
"""Used as a 'state' query result."""
32+
2733
current_records: List[int] # record ids currently being processed
2834
children_started_by_this_run: int
2935
offset: int
@@ -33,7 +39,7 @@ class SlidingWindowState:
3339
@workflow.defn
3440
class SlidingWindowWorkflow:
3541
"""Workflow processes a range of records using a requested number of child workflows.
36-
42+
3743
As soon as a child workflow completes a new one is started.
3844
"""
3945

@@ -54,7 +60,7 @@ async def run(self, input: SlidingWindowWorkflowInput) -> int:
5460
"offset": input.offset,
5561
"maximum_offset": input.maximum_offset,
5662
"progress": input.progress,
57-
}
63+
},
5864
)
5965

6066
# Initialize state from input
@@ -111,13 +117,15 @@ async def _execute(self, input: SlidingWindowWorkflowInput) -> int:
111117

112118
return await self._continue_as_new_or_complete(input)
113119

114-
async def _continue_as_new_or_complete(self, input: SlidingWindowWorkflowInput) -> int:
120+
async def _continue_as_new_or_complete(
121+
self, input: SlidingWindowWorkflowInput
122+
) -> int:
115123
"""Continue-as-new after starting page_size children or complete if done."""
116124
# Update offset based on children started in this run
117125
new_offset = input.offset + len(self.children_started_by_this_run)
118-
126+
119127
if new_offset < input.maximum_offset:
120-
# In Python, await start_child_workflow() already waits until
128+
# In Python, await start_child_workflow() already waits until
121129
# the start has been accepted by the server, so no additional wait needed
122130

123131
# Continue-as-new with updated state
@@ -129,7 +137,7 @@ async def _continue_as_new_or_complete(self, input: SlidingWindowWorkflowInput)
129137
progress=self.progress,
130138
current_records=self.current_records,
131139
)
132-
140+
133141
workflow.continue_as_new(new_input)
134142

135143
# Last run in the continue-as-new chain
@@ -152,4 +160,4 @@ def _handle_state_query(self) -> SlidingWindowState:
152160
children_started_by_this_run=len(self.children_started_by_this_run),
153161
offset=self.offset,
154162
progress=self.progress,
155-
)
163+
)

batch_sliding_window/starter.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@
77

88
from temporalio.client import Client
99

10-
from batch_sliding_window.batch_workflow import ProcessBatchWorkflow, ProcessBatchWorkflowInput
10+
from batch_sliding_window.batch_workflow import (
11+
ProcessBatchWorkflow,
12+
ProcessBatchWorkflowInput,
13+
)
1114

1215

1316
async def main():
@@ -17,32 +20,32 @@ async def main():
1720

1821
# Create client
1922
client = await Client.connect("localhost:7233")
20-
23+
2124
# Create unique workflow ID with timestamp
2225
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
2326
workflow_id = f"batch_sliding_window_example_{timestamp}"
24-
27+
2528
# Define workflow input
2629
workflow_input = ProcessBatchWorkflowInput(
2730
page_size=5,
2831
sliding_window_size=10,
2932
partitions=3,
3033
)
31-
34+
3235
print(f"Starting workflow with ID: {workflow_id}")
3336
print(f"Input: {workflow_input}")
34-
37+
3538
# Start workflow
3639
handle = await client.start_workflow(
3740
ProcessBatchWorkflow.run,
3841
workflow_input,
3942
id=workflow_id,
4043
task_queue="batch_sliding_window_task_queue",
4144
)
42-
45+
4346
print(f"Workflow started with ID: {handle.id}")
4447
print(f"Waiting for workflow to complete...")
45-
48+
4649
# Wait for result
4750
try:
4851
result = await handle.result()
@@ -54,4 +57,4 @@ async def main():
5457

5558

5659
if __name__ == "__main__":
57-
asyncio.run(main())
60+
asyncio.run(main())

batch_sliding_window/worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,4 @@ async def main():
4545

4646

4747
if __name__ == "__main__":
48-
asyncio.run(main())
48+
asyncio.run(main())

0 commit comments

Comments
 (0)