Skip to content

Commit 94fafc4

Browse files
Add list_pending_activities sample: find workflows with pending activities
1 parent 5e9d1c7 commit 94fafc4

3 files changed

Lines changed: 359 additions & 0 deletions

File tree

list_pending_activities/README.md

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
# List Pending Activities
2+
3+
A command-line tool that queries a Temporal Cloud namespace to find all workflows with pending activities. Supports optional filters and saves results to a local JSON file.
4+
5+
## How it works
6+
7+
1. Builds a [visibility query](https://docs.temporal.io/visibility#list-filter) from the optional filters you provide
8+
2. Calls `client.list_workflows()` to retrieve matching workflows
9+
3. Calls `handle.describe()` on each workflow to check for pending activities
10+
4. Prints results to the console and saves them to `output/pending_activities_<timestamp>.json`
11+
12+
Both parent and child workflows are found — child workflows are independent executions in the visibility store and are queried the same way.
13+
14+
## Authentication
15+
16+
The script supports two auth modes. If `TEMPORAL_API_KEY` is set, it uses API key auth via the regional endpoint. Otherwise it falls back to mTLS via the namespace endpoint.
17+
18+
**API key:**
19+
```bash
20+
export TEMPORAL_API_KEY="your-api-key"
21+
python find_pending.py
22+
```
23+
24+
**mTLS (default):**
25+
```bash
26+
python find_pending.py
27+
```
28+
29+
Requires `client.pem` and `client.key` in the certs directory.
30+
31+
### Environment variables
32+
33+
| Variable | Default | Description |
34+
|---|---|---|
35+
| `TEMPORAL_API_KEY` | (not set) | API key for auth. If set, uses the regional API endpoint. |
36+
| `TEMPORAL_NAMESPACE` | `deepika-test-namespace.a2dd6` | Namespace to query. |
37+
| `TEMPORAL_ADDRESS` | Regional or namespace endpoint | Overrides the target host for either auth mode. |
38+
| `TEMPORAL_CERTS_DIR` | `/Users/deepikaawasthi/temporal/temporal-certs` | Directory containing `client.pem` and `client.key` for mTLS. |
39+
40+
## Usage
41+
42+
All flags are optional — use any combination to narrow the search.
43+
44+
```bash
45+
# No filters — scans all workflows in the namespace
46+
python find_pending.py
47+
48+
# Filter by task queue
49+
python find_pending.py --task-queue my-queue
50+
51+
# Filter by workflow type
52+
python find_pending.py --workflow-type MyWorkflow
53+
54+
# Filter by execution status
55+
python find_pending.py --status Running
56+
57+
# Filter by start time range
58+
python find_pending.py --start-time-after "2026-03-01T00:00:00Z" --start-time-before "2026-03-25T00:00:00Z"
59+
60+
# Filter by close time range
61+
python find_pending.py --close-time-after "2026-03-20T00:00:00Z" --close-time-before "2026-03-25T00:00:00Z"
62+
63+
# Combine any filters
64+
python find_pending.py --task-queue my-queue --workflow-type MyWorkflow --status Running --start-time-after "2026-03-20T00:00:00Z"
65+
```
66+
67+
### Available filters
68+
69+
| Flag | Visibility Query | Description |
70+
|---|---|---|
71+
| `--task-queue` | `TaskQueue="..."` | Filter by task queue name |
72+
| `--workflow-type` | `WorkflowType="..."` | Filter by workflow type name |
73+
| `--status` | `ExecutionStatus="..."` | Filter by status: `Running`, `Completed`, `Failed`, `Canceled`, `Terminated`, `ContinuedAsNew`, `TimedOut` |
74+
| `--start-time-after` | `StartTime>="..."` | Workflows started at or after this time |
75+
| `--start-time-before` | `StartTime<="..."` | Workflows started at or before this time |
76+
| `--close-time-after` | `CloseTime>="..."` | Workflows closed at or after this time |
77+
| `--close-time-before` | `CloseTime<="..."` | Workflows closed at or before this time |
78+
79+
All times are in ISO 8601 format (e.g. `2026-03-01T00:00:00Z`).
80+
81+
## Output
82+
83+
Results are printed to the console and saved to `output/pending_activities_<timestamp>.json`:
84+
85+
```json
86+
{
87+
"generated_at": "2026-03-25T10:04:12.832303",
88+
"query_used": "WorkflowType=\"PendingActivitiesWorkflow\" AND ExecutionStatus=\"Running\"",
89+
"total_workflows": 1,
90+
"workflows": [
91+
{
92+
"workflow_id": "hello-pending-activities-workflow",
93+
"run_id": "019d25f3-65f4-7c71-9c86-acfb68faec15",
94+
"pending_activity_count": 3,
95+
"pending_activities": [
96+
{
97+
"activity_id": "1",
98+
"activity_type": "say_hello",
99+
"state": "1",
100+
"attempt": 1
101+
}
102+
]
103+
}
104+
]
105+
}
106+
```
107+
108+
## Notes
109+
110+
- With no filters the script scans **all** workflows in the namespace. Use filters to narrow the scope for large namespaces.
111+
- Only workflows with at least one pending activity appear in the output.
112+
- The `output/` directory is created automatically on first run.

list_pending_activities/__init__.py

Whitespace-only changes.
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
"""Find workflows with pending activities and save results locally.
2+
3+
All filters are optional — use any combination to narrow the search.
4+
5+
Authentication:
6+
API key: export TEMPORAL_API_KEY="your-api-key"
7+
mTLS: Falls back to certs if TEMPORAL_API_KEY is not set.
8+
9+
Usage:
10+
python find_pending.py
11+
python find_pending.py --task-queue my-queue
12+
python find_pending.py --workflow-type MyWorkflow --status Running
13+
python find_pending.py --start-time-after "2026-03-01T00:00:00Z" --start-time-before "2026-03-25T00:00:00Z"
14+
python find_pending.py --close-time-after "2026-03-20T00:00:00Z"
15+
"""
16+
17+
import argparse
18+
import asyncio
19+
import json
20+
import os
21+
from datetime import datetime
22+
23+
from temporalio.client import Client
24+
from temporalio.service import TLSConfig
25+
26+
DEFAULT_NAMESPACE = "deepika-test-namespace.a2dd6" # namespace - <ns>.<account-id>
27+
DEFAULT_API_HOST = "us-east-1.aws.api.temporal.io:7233" # regional endpoint for your namespace
28+
DEFAULT_MTLS_HOST = "deepika-test-namespace.a2dd6.tmprl.cloud:7233" # namespace endpoint for your namespace
29+
DEFAULT_CERTS_DIR = "/Users/deepikaawasthi/temporal/temporal-certs" # certs directory
30+
31+
32+
def resolve_api_key() -> str | None:
33+
"""Read API key from TEMPORAL_API_KEY env var, or return None to fall back to mTLS."""
34+
return os.environ.get("TEMPORAL_API_KEY")
35+
36+
37+
async def create_client(api_key: str | None = None) -> Client:
38+
namespace = os.environ.get("TEMPORAL_NAMESPACE", DEFAULT_NAMESPACE)
39+
40+
if api_key:
41+
target_host = os.environ.get("TEMPORAL_ADDRESS", DEFAULT_API_HOST)
42+
print(f"Authenticating with API key to {target_host}")
43+
return await Client.connect(
44+
target_host,
45+
namespace=namespace,
46+
api_key=api_key,
47+
tls=True,
48+
)
49+
50+
# Fall back to mTLS
51+
target_host = os.environ.get("TEMPORAL_ADDRESS", DEFAULT_MTLS_HOST)
52+
certs_dir = os.environ.get("TEMPORAL_CERTS_DIR", DEFAULT_CERTS_DIR)
53+
print(f"Authenticating with mTLS to {target_host}")
54+
55+
with open(os.path.join(certs_dir, "client.pem"), "rb") as f:
56+
client_cert = f.read()
57+
with open(os.path.join(certs_dir, "client.key"), "rb") as f:
58+
client_key = f.read()
59+
60+
return await Client.connect(
61+
target_host,
62+
namespace=namespace,
63+
tls=TLSConfig(
64+
client_cert=client_cert,
65+
client_private_key=client_key,
66+
),
67+
)
68+
69+
70+
def build_query(
71+
task_queue: str | None = None,
72+
workflow_type: str | None = None,
73+
status: str | None = None,
74+
start_time_after: str | None = None,
75+
start_time_before: str | None = None,
76+
close_time_after: str | None = None,
77+
close_time_before: str | None = None,
78+
) -> str:
79+
"""Build a visibility query from optional filters."""
80+
clauses = []
81+
82+
if task_queue:
83+
clauses.append(f'TaskQueue="{task_queue}"')
84+
if workflow_type:
85+
clauses.append(f'WorkflowType="{workflow_type}"')
86+
if status:
87+
clauses.append(f'ExecutionStatus="{status}"')
88+
if start_time_after:
89+
clauses.append(f'StartTime>="{start_time_after}"')
90+
if start_time_before:
91+
clauses.append(f'StartTime<="{start_time_before}"')
92+
if close_time_after:
93+
clauses.append(f'CloseTime>="{close_time_after}"')
94+
if close_time_before:
95+
clauses.append(f'CloseTime<="{close_time_before}"')
96+
97+
return " AND ".join(clauses) if clauses else ""
98+
99+
100+
async def find_workflows_with_pending_activities(
101+
client: Client,
102+
query: str,
103+
) -> list[dict]:
104+
"""List workflows matching the query, describe each, return those with pending activities."""
105+
106+
results = []
107+
108+
async for wf in client.list_workflows(query=query or None):
109+
handle = client.get_workflow_handle(wf.id, run_id=wf.run_id)
110+
desc = await handle.describe()
111+
112+
pending = desc.raw_description.pending_activities
113+
if not pending:
114+
continue
115+
116+
activities_info = []
117+
for pa in pending:
118+
activities_info.append(
119+
{
120+
"activity_id": pa.activity_id,
121+
"activity_type": pa.activity_type.name,
122+
"state": str(pa.state),
123+
"attempt": pa.attempt,
124+
}
125+
)
126+
127+
parent_exec = desc.raw_description.parent_execution
128+
parent_id = parent_exec.workflow_id if parent_exec else None
129+
130+
results.append(
131+
{
132+
"workflow_id": wf.id,
133+
"run_id": wf.run_id,
134+
"workflow_type": str(getattr(wf, "workflow_type", "")),
135+
"parent_workflow_id": parent_id,
136+
"pending_activity_count": len(pending),
137+
"pending_activities": activities_info,
138+
}
139+
)
140+
141+
return results
142+
143+
144+
def save_results(results: list[dict], query: str) -> str:
145+
"""Save results to a JSON file in the output/ directory. Returns the file path."""
146+
output_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "output")
147+
os.makedirs(output_dir, exist_ok=True)
148+
149+
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
150+
filepath = os.path.join(output_dir, f"pending_activities_{timestamp}.json")
151+
152+
with open(filepath, "w") as f:
153+
json.dump(
154+
{
155+
"generated_at": datetime.now().isoformat(),
156+
"query_used": query,
157+
"total_workflows": len(results),
158+
"workflows": results,
159+
},
160+
f,
161+
indent=2,
162+
)
163+
164+
return filepath
165+
166+
167+
def print_results(results: list[dict]) -> None:
168+
print("-" * 80)
169+
for entry in results:
170+
print(f"Workflow ID : {entry['workflow_id']}")
171+
print(f"Run ID : {entry['run_id']}")
172+
print(f"Workflow Type : {entry['workflow_type']}")
173+
print(f"Parent WF ID : {entry['parent_workflow_id'] or '(none — top-level)'}")
174+
print(f"Pending Count : {entry['pending_activity_count']}")
175+
for act in entry["pending_activities"]:
176+
print(
177+
f" - Activity ID: {act['activity_id']}, "
178+
f"Type: {act['activity_type']}, "
179+
f"State: {act['state']}, "
180+
f"Attempt: {act['attempt']}"
181+
)
182+
print("-" * 80)
183+
184+
185+
async def main():
186+
parser = argparse.ArgumentParser(
187+
description="Find workflows with pending activities. All filters are optional."
188+
)
189+
parser.add_argument("--task-queue", default=None, help="Filter by task queue name")
190+
parser.add_argument("--workflow-type", default=None, help="Filter by workflow type name")
191+
parser.add_argument(
192+
"--status",
193+
default=None,
194+
choices=["Running", "Completed", "Failed", "Canceled", "Terminated", "ContinuedAsNew", "TimedOut"],
195+
help="Filter by execution status (default: all statuses)",
196+
)
197+
parser.add_argument(
198+
"--start-time-after",
199+
default=None,
200+
help='Workflows started at or after this time (ISO 8601, e.g. "2026-03-01T00:00:00Z")',
201+
)
202+
parser.add_argument(
203+
"--start-time-before",
204+
default=None,
205+
help='Workflows started at or before this time (ISO 8601, e.g. "2026-03-25T00:00:00Z")',
206+
)
207+
parser.add_argument(
208+
"--close-time-after",
209+
default=None,
210+
help='Workflows closed at or after this time (ISO 8601, e.g. "2026-03-20T00:00:00Z")',
211+
)
212+
parser.add_argument(
213+
"--close-time-before",
214+
default=None,
215+
help='Workflows closed at or before this time (ISO 8601, e.g. "2026-03-25T00:00:00Z")',
216+
)
217+
args = parser.parse_args()
218+
219+
query = build_query(
220+
task_queue=args.task_queue,
221+
workflow_type=args.workflow_type,
222+
status=args.status,
223+
start_time_after=args.start_time_after,
224+
start_time_before=args.start_time_before,
225+
close_time_after=args.close_time_after,
226+
close_time_before=args.close_time_before,
227+
)
228+
229+
print(f"Query: {query or '(no filters — scanning all workflows)'}\n")
230+
231+
api_key = resolve_api_key()
232+
client = await create_client(api_key=api_key)
233+
results = await find_workflows_with_pending_activities(client, query)
234+
235+
if not results:
236+
print("No workflows with pending activities found.")
237+
return
238+
239+
print(f"Found {len(results)} workflow(s) with pending activities:\n")
240+
print_results(results)
241+
242+
filepath = save_results(results, query)
243+
print(f"\nResults saved to: {filepath}")
244+
245+
246+
if __name__ == "__main__":
247+
asyncio.run(main())

0 commit comments

Comments
 (0)