Skip to content

Commit 883f159

Browse files
committed
PR feedback (move resource_pool_workflow, misc factoring)
1 parent 7ac42cc commit 883f159

6 files changed

Lines changed: 17 additions & 18 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
from .resource_pool_client import ResourcePoolClient
2+
from .resource_pool_workflow import ResourcePoolWorkflow

resource_pool/pool_client/resource_pool_client.py

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from temporalio import workflow
66

7-
from resource_pool.resource_pool_workflow import ResourcePoolWorkflow
7+
from resource_pool.pool_client.resource_pool_workflow import ResourcePoolWorkflow
88
from resource_pool.shared import (
99
AcquiredResource,
1010
AcquireRequest,
@@ -18,24 +18,29 @@ class ResourcePoolClient:
1818
def __init__(self, pool_workflow_id: str) -> None:
1919
self.pool_workflow_id = pool_workflow_id
2020
self.acquired_resources: list[AcquiredResource] = []
21-
self.register_signal_handler()
2221

23-
def register_signal_handler(self) -> None:
2422
signal_name = f"assign_resource_{self.pool_workflow_id}"
2523
if workflow.get_signal_handler(signal_name) is None:
26-
workflow.set_signal_handler(signal_name, self.assign_resource)
24+
workflow.set_signal_handler(signal_name, self._handle_acquire_response)
2725
else:
2826
raise RuntimeError(
2927
f"{signal_name} already registered - if you use multiple ResourcePoolClients within the "
3028
f"same workflow, they must use different pool_workflow_ids"
3129
)
3230

33-
async def send_acquire_signal(self) -> None:
31+
def _handle_acquire_response(self, response: AcquireResponse) -> None:
32+
self.acquired_resources.append(
33+
AcquiredResource(
34+
resource=response.resource, release_key=response.release_key
35+
)
36+
)
37+
38+
async def _send_acquire_signal(self) -> None:
3439
await workflow.get_external_workflow_handle_for(
3540
ResourcePoolWorkflow.run, self.pool_workflow_id
3641
).signal("acquire_resource", AcquireRequest(workflow.info().workflow_id))
3742

38-
async def send_release_signal(self, acquired_resource: AcquiredResource) -> None:
43+
async def _send_release_signal(self, acquired_resource: AcquiredResource) -> None:
3944
await workflow.get_external_workflow_handle_for(
4045
ResourcePoolWorkflow.run, self.pool_workflow_id
4146
).signal(
@@ -46,13 +51,6 @@ async def send_release_signal(self, acquired_resource: AcquiredResource) -> None
4651
),
4752
)
4853

49-
def assign_resource(self, response: AcquireResponse) -> None:
50-
self.acquired_resources.append(
51-
AcquiredResource(
52-
resource=response.resource, release_key=response.release_key
53-
)
54-
)
55-
5654
@asynccontextmanager
5755
async def acquire_resource(
5856
self,
@@ -63,7 +61,7 @@ async def acquire_resource(
6361
_warn_when_workflow_has_timeouts()
6462

6563
if reattach is None:
66-
await self.send_acquire_signal()
64+
await self._send_acquire_signal()
6765
await workflow.wait_condition(
6866
lambda: len(self.acquired_resources) > 0, timeout=max_wait_time
6967
)
@@ -84,7 +82,7 @@ async def acquire_resource(
8482
yield resource
8583
finally:
8684
if not resource.detached:
87-
await self.send_release_signal(resource)
85+
await self._send_release_signal(resource)
8886

8987

9088
def _warn_when_workflow_has_timeouts() -> None:
File renamed without changes.

resource_pool/starter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from temporalio.client import Client, WorkflowFailureError, WorkflowHandle
55
from temporalio.common import WorkflowIDConflictPolicy
66

7-
from resource_pool.resource_pool_workflow import (
7+
from resource_pool.pool_client.resource_pool_workflow import (
88
ResourcePoolWorkflow,
99
ResourcePoolWorkflowInput,
1010
)

resource_pool/worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from temporalio.client import Client
55
from temporalio.worker import Worker
66

7-
from resource_pool.resource_pool_workflow import ResourcePoolWorkflow
7+
from resource_pool.pool_client.resource_pool_workflow import ResourcePoolWorkflow
88
from resource_pool.resource_user_workflow import ResourceUserWorkflow, use_resource
99

1010

tests/resource_pool/workflow_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from temporalio.common import WorkflowIDConflictPolicy
88
from temporalio.worker import Worker
99

10-
from resource_pool.resource_pool_workflow import (
10+
from resource_pool.pool_client.resource_pool_workflow import (
1111
ResourcePoolWorkflow,
1212
ResourcePoolWorkflowInput,
1313
)

0 commit comments

Comments
 (0)