Skip to content

Commit 72a24b0

Browse files
committed
Add 'async with' syntactic sugar
1 parent 90aaf99 commit 72a24b0

5 files changed

Lines changed: 126 additions & 77 deletions

File tree

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
from contextlib import asynccontextmanager
2+
from datetime import timedelta
3+
from typing import Optional, AsyncGenerator
4+
5+
from temporalio.client import Client
6+
from temporalio import workflow, activity
7+
from temporalio.common import WorkflowIDConflictPolicy
8+
9+
from resource_locking.lock_manager_workflow import LockManagerWorkflowInput, LockManagerWorkflow
10+
from resource_locking.shared import AcquireResponse, LOCK_MANAGER_WORKFLOW_ID, AcquireRequest, AcquiredResource
11+
12+
# Use this class in workflow code that that needs to run on locked resources.
13+
class ResourceAllocator:
14+
def __init__(self, client: Client):
15+
self.client = client
16+
17+
@activity.defn
18+
async def send_acquire_signal(self):
19+
info = activity.info()
20+
21+
# This will start and signal the workflow if it isn't running, otherwise it will signal the current run.
22+
await self.client.start_workflow(
23+
workflow=LockManagerWorkflow.run,
24+
arg=LockManagerWorkflowInput(
25+
resources={},
26+
waiters=[],
27+
),
28+
id=LOCK_MANAGER_WORKFLOW_ID,
29+
task_queue="default",
30+
id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING,
31+
start_signal="acquire_resource",
32+
start_signal_args=[AcquireRequest(info.workflow_id)]
33+
)
34+
35+
@classmethod
36+
@asynccontextmanager
37+
async def acquire_resource(cls, *, already_acquired_resource: Optional[AcquiredResource] = None, max_wait_time: timedelta = timedelta(minutes=5)):
38+
warn_when_workflow_has_timeouts()
39+
40+
resource = already_acquired_resource
41+
if resource is None:
42+
async def assign_resource(input: AcquireResponse):
43+
workflow.set_signal_handler("assign_resource", None)
44+
nonlocal resource
45+
resource = AcquiredResource(
46+
resource=input.resource,
47+
release_signal_name=input.release_signal_name,
48+
)
49+
50+
workflow.set_signal_handler("assign_resource", assign_resource)
51+
52+
await workflow.execute_activity(
53+
ResourceAllocator.send_acquire_signal,
54+
start_to_close_timeout=timedelta(seconds=10),
55+
)
56+
57+
await workflow.wait_condition(lambda: resource is not None, timeout=max_wait_time)
58+
59+
# During the yield, the calling workflow owns the resource. Note that this is a lock, not a lease! Our
60+
# finally block will release the resource if an activity fails. This is why we asserted the lack of
61+
# workflow-level timeouts above - the finally block wouldn't run if there was a timeout.
62+
try:
63+
resource.autorelease = True
64+
yield resource
65+
finally:
66+
if resource.autorelease:
67+
handle = workflow.get_external_workflow_handle(LOCK_MANAGER_WORKFLOW_ID)
68+
await handle.signal(resource.release_signal_name)
69+
70+
def warn_when_workflow_has_timeouts():
71+
if has_timeout(workflow.info().run_timeout):
72+
workflow.logger.warning(
73+
f"ResourceLockingWorkflow cannot have a run_timeout (found {workflow.info().run_timeout}) - this will leak locks"
74+
)
75+
if has_timeout(workflow.info().execution_timeout):
76+
workflow.logger.warning(
77+
f"ResourceLockingWorkflow cannot have an execution_timeout (found {workflow.info().execution_timeout}) - this will leak locks"
78+
)
79+
80+
def has_timeout(timeout: Optional[timedelta]) -> bool:
81+
# After continue_as_new, timeouts are 0, even if they were None before continue_as_new (and were not set in the
82+
# continue_as_new call).
83+
return timeout is not None and timeout > timedelta(0)
Lines changed: 13 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
import asyncio
2-
from dataclasses import dataclass
2+
from dataclasses import dataclass, field
33
from datetime import timedelta
4-
from typing import Optional
4+
from typing import Optional, Callable
55

66
from temporalio import activity, workflow
77

8+
from resource_locking.resource_allocator import ResourceAllocator
89
from resource_locking.shared import (
910
LOCK_MANAGER_WORKFLOW_ID,
1011
AcquireRequest,
11-
AcquireResponse,
12+
AcquireResponse, AcquiredResource,
1213
)
1314

1415
@dataclass
@@ -38,7 +39,7 @@ class ResourceLockingWorkflowInput:
3839
should_continue_as_new: bool
3940

4041
# Used to transfer resource ownership between iterations during continue_as_new
41-
already_assigned_resource: Optional[AcquireResponse]
42+
already_acquired_resource: Optional[AcquiredResource] = field(default=None)
4243

4344

4445
class FailWorkflowException(Exception):
@@ -51,53 +52,13 @@ class FailWorkflowException(Exception):
5152

5253
@workflow.defn(failure_exception_types=[FailWorkflowException])
5354
class ResourceLockingWorkflow:
54-
def __init__(self):
55-
self.assigned_resource: Optional[AcquireResponse] = None
56-
57-
@workflow.signal(name="assign_resource")
58-
def handle_assign_resource(self, input: AcquireResponse):
59-
self.assigned_resource = input
60-
6155
@workflow.run
6256
async def run(self, input: ResourceLockingWorkflowInput):
63-
if has_timeout(workflow.info().run_timeout):
64-
# See "locking" comment below for rationale
65-
raise FailWorkflowException(
66-
f"ResourceLockingWorkflow cannot have a run_timeout (found {workflow.info().run_timeout})"
67-
)
68-
if has_timeout(workflow.info().execution_timeout):
69-
raise FailWorkflowException(
70-
f"ResourceLockingWorkflow cannot have an execution_timeout (found {workflow.info().execution_timeout})"
71-
)
72-
73-
sem_handle = workflow.get_external_workflow_handle(LOCK_MANAGER_WORKFLOW_ID)
74-
75-
info = workflow.info()
76-
if input.already_assigned_resource is None:
77-
await sem_handle.signal("acquire_resource", AcquireRequest(info.workflow_id))
78-
elif info.continued_run_id:
79-
self.assigned_resource = input.already_assigned_resource
80-
else:
81-
raise FailWorkflowException(
82-
f"Only set 'already_assigned_resource' when using continue_as_new"
83-
)
84-
85-
await workflow.wait_condition(
86-
lambda: self.assigned_resource is not None, timeout=MAX_RESOURCE_WAIT_TIME
87-
)
88-
if self.assigned_resource is None:
89-
raise FailWorkflowException(
90-
f"No resource was assigned after {MAX_RESOURCE_WAIT_TIME}"
91-
)
92-
93-
# From this point forward, we own the resource. Note that this is a lock, not a lease! Our finally block will
94-
# release the resource if an activity fails. This is why we asserted the lack of workflow-level timeouts
95-
# above - the finally block wouldn't run if there was a timeout.
96-
try:
57+
async with ResourceAllocator.acquire_resource(already_acquired_resource=input.already_acquired_resource) as resource:
9758
for iteration in ["first", "second", "third"]:
9859
await workflow.execute_activity(
9960
use_resource,
100-
UseResourceActivityInput(self.assigned_resource.resource, iteration),
61+
UseResourceActivityInput(resource.resource, iteration),
10162
start_to_close_timeout=timedelta(seconds=10),
10263
)
10364

@@ -111,17 +72,11 @@ async def run(self, input: ResourceLockingWorkflowInput):
11172
next_input = ResourceLockingWorkflowInput(
11273
iteration_to_fail_after=input.iteration_to_fail_after,
11374
should_continue_as_new=False,
114-
already_assigned_resource=self.assigned_resource,
75+
already_acquired_resource=resource,
11576
)
77+
78+
# By default, ResourceAllocator will release the resource when we return. We want to hold the resource
79+
# across continue-as-new for the sake of demonstration.
80+
resource.autorelease = False
81+
11682
workflow.continue_as_new(next_input)
117-
finally:
118-
# Only release the resource if we didn't continue-as-new. workflow.continue_as_new raises to halt workflow
119-
# execution, but this code in this finally block will still run. It wouldn't successfully send the signal...
120-
# the if statement just avoids some warnings in the log.
121-
if not input.should_continue_as_new:
122-
await sem_handle.signal(self.assigned_resource.release_signal_name)
123-
124-
def has_timeout(timeout: Optional[timedelta]) -> bool:
125-
# After continue_as_new, timeouts are 0, even if they were None before continue_as_new (and were not set in the
126-
# continue_as_new call).
127-
return timeout is not None and timeout > timedelta(0)

resource_locking/shared.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
from dataclasses import dataclass
1+
from dataclasses import dataclass, field
2+
from typing import Optional
23

34
LOCK_MANAGER_WORKFLOW_ID = "lock_manager"
45

@@ -10,3 +11,7 @@ class AcquireRequest:
1011
class AcquireResponse:
1112
release_signal_name: str
1213
resource: str
14+
15+
@dataclass
16+
class AcquiredResource(AcquireResponse):
17+
autorelease: bool = field(default=True)

resource_locking/starter.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,39 +3,25 @@
33

44
from temporalio.client import Client, WorkflowFailureError, WorkflowHandle
55

6-
from resource_locking.shared import LOCK_MANAGER_WORKFLOW_ID
7-
from resource_locking.lock_manager_workflow import (
8-
LockManagerWorkflow,
9-
LockManagerWorkflowInput,
10-
)
6+
from resource_locking.lock_manager_workflow import LockManagerWorkflow, LockManagerWorkflowInput
117
from resource_locking.resource_locking_workflow import (
128
ResourceLockingWorkflow,
139
ResourceLockingWorkflowInput,
1410
)
11+
from resource_locking.shared import LOCK_MANAGER_WORKFLOW_ID
12+
from temporalio.common import WorkflowIDConflictPolicy
1513

1614

1715
async def main():
1816
# Connect client
1917
client = await Client.connect("localhost:7233")
2018

21-
# Start the LockManagerWorkflow
22-
lock_manager_handle = await client.start_workflow(
23-
workflow=LockManagerWorkflow.run,
24-
arg=LockManagerWorkflowInput(
25-
resources={ "resource_a": None, "resource_b": None },
26-
waiters=[],
27-
),
28-
id=LOCK_MANAGER_WORKFLOW_ID,
29-
task_queue="default",
30-
)
31-
3219
# Start the ResourceLockingWorkflows
3320
resource_locking_handles: list[WorkflowHandle[Any, Any]] = []
3421
for i in range(0, 4):
3522
input = ResourceLockingWorkflowInput(
3623
iteration_to_fail_after=None,
3724
should_continue_as_new=False,
38-
already_assigned_resource=None,
3925
)
4026
if i == 0:
4127
input.should_continue_as_new = True
@@ -50,6 +36,20 @@ async def main():
5036
)
5137
resource_locking_handles.append(resource_locking_handle)
5238

39+
# Add some resources
40+
lock_manager_handle = await client.start_workflow(
41+
workflow=LockManagerWorkflow.run,
42+
arg=LockManagerWorkflowInput(
43+
resources={},
44+
waiters=[],
45+
),
46+
id=LOCK_MANAGER_WORKFLOW_ID,
47+
task_queue="default",
48+
id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING,
49+
start_signal="add_resources",
50+
start_signal_args=[["resource_a", "resource_b"]],
51+
)
52+
5353
for resource_locking_handle in resource_locking_handles:
5454
try:
5555
await resource_locking_handle.result()

resource_locking/worker.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from temporalio.client import Client
55
from temporalio.worker import Worker
66

7+
from resource_locking.resource_allocator import ResourceAllocator
78
from resource_locking.lock_manager_workflow import LockManagerWorkflow
89
from resource_locking.resource_locking_workflow import (
910
ResourceLockingWorkflow,
@@ -17,12 +18,17 @@ async def main():
1718
# Start client
1819
client = await Client.connect("localhost:7233")
1920

21+
resource_allocator = ResourceAllocator(client)
22+
2023
# Run a worker for the workflow
2124
worker = Worker(
2225
client,
2326
task_queue="default",
2427
workflows=[LockManagerWorkflow, ResourceLockingWorkflow],
25-
activities=[use_resource],
28+
activities=[
29+
use_resource,
30+
resource_allocator.send_acquire_signal,
31+
],
2632
)
2733

2834
await worker.run()

0 commit comments

Comments
 (0)