Skip to content

Commit 86c40ea

Browse files
committed
Cleanup
1 parent 5afcfc0 commit 86c40ea

6 files changed

Lines changed: 53 additions & 31 deletions

File tree

nexus_sync_operations/README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
This sample shows how to create a Nexus service that is backed by a long-running workflow and
2-
exposes operations that use signals, queries, and updates against that workflow.
2+
exposes operations that execute updates and queries against that workflow. The long-running
3+
workflow, and the updates/queries are private implementation detail of the nexus service: the caller
4+
does not know how the operations are implemented.
35

46
### Sample directory structure
57

nexus_sync_operations/caller/workflows.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
"""
2+
This is a workflow that calls nexus operations. The caller does not have information about how these
3+
operations are implemented by the nexus service.
4+
"""
5+
16
from temporalio import workflow
27

38
from message_passing.introduction import Language
4-
from message_passing.introduction.workflows import (
5-
GetLanguagesInput,
6-
SetLanguageInput,
7-
)
9+
from message_passing.introduction.workflows import GetLanguagesInput, SetLanguageInput
810

911
with workflow.unsafe.imports_passed_through():
1012
from nexus_sync_operations.service import GreetingService

nexus_sync_operations/endpoint_description.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,3 @@
22
- operation: `get_languages`
33
- operation: `get_language`
44
- operation: `set_language`
5-
- operation: `set_language_using_activity`
6-
- operation: `approve`

nexus_sync_operations/handler/service_handler.py

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
"""
2-
This file demonstrates how to implement a Nexus service that is backed by a long-running
3-
workflow and exposes operations that perform signals, updates, and queries against that
4-
workflow.
2+
This file demonstrates how to implement a Nexus service that is backed by a long-running workflow
3+
and exposes operations that perform updates, and queries against that workflow.
54
"""
65

76
from __future__ import annotations
87

98
import nexusrpc
9+
from temporalio import nexus
1010
from temporalio.client import Client, WorkflowHandle
1111
from temporalio.common import WorkflowIDConflictPolicy
1212

@@ -21,29 +21,38 @@
2121

2222
@nexusrpc.handler.service_handler(service=GreetingService)
2323
class GreetingServiceHandler:
24-
def __init__(
25-
self,
26-
greeting_workflow_handle: WorkflowHandle[GreetingWorkflow, str],
27-
):
28-
self.greeting_workflow_handle = greeting_workflow_handle
24+
# This nexus service is backed by a long-running "entity" workflow. This means that the workflow
25+
# is always running in the background, allowing the service to be stateful and durable. The
26+
# service interacts with it via messages (updates and queries). All of this is implementation
27+
# detail private to the nexus handler: the nexus caller does not know how the operations are
28+
# implemented or what is providing the backing storage.
29+
LONG_RUNNING_WORKFLOW_ID = "nexus-sync-operations-greeting-workflow"
2930

3031
@classmethod
31-
async def create(cls, client: Client, task_queue: str) -> GreetingServiceHandler:
32-
# Obtain a workflow handle to the long-running workflow that backs this service, starting
33-
# the workflow if it is not already running.
34-
return cls(await cls._get_workflow_handle(client, task_queue))
35-
36-
@staticmethod
37-
async def _get_workflow_handle(
38-
client: Client, task_queue: str
39-
) -> WorkflowHandle[GreetingWorkflow, str]:
40-
return await client.start_workflow(
32+
async def start(cls, client: Client, task_queue: str) -> None:
33+
# Start the long-running "entity" workflow, if it is not already running.
34+
await client.start_workflow(
4135
GreetingWorkflow.run,
42-
id="nexus-sync-operations-greeting-workflow",
36+
id=cls.LONG_RUNNING_WORKFLOW_ID,
4337
task_queue=task_queue,
4438
id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING,
4539
)
4640

41+
@property
42+
def greeting_workflow_handle(self) -> WorkflowHandle[GreetingWorkflow, str]:
43+
# In nexus operation handler code, nexus.client() is always available, returning a client
44+
# connected to the handler namespace (it's the same client instance that your nexus worker
45+
# is using to poll the server for nexus tasks). This client can be used to interact with the
46+
# handler namespace, for example to send signals, queries, or updates. Remember however,
47+
# that a sync_operation handler must return quickly (no more than a few seconds). To do
48+
# long-running work in a nexus operation handler, use
49+
# temporalio.nexus.workflow_run_operation (see the hello_nexus sample).
50+
return nexus.client().get_workflow_handle_for(
51+
GreetingWorkflow.run, self.LONG_RUNNING_WORKFLOW_ID
52+
)
53+
54+
# 👉 This is a handler for a nexus operation whose internal implementation involves executing a
55+
# query against a long-running workflow that is private to the nexus service.
4756
@nexusrpc.handler.sync_operation
4857
async def get_languages(
4958
self, ctx: nexusrpc.handler.StartOperationContext, input: GetLanguagesInput
@@ -52,12 +61,16 @@ async def get_languages(
5261
GreetingWorkflow.get_languages, input
5362
)
5463

64+
# 👉 This is a handler for a nexus operation whose internal implementation involves executing a
65+
# query against a long-running workflow that is private to the nexus service.
5566
@nexusrpc.handler.sync_operation
5667
async def get_language(
5768
self, ctx: nexusrpc.handler.StartOperationContext, input: None
5869
) -> Language:
5970
return await self.greeting_workflow_handle.query(GreetingWorkflow.get_language)
6071

72+
# 👉 This is a handler for a nexus operation whose internal implementation involves executing an
73+
# update against a long-running workflow that is private to the nexus service.
6174
@nexusrpc.handler.sync_operation
6275
async def set_language(
6376
self,

nexus_sync_operations/handler/worker.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ async def main(client: Optional[Client] = None):
2222
"localhost:7233",
2323
namespace=NAMESPACE,
2424
)
25-
greeting_service_handler = await GreetingServiceHandler.create(client, TASK_QUEUE)
25+
greeting_service_handler = GreetingServiceHandler()
26+
await greeting_service_handler.start(client, TASK_QUEUE)
2627

2728
async with Worker(
2829
client,

nexus_sync_operations/service.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
1+
"""
2+
This module defines a Nexus service that exposes three operations.
3+
4+
It is used by the nexus service handler to validate that the operation handlers implement the
5+
correct input and output types, and by the caller workflow to create a type-safe client. It does not
6+
contain the implementation of the operations; see nexus_sync_operations.handler.service_handler for
7+
that.
8+
"""
9+
110
import nexusrpc
211

312
from message_passing.introduction import Language
4-
from message_passing.introduction.workflows import (
5-
GetLanguagesInput,
6-
SetLanguageInput,
7-
)
13+
from message_passing.introduction.workflows import GetLanguagesInput, SetLanguageInput
814

915

1016
@nexusrpc.service

0 commit comments

Comments
 (0)