Skip to content

Commit 3d36c84

Browse files
committed
Reference impl from client instead of interface
1 parent 41fa63f commit 3d36c84

2 files changed

Lines changed: 116 additions & 11 deletions

File tree

hello_nexus/caller/app.py

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,6 @@
55
from temporalio.client import Client
66
from temporalio.worker import UnsandboxedWorkflowRunner, Worker
77

8-
from hello_nexus.caller.workflows_via_interface import (
9-
Echo2CallerWorkflow,
10-
Echo3CallerWorkflow,
11-
EchoCallerWorkflow,
12-
Hello2CallerWorkflow,
13-
HelloCallerWorkflow,
14-
)
15-
168
interrupt_event = asyncio.Event()
179

1810

@@ -39,11 +31,31 @@ async def execute_workflow(workflow_cls: Type[Any], input: Any) -> None:
3931

4032

4133
if __name__ == "__main__":
42-
if len(sys.argv) != 2:
43-
print("Usage: python -m nexus.caller.app [echo|hello]")
34+
if len(sys.argv) != 3:
35+
print("Usage: python -m nexus.caller.app [echo|hello] [impl|interface]")
4436
sys.exit(1)
4537

46-
[wf_name] = sys.argv[1:]
38+
[wf_name, impl_or_interface] = sys.argv[1:]
39+
40+
if impl_or_interface == "impl":
41+
from hello_nexus.caller.workflows_via_impl import (
42+
Echo2CallerWorkflow,
43+
Echo3CallerWorkflow,
44+
EchoCallerWorkflow,
45+
Hello2CallerWorkflow,
46+
HelloCallerWorkflow,
47+
)
48+
elif impl_or_interface == "interface":
49+
from hello_nexus.caller.workflows_via_interface import (
50+
Echo2CallerWorkflow,
51+
Echo3CallerWorkflow,
52+
EchoCallerWorkflow,
53+
Hello2CallerWorkflow,
54+
HelloCallerWorkflow,
55+
)
56+
else:
57+
raise ValueError(f"Invalid impl_or_interface: {impl_or_interface}")
58+
4759
fn = {
4860
"echo": lambda: execute_workflow(EchoCallerWorkflow, "hello"),
4961
"echo2": lambda: execute_workflow(Echo2CallerWorkflow, "hello"),
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
from datetime import timedelta
2+
3+
import xray
4+
from temporalio import workflow
5+
from temporalio.exceptions import FailureError
6+
from temporalio.workflow import NexusClient
7+
8+
from hello_nexus.handler.nexus_service import (
9+
EchoInput,
10+
EchoOutput,
11+
HelloInput,
12+
HelloOutput,
13+
MyNexusService,
14+
)
15+
16+
17+
class CallerWorkflowBase:
18+
def __init__(self):
19+
self.nexus_client = NexusClient(
20+
MyNexusService, # or string name "my-nexus-service",
21+
"my-nexus-endpoint-name-python",
22+
schedule_to_close_timeout=timedelta(seconds=30),
23+
)
24+
25+
26+
@workflow.defn
27+
class EchoCallerWorkflow(CallerWorkflowBase):
28+
@xray.start_as_current_workflow_method_span()
29+
@workflow.run
30+
async def run(self, message: str) -> EchoOutput:
31+
op_output = await self.nexus_client.execute_operation(
32+
MyNexusService.echo,
33+
EchoInput(message),
34+
)
35+
return op_output
36+
37+
38+
@workflow.defn
39+
class Echo2CallerWorkflow(CallerWorkflowBase):
40+
@xray.start_as_current_workflow_method_span()
41+
@workflow.run
42+
async def run(self, message: str) -> EchoOutput:
43+
op_output = await self.nexus_client.execute_operation(
44+
MyNexusService.echo2,
45+
EchoInput(message),
46+
)
47+
return op_output
48+
49+
50+
@workflow.defn
51+
class Echo3CallerWorkflow(CallerWorkflowBase):
52+
@xray.start_as_current_workflow_method_span()
53+
@workflow.run
54+
async def run(self, message: str) -> EchoOutput:
55+
op_output = await self.nexus_client.execute_operation(
56+
MyNexusService.echo3,
57+
EchoInput(message),
58+
)
59+
return op_output
60+
61+
62+
@workflow.defn
63+
class HelloCallerWorkflow(CallerWorkflowBase):
64+
@xray.start_as_current_workflow_method_span()
65+
@workflow.run
66+
async def run(self, name: str) -> HelloOutput:
67+
handle = await self.nexus_client.start_operation(
68+
MyNexusService.hello,
69+
HelloInput(name),
70+
)
71+
assert handle.cancel()
72+
try:
73+
await handle
74+
except FailureError:
75+
handle = await self.nexus_client.start_operation(
76+
MyNexusService.hello,
77+
HelloInput(name),
78+
)
79+
result = await handle
80+
return result
81+
raise AssertionError("Expected Nexus operation to be cancelled")
82+
83+
84+
@workflow.defn
85+
class Hello2CallerWorkflow(CallerWorkflowBase):
86+
@xray.start_as_current_workflow_method_span()
87+
@workflow.run
88+
async def run(self, name: str) -> HelloOutput:
89+
handle = await self.nexus_client.start_operation(
90+
MyNexusService.hello2,
91+
HelloInput(name),
92+
)
93+
return await handle

0 commit comments

Comments
 (0)