Skip to content

Commit 84b2f4c

Browse files
authored
Merge branch 'temporalio:main' into main
2 parents 7723e12 + 3bd017d commit 84b2f4c

48 files changed

Lines changed: 3112 additions & 1517 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/ci.yml

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,13 @@ jobs:
1212
strategy:
1313
fail-fast: true
1414
matrix:
15-
python: ["3.8", "3.12"]
15+
python: ["3.9", "3.12"]
1616
os: [ubuntu-latest, macos-intel, macos-arm, windows-latest]
1717
include:
1818
- os: macos-intel
1919
runsOn: macos-13
2020
- os: macos-arm
2121
runsOn: macos-14
22-
# macOS ARM 3.8 does not have an available Python build at
23-
# https://raw.githubusercontent.com/actions/python-versions/main/versions-manifest.json.
24-
# See https://github.com/actions/setup-python/issues/808 and
25-
# https://github.com/actions/python-versions/pull/259.
26-
exclude:
27-
- os: macos-arm
28-
python: "3.8"
2922
runs-on: ${{ matrix.runsOn || matrix.os }}
3023
steps:
3124
- name: Print build information
@@ -39,11 +32,19 @@ jobs:
3932
# Using fixed Poetry version until
4033
# https://github.com/python-poetry/poetry/pull/7694 is fixed
4134
- run: python -m pip install --upgrade wheel "poetry==1.4.0" poethepoet
42-
- run: poetry install --with pydantic --with dsl --with encryption
35+
- run: poetry install --with pydantic_converter --with dsl --with encryption --with trio_async
4336
- run: poe lint
4437
- run: mkdir junit-xml
45-
- run: poe test -s -o log_cli_level=DEBUG --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}.xml
46-
- run: poe test -s -o log_cli_level=DEBUG --workflow-environment time-skipping --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}--time-skipping.xml
38+
- run: poe test -s --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}.xml
39+
- run: poe test -s --workflow-environment time-skipping --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}--time-skipping.xml
40+
# This must remain the last step since it downgrades pydantic
41+
- name: Uninstall pydantic
42+
shell: bash
43+
run: |
44+
echo y | poetry run pip uninstall pydantic
45+
echo y | poetry run pip uninstall pydantic-core
46+
poetry run pip install pydantic==1.10
47+
poe test -s --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}--pydantic-v1.xml tests/pydantic_converter_v1/workflow_test.py
4748
4849
# On latest, run gevent test
4950
- name: Gevent test

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ This is the set of Python samples for the [Python SDK](https://github.com/tempor
66

77
Prerequisites:
88

9-
* Python >= 3.8
9+
* Python >= 3.9
1010
* [Poetry](https://python-poetry.org)
1111
* [Temporal CLI installed](https://docs.temporal.io/cli#install)
1212
* [Local Temporal server running](https://docs.temporal.io/cli/server#start-dev)
@@ -72,6 +72,7 @@ Some examples require extra dependencies. See each sample's directory for specif
7272
* [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models.
7373
* [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule.
7474
* [sentry](sentry) - Report errors to Sentry.
75+
* [trio_async](trio_async) - Use asyncio Temporal in Trio-based environments.
7576
* [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers.
7677
* [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code.
7778

hello/hello_activity_choice.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,10 @@ class ShoppingList:
5454
@workflow.defn
5555
class PurchaseFruitsWorkflow:
5656
@workflow.run
57-
async def run(self, list: ShoppingList) -> str:
57+
async def run(self, shopping_list: ShoppingList) -> str:
5858
# Order each thing on the list
5959
ordered: List[str] = []
60-
for item in list.items:
60+
for item in shopping_list.items:
6161
if item.fruit is Fruit.APPLE:
6262
order_function = order_apples
6363
elif item.fruit is Fruit.BANANA:
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Waiting for message handlers
2+
3+
This workflow demonstrates how to wait for signal and update handlers to
4+
finish in the following circumstances:
5+
6+
- Before a successful return
7+
- On failure
8+
- On cancellation
9+
10+
Your workflow can also exit via Continue-As-New. In that case you would
11+
usually wait for the handlers to finish immediately before the call to
12+
continue_as_new(); that's not illustrated in this sample.
13+
14+
15+
To run, open two terminals and `cd` to this directory in them.
16+
17+
Run the worker in one terminal:
18+
19+
poetry run python worker.py
20+
21+
And run the workflow-starter code in the other terminal:
22+
23+
poetry run python starter.py
24+
25+
26+
Here's the output you'll see:
27+
28+
```
29+
workflow exit type: SUCCESS
30+
🟢 caller received update result
31+
🟢 caller received workflow result
32+
33+
34+
workflow exit type: FAILURE
35+
🟢 caller received update result
36+
🔴 caught exception while waiting for workflow result: Workflow execution failed: deliberately failing workflow
37+
38+
39+
workflow exit type: CANCELLATION
40+
🟢 caller received update result
41+
```
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from dataclasses import dataclass
2+
from enum import IntEnum
3+
4+
TASK_QUEUE = "my-task-queue"
5+
WORKFLOW_ID = "my-workflow-id"
6+
7+
8+
class WorkflowExitType(IntEnum):
9+
SUCCESS = 0
10+
FAILURE = 1
11+
CANCELLATION = 2
12+
13+
14+
@dataclass
15+
class WorkflowInput:
16+
exit_type: WorkflowExitType
17+
18+
19+
@dataclass
20+
class WorkflowResult:
21+
data: str
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import asyncio
2+
3+
from temporalio import activity
4+
5+
6+
@activity.defn
7+
async def activity_executed_by_update_handler():
8+
await asyncio.sleep(1)
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import asyncio
2+
3+
from temporalio import client, common
4+
5+
from message_passing.waiting_for_handlers import (
6+
TASK_QUEUE,
7+
WORKFLOW_ID,
8+
WorkflowExitType,
9+
WorkflowInput,
10+
)
11+
from message_passing.waiting_for_handlers.workflows import WaitingForHandlersWorkflow
12+
13+
14+
async def starter(exit_type: WorkflowExitType):
15+
cl = await client.Client.connect("localhost:7233")
16+
wf_handle = await cl.start_workflow(
17+
WaitingForHandlersWorkflow.run,
18+
WorkflowInput(exit_type=exit_type),
19+
id=WORKFLOW_ID,
20+
task_queue=TASK_QUEUE,
21+
id_conflict_policy=common.WorkflowIDConflictPolicy.TERMINATE_EXISTING,
22+
)
23+
await _check_run(wf_handle, exit_type)
24+
25+
26+
async def _check_run(
27+
wf_handle: client.WorkflowHandle,
28+
exit_type: WorkflowExitType,
29+
):
30+
try:
31+
up_handle = await wf_handle.start_update(
32+
WaitingForHandlersWorkflow.my_update,
33+
wait_for_stage=client.WorkflowUpdateStage.ACCEPTED,
34+
)
35+
except Exception as e:
36+
print(f" 🔴 caught exception while starting update: {e}: {e.__cause__ or ''}")
37+
38+
if exit_type == WorkflowExitType.CANCELLATION:
39+
await wf_handle.cancel()
40+
41+
try:
42+
await up_handle.result()
43+
print(" 🟢 caller received update result")
44+
except Exception as e:
45+
print(
46+
f" 🔴 caught exception while waiting for update result: {e}: {e.__cause__ or ''}"
47+
)
48+
49+
try:
50+
await wf_handle.result()
51+
print(" 🟢 caller received workflow result")
52+
except BaseException as e:
53+
print(
54+
f" 🔴 caught exception while waiting for workflow result: {e}: {e.__cause__ or ''}"
55+
)
56+
57+
58+
async def main():
59+
for exit_type in [
60+
WorkflowExitType.SUCCESS,
61+
WorkflowExitType.FAILURE,
62+
WorkflowExitType.CANCELLATION,
63+
]:
64+
print(f"\n\nworkflow exit type: {exit_type.name}")
65+
await starter(exit_type)
66+
67+
68+
if __name__ == "__main__":
69+
asyncio.run(main())
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import asyncio
2+
import logging
3+
4+
from temporalio.client import Client
5+
from temporalio.worker import Worker
6+
7+
from message_passing.waiting_for_handlers import TASK_QUEUE
8+
from message_passing.waiting_for_handlers.activities import (
9+
activity_executed_by_update_handler,
10+
)
11+
from message_passing.waiting_for_handlers.workflows import WaitingForHandlersWorkflow
12+
13+
interrupt_event = asyncio.Event()
14+
15+
16+
async def main():
17+
logging.basicConfig(level=logging.INFO)
18+
19+
client = await Client.connect("localhost:7233")
20+
21+
async with Worker(
22+
client,
23+
task_queue=TASK_QUEUE,
24+
workflows=[WaitingForHandlersWorkflow],
25+
activities=[
26+
activity_executed_by_update_handler,
27+
],
28+
):
29+
logging.info("Worker started, ctrl+c to exit")
30+
await interrupt_event.wait()
31+
logging.info("Shutting down")
32+
33+
34+
if __name__ == "__main__":
35+
loop = asyncio.new_event_loop()
36+
try:
37+
loop.run_until_complete(main())
38+
except KeyboardInterrupt:
39+
interrupt_event.set()
40+
loop.run_until_complete(loop.shutdown_asyncgens())
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import asyncio
2+
from datetime import timedelta
3+
4+
from temporalio import exceptions, workflow
5+
6+
from message_passing.waiting_for_handlers import (
7+
WorkflowExitType,
8+
WorkflowInput,
9+
WorkflowResult,
10+
)
11+
from message_passing.waiting_for_handlers.activities import (
12+
activity_executed_by_update_handler,
13+
)
14+
15+
16+
def is_workflow_exit_exception(e: BaseException) -> bool:
17+
"""
18+
True if the exception is of a type that will cause the workflow to exit.
19+
20+
This is as opposed to exceptions that cause a workflow task failure, which
21+
are retried automatically by Temporal.
22+
"""
23+
# 👉 If you have set additional failure_exception_types you should also
24+
# check for these here.
25+
return isinstance(e, (asyncio.CancelledError, exceptions.FailureError))
26+
27+
28+
@workflow.defn
29+
class WaitingForHandlersWorkflow:
30+
@workflow.run
31+
async def run(self, input: WorkflowInput) -> WorkflowResult:
32+
"""
33+
This workflow.run method demonstrates a pattern that can be used to wait for signal and
34+
update handlers to finish in the following circumstances:
35+
36+
- On successful workflow return
37+
- On workflow cancellation
38+
- On workflow failure
39+
40+
Your workflow can also exit via Continue-As-New. In that case you would usually wait for
41+
the handlers to finish immediately before the call to continue_as_new(); that's not
42+
illustrated in this sample.
43+
44+
If you additionally need to perform cleanup or compensation on workflow failure or
45+
cancellation, see the message_passing/waiting_for_handlers_and_compensation sample.
46+
"""
47+
try:
48+
# 👉 Use this `try...except` style, instead of waiting for message
49+
# handlers to finish in a `finally` block. The reason is that some
50+
# exception types cause a workflow task failure as opposed to
51+
# workflow exit, in which case we do *not* want to wait for message
52+
# handlers to finish.
53+
result = await self._my_workflow_application_logic(input)
54+
await workflow.wait_condition(workflow.all_handlers_finished)
55+
return result
56+
# 👉 Catch BaseException since asyncio.CancelledError does not inherit
57+
# from Exception.
58+
except BaseException as e:
59+
if is_workflow_exit_exception(e):
60+
await workflow.wait_condition(workflow.all_handlers_finished)
61+
raise
62+
63+
# Methods below this point can be ignored unless you are interested in
64+
# the implementation details of this sample.
65+
66+
def __init__(self) -> None:
67+
self._update_started = False
68+
69+
@workflow.update
70+
async def my_update(self) -> str:
71+
self._update_started = True
72+
await workflow.execute_activity(
73+
activity_executed_by_update_handler,
74+
start_to_close_timeout=timedelta(seconds=10),
75+
)
76+
return "update-result"
77+
78+
async def _my_workflow_application_logic(
79+
self, input: WorkflowInput
80+
) -> WorkflowResult:
81+
# The main workflow logic is implemented in a separate method in order
82+
# to separate "platform-level" concerns (waiting for handlers to finish
83+
# and error handling) from application logic.
84+
85+
# Wait until handlers have started, so that we are demonstrating that we
86+
# wait for them to finish.
87+
await workflow.wait_condition(lambda: self._update_started)
88+
if input.exit_type == WorkflowExitType.SUCCESS:
89+
return WorkflowResult(data="workflow-result")
90+
elif input.exit_type == WorkflowExitType.FAILURE:
91+
raise exceptions.ApplicationError("deliberately failing workflow")
92+
elif input.exit_type == WorkflowExitType.CANCELLATION:
93+
# Block forever; the starter will send a workflow cancellation request.
94+
await asyncio.Future()
95+
raise AssertionError("unreachable")
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Waiting for message handlers, and performing compensation and cleanup in message handlers
2+
3+
This sample demonstrates how to do the following:
4+
5+
1. Ensure that all update/signal handlers are finished before a successful
6+
workflow return, and on workflow cancellation and failure.
7+
2. Perform compensation/cleanup in an update handler when the workflow is
8+
cancelled or fails.
9+
10+
For a simpler sample showing how to do (1) without (2), see [safe_message_handlers](../safe_message_handlers/README.md).
11+
12+
To run, open two terminals and `cd` to this directory in them.
13+
14+
Run the worker in one terminal:
15+
16+
poetry run python worker.py
17+
18+
And run the workflow-starter code in the other terminal:
19+
20+
poetry run python starter.py
21+
22+
23+
Here's the output you'll see:
24+
25+
```
26+
workflow exit type: SUCCESS
27+
🟢 caller received update result
28+
🟢 caller received workflow result
29+
30+
31+
workflow exit type: FAILURE
32+
🔴 caught exception while waiting for update result: Workflow update failed: The update failed because the workflow run exited
33+
🔴 caught exception while waiting for workflow result: Workflow execution failed: deliberately failing workflow
34+
35+
36+
workflow exit type: CANCELLATION
37+
🔴 caught exception while waiting for update result: Workflow update failed: The update failed because the workflow run exited
38+
🔴 caught exception while waiting for workflow result: Workflow execution failed: Workflow cancelled
39+
```

0 commit comments

Comments
 (0)