Skip to content

Commit e8fc392

Browse files
authored
Merge branch 'main' into reuse-policy-terminate-existing
2 parents 32a0596 + 230fdf8 commit e8fc392

355 files changed

Lines changed: 12367 additions & 2888 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/ci.yml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
name: Continuous Integration
2+
permissions:
3+
contents: read
4+
actions: write
25
on: # rebuild any PRs and main branch changes
36
pull_request:
47
push:
@@ -12,13 +15,13 @@ jobs:
1215
strategy:
1316
fail-fast: true
1417
matrix:
15-
python: ["3.9", "3.12"]
18+
python: ["3.10", "3.14"]
1619
os: [ubuntu-latest, macos-intel, macos-arm, windows-latest]
1720
include:
1821
- os: macos-intel
19-
runsOn: macos-13
22+
runsOn: macos-15-intel
2023
- os: macos-arm
21-
runsOn: macos-14
24+
runsOn: macos-latest
2225
runs-on: ${{ matrix.runsOn || matrix.os }}
2326
steps:
2427
- uses: astral-sh/setup-uv@v5

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@
33
__pycache__
44
.vscode
55
.DS_Store
6+
.claude

README.md

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ Prerequisites:
1010
* [Temporal CLI installed](https://docs.temporal.io/cli#install)
1111
* [Local Temporal server running](https://docs.temporal.io/cli/server#start-dev)
1212

13-
The SDK requires Python >= 3.9. You can install Python using uv. For example,
13+
The SDK requires Python >= 3.10. You can install Python using uv. For example,
1414

1515
uv python install 3.13
1616

@@ -41,6 +41,7 @@ Some examples require extra dependencies. See each sample's directory for specif
4141
* [hello_async_activity_completion](hello/hello_async_activity_completion.py) - Complete an activity outside of the
4242
function that was called.
4343
* [hello_cancellation](hello/hello_cancellation.py) - Manually react to cancellation inside workflows and activities.
44+
* [hello_change_log_level](hello/hello_change_log_level.py) - Change the level of workflow task failure from WARN to ERROR.
4445
* [hello_child_workflow](hello/hello_child_workflow.py) - Execute a child workflow from a workflow.
4546
* [hello_continue_as_new](hello/hello_continue_as_new.py) - Use continue as new to restart a workflow.
4647
* [hello_cron](hello/hello_cron.py) - Execute a workflow once a minute.
@@ -56,14 +57,17 @@ Some examples require extra dependencies. See each sample's directory for specif
5657
* [hello_signal](hello/hello_signal.py) - Send signals to a workflow.
5758
<!-- Keep this list in alphabetical order -->
5859
* [activity_worker](activity_worker) - Use Python activities from a workflow in another language.
60+
* [batch_sliding_window](batch_sliding_window) - Batch processing with a sliding window of child workflows.
5961
* [bedrock](bedrock) - Orchestrate a chatbot with Amazon Bedrock.
6062
* [cloud_export_to_parquet](cloud_export_to_parquet) - Set up schedule workflow to process exported files on an hourly basis
6163
* [context_propagation](context_propagation) - Context propagation through workflows/activities via interceptor.
6264
* [custom_converter](custom_converter) - Use a custom payload converter to handle custom types.
6365
* [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity.
6466
* [custom_metric](custom_metric) - Custom metric to record the workflow type in the activity schedule to start latency.
6567
* [dsl](dsl) - DSL workflow that executes steps defined in a YAML file.
68+
* [eager_wf_start](eager_wf_start) - Run a workflow using Eager Workflow Start
6669
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
70+
* [env_config](env_config) - Load client configuration from TOML files with programmatic overrides.
6771
* [gevent_async](gevent_async) - Combine gevent and Temporal.
6872
* [langchain](langchain) - Orchestrate workflows for LangChain.
6973
* [message_passing/introduction](message_passing/introduction/) - Introduction to queries, signals, and updates.
@@ -80,13 +84,10 @@ Some examples require extra dependencies. See each sample's directory for specif
8084
* [updatable_timer](updatable_timer) - A timer that can be updated while sleeping.
8185
* [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers.
8286
* [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code.
87+
* [worker_multiprocessing](worker_multiprocessing) - Leverage Python multiprocessing to parallelize workflow tasks and other CPU bound operations by running multiple workers.
8388

8489
## Test
8590

86-
Running the tests requires `poe` to be installed.
91+
To run the tests:
8792

88-
uv tool install poethepoet
89-
90-
Once you have `poe` installed you can run:
91-
92-
poe test
93+
uv run poe test

activity_worker/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ First run the Go workflow worker by running this in the `go_workflow` directory
66

77
go run .
88

9-
Then in another terminal, run the sample from this directory:
9+
Then in another terminal, run the sample from the root directory:
1010

11-
uv run activity_worker.py
11+
uv run activity_worker/activity_worker.py
1212

1313
The Python code will invoke the Go workflow which will execute the Python activity and return.

activity_worker/activity_worker.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from temporalio import activity
66
from temporalio.client import Client
7+
from temporalio.envconfig import ClientConfig
78
from temporalio.worker import Worker
89

910
task_queue = "say-hello-task-queue"
@@ -18,7 +19,9 @@ async def say_hello_activity(name: str) -> str:
1819

1920
async def main():
2021
# Create client to localhost on default namespace
21-
client = await Client.connect("localhost:7233")
22+
config = ClientConfig.load_client_connect_config()
23+
config.setdefault("target_host", "localhost:7233")
24+
client = await Client.connect(**config)
2225

2326
# Run activity worker
2427
async with Worker(client, task_queue=task_queue, activities=[say_hello_activity]):

batch_sliding_window/README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Batch Sliding Window
2+
3+
This sample demonstrates a batch processing workflow that maintains a sliding window of record processing workflows.
4+
5+
A `SlidingWindowWorkflow` starts a configured number (sliding window size) of `RecordProcessorWorkflow` children in parallel. Each child processes a single record. When a child completes, a new child is started.
6+
7+
The `SlidingWindowWorkflow` calls continue-as-new after starting a preconfigured number of children to keep its history size bounded. A `RecordProcessorWorkflow` reports its completion through a signal to its parent, which allows notification of a parent that called continue-as-new.
8+
9+
A single instance of `SlidingWindowWorkflow` has limited window size and throughput. To support larger window size and overall throughput, multiple instances of `SlidingWindowWorkflow` run in parallel.
10+
11+
### Running This Sample
12+
13+
To run, first see [README.md](../README.md) for prerequisites. Then, run the following from root directory to start the worker:
14+
15+
uv run batch_sliding_window/worker.py
16+
17+
This will start the worker. Then, in another terminal, run the following to execute the workflow:
18+
19+
uv run batch_sliding_window/starter.py
20+
21+
The workflow will process 90 records using a sliding window of 10 parallel workers across 3 partitions, with a page size of 5 records per continue-as-new iteration.

batch_sliding_window/__init__.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
"""Sliding Window Batch Processing Sample.
2+
3+
This sample demonstrates a batch processing workflow that maintains a sliding window
4+
of record processing workflows. It includes:
5+
6+
- ProcessBatchWorkflow: Main workflow that partitions work across multiple sliding windows
7+
- SlidingWindowWorkflow: Implements the sliding window pattern with continue-as-new
8+
- RecordProcessorWorkflow: Processes individual records
9+
- RecordLoader: Activity for loading records from external sources
10+
"""
11+
12+
from batch_sliding_window.batch_workflow import (
13+
ProcessBatchWorkflow,
14+
ProcessBatchWorkflowInput,
15+
)
16+
from batch_sliding_window.record_loader_activity import (
17+
GetRecordsInput,
18+
GetRecordsOutput,
19+
RecordLoader,
20+
SingleRecord,
21+
)
22+
from batch_sliding_window.record_processor_workflow import RecordProcessorWorkflow
23+
from batch_sliding_window.sliding_window_workflow import (
24+
SlidingWindowState,
25+
SlidingWindowWorkflow,
26+
SlidingWindowWorkflowInput,
27+
)
28+
29+
__all__ = [
30+
"ProcessBatchWorkflow",
31+
"ProcessBatchWorkflowInput",
32+
"SlidingWindowWorkflow",
33+
"SlidingWindowWorkflowInput",
34+
"SlidingWindowState",
35+
"RecordProcessorWorkflow",
36+
"RecordLoader",
37+
"GetRecordsInput",
38+
"GetRecordsOutput",
39+
"SingleRecord",
40+
]
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import asyncio
2+
from dataclasses import dataclass
3+
from datetime import timedelta
4+
from typing import List
5+
6+
from temporalio import workflow
7+
from temporalio.common import WorkflowIDReusePolicy
8+
from temporalio.exceptions import ApplicationError
9+
10+
from batch_sliding_window.record_loader_activity import RecordLoader
11+
from batch_sliding_window.sliding_window_workflow import (
12+
SlidingWindowWorkflow,
13+
SlidingWindowWorkflowInput,
14+
)
15+
16+
17+
@dataclass
18+
class ProcessBatchWorkflowInput:
19+
"""Input for the ProcessBatchWorkflow.
20+
21+
A single input structure is preferred to multiple workflow arguments
22+
to simplify backward compatible API changes.
23+
"""
24+
25+
page_size: int # Number of children started by a single sliding window workflow run
26+
sliding_window_size: int # Maximum number of children to run in parallel
27+
partitions: int # How many sliding windows to run in parallel
28+
29+
30+
@workflow.defn
31+
class ProcessBatchWorkflow:
32+
"""Sample workflow that partitions the data set into continuous ranges.
33+
34+
A real application can choose any other way to divide the records
35+
into multiple collections.
36+
"""
37+
38+
@workflow.run
39+
async def run(self, input: ProcessBatchWorkflowInput) -> int:
40+
# Get total record count
41+
record_count: int = await workflow.execute_activity_method(
42+
RecordLoader.get_record_count,
43+
start_to_close_timeout=timedelta(seconds=5),
44+
)
45+
46+
if input.sliding_window_size < input.partitions:
47+
raise ApplicationError(
48+
"SlidingWindowSize cannot be less than number of partitions"
49+
)
50+
51+
partitions = self._divide_into_partitions(record_count, input.partitions)
52+
window_sizes = self._divide_into_partitions(
53+
input.sliding_window_size, input.partitions
54+
)
55+
56+
workflow.logger.info(
57+
f"ProcessBatchWorkflow started",
58+
extra={
59+
"input": input,
60+
"record_count": record_count,
61+
"partitions": partitions,
62+
"window_sizes": window_sizes,
63+
},
64+
)
65+
66+
# Start child workflows for each partition
67+
tasks = []
68+
offset = 0
69+
70+
for i in range(input.partitions):
71+
# Make child id more user-friendly
72+
child_id = f"{workflow.info().workflow_id}/{i}"
73+
74+
# Define partition boundaries
75+
maximum_partition_offset = offset + partitions[i]
76+
if maximum_partition_offset > record_count:
77+
maximum_partition_offset = record_count
78+
79+
child_input = SlidingWindowWorkflowInput(
80+
page_size=input.page_size,
81+
sliding_window_size=window_sizes[i],
82+
offset=offset, # inclusive
83+
maximum_offset=maximum_partition_offset, # exclusive
84+
progress=0,
85+
current_records=None,
86+
)
87+
88+
task = workflow.execute_child_workflow(
89+
SlidingWindowWorkflow.run,
90+
child_input,
91+
id=child_id,
92+
id_reuse_policy=WorkflowIDReusePolicy.ALLOW_DUPLICATE,
93+
)
94+
tasks.append(task)
95+
offset += partitions[i]
96+
97+
# Wait for all child workflows to complete
98+
results = await asyncio.gather(*tasks)
99+
return sum(results)
100+
101+
def _divide_into_partitions(self, number: int, n: int) -> List[int]:
102+
"""Divide a number into n partitions as evenly as possible."""
103+
base = number // n
104+
remainder = number % n
105+
partitions = [base] * n
106+
107+
for i in range(remainder):
108+
partitions[i] += 1
109+
110+
return partitions
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
from dataclasses import dataclass
2+
from typing import List
3+
4+
from temporalio import activity
5+
6+
7+
@dataclass
8+
class GetRecordsInput:
9+
"""Input for the GetRecords activity."""
10+
11+
page_size: int
12+
offset: int
13+
max_offset: int
14+
15+
16+
@dataclass
17+
class SingleRecord:
18+
"""Represents a single record to be processed."""
19+
20+
id: int
21+
22+
23+
@dataclass
24+
class GetRecordsOutput:
25+
"""Output from the GetRecords activity."""
26+
27+
records: List[SingleRecord]
28+
29+
30+
class RecordLoader:
31+
"""Activities for loading records from an external data source."""
32+
33+
def __init__(self, record_count: int):
34+
self.record_count = record_count
35+
36+
@activity.defn
37+
async def get_record_count(self) -> int:
38+
"""Get the total record count.
39+
40+
Used to partition processing across parallel sliding windows.
41+
The sample implementation just returns a fake value passed during worker initialization.
42+
"""
43+
return self.record_count
44+
45+
@activity.defn
46+
async def get_records(self, input: GetRecordsInput) -> GetRecordsOutput:
47+
"""Get records loaded from an external data source.
48+
49+
The sample returns fake records.
50+
"""
51+
if input.max_offset > self.record_count:
52+
raise ValueError(
53+
f"max_offset({input.max_offset}) > record_count({self.record_count})"
54+
)
55+
56+
limit = min(input.offset + input.page_size, input.max_offset)
57+
records = [SingleRecord(id=i) for i in range(input.offset, limit)]
58+
59+
return GetRecordsOutput(records=records)
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import asyncio
2+
import random
3+
4+
from temporalio import workflow
5+
6+
from batch_sliding_window.record_loader_activity import SingleRecord
7+
8+
9+
@workflow.defn
10+
class RecordProcessorWorkflow:
11+
"""Workflow that implements processing of a single record."""
12+
13+
@workflow.run
14+
async def run(self, record: SingleRecord) -> None:
15+
await self._process_record(record)
16+
17+
# Notify parent about completion via signal
18+
parent = workflow.info().parent
19+
20+
# This workflow is always expected to have a parent.
21+
# But for unit testing it might be useful to skip the notification if there is none.
22+
if parent:
23+
# Don't specify run_id as parent calls continue-as-new
24+
handle = workflow.get_external_workflow_handle(parent.workflow_id)
25+
await handle.signal("report_completion", record.id)
26+
27+
async def _process_record(self, record: SingleRecord) -> None:
28+
"""Simulate application specific record processing."""
29+
# Use workflow.random() to get a random number to ensure workflow determinism
30+
sleep_duration = workflow.random().randint(1, 10)
31+
await workflow.sleep(sleep_duration)
32+
33+
workflow.logger.info(f"Processed record {record}")

0 commit comments

Comments
 (0)