Skip to content

Commit 48d1c3a

Browse files
Add Python bridge webhook client
1 parent 5d9485c commit 48d1c3a

5 files changed

Lines changed: 339 additions & 0 deletions

File tree

README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,27 @@ decisions for success, retryability, malformed output, cancellation, deadline
420420
exceeded, handler crash, decode failure, and unsupported payload
421421
codec/reference states without treating stderr as a machine signal.
422422

423+
Bridge adapters can hand bounded webhook ingress into the server through
424+
`Client.send_webhook_bridge_event()`. The method returns the server's typed
425+
bridge outcome for accepted, duplicate, and rejected events, including
426+
machine-readable HTTP 422 rejection outcomes:
427+
428+
```python
429+
outcome = await client.send_webhook_bridge_event(
430+
"pagerduty",
431+
action="signal_workflow",
432+
idempotency_key="pagerduty-event-3003",
433+
target={"workflow_id": "wf-remediation-42", "signal_name": "incident_escalated"},
434+
input={"severity": "critical", "service": "checkout"},
435+
correlation={"provider": "pagerduty", "event_type": "incident.triggered"},
436+
)
437+
438+
if outcome.accepted:
439+
print(outcome.workflow_id, outcome.control_plane_outcome)
440+
else:
441+
print(outcome.outcome, outcome.reason)
442+
```
443+
423444
## Development
424445

425446
```bash

src/durable_workflow/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
parse_auth_composition_contract,
1818
)
1919
from .client import (
20+
BridgeAdapterOutcome,
2021
Client,
2122
ScheduleAction,
2223
ScheduleBackfillResult,
@@ -130,6 +131,7 @@
130131
"ActivityInfo",
131132
"ActivityInterceptorContext",
132133
"ActivityRetryPolicy",
134+
"BridgeAdapterOutcome",
133135
"ChildWorkflowRetryPolicy",
134136
"ChildWorkflowFailed",
135137
"Client",

src/durable_workflow/client.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ def _route_for_metrics(path: str) -> str:
7070
parts[3] = "{run_id}"
7171
elif parts[0] == "schedules" and len(parts) >= 2:
7272
parts[1] = "{schedule_id}"
73+
elif parts[:2] == ["bridge-adapters", "webhook"] and len(parts) >= 3:
74+
parts[2] = "{adapter}"
7375
elif (
7476
parts[:2] == ["worker", "workflow-tasks"]
7577
or parts[:2] == ["worker", "activity-tasks"]
@@ -362,6 +364,47 @@ class ScheduleBackfillResult:
362364
results: list[dict[str, Any]] | None = None
363365

364366

367+
@dataclass
368+
class BridgeAdapterOutcome:
369+
"""Machine-readable result returned by a bridge adapter event."""
370+
371+
schema: str
372+
version: int
373+
adapter: str
374+
action: str | None
375+
accepted: bool
376+
outcome: str
377+
idempotency_key: str | None = None
378+
reason: str | None = None
379+
target: dict[str, Any] | None = None
380+
correlation: dict[str, Any] | None = None
381+
workflow_id: str | None = None
382+
run_id: str | None = None
383+
workflow_type: str | None = None
384+
control_plane_outcome: str | None = None
385+
raw: dict[str, Any] | None = None
386+
387+
@classmethod
388+
def from_dict(cls, data: dict[str, Any]) -> BridgeAdapterOutcome:
389+
return cls(
390+
schema=str(data.get("schema", "")),
391+
version=int(data.get("version", 0)),
392+
adapter=str(data.get("adapter", "")),
393+
action=data.get("action"),
394+
accepted=bool(data.get("accepted", False)),
395+
outcome=str(data.get("outcome", "")),
396+
idempotency_key=data.get("idempotency_key"),
397+
reason=data.get("reason"),
398+
target=data.get("target") if isinstance(data.get("target"), dict) else None,
399+
correlation=data.get("correlation") if isinstance(data.get("correlation"), dict) else None,
400+
workflow_id=data.get("workflow_id"),
401+
run_id=data.get("run_id"),
402+
workflow_type=data.get("workflow_type"),
403+
control_plane_outcome=data.get("control_plane_outcome"),
404+
raw=data,
405+
)
406+
407+
365408
class WorkflowHandle:
366409
"""Convenience wrapper for operating on one workflow ID."""
367410

@@ -696,6 +739,68 @@ async def _do_request() -> httpx.Response:
696739
self.metrics.increment(CLIENT_REQUESTS, tags=tags)
697740
self.metrics.record(CLIENT_REQUEST_DURATION_SECONDS, time.perf_counter() - start, tags=tags)
698741

742+
async def _request_bridge_outcome(self, path: str, *, json: Any = None, context: str = "") -> dict[str, Any]:
743+
start = time.perf_counter()
744+
route = _route_for_metrics(path)
745+
status_code = "none"
746+
outcome = "pending"
747+
748+
async def _do_request() -> httpx.Response:
749+
resp = await self._http.request(
750+
"POST",
751+
f"/api{path}",
752+
headers=self._headers(worker=False),
753+
json=json,
754+
)
755+
if resp.status_code != 422:
756+
resp.raise_for_status()
757+
return resp
758+
759+
try:
760+
try:
761+
resp = await self.retry_policy.execute(_do_request)
762+
except httpx.HTTPStatusError as exc:
763+
status_code = str(exc.response.status_code)
764+
outcome = "http_error"
765+
try:
766+
body = exc.response.json()
767+
except ValueError:
768+
body = exc.response.text
769+
_raise_for_status(exc.response.status_code, body, context=context)
770+
raise
771+
772+
status_code = str(resp.status_code)
773+
if not resp.content:
774+
raise ServerError(
775+
resp.status_code,
776+
{"reason": "invalid_bridge_outcome", "message": "expected JSON object, got empty response"},
777+
)
778+
data = resp.json()
779+
if not isinstance(data, dict):
780+
raise ServerError(
781+
resp.status_code,
782+
{
783+
"reason": "invalid_bridge_outcome",
784+
"message": f"expected JSON object, got {type(data).__name__}",
785+
},
786+
)
787+
outcome = "bridge_rejected" if resp.status_code == 422 else "ok"
788+
return data
789+
except Exception as exc:
790+
if outcome == "pending":
791+
outcome = type(exc).__name__
792+
raise
793+
finally:
794+
tags = {
795+
"method": "POST",
796+
"route": route,
797+
"plane": "control",
798+
"status_code": status_code,
799+
"outcome": outcome,
800+
}
801+
self.metrics.increment(CLIENT_REQUESTS, tags=tags)
802+
self.metrics.record(CLIENT_REQUEST_DURATION_SECONDS, time.perf_counter() - start, tags=tags)
803+
699804
async def get_cluster_info(self) -> dict[str, Any]:
700805
"""Fetch server build identity, capabilities, and protocol manifests."""
701806
result = await self._request("GET", "/cluster/info", worker=False, context="get_cluster_info")
@@ -911,6 +1016,40 @@ async def get_history(self, workflow_id: str, run_id: str) -> Any:
9111016
"GET", f"/workflows/{workflow_id}/runs/{run_id}/history", context=workflow_id
9121017
)
9131018

1019+
async def send_webhook_bridge_event(
1020+
self,
1021+
adapter: str,
1022+
*,
1023+
action: str,
1024+
idempotency_key: str,
1025+
target: dict[str, Any],
1026+
input: dict[str, Any] | None = None,
1027+
correlation: dict[str, Any] | None = None,
1028+
) -> BridgeAdapterOutcome:
1029+
"""Send one bounded webhook bridge event and return its contract outcome.
1030+
1031+
The bridge endpoint intentionally returns machine-readable rejected
1032+
outcomes as HTTP 422. This method returns those outcomes instead of
1033+
raising :class:`InvalidArgument`, while auth and unexpected server
1034+
failures still use the normal SDK exception mapping.
1035+
"""
1036+
body: dict[str, Any] = {
1037+
"action": action,
1038+
"idempotency_key": idempotency_key,
1039+
"target": target,
1040+
}
1041+
if input is not None:
1042+
body["input"] = input
1043+
if correlation is not None:
1044+
body["correlation"] = correlation
1045+
1046+
data = await self._request_bridge_outcome(
1047+
f"/bridge-adapters/webhook/{quote(adapter, safe='._:-')}",
1048+
json=body,
1049+
context=f"bridge adapter {adapter}",
1050+
)
1051+
return BridgeAdapterOutcome.from_dict(data)
1052+
9141053
async def signal_workflow(
9151054
self, workflow_id: str, signal_name: str, *, args: list[Any] | None = None
9161055
) -> None:

tests/test_client.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,145 @@ async def test_signal_request_matches_polyglot_fixture(self, client: Client) ->
335335
assert sdk["args"]["signal_name"] == semantic["signal_name"]
336336

337337

338+
class TestWebhookBridgeAdapters:
339+
@pytest.mark.asyncio
340+
async def test_start_workflow_bridge_event_returns_accepted_outcome(self, client: Client) -> None:
341+
resp = _mock_response(202, {
342+
"schema": "durable-workflow.v2.bridge-adapter-outcome.contract",
343+
"version": 1,
344+
"adapter": "stripe",
345+
"action": "start_workflow",
346+
"accepted": True,
347+
"outcome": "accepted",
348+
"idempotency_key": "stripe-event-1001",
349+
"target": {
350+
"workflow_id": "bridge-stripe-derived",
351+
"workflow_type": "orders.fulfillment",
352+
"task_queue": "external-workflows",
353+
"business_key": "order-1001",
354+
},
355+
"correlation": {
356+
"provider": "stripe",
357+
"event_type": "checkout.session.completed",
358+
},
359+
"workflow_id": "bridge-stripe-derived",
360+
"run_id": "run-bridge-1",
361+
"workflow_type": "orders.fulfillment",
362+
"control_plane_outcome": "started_new",
363+
})
364+
365+
with patch.object(client._http, "request", new_callable=AsyncMock, return_value=resp) as mock:
366+
outcome = await client.send_webhook_bridge_event(
367+
"stripe",
368+
action="start_workflow",
369+
idempotency_key="stripe-event-1001",
370+
target={
371+
"workflow_type": "orders.fulfillment",
372+
"task_queue": "external-workflows",
373+
"business_key": "order-1001",
374+
"duplicate_policy": "use_existing",
375+
},
376+
input={"order_id": "order-1001"},
377+
correlation={
378+
"provider": "stripe",
379+
"event_type": "checkout.session.completed",
380+
},
381+
)
382+
383+
assert outcome.accepted is True
384+
assert outcome.outcome == "accepted"
385+
assert outcome.workflow_id == "bridge-stripe-derived"
386+
assert outcome.control_plane_outcome == "started_new"
387+
assert outcome.target is not None
388+
assert outcome.target["business_key"] == "order-1001"
389+
390+
call_args = mock.call_args
391+
assert call_args.args[0] == "POST"
392+
assert call_args.args[1] == "/api/bridge-adapters/webhook/stripe"
393+
body = call_args.kwargs.get("json") or call_args[1].get("json")
394+
assert body == {
395+
"action": "start_workflow",
396+
"idempotency_key": "stripe-event-1001",
397+
"target": {
398+
"workflow_type": "orders.fulfillment",
399+
"task_queue": "external-workflows",
400+
"business_key": "order-1001",
401+
"duplicate_policy": "use_existing",
402+
},
403+
"input": {"order_id": "order-1001"},
404+
"correlation": {
405+
"provider": "stripe",
406+
"event_type": "checkout.session.completed",
407+
},
408+
}
409+
410+
@pytest.mark.asyncio
411+
async def test_signal_bridge_event_returns_rejected_outcome_for_422(self, client: Client) -> None:
412+
resp = _mock_response(422, {
413+
"schema": "durable-workflow.v2.bridge-adapter-outcome.contract",
414+
"version": 1,
415+
"adapter": "pagerduty",
416+
"action": "signal_workflow",
417+
"accepted": False,
418+
"outcome": "rejected",
419+
"reason": "unknown_target",
420+
"idempotency_key": "pagerduty-event-3003",
421+
"target": {
422+
"workflow_id": "wf-remediation-42",
423+
"signal_name": "incident_escalated",
424+
},
425+
"correlation": {
426+
"provider": "pagerduty",
427+
"event_type": "incident.triggered",
428+
},
429+
})
430+
431+
with patch.object(client._http, "request", new_callable=AsyncMock, return_value=resp):
432+
outcome = await client.send_webhook_bridge_event(
433+
"pagerduty",
434+
action="signal_workflow",
435+
idempotency_key="pagerduty-event-3003",
436+
target={
437+
"workflow_id": "wf-remediation-42",
438+
"signal_name": "incident_escalated",
439+
},
440+
input={
441+
"severity": "critical",
442+
"service": "checkout",
443+
},
444+
correlation={
445+
"provider": "pagerduty",
446+
"event_type": "incident.triggered",
447+
},
448+
)
449+
450+
assert outcome.accepted is False
451+
assert outcome.outcome == "rejected"
452+
assert outcome.reason == "unknown_target"
453+
assert outcome.idempotency_key == "pagerduty-event-3003"
454+
455+
@pytest.mark.asyncio
456+
async def test_bridge_adapter_path_escapes_adapter_segment(self, client: Client) -> None:
457+
resp = _mock_response(202, {
458+
"schema": "durable-workflow.v2.bridge-adapter-outcome.contract",
459+
"version": 1,
460+
"adapter": "ops/foo",
461+
"action": "update_workflow",
462+
"accepted": True,
463+
"outcome": "accepted",
464+
})
465+
466+
with patch.object(client._http, "request", new_callable=AsyncMock, return_value=resp) as mock:
467+
await client.send_webhook_bridge_event(
468+
"ops/foo",
469+
action="update_workflow",
470+
idempotency_key="evt-1",
471+
target={"workflow_id": "wf-1", "update_name": "acknowledge"},
472+
)
473+
474+
assert mock.call_args.args[1] == "/api/bridge-adapters/webhook/ops%2Ffoo"
475+
476+
338477
class TestCancelWorkflow:
339478
@pytest.mark.asyncio
340479
async def test_cancel(self, client: Client) -> None:

0 commit comments

Comments
 (0)