Skip to content

Commit 41c602e

Browse files
committed
Respond to upstream
1 parent a9927e9 commit 41c602e

4 files changed

Lines changed: 65 additions & 62 deletions

File tree

hello_nexus/basic/handler/service_handler.py

Lines changed: 19 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,12 @@
1414
import uuid
1515

1616
from nexusrpc.handler import (
17-
OperationHandler,
1817
StartOperationContext,
19-
SyncOperationHandler,
20-
operation_handler,
2118
service_handler,
19+
sync_operation_handler,
2220
)
23-
from temporalio.nexus.handler import (
24-
WorkflowOperationToken,
25-
WorkflowRunOperationHandler,
26-
start_workflow,
27-
)
21+
from temporalio import nexus
22+
from temporalio.nexus import workflow_run_operation_handler
2823

2924
from hello_nexus.basic.handler.db_client import MyDBClient
3025
from hello_nexus.basic.handler.workflows import WorkflowStartedByNexusOperation
@@ -49,21 +44,16 @@ def __init__(self, connected_db_client: MyDBClient):
4944
#
5045
# The token will be used by the caller if it subsequently wants to cancel the Nexus
5146
# operation.
52-
@operation_handler
53-
def my_workflow_run_operation(
54-
self,
55-
) -> OperationHandler[MyInput, MyOutput]:
56-
async def start(
57-
ctx: StartOperationContext, input: MyInput
58-
) -> WorkflowOperationToken[MyOutput]:
59-
# You could use self.connected_db_client here.
60-
return await start_workflow(
61-
WorkflowStartedByNexusOperation.run,
62-
input,
63-
id=str(uuid.uuid4()),
64-
)
65-
66-
return WorkflowRunOperationHandler.from_callable(start)
47+
@workflow_run_operation_handler
48+
async def my_workflow_run_operation(
49+
self, ctx: StartOperationContext, input: MyInput
50+
) -> nexus.WorkflowHandle[MyOutput]:
51+
# You could use self.connected_db_client here.
52+
return await nexus.start_workflow(
53+
WorkflowStartedByNexusOperation.run,
54+
input,
55+
id=str(uuid.uuid4()),
56+
)
6757

6858
# This is a Nexus operation that responds synchronously to all requests. That means
6959
# that unlike the workflow run operation above, in this case the `start` method
@@ -75,12 +65,9 @@ async def start(
7565
#
7666
# Sync operations are free to make arbitrary network calls, or perform CPU-bound
7767
# computations. Total execution duration must not exceed 10s.
78-
@operation_handler
79-
def my_sync_operation(
80-
self,
81-
) -> OperationHandler[MyInput, MyOutput]:
82-
async def start(ctx: StartOperationContext, input: MyInput) -> MyOutput:
83-
# You could use self.connected_db_client here.
84-
return MyOutput(message=f"Hello {input.name} from sync operation!")
85-
86-
return SyncOperationHandler.from_callable(start)
68+
@sync_operation_handler
69+
async def my_sync_operation(
70+
self, ctx: StartOperationContext, input: MyInput
71+
) -> MyOutput:
72+
# You could use self.connected_db_client here.
73+
return MyOutput(message=f"Hello {input.name} from sync operation!")

hello_nexus/basic/handler/service_handler_with_operation_handler_classes.py

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,19 @@ class directly.
2020

2121
import uuid
2222

23+
from nexusrpc import OperationInfo
2324
from nexusrpc.handler import (
25+
CancelOperationContext,
26+
FetchOperationInfoContext,
27+
FetchOperationResultContext,
2428
OperationHandler,
2529
StartOperationContext,
2630
StartOperationResultAsync,
2731
StartOperationResultSync,
28-
SyncOperationHandler,
2932
operation_handler,
3033
service_handler,
3134
)
32-
from temporalio.nexus.handler import WorkflowRunOperationHandler, start_workflow
35+
from temporalio import nexus
3336

3437
from hello_nexus.basic.handler.db_client import MyDBClient
3538
from hello_nexus.basic.handler.service_handler import MyInput, MyNexusService, MyOutput
@@ -63,7 +66,7 @@ def my_workflow_run_operation(
6366
# This is a Nexus operation that is backed by a Temporal workflow. That means that it
6467
# responds asynchronously to all requests: it starts a workflow and responds with a token
6568
# that the handler can associate with the worklow is started.
66-
class MyWorkflowRunOperation(WorkflowRunOperationHandler[MyInput, MyOutput]):
69+
class MyWorkflowRunOperation(OperationHandler[MyInput, MyOutput]):
6770
# You can add an __init__ method taking any required arguments, since you are in
6871
# control of instantiating the OperationHandler inside the operation handler method
6972
# above decorated with @operation_handler.
@@ -78,12 +81,25 @@ class MyWorkflowRunOperation(WorkflowRunOperationHandler[MyInput, MyOutput]):
7881
async def start(
7982
self, ctx: StartOperationContext, input: MyInput
8083
) -> StartOperationResultAsync:
81-
token = await start_workflow(
84+
handle = await nexus.start_workflow(
8285
WorkflowStartedByNexusOperation.run,
8386
input,
8487
id=str(uuid.uuid4()),
8588
)
86-
return StartOperationResultAsync(token.encode())
89+
return StartOperationResultAsync(handle.to_token())
90+
91+
async def fetch_info(
92+
self, ctx: FetchOperationInfoContext, input: MyInput
93+
) -> OperationInfo:
94+
raise NotImplementedError
95+
96+
async def cancel(self, ctx: CancelOperationContext, input: MyInput) -> None:
97+
raise NotImplementedError
98+
99+
async def fetch_result(
100+
self, ctx: FetchOperationResultContext, input: MyInput
101+
) -> MyOutput:
102+
raise NotImplementedError
87103

88104

89105
# This is a Nexus operation that responds synchronously to all requests. That means that
@@ -96,7 +112,7 @@ async def start(
96112
#
97113
# Sync operations are free to make arbitrary network calls, or perform CPU-bound
98114
# computations. Total execution duration must not exceed 10s.
99-
class MySyncOperation(SyncOperationHandler[MyInput, MyOutput]):
115+
class MySyncOperation(OperationHandler[MyInput, MyOutput]):
100116
# You can add an __init__ method taking any required arguments, since you are in
101117
# control of instantiating the OperationHandler inside the operation handler method
102118
# above decorated with @operation_handler.
@@ -110,3 +126,16 @@ async def start(
110126
) -> StartOperationResultSync[MyOutput]:
111127
output = MyOutput(message=f"Hello {input.name} from sync operation!")
112128
return StartOperationResultSync(output)
129+
130+
async def fetch_info(
131+
self, ctx: FetchOperationInfoContext, input: MyInput
132+
) -> OperationInfo:
133+
raise NotImplementedError
134+
135+
async def cancel(self, ctx: CancelOperationContext, input: MyInput) -> None:
136+
raise NotImplementedError
137+
138+
async def fetch_result(
139+
self, ctx: FetchOperationResultContext, input: MyInput
140+
) -> MyOutput:
141+
raise NotImplementedError

hello_nexus/without_service_definition/app.py

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,12 @@
1010
from typing import Optional
1111

1212
from nexusrpc.handler import (
13-
OperationHandler,
1413
StartOperationContext,
15-
operation_handler,
1614
service_handler,
1715
)
18-
from temporalio import workflow
16+
from temporalio import nexus, workflow
1917
from temporalio.client import Client
20-
from temporalio.nexus.handler import (
21-
WorkflowOperationToken,
22-
WorkflowRunOperationHandler,
23-
start_workflow,
24-
)
18+
from temporalio.nexus import workflow_run_operation_handler
2519
from temporalio.worker import UnsandboxedWorkflowRunner, Worker
2620
from temporalio.workflow import NexusClient
2721

@@ -50,20 +44,15 @@ class MyNexusServiceHandler:
5044
# which returns a WorkflowOperationToken. (Temporal server will then take care of
5145
# delivering the workflow result to the caller, using the Nexus RPC callback
5246
# mechanism).
53-
@operation_handler
54-
def my_workflow_run_operation(
55-
self,
56-
) -> OperationHandler[str, str]:
57-
async def start(
58-
ctx: StartOperationContext, name: str
59-
) -> WorkflowOperationToken[str]:
60-
return await start_workflow(
61-
HandlerWorkflow.run,
62-
name,
63-
id=str(uuid.uuid4()),
64-
)
65-
66-
return WorkflowRunOperationHandler.from_callable(start)
47+
@workflow_run_operation_handler
48+
async def my_workflow_run_operation(
49+
self, ctx: StartOperationContext, name: str
50+
) -> nexus.WorkflowHandle[str]:
51+
return await nexus.start_workflow(
52+
HandlerWorkflow.run,
53+
name,
54+
id=str(uuid.uuid4()),
55+
)
6756

6857

6958
#

tests/hello_nexus/helpers.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66
import temporalio.api.nexus.v1
77
import temporalio.api.operatorservice
88
import temporalio.api.operatorservice.v1
9-
import temporalio.nexus
10-
import temporalio.nexus.handler
119
from temporalio.client import Client
1210

1311

0 commit comments

Comments
 (0)