Skip to content

Commit 755fe64

Browse files
committed
added custom metric sample
1 parent 1b6145a commit 755fe64

6 files changed

Lines changed: 169 additions & 0 deletions

File tree

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
.venv
22
.idea
33
__pycache__
4+
.vscode
5+
.DS_Store

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ Some examples require extra dependencies. See each sample's directory for specif
6161
* [context_propagation](context_propagation) - Context propagation through workflows/activities via interceptor.
6262
* [custom_converter](custom_converter) - Use a custom payload converter to handle custom types.
6363
* [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity.
64+
* [custom_metric](custom_metric) - Custom metric to record the workflow type in the activity schedule to start latency.
6465
* [dsl](dsl) - DSL workflow that executes steps defined in a YAML file.
6566
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
6667
* [gevent_async](gevent_async) - Combine gevent and Temporal.

custom_metric/README.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Custom Metric
2+
3+
1. Run the server with `temporal server start-dev`
4+
2. Run the client with `uv run custom_metric/client.py`
5+
3. Run the workflow worker with `uv run custom_metric/workflow_worker.py`
6+
4. Run the activity worker with `uv run custom_metric/activity_worker.py`
7+
5. Go to `http://127.0.0.1:9090/metrics` in your browser
8+
9+
You'll get something like the following:
10+
11+
```txt
12+
custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="100"} 0
13+
custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="500"} 0
14+
custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="1000"} 0
15+
custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="5000"} 0
16+
custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="10000"} 1
17+
custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="100000"} 1
18+
custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="1000000"} 1
19+
custom_activity_schedule_to_start_latency_bucket{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow",le="+Inf"} 1
20+
custom_activity_schedule_to_start_latency_sum{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow"} 6336
21+
custom_activity_schedule_to_start_latency_count{activity_type="print_message",namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",workflow_type="ExecuteActivityWorkflow"} 1
22+
...
23+
temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="100"} 0
24+
temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="500"} 0
25+
temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="1000"} 0
26+
temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="5000"} 0
27+
temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="10000"} 1
28+
temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="100000"} 1
29+
temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="1000000"} 1
30+
temporal_activity_schedule_to_start_latency_bucket{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue",le="+Inf"} 1
31+
temporal_activity_schedule_to_start_latency_sum{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue"} 6336
32+
temporal_activity_schedule_to_start_latency_count{namespace="default",service_name="temporal-core-sdk",task_queue="custom-metric-task-queue"} 1
33+
```

custom_metric/activity_worker.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import asyncio
2+
from concurrent.futures import ThreadPoolExecutor
3+
4+
from temporalio import activity
5+
from temporalio.client import Client
6+
from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig
7+
from temporalio.worker import (
8+
ActivityInboundInterceptor,
9+
ExecuteActivityInput,
10+
Interceptor,
11+
Worker,
12+
)
13+
14+
15+
class SimpleWorkerInterceptor(Interceptor):
16+
17+
def intercept_activity(
18+
self, next: ActivityInboundInterceptor
19+
) -> ActivityInboundInterceptor:
20+
return CustomScheduleToStartInterceptor(next)
21+
22+
23+
class CustomScheduleToStartInterceptor(ActivityInboundInterceptor):
24+
25+
async def execute_activity(self, input: ExecuteActivityInput):
26+
27+
schedule_to_start = (
28+
activity.info().started_time
29+
- activity.info().current_attempt_scheduled_time
30+
)
31+
# Could do the original schedule time instead of current attempt
32+
# schedule_to_start_second_option = activity.info().started_time - activity.info().scheduled_time
33+
34+
meter = activity.metric_meter()
35+
histogram = meter.create_histogram_timedelta(
36+
"custom_activity_schedule_to_start_latency",
37+
description="Time between activity scheduling and start",
38+
unit="duration",
39+
)
40+
histogram.record(
41+
schedule_to_start, {"workflow_type": activity.info().workflow_type}
42+
)
43+
return await self.next.execute_activity(input)
44+
45+
46+
@activity.defn
47+
def print_message():
48+
print("in the activity")
49+
50+
51+
async def main():
52+
runtime = Runtime(
53+
telemetry=TelemetryConfig(metrics=PrometheusConfig(bind_address="0.0.0.0:9090"))
54+
)
55+
client = await Client.connect(
56+
"localhost:7233",
57+
runtime=runtime,
58+
)
59+
worker = Worker(
60+
client,
61+
task_queue="custom-metric-task-queue",
62+
activities=[print_message],
63+
activity_executor=ThreadPoolExecutor(5),
64+
interceptors=[SimpleWorkerInterceptor()],
65+
)
66+
67+
await worker.run()
68+
69+
70+
if __name__ == "__main__":
71+
asyncio.run(main())

custom_metric/client.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import asyncio
2+
import uuid
3+
4+
from temporalio.client import Client
5+
6+
from custom_metric.workflow_worker import ExecuteActivityWorkflow
7+
8+
9+
async def main():
10+
11+
client = await Client.connect(
12+
"localhost:7233",
13+
)
14+
15+
await client.start_workflow(
16+
ExecuteActivityWorkflow.run,
17+
id="execute-activity-workflow-" + str(uuid.uuid4()),
18+
task_queue="custom-metric-task-queue",
19+
)
20+
21+
22+
if __name__ == "__main__":
23+
asyncio.run(main())

custom_metric/workflow_worker.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import asyncio
2+
from datetime import timedelta
3+
4+
from temporalio import workflow
5+
from temporalio.client import Client
6+
from temporalio.worker import Worker
7+
8+
with workflow.unsafe.imports_passed_through():
9+
from custom_metric.activity_worker import print_message
10+
11+
12+
@workflow.defn
13+
class ExecuteActivityWorkflow:
14+
15+
@workflow.run
16+
async def run(self):
17+
await workflow.execute_activity(
18+
print_message,
19+
start_to_close_timeout=timedelta(seconds=5),
20+
)
21+
return None
22+
23+
24+
async def main():
25+
26+
client = await Client.connect(
27+
"localhost:7233",
28+
)
29+
worker = Worker(
30+
client,
31+
task_queue="custom-metric-task-queue",
32+
workflows=[ExecuteActivityWorkflow],
33+
)
34+
35+
await worker.run()
36+
37+
38+
if __name__ == "__main__":
39+
asyncio.run(main())

0 commit comments

Comments
 (0)