Skip to content

Commit 78b2f52

Browse files
committed
Refactor
1 parent 4f63fdb commit 78b2f52

3 files changed

Lines changed: 14 additions & 22 deletions

File tree

nexus_sync_operations/handler/service_handler.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,21 @@
2121

2222
@nexusrpc.handler.service_handler(service=GreetingService)
2323
class GreetingServiceHandler:
24-
# This nexus service is backed by a long-running "entity" workflow. This means that the workflow
25-
# is always running in the background, allowing the service to be stateful and durable. The
26-
# service interacts with it via messages (updates and queries). All of this is implementation
27-
# detail private to the nexus handler: the nexus caller does not know how the operations are
28-
# implemented or what is providing the backing storage.
29-
LONG_RUNNING_WORKFLOW_ID = "nexus-sync-operations-greeting-workflow"
24+
def __init__(self, workflow_id: str):
25+
self.workflow_id = workflow_id
3026

3127
@classmethod
32-
async def start(cls, client: Client, task_queue: str) -> None:
28+
async def create(
29+
cls, workflow_id: str, client: Client, task_queue: str
30+
) -> GreetingServiceHandler:
3331
# Start the long-running "entity" workflow, if it is not already running.
3432
await client.start_workflow(
3533
GreetingWorkflow.run,
36-
id=cls.LONG_RUNNING_WORKFLOW_ID,
34+
id=workflow_id,
3735
task_queue=task_queue,
3836
id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING,
3937
)
38+
return cls(workflow_id)
4039

4140
@property
4241
def greeting_workflow_handle(self) -> WorkflowHandle[GreetingWorkflow, str]:
@@ -48,7 +47,7 @@ def greeting_workflow_handle(self) -> WorkflowHandle[GreetingWorkflow, str]:
4847
# long-running work in a nexus operation handler, use
4948
# temporalio.nexus.workflow_run_operation (see the hello_nexus sample).
5049
return nexus.client().get_workflow_handle_for(
51-
GreetingWorkflow.run, self.LONG_RUNNING_WORKFLOW_ID
50+
GreetingWorkflow.run, self.workflow_id
5251
)
5352

5453
# 👉 This is a handler for a nexus operation whose internal implementation involves executing a

nexus_sync_operations/handler/worker.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,12 @@ async def main(client: Optional[Client] = None):
2222
"localhost:7233",
2323
namespace=NAMESPACE,
2424
)
25-
greeting_service_handler = GreetingServiceHandler()
26-
await greeting_service_handler.start(client, TASK_QUEUE)
25+
26+
# Create the nexus service handler instance, starting the long-running entity workflow that
27+
# backs the Nexus service
28+
greeting_service_handler = await GreetingServiceHandler.create(
29+
"nexus-sync-operations-greeting-workflow", client, TASK_QUEUE
30+
)
2731

2832
async with Worker(
2933
client,

tests/nexus_sync_operations/nexus_sync_operations_test.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,6 @@ async def _run_caller_workflow(client: Client, workflow: Type):
9292
client=client,
9393
)
9494
try:
95-
await (
96-
nexus_sync_operations.handler.service_handler.GreetingServiceHandler.start(
97-
client, nexus_sync_operations.handler.worker.TASK_QUEUE
98-
)
99-
)
10095
handler_worker_task = asyncio.create_task(
10196
nexus_sync_operations.handler.worker.main(client)
10297
)
@@ -115,12 +110,6 @@ async def _run_caller_workflow(client: Client, workflow: Type):
115110
nexus_sync_operations.handler.worker.interrupt_event.set()
116111
await handler_worker_task
117112
nexus_sync_operations.handler.worker.interrupt_event.clear()
118-
try:
119-
await client.get_workflow_handle(
120-
nexus_sync_operations.handler.service_handler.GreetingServiceHandler.LONG_RUNNING_WORKFLOW_ID
121-
).terminate()
122-
except Exception:
123-
pass
124113
finally:
125114
await delete_nexus_endpoint(
126115
id=create_response.endpoint.id,

0 commit comments

Comments
 (0)