Skip to content

Commit d27133b

Browse files
committed
Update worker versioning sample to use deployments
1 parent 0c66e2f commit d27133b

13 files changed

Lines changed: 494 additions & 236 deletions

.claude/settings.local.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"permissions": {
3+
"allow": [
4+
"WebFetch(domain:patch-diff.githubusercontent.com)",
5+
"Read(//mnt/chonky/dev/temporal/sdk-python/**)",
6+
"Bash(find:*)",
7+
"Bash(python:*)"
8+
],
9+
"deny": [],
10+
"ask": []
11+
}
12+
}

worker_versioning/README.md

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,24 @@
1-
# Worker Versioning Sample
1+
## Worker Versioning
22

3-
This sample shows you how you can use the [Worker Versioning](https://docs.temporal.io/workers#worker-versioning)
4-
feature to deploy incompatible changes to workflow code more easily.
3+
This sample demonstrates how to use Temporal's Worker Versioning feature to safely deploy updates to workflow and activity code. It shows the difference between auto-upgrading and pinned workflows, and how to manage worker deployments with different build IDs.
54

6-
To run, first see [README.md](../README.md) for prerequisites. Then, run the following from the root directory:
5+
The sample creates multiple worker versions (1.0, 1.1, and 2.0) within one deployment and demonstrates:
6+
- **Auto-upgrading workflows**: Automatically and controllably migrate to newer worker versions
7+
- **Pinned workflows**: Stay on the original worker version throughout their lifecycle
8+
- **Compatible vs incompatible changes**: How to make safe updates using `workflow.patched`
79

8-
uv run worker_versioning/example.py
10+
### Steps to run this sample:
911

10-
This will add some Build IDs to a Task Queue, and will also run Workers with those versions to show how you can
11-
mark add versions, mark them as compatible (or not) with one another, and run Workers at specific versions. You'll
12-
see that only the workers only process Workflow Tasks assigned versions they are compatible with.
12+
1) Run a [Temporal service](https://github.com/temporalio/samples-python/tree/main/#how-to-use).
13+
14+
2) Start the main application (this will guide you through the sample):
15+
```bash
16+
uv run worker_versioning/app.py
17+
```
18+
19+
3) Follow the prompts to start workers in separate terminals:
20+
- When prompted, run: `uv run worker_versioning/workerv1.py`
21+
- When prompted, run: `uv run worker_versioning/workerv1_1.py`
22+
- When prompted, run: `uv run worker_versioning/workerv2.py`
23+
24+
The sample will show how auto-upgrading workflows migrate to newer workers while pinned workflows remain on their original version.

worker_versioning/activities.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,21 @@
1+
from dataclasses import dataclass
12
from temporalio import activity
23

34

5+
@dataclass
6+
class IncompatibleActivityInput:
7+
"""Input for the incompatible activity."""
8+
called_by: str
9+
more_data: str
10+
11+
412
@activity.defn
5-
async def greet(inp: str) -> str:
6-
return f"Hi from {inp}"
13+
async def some_activity(called_by: str) -> str:
14+
"""Basic activity for the workflow."""
15+
return f"SomeActivity called by {called_by}"
716

817

918
@activity.defn
10-
async def super_greet(inp: str, some_number: int) -> str:
11-
return f"Hi from {inp} with {some_number}"
19+
async def some_incompatible_activity(input_data: IncompatibleActivityInput) -> str:
20+
"""Incompatible activity that takes different input."""
21+
return f"SomeIncompatibleActivity called by {input_data.called_by} with {input_data.more_data}"

worker_versioning/app.py

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
"""Main application for the worker versioning sample."""
2+
3+
import asyncio
4+
import logging
5+
import uuid
6+
from temporalio.client import Client
7+
8+
from worker_versioning.constants import TASK_QUEUE, DEPLOYMENT_NAME
9+
10+
logging.basicConfig(level=logging.INFO)
11+
12+
13+
async def main() -> None:
14+
client = await Client.connect("localhost:7233")
15+
16+
# Wait for v1 worker and set as current version
17+
logging.info(
18+
"Waiting for v1 worker to appear. Run `python worker_versioning/workerv1.py` in another terminal"
19+
)
20+
await wait_for_worker_and_make_current(client, "1.0")
21+
22+
# Start auto-upgrading and pinned workflows
23+
auto_upgrade_workflow_id = "worker-versioning-versioning-autoupgrade_" + str(
24+
uuid.uuid4()
25+
)
26+
auto_upgrade_execution = await client.start_workflow(
27+
"AutoUpgrading",
28+
id=auto_upgrade_workflow_id,
29+
task_queue=TASK_QUEUE,
30+
)
31+
32+
pinned_workflow_id = "worker-versioning-versioning-pinned_" + str(uuid.uuid4())
33+
pinned_execution = await client.start_workflow(
34+
"Pinned",
35+
id=pinned_workflow_id,
36+
task_queue=TASK_QUEUE,
37+
)
38+
39+
logging.info("Started auto-upgrading workflow: %s", auto_upgrade_execution.id)
40+
logging.info("Started pinned workflow: %s", pinned_execution.id)
41+
42+
# Signal both workflows a few times to drive them
43+
await advance_workflows(auto_upgrade_execution, pinned_execution)
44+
45+
# Now wait for the v1.1 worker to appear and become current
46+
logging.info(
47+
"Waiting for v1.1 worker to appear. Run `python worker_versioning/workerv1_1.py` in another terminal"
48+
)
49+
await wait_for_worker_and_make_current(client, "1.1")
50+
51+
# Once it has, we will continue to advance the workflows.
52+
# The auto-upgrade workflow will now make progress on the new worker, while the pinned one will
53+
# keep progressing on the old worker.
54+
await advance_workflows(auto_upgrade_execution, pinned_execution)
55+
56+
# Finally we'll start the v2 worker, and again it'll become the new current version
57+
logging.info(
58+
"Waiting for v2 worker to appear. Run `python worker_versioning/workerv2.py` in another terminal"
59+
)
60+
await wait_for_worker_and_make_current(client, "2.0")
61+
62+
# Once it has we'll start one more new workflow, another pinned one, to demonstrate that new
63+
# pinned workflows start on the current version.
64+
pinned_workflow_2_id = "worker-versioning-versioning-pinned-2_" + str(uuid.uuid4())
65+
pinned_execution_2 = await client.start_workflow(
66+
"Pinned",
67+
id=pinned_workflow_2_id,
68+
task_queue=TASK_QUEUE,
69+
)
70+
logging.info("Started pinned workflow v2: %s", pinned_execution_2.id)
71+
72+
# Now we'll conclude all workflows. You should be able to see in your server UI that the pinned
73+
# workflow always stayed on 1.0, while the auto-upgrading workflow migrated.
74+
for handle in [auto_upgrade_execution, pinned_execution, pinned_execution_2]:
75+
await handle.signal("do_next_signal", "conclude")
76+
await handle.result()
77+
78+
logging.info("All workflows completed")
79+
80+
81+
async def advance_workflows(auto_upgrade_execution, pinned_execution):
82+
"""Signal both workflows a few times to drive them."""
83+
for i in range(3):
84+
await auto_upgrade_execution.signal("do_next_signal", "do-activity")
85+
await pinned_execution.signal("do_next_signal", "some-signal")
86+
87+
88+
async def wait_for_worker_and_make_current(client: Client, build_id: str) -> None:
89+
import temporalio.api.workflowservice.v1 as wsv1
90+
from temporalio.common import WorkerDeploymentVersion
91+
92+
target_version = WorkerDeploymentVersion(
93+
deployment_name=DEPLOYMENT_NAME, build_id=build_id
94+
)
95+
96+
# Wait for the worker to appear
97+
while True:
98+
try:
99+
describe_request = wsv1.DescribeWorkerDeploymentRequest(
100+
namespace=client.namespace,
101+
deployment_name=DEPLOYMENT_NAME,
102+
)
103+
response = await client.workflow_service.describe_worker_deployment(
104+
describe_request
105+
)
106+
107+
# Check if our version is present in the version summaries
108+
for version_summary in response.worker_deployment_info.version_summaries:
109+
if (
110+
version_summary.deployment_version.deployment_name
111+
== target_version.deployment_name
112+
and version_summary.deployment_version.build_id
113+
== target_version.build_id
114+
):
115+
break
116+
else:
117+
await asyncio.sleep(1)
118+
continue
119+
120+
break
121+
122+
except Exception:
123+
await asyncio.sleep(1)
124+
continue
125+
126+
# Once the version is available, set it as current
127+
set_request = wsv1.SetWorkerDeploymentCurrentVersionRequest(
128+
namespace=client.namespace,
129+
deployment_name=DEPLOYMENT_NAME,
130+
build_id=target_version.build_id,
131+
)
132+
await client.workflow_service.set_worker_deployment_current_version(set_request)
133+
134+
135+
if __name__ == "__main__":
136+
asyncio.run(main())

worker_versioning/constants.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
"""Constants for the worker versioning sample."""
2+
3+
# Task queue name
4+
TASK_QUEUE = "worker-versioning"
5+
6+
# Deployment name
7+
DEPLOYMENT_NAME = "my-deployment"

worker_versioning/example.py

Lines changed: 0 additions & 116 deletions
This file was deleted.

worker_versioning/workerv1.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
"""Worker v1 for the worker versioning sample."""
2+
3+
import asyncio
4+
import logging
5+
from temporalio.client import Client
6+
from temporalio.worker import Worker, WorkerDeploymentConfig
7+
from temporalio.common import WorkerDeploymentVersion
8+
9+
from worker_versioning.constants import TASK_QUEUE, DEPLOYMENT_NAME
10+
from worker_versioning.workflows import AutoUpgradingWorkflowV1, PinnedWorkflowV1
11+
from worker_versioning.activities import some_activity, some_incompatible_activity
12+
13+
logging.basicConfig(level=logging.INFO)
14+
15+
16+
async def main() -> None:
17+
"""Run worker v1."""
18+
client = await Client.connect("localhost:7233")
19+
20+
# Create worker v1
21+
worker = Worker(
22+
client,
23+
task_queue=TASK_QUEUE,
24+
workflows=[AutoUpgradingWorkflowV1, PinnedWorkflowV1],
25+
activities=[some_activity, some_incompatible_activity],
26+
deployment_config=WorkerDeploymentConfig(
27+
version=WorkerDeploymentVersion(
28+
deployment_name=DEPLOYMENT_NAME, build_id="1.0"
29+
),
30+
use_worker_versioning=True,
31+
),
32+
)
33+
34+
logging.info("Starting worker v1 (build 1.0)")
35+
await worker.run()
36+
37+
38+
if __name__ == "__main__":
39+
asyncio.run(main())

0 commit comments

Comments
 (0)