Skip to content

Commit 8582146

Browse files
authored
Improve HTTP/2 resilience, worker supervision defaults, metrics, chaos tests, and FastAPI example (#375)
1 parent d298333 commit 8582146

21 files changed

Lines changed: 1422 additions & 301 deletions

METRICS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ The Conductor Python SDK includes built-in metrics collection using Prometheus t
3333
| `task_result_size` | Gauge | `taskType` | Size of task result payload (bytes) |
3434
| `task_execution_queue_full_total` | Counter | `taskType` | Number of times execution queue was full |
3535
| `task_paused_total` | Counter | `taskType` | Number of polls while worker paused |
36+
| `worker_restart_total` | Counter | `taskType` | Number of times TaskHandler restarted a worker subprocess |
3637
| `external_payload_used_total` | Counter | `taskType`, `payloadType` | External payload storage usage count |
3738
| `workflow_input_size` | Gauge | `workflowType`, `version` | Workflow input payload size (bytes) |
3839
| `workflow_start_error_total` | Counter | `workflowType`, `exception` | Workflow start error count |

README.md

Lines changed: 221 additions & 99 deletions
Large diffs are not rendered by default.

docs/WORKER.md

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,12 +259,64 @@ workers = [
259259
)
260260
]
261261

262-
# If there are decorated workers in your application, scan_for_annotated_workers should be set
263-
# default value of scan_for_annotated_workers is False
262+
# TaskHandler scans for @worker_task decorated workers by default.
263+
# Set scan_for_annotated_workers=False if you want to disable auto-discovery.
264264
with TaskHandler(workers, configuration, scan_for_annotated_workers=True) as task_handler:
265265
task_handler.start_processes()
266266
```
267267

268+
### Resilience: auto-restart and health checks
269+
270+
If you run workers as a long-lived service (e.g., alongside FastAPI/Uvicorn), you can optionally enable process
271+
supervision so the `TaskHandler` monitors worker processes and restarts them if they exit unexpectedly:
272+
273+
```python
274+
with TaskHandler(
275+
workers,
276+
configuration,
277+
scan_for_annotated_workers=True,
278+
# Enabled by default. Set to False to disable supervision.
279+
monitor_processes=True,
280+
restart_on_failure=True,
281+
) as task_handler:
282+
task_handler.start_processes()
283+
```
284+
285+
For a `/healthcheck` endpoint, you can use:
286+
287+
```python
288+
task_handler.is_healthy()
289+
task_handler.get_worker_process_status()
290+
```
291+
292+
To disable restarts/monitoring (e.g., for local debugging), set:
293+
294+
```python
295+
TaskHandler(..., monitor_processes=False, restart_on_failure=False)
296+
```
297+
298+
### Mitigation for intermittent HTTP/2 connection termination
299+
300+
The SDK uses `httpx` for outbound calls to the Conductor/Orkes server. By default, it enables HTTP/2 for these calls.
301+
In some environments (certain proxies/load balancers/NATs), long-lived HTTP/2 connections may be terminated, which can
302+
surface as errors like `httpcore.RemoteProtocolError: <ConnectionTerminated ...>`.
303+
304+
The SDK automatically attempts to recover by recreating the underlying HTTP client and retrying the request once.
305+
If your environment is still unstable with HTTP/2, you can force the SDK to use HTTP/1.1 instead via an environment
306+
variable.
307+
308+
#### `CONDUCTOR_HTTP2_ENABLED`
309+
310+
- **What it does**: Controls whether the Conductor Python SDK uses HTTP/2 for outbound requests to the Conductor server.
311+
- **Default**: `true` (HTTP/2 enabled).
312+
- **Scope**: Affects all SDK clients (workers, `OrkesClients`, sync + async). It does *not* change your FastAPI/Uvicorn
313+
server behavior; it only changes how the SDK talks to Conductor.
314+
- **Values**: `false|0|no|off` disables HTTP/2. Anything else enables it.
315+
316+
```shell
317+
export CONDUCTOR_HTTP2_ENABLED=false
318+
```
319+
268320
If you paste the above code in a file called main.py, you can launch the workers by running:
269321
```shell
270322
python3 main.py

examples/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ python examples/workers_e2e.py
2525
|------|-------------|-----|
2626
| **workers_e2e.py** | ⭐ Start here - sync + async workers | `python examples/workers_e2e.py` |
2727
| **worker_example.py** | Comprehensive patterns (None returns, TaskInProgress) | `python examples/worker_example.py` |
28+
| **fastapi_worker_service.py** | FastAPI exposing a workflow as an API (+ workers) | `uvicorn examples.fastapi_worker_service:app --port 8081 --workers 1` |
2829
| **worker_configuration_example.py** | Hierarchical configuration (env vars) | `python examples/worker_configuration_example.py` |
2930
| **task_context_example.py** | Task context (logs, poll_count, task_id) | `python examples/task_context_example.py` |
3031
| **task_workers.py** | Task worker patterns with dataclasses | `python examples/task_workers.py` |
@@ -416,4 +417,4 @@ export conductor.worker.all.thread_count=20
416417
---
417418

418419
**Repository**: https://github.com/conductor-oss/conductor-python
419-
**License**: Apache 2.0
420+
**License**: Apache 2.0

examples/fastapi_worker_service.py

Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
"""
2+
FastAPI + Conductor workers in one process.
3+
4+
Install (example-only deps):
5+
pip install fastapi uvicorn
6+
7+
Run (single web worker; TaskHandler will spawn one process per Conductor worker):
8+
export CONDUCTOR_SERVER_URL="http://localhost:8080/api"
9+
export CONDUCTOR_AUTH_KEY="..."
10+
export CONDUCTOR_AUTH_SECRET="..."
11+
uvicorn examples.fastapi_worker_service:app --host 0.0.0.0 --port 8081 --workers 1
12+
13+
Trigger the workflow via API (waits up to 10s for completion):
14+
curl -s -X POST http://localhost:8081/v1/hello \\
15+
-H 'content-type: application/json' \\
16+
-d '{"name":"Ada","a":2,"b":3}' | jq .
17+
18+
Notes:
19+
- Do NOT run uvicorn with multiple web workers unless you explicitly want multiple independent TaskHandlers polling.
20+
- TaskHandler supervision is enabled by default (monitor + restart worker subprocesses).
21+
"""
22+
23+
from __future__ import annotations
24+
25+
import os
26+
from contextlib import asynccontextmanager
27+
from typing import Optional
28+
29+
from fastapi import FastAPI
30+
from fastapi.responses import JSONResponse
31+
from pydantic import BaseModel, Field
32+
33+
from conductor.client.automator.task_handler import TaskHandler
34+
from conductor.client.configuration.configuration import Configuration
35+
from conductor.client.context.task_context import get_task_context
36+
from conductor.client.orkes_clients import OrkesClients
37+
from conductor.client.worker.worker_task import worker_task
38+
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
39+
from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor
40+
41+
42+
# ---------------------------------------------------------------------------
43+
# Example worker(s)
44+
# ---------------------------------------------------------------------------
45+
46+
@worker_task(
47+
task_definition_name="fastapi_normalize_name",
48+
poll_interval_millis=100,
49+
register_task_def=True,
50+
overwrite_task_def=False,
51+
)
52+
def normalize_name(name: str) -> str:
53+
# This shows how to access task context safely.
54+
_ = get_task_context()
55+
return name.strip().title()
56+
57+
58+
@worker_task(
59+
task_definition_name="fastapi_add_numbers",
60+
poll_interval_millis=100,
61+
register_task_def=True,
62+
overwrite_task_def=False,
63+
)
64+
def add_numbers(a: int, b: int) -> int:
65+
_ = get_task_context()
66+
return a + b
67+
68+
69+
@worker_task(
70+
task_definition_name="fastapi_build_message",
71+
poll_interval_millis=100,
72+
register_task_def=True,
73+
overwrite_task_def=False,
74+
)
75+
def build_message(normalized_name: str, total: int) -> dict:
76+
ctx = get_task_context()
77+
return {
78+
"message": f"Hello {normalized_name}! {total=}",
79+
"normalized_name": normalized_name,
80+
"total": total,
81+
"task_id": ctx.get_task_id(),
82+
"workflow_id": ctx.get_workflow_instance_id(),
83+
}
84+
85+
86+
def _build_hello_workflow(executor: WorkflowExecutor) -> ConductorWorkflow:
87+
workflow = ConductorWorkflow(executor=executor, name="fastapi_hello_workflow", version=1)
88+
89+
t1 = normalize_name(task_ref_name="normalize_name_ref", name=workflow.input("name"))
90+
t2 = add_numbers(task_ref_name="add_numbers_ref", a=workflow.input("a"), b=workflow.input("b"))
91+
t3 = build_message(
92+
task_ref_name="build_message_ref",
93+
normalized_name=t1.output("result"),
94+
total=t2.output("result"),
95+
)
96+
97+
workflow >> t1 >> t2 >> t3
98+
99+
workflow.output_parameters(
100+
output_parameters={
101+
"message": t3.output("message"),
102+
"normalized_name": t3.output("normalized_name"),
103+
"total": t3.output("total"),
104+
}
105+
)
106+
107+
return workflow
108+
109+
110+
class HelloRequest(BaseModel):
111+
name: str = Field(default="World", description="Name to greet")
112+
a: int = Field(default=1, description="First number")
113+
b: int = Field(default=2, description="Second number")
114+
115+
116+
# ---------------------------------------------------------------------------
117+
# FastAPI app + TaskHandler lifecycle
118+
# ---------------------------------------------------------------------------
119+
120+
task_handler: Optional[TaskHandler] = None
121+
workflow_executor: Optional[WorkflowExecutor] = None
122+
api_config: Optional[Configuration] = None
123+
124+
125+
@asynccontextmanager
126+
async def lifespan(app: FastAPI):
127+
global task_handler, workflow_executor, api_config
128+
129+
api_config = Configuration()
130+
clients = OrkesClients(configuration=api_config)
131+
workflow_executor = clients.get_workflow_executor()
132+
133+
# scan_for_annotated_workers=True will pick up @worker_task functions in this module.
134+
task_handler = TaskHandler(
135+
workers=[],
136+
configuration=api_config,
137+
scan_for_annotated_workers=True,
138+
# Defaults are already True, but keeping these explicit in the example:
139+
monitor_processes=True,
140+
restart_on_failure=True,
141+
)
142+
task_handler.start_processes()
143+
144+
try:
145+
yield
146+
finally:
147+
if task_handler is not None:
148+
task_handler.stop_processes()
149+
task_handler = None
150+
workflow_executor = None
151+
api_config = None
152+
153+
154+
app = FastAPI(lifespan=lifespan)
155+
156+
157+
@app.get("/healthcheck")
158+
def healthcheck():
159+
# 503 if worker processes aren't healthy; useful for container orchestrators.
160+
if task_handler is None:
161+
return JSONResponse({"ok": False, "detail": "workers_not_started"}, status_code=503)
162+
163+
ok = task_handler.is_healthy()
164+
payload = {
165+
"ok": ok,
166+
"workers": task_handler.get_worker_process_status(),
167+
}
168+
return JSONResponse(payload, status_code=200 if ok else 503)
169+
170+
171+
@app.post("/v1/hello")
172+
def hello(req: HelloRequest):
173+
"""
174+
Expose a Conductor workflow as an API:
175+
- Builds an inline workflow definition with 3 SIMPLE tasks
176+
- Starts it and waits up to 10 seconds for completion
177+
- Returns workflow output as the HTTP response
178+
"""
179+
if task_handler is None or workflow_executor is None or api_config is None:
180+
return JSONResponse({"ok": False, "detail": "service_not_ready"}, status_code=503)
181+
if not task_handler.is_healthy():
182+
return JSONResponse(
183+
{"ok": False, "detail": "workers_unhealthy", "workers": task_handler.get_worker_process_status()},
184+
status_code=503,
185+
)
186+
187+
workflow = _build_hello_workflow(executor=workflow_executor)
188+
payload = req.model_dump() if hasattr(req, "model_dump") else req.dict() # pydantic v2/v1
189+
190+
try:
191+
run = workflow.execute(workflow_input=payload, wait_for_seconds=10)
192+
except Exception as e:
193+
return JSONResponse({"ok": False, "detail": "workflow_start_failed", "error": str(e)}, status_code=502)
194+
195+
response = {
196+
"ok": run.status == "COMPLETED",
197+
"workflow_id": run.workflow_id,
198+
"status": run.status,
199+
"output": run.output,
200+
"ui_url": f"{api_config.ui_host}/execution/{run.workflow_id}",
201+
}
202+
return JSONResponse(response, status_code=200 if run.status == "COMPLETED" else 202)
203+
204+
205+
if __name__ == "__main__":
206+
import uvicorn
207+
208+
uvicorn.run(
209+
"examples.fastapi_worker_service:app",
210+
host="0.0.0.0",
211+
port=int(os.getenv("PORT", "8081")),
212+
workers=1,
213+
)

src/conductor/client/automator/json_schema_generator.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -188,11 +188,13 @@ def _type_to_json_schema(type_hint, strict_schema: bool = False) -> Optional[Dic
188188
if len(non_none_args) == 1:
189189
# Optional[T] case
190190
inner_schema = _type_to_json_schema(non_none_args[0], strict_schema)
191-
if inner_schema:
192-
# For optional, we could use oneOf or just mark as nullable
193-
# Using nullable for simplicity
194-
inner_schema['nullable'] = True
195-
return inner_schema
191+
if inner_schema is not None:
192+
# Draft-07 JSON Schema does not support OpenAPI's `nullable`.
193+
# Represent Optional[T] as a union with null.
194+
if inner_schema == {}:
195+
# "Any" already includes null, so keep it minimal.
196+
return inner_schema
197+
return {"anyOf": [inner_schema, {"type": "null"}]}
196198
# Multiple non-None types in Union - too complex
197199
return None
198200

0 commit comments

Comments
 (0)