Skip to content

Commit 1a6be3f

Browse files
Add graph-level webhooks for execution failure and completion (#607)
* Add graph execution webhooks for failure and execution events * Removed unused imports
1 parent 488ed90 commit 1a6be3f

5 files changed

Lines changed: 137 additions & 32 deletions

File tree

state-manager/app/controller/errored_state.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,14 @@
99
from app.models.state_status_enum import StateStatusEnum
1010
from app.singletons.logs_manager import LogsManager
1111
from app.models.db.graph_template_model import GraphTemplate
12+
from app.tasks.webhook import dispatch_webhook
13+
from datetime import datetime
14+
from fastapi import BackgroundTasks
1215

1316
logger = LogsManager().get_logger()
14-
15-
async def errored_state(namespace_name: str, state_id: PydanticObjectId, body: ErroredRequestModel, x_exosphere_request_id: str) -> ErroredResponseModel:
16-
17+
async def errored_state(namespace_name: str, state_id: PydanticObjectId, body: ErroredRequestModel, x_exosphere_request_id: str, background_tasks: BackgroundTasks | None = None,) -> ErroredResponseModel:
18+
if background_tasks is None:
19+
background_tasks = BackgroundTasks()
1720
try:
1821
logger.info(f"Errored state {state_id} for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
1922

@@ -70,6 +73,26 @@ async def errored_state(namespace_name: str, state_id: PydanticObjectId, body: E
7073
state.error = body.error
7174
await state.save()
7275

76+
if (
77+
not retry_created
78+
and graph_template.webhook
79+
and "GRAPH_FAILED" in graph_template.webhook.events
80+
):
81+
background_tasks.add_task(
82+
dispatch_webhook,
83+
url=graph_template.webhook.url,
84+
payload={
85+
"event": "GRAPH_FAILED",
86+
"namespace": namespace_name,
87+
"graph_name": state.graph_name,
88+
"run_id": state.run_id,
89+
"failed_state_id": str(state.id),
90+
"node_name": state.node_name,
91+
"error": body.error,
92+
"timestamp": datetime.utcnow().isoformat(),
93+
},
94+
headers=graph_template.webhook.headers,
95+
)
7396
return ErroredResponseModel(status=StateStatusEnum.ERRORED, retry_created=retry_created)
7497

7598
except Exception as e:
Lines changed: 65 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,101 @@
11
from beanie import PydanticObjectId
2-
from app.models.executed_models import ExecutedRequestModel, ExecutedResponseModel
3-
42
from fastapi import HTTPException, status, BackgroundTasks
53

4+
from app.models.executed_models import ExecutedRequestModel, ExecutedResponseModel
65
from app.models.db.state import State
76
from app.models.state_status_enum import StateStatusEnum
87
from app.singletons.logs_manager import LogsManager
98
from app.tasks.create_next_states import create_next_states
109

1110
logger = LogsManager().get_logger()
1211

13-
async def executed_state(namespace_name: str, state_id: PydanticObjectId, body: ExecutedRequestModel, x_exosphere_request_id: str, background_tasks: BackgroundTasks) -> ExecutedResponseModel:
1412

13+
async def executed_state(
14+
namespace_name: str,
15+
state_id: PydanticObjectId,
16+
body: ExecutedRequestModel,
17+
x_exosphere_request_id: str,
18+
background_tasks: BackgroundTasks,
19+
) -> ExecutedResponseModel:
1520
try:
16-
logger.info(f"Executed state {state_id} for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)
21+
logger.info(
22+
f"Executed state {state_id} for namespace {namespace_name}",
23+
x_exosphere_request_id=x_exosphere_request_id,
24+
)
1725

1826
state = await State.find_one(State.id == state_id)
1927
if not state or not state.id:
20-
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="State not found")
28+
raise HTTPException(
29+
status_code=status.HTTP_404_NOT_FOUND,
30+
detail="State not found",
31+
)
2132

2233
if state.status != StateStatusEnum.QUEUED:
23-
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="State is not queued")
24-
25-
next_state_ids = []
34+
raise HTTPException(
35+
status_code=status.HTTP_400_BAD_REQUEST,
36+
detail="State is not queued",
37+
)
38+
39+
next_state_ids: list[PydanticObjectId] = []
40+
41+
# ---- Handle outputs ----
2642
if len(body.outputs) == 0:
2743
state.status = StateStatusEnum.EXECUTED
2844
state.outputs = {}
2945
await state.save()
3046

3147
next_state_ids.append(state.id)
3248

33-
else:
49+
else:
50+
# First output updates the current state
3451
state.outputs = body.outputs[0]
3552
state.status = StateStatusEnum.EXECUTED
3653
await state.save()
54+
3755
next_state_ids.append(state.id)
3856

57+
# Remaining outputs create new states
3958
new_states = []
4059
for output in body.outputs[1:]:
41-
new_states.append(State(
42-
node_name=state.node_name,
43-
namespace_name=state.namespace_name,
44-
identifier=state.identifier,
45-
graph_name=state.graph_name,
46-
run_id=state.run_id,
47-
status=StateStatusEnum.EXECUTED,
48-
inputs=state.inputs,
49-
outputs=output,
50-
error=None,
51-
parents=state.parents
52-
))
53-
54-
if len(new_states) > 0:
55-
inserted_ids = (await State.insert_many(new_states)).inserted_ids
60+
new_states.append(
61+
State(
62+
node_name=state.node_name,
63+
namespace_name=state.namespace_name,
64+
identifier=state.identifier,
65+
graph_name=state.graph_name,
66+
run_id=state.run_id,
67+
status=StateStatusEnum.EXECUTED,
68+
inputs=state.inputs,
69+
outputs=output,
70+
error=None,
71+
parents=state.parents,
72+
)
73+
)
74+
75+
if new_states:
76+
inserted_ids = (
77+
await State.insert_many(new_states)
78+
).inserted_ids
5679
next_state_ids.extend(inserted_ids)
5780

58-
background_tasks.add_task(create_next_states, next_state_ids, state.identifier, state.namespace_name, state.graph_name, state.parents)
81+
# ---- Create next states ----
82+
background_tasks.add_task(
83+
create_next_states,
84+
next_state_ids,
85+
state.identifier,
86+
state.namespace_name,
87+
state.graph_name,
88+
state.parents,
89+
)
5990

60-
return ExecutedResponseModel(status=StateStatusEnum.EXECUTED)
91+
return ExecutedResponseModel(
92+
status=StateStatusEnum.EXECUTED
93+
)
6194

6295
except Exception as e:
63-
logger.error(f"Error executing state {state_id} for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id, error=e)
64-
raise e
96+
logger.error(
97+
f"Error executing state {state_id} for namespace {namespace_name}",
98+
x_exosphere_request_id=x_exosphere_request_id,
99+
error=e,
100+
)
101+
raise

state-manager/app/models/db/graph_template_model.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from app.models.retry_policy_model import RetryPolicyModel
1515
from app.models.store_config_model import StoreConfig
1616
from app.models.trigger_models import Trigger
17+
from app.models.webhook_config_model import WebhookConfig
1718

1819
class GraphTemplate(BaseDatabaseModel):
1920
name: str = Field(..., description="Name of the graph")
@@ -25,6 +26,7 @@ class GraphTemplate(BaseDatabaseModel):
2526
triggers: List[Trigger] = Field(default_factory=list, description="Triggers of the graph")
2627
retry_policy: RetryPolicyModel = Field(default_factory=RetryPolicyModel, description="Retry policy of the graph")
2728
store_config: StoreConfig = Field(default_factory=StoreConfig, description="Store config of the graph")
29+
webhook: WebhookConfig | None = Field(default=None, description="Optional webhook configuration for graph execution events")
2830

2931
_node_by_identifier: Dict[str, NodeTemplate] | None = PrivateAttr(default=None)
3032
_parents_by_identifier: Dict[str, set[str]] | None = PrivateAttr(default=None) # type: ignore
@@ -318,7 +320,7 @@ def get_path_by_identifier(self, identifier: str) -> set[str]:
318320

319321
@staticmethod
320322
async def get(namespace: str, graph_name: str) -> "GraphTemplate":
321-
graph_template = await GraphTemplate.find_one(GraphTemplate.namespace == namespace, GraphTemplate.name == graph_name)
323+
graph_template = await GraphTemplate.find_one(GraphTemplate.namespace == namespace,GraphTemplate.name == graph_name)
322324
if not graph_template:
323325
raise ValueError(f"Graph template not found for namespace: {namespace} and graph name: {graph_name}")
324326
return graph_template
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from pydantic import BaseModel, Field
2+
from typing import List, Dict, Optional
3+
4+
5+
class WebhookConfig(BaseModel):
6+
url: str = Field(..., description="Webhook endpoint URL")
7+
events: List[str] = Field(default_factory=list, description="Subscribed events")
8+
headers: Optional[Dict[str, str]] = Field(
9+
default=None,
10+
description="Optional HTTP headers for webhook requests"
11+
)

state-manager/app/tasks/webhook.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import logging
2+
from datetime import datetime
3+
from typing import Optional
4+
5+
import httpx
6+
7+
logger = logging.getLogger(__name__)
8+
9+
10+
async def dispatch_webhook(
11+
*,
12+
url: str,
13+
payload: dict,
14+
headers: Optional[dict] = None,
15+
) -> None:
16+
"""
17+
Dispatch a webhook event.
18+
This must never raise exceptions (best-effort delivery).
19+
"""
20+
try:
21+
async with httpx.AsyncClient(timeout=5) as client:
22+
await client.post(
23+
url,
24+
json=payload,
25+
headers=headers or {},
26+
)
27+
except Exception as exc:
28+
logger.warning(
29+
"Webhook dispatch failed",
30+
exc_info=exc,
31+
extra={"url": url},
32+
)

0 commit comments

Comments
 (0)