Skip to content

Commit bcf8582

Browse files
committed
Add SANO sample
1 parent 26c1244 commit bcf8582

10 files changed

Lines changed: 336 additions & 12 deletions

File tree

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
This sample demonstrates how to execute Nexus operations directly from client code,
2+
without wrapping them in a workflow. It shows both synchronous and asynchronous
3+
(workflow-backed) operations, plus listing and counting operations.
4+
5+
## Note: Standalone Nexus operations require a server version that supports this feature.
6+
7+
### Sample directory structure
8+
9+
- [service.py](./service.py) - Nexus service definition with echo (sync) and hello (async) operations
10+
- [handler.py](./handler.py) - Nexus operation handlers and the backing workflow for the async operation
11+
- [worker.py](./worker.py) - Temporal worker that hosts the Nexus service
12+
- [starter.py](./starter.py) - Client that executes standalone Nexus operations
13+
14+
15+
### Instructions
16+
17+
Start a Temporal server. (See the main samples repo [README](../README.md)).
18+
19+
Create the Nexus endpoint:
20+
21+
```
22+
temporal operator nexus endpoint create \
23+
--name nexus-standalone-operations-endpoint \
24+
--target-namespace default \
25+
--target-task-queue nexus-standalone-operations
26+
```
27+
28+
In one terminal, start the worker:
29+
```
30+
uv run nexus_standalone_operations/worker.py
31+
```
32+
33+
In another terminal, run the starter:
34+
```
35+
uv run nexus_standalone_operations/starter.py
36+
```
37+
38+
### Expected output
39+
40+
```
41+
Echo result: hello
42+
Hello result: Hello, World!
43+
44+
Listing Nexus operations:
45+
OperationId: echo-..., Operation: echo, Status: COMPLETED
46+
OperationId: hello-..., Operation: hello, Status: COMPLETED
47+
48+
Total Nexus operations: 2
49+
```
50+
51+
If you run the starter code multiple times, you should see additional operations in the listing results, as more operations are run.
52+
The same goes for the total number of operations.

nexus_standalone_operations/__init__.py

Whitespace-only changes.
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
"""Nexus service handler and backing workflow for standalone operations sample."""
2+
3+
from __future__ import annotations
4+
5+
import uuid
6+
7+
import nexusrpc.handler
8+
from temporalio import nexus, workflow
9+
10+
from nexus_standalone_operations.service import (
11+
EchoInput,
12+
EchoOutput,
13+
HelloInput,
14+
HelloOutput,
15+
MyNexusService,
16+
)
17+
18+
19+
@workflow.defn
20+
class HelloWorkflow:
21+
@workflow.run
22+
async def run(self, input: HelloInput) -> HelloOutput:
23+
return HelloOutput(greeting=f"Hello, {input.name}!")
24+
25+
26+
@nexusrpc.handler.service_handler(service=MyNexusService)
27+
class MyNexusServiceHandler:
28+
@nexusrpc.handler.sync_operation
29+
async def echo(
30+
self, _ctx: nexusrpc.handler.StartOperationContext, input: EchoInput
31+
) -> EchoOutput:
32+
return EchoOutput(message=input.message)
33+
34+
@nexus.workflow_run_operation
35+
async def hello(
36+
self, ctx: nexus.WorkflowRunOperationContext, input: HelloInput
37+
) -> nexus.WorkflowHandle[HelloOutput]:
38+
return await ctx.start_workflow(
39+
HelloWorkflow.run,
40+
input,
41+
id=str(uuid.uuid4()),
42+
)
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
"""Nexus service definition for standalone operations sample.
2+
3+
Defines a Nexus service with two operations:
4+
- echo: a synchronous operation that echoes the input message
5+
- hello: an asynchronous (workflow-backed) operation that returns a greeting
6+
7+
This service definition is used by both the handler (to validate operation
8+
signatures) and the client (to create type-safe nexus clients).
9+
"""
10+
11+
from dataclasses import dataclass
12+
13+
import nexusrpc
14+
15+
16+
@dataclass
17+
class EchoInput:
18+
message: str
19+
20+
21+
@dataclass
22+
class EchoOutput:
23+
message: str
24+
25+
26+
@dataclass
27+
class HelloInput:
28+
name: str
29+
30+
31+
@dataclass
32+
class HelloOutput:
33+
greeting: str
34+
35+
36+
@nexusrpc.service
37+
class MyNexusService:
38+
echo: nexusrpc.Operation[EchoInput, EchoOutput]
39+
hello: nexusrpc.Operation[HelloInput, HelloOutput]
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
"""Starter that demonstrates standalone Nexus operation execution.
2+
3+
Unlike other Nexus samples that call operations from within a workflow, this
4+
sample executes Nexus operations directly from client code using the standalone
5+
Nexus operation APIs.
6+
"""
7+
8+
import asyncio
9+
import uuid
10+
from datetime import timedelta
11+
12+
from temporalio.client import Client
13+
from temporalio.envconfig import ClientConfig
14+
15+
from nexus_standalone_operations.service import (
16+
EchoInput,
17+
HelloInput,
18+
MyNexusService,
19+
)
20+
21+
ENDPOINT_NAME = "nexus-standalone-operations-endpoint"
22+
23+
24+
async def main() -> None:
25+
config = ClientConfig.load_client_connect_config()
26+
_ = config.setdefault("target_host", "localhost:7233")
27+
client = await Client.connect(**config)
28+
29+
# Create a typed NexusClient bound to the endpoint and service.
30+
# The endpoint must be pre-created on the server (see README).
31+
nexus_client = client.create_nexus_client(
32+
service=MyNexusService, endpoint=ENDPOINT_NAME
33+
)
34+
35+
# Start sync echo operation and await the result immediately.
36+
echo_result = await nexus_client.execute_operation(
37+
MyNexusService.echo,
38+
EchoInput(message="hello"),
39+
id=f"echo-{uuid.uuid4()}",
40+
schedule_to_close_timeout=timedelta(seconds=10),
41+
)
42+
print(f"Echo result: {echo_result.message}")
43+
44+
# Start async (workflow-backed) hello operation and get a NexusOperationHandle.
45+
handle = await nexus_client.start_operation(
46+
MyNexusService.hello,
47+
HelloInput(name="World"),
48+
id=f"hello-{uuid.uuid4()}",
49+
schedule_to_close_timeout=timedelta(seconds=10),
50+
)
51+
52+
print(f"\nStarted `MyNexusService.Hello`. OperationID: {handle.operation_id}")
53+
54+
# Use the NexusOperationHandle to await the result of the operation.
55+
hello_result = await handle.result()
56+
print(f"`MyNexusService.Hello` result: {hello_result.greeting}")
57+
58+
# List nexus operations.
59+
query = f'Endpoint = "{ENDPOINT_NAME}"'
60+
print("\nListing Nexus operations:")
61+
async for op in client.list_nexus_operations(query):
62+
print(
63+
f" OperationId: {op.operation_id},",
64+
f" Operation: {op.operation},",
65+
f" Status: {op.status.name}",
66+
)
67+
68+
# Count nexus operations.
69+
count = await client.count_nexus_operations(query)
70+
print(f"\nTotal Nexus operations: {count.count}")
71+
72+
73+
if __name__ == "__main__":
74+
asyncio.run(main())
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
"""Worker that hosts the Nexus service for standalone operations sample."""
2+
3+
import asyncio
4+
import logging
5+
6+
from temporalio.client import Client
7+
from temporalio.envconfig import ClientConfig
8+
from temporalio.worker import Worker
9+
10+
from nexus_standalone_operations.handler import HelloWorkflow, MyNexusServiceHandler
11+
12+
interrupt_event = asyncio.Event()
13+
14+
TASK_QUEUE = "nexus-standalone-operations"
15+
16+
17+
async def main() -> None:
18+
logging.basicConfig(level=logging.INFO)
19+
20+
config = ClientConfig.load_client_connect_config()
21+
_ = config.setdefault("target_host", "localhost:7233")
22+
client = await Client.connect(**config)
23+
24+
async with Worker(
25+
client,
26+
task_queue=TASK_QUEUE,
27+
workflows=[HelloWorkflow],
28+
nexus_service_handlers=[MyNexusServiceHandler()],
29+
):
30+
logging.info("Worker started, ctrl+c to exit")
31+
_ = await interrupt_event.wait()
32+
logging.info("Shutting down")
33+
34+
35+
if __name__ == "__main__":
36+
loop = asyncio.new_event_loop()
37+
try:
38+
loop.run_until_complete(main())
39+
except KeyboardInterrupt:
40+
interrupt_event.set()
41+
loop.run_until_complete(loop.shutdown_asyncgens())

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,3 +140,6 @@ ignore_errors = true
140140
[[tool.mypy.overrides]]
141141
module = "opentelemetry.*"
142142
ignore_errors = true
143+
144+
[tool.uv.sources]
145+
temporalio = { git = "https://github.com/temporalio/sdk-python", branch = "amazzeo/sano" }

tests/nexus_standalone_operations/__init__.py

Whitespace-only changes.
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import asyncio
2+
import os
3+
import uuid
4+
from datetime import timedelta
5+
6+
import pytest
7+
from temporalio.client import Client, NexusOperationFailureError
8+
from temporalio.service import RPCError
9+
from temporalio.worker import Worker
10+
11+
from nexus_standalone_operations.handler import HelloWorkflow, MyNexusServiceHandler
12+
from nexus_standalone_operations.service import (
13+
EchoInput,
14+
EchoOutput,
15+
HelloInput,
16+
HelloOutput,
17+
MyNexusService,
18+
)
19+
from nexus_standalone_operations.worker import TASK_QUEUE
20+
from tests.helpers.nexus import create_nexus_endpoint, delete_nexus_endpoint
21+
22+
23+
async def test_nexus_standalone_operations(client: Client):
24+
if not os.getenv("ENABLE_STANDALONE_NEXUS_TESTS"):
25+
pytest.skip(
26+
"Standalone Nexus operations not yet supported by default dev server. Set ENABLE_STANDALONE_NEXUS_TESTS=1 to enable."
27+
)
28+
29+
endpoint_name = f"test-nexus-standalone-{uuid.uuid4()}"
30+
31+
create_response = await create_nexus_endpoint(
32+
name=endpoint_name,
33+
task_queue=TASK_QUEUE,
34+
client=client,
35+
)
36+
try:
37+
async with Worker(
38+
client,
39+
task_queue=TASK_QUEUE,
40+
workflows=[HelloWorkflow],
41+
nexus_service_handlers=[MyNexusServiceHandler()],
42+
):
43+
nexus_client = client.create_nexus_client(
44+
service=MyNexusService, endpoint=endpoint_name
45+
)
46+
47+
# Test sync echo operation (with retry for endpoint propagation)
48+
echo_result = None
49+
for _ in range(30):
50+
try:
51+
echo_result = await nexus_client.execute_operation(
52+
MyNexusService.echo,
53+
EchoInput(message="test-echo"),
54+
id=str(uuid.uuid4()),
55+
schedule_to_close_timeout=timedelta(seconds=10),
56+
)
57+
break
58+
except (RPCError, NexusOperationFailureError):
59+
await asyncio.sleep(0.5)
60+
assert isinstance(echo_result, EchoOutput)
61+
assert echo_result.message == "test-echo"
62+
63+
# Test async hello operation
64+
hello_result = await nexus_client.execute_operation(
65+
MyNexusService.hello,
66+
HelloInput(name="Test"),
67+
id=str(uuid.uuid4()),
68+
schedule_to_close_timeout=timedelta(seconds=10),
69+
)
70+
assert isinstance(hello_result, HelloOutput)
71+
assert hello_result.greeting == "Hello, Test!"
72+
73+
# Test count operations
74+
count = await client.count_nexus_operations(f'Endpoint = "{endpoint_name}"')
75+
assert count.count >= 0
76+
finally:
77+
_ = await delete_nexus_endpoint(
78+
id=create_response.endpoint.id,
79+
version=create_response.endpoint.version,
80+
client=client,
81+
)

uv.lock

Lines changed: 4 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)