Skip to content

Commit 770e4ea

Browse files
Expose workflow maintenance commands in Python SDK
1 parent 4acd087 commit 770e4ea

5 files changed

Lines changed: 153 additions & 0 deletions

File tree

src/durable_workflow/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
TaskQueueList,
3232
TaskQueueQueryAdmission,
3333
TaskQueueTaskAdmission,
34+
WorkflowCommandResult,
3435
WorkflowExecution,
3536
WorkflowHandle,
3637
WorkflowList,
@@ -167,6 +168,7 @@
167168
"TaskQueueTaskAdmission",
168169
"Worker",
169170
"WorkerInterceptor",
171+
"WorkflowCommandResult",
170172
"WorkflowExecution",
171173
"WorkflowTaskHandler",
172174
"WorkflowTaskInterceptorContext",

src/durable_workflow/client.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,27 @@ def from_dict(
176176
)
177177

178178

179+
@dataclass
180+
class WorkflowCommandResult:
181+
"""Machine-readable outcome returned by workflow control commands."""
182+
183+
workflow_id: str
184+
outcome: str
185+
command_status: str | None = None
186+
command_id: str | None = None
187+
raw: dict[str, Any] | None = None
188+
189+
@classmethod
190+
def from_dict(cls, data: dict[str, Any], *, workflow_id: str | None = None) -> WorkflowCommandResult:
191+
return cls(
192+
workflow_id=data.get("workflow_id", workflow_id or ""),
193+
outcome=data.get("outcome", ""),
194+
command_status=data.get("command_status"),
195+
command_id=data.get("command_id"),
196+
raw=data,
197+
)
198+
199+
179200
@dataclass
180201
class WorkflowRunList:
181202
"""All known durable runs for one workflow execution, oldest first."""
@@ -1213,6 +1234,19 @@ async def terminate_workflow(self, workflow_id: str, *, reason: str | None = Non
12131234
body["reason"] = reason
12141235
await self._request("POST", f"/workflows/{workflow_id}/terminate", json=body, context=workflow_id)
12151236

1237+
async def repair_workflow(self, workflow_id: str) -> WorkflowCommandResult:
1238+
"""Ask the server to repair a stalled workflow, returning the command outcome."""
1239+
data = await self._request("POST", f"/workflows/{workflow_id}/repair", json={}, context=workflow_id)
1240+
return WorkflowCommandResult.from_dict(data, workflow_id=workflow_id)
1241+
1242+
async def archive_workflow(self, workflow_id: str, *, reason: str | None = None) -> WorkflowCommandResult:
1243+
"""Move a terminal workflow into the archive tier, returning the command outcome."""
1244+
body: dict[str, Any] = {}
1245+
if reason is not None:
1246+
body["reason"] = reason
1247+
data = await self._request("POST", f"/workflows/{workflow_id}/archive", json=body, context=workflow_id)
1248+
return WorkflowCommandResult.from_dict(data, workflow_id=workflow_id)
1249+
12161250
async def update_workflow(
12171251
self,
12181252
workflow_id: str,
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
{
2+
"schema": "durable-workflow.polyglot.control-plane-request-fixture",
3+
"version": 1,
4+
"operation": "workflow.archive",
5+
"request": {
6+
"method": "POST",
7+
"path": "/workflows/wf-polyglot-231/archive"
8+
},
9+
"semantic_body": {
10+
"workflow_id": "wf-polyglot-231",
11+
"reason": "retention policy cleanup"
12+
},
13+
"response_body": {
14+
"workflow_id": "wf-polyglot-231",
15+
"outcome": "archived",
16+
"command_status": "accepted",
17+
"command_id": "cmd-polyglot-archive-231"
18+
},
19+
"cli": {
20+
"argv": {
21+
"workflow-id": "wf-polyglot-231",
22+
"--reason": "retention policy cleanup",
23+
"--json": true
24+
},
25+
"expected_body": {
26+
"reason": "retention policy cleanup"
27+
}
28+
},
29+
"sdk_python": {
30+
"method": "archive_workflow",
31+
"args": {
32+
"workflow_id": "wf-polyglot-231",
33+
"reason": "retention policy cleanup"
34+
},
35+
"expected_body": {
36+
"reason": "retention policy cleanup"
37+
}
38+
}
39+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
{
2+
"schema": "durable-workflow.polyglot.control-plane-request-fixture",
3+
"version": 1,
4+
"operation": "workflow.repair",
5+
"request": {
6+
"method": "POST",
7+
"path": "/workflows/wf-polyglot-231/repair"
8+
},
9+
"semantic_body": {
10+
"workflow_id": "wf-polyglot-231"
11+
},
12+
"response_body": {
13+
"workflow_id": "wf-polyglot-231",
14+
"outcome": "repair_requested",
15+
"command_status": "accepted",
16+
"command_id": "cmd-polyglot-repair-231"
17+
},
18+
"cli": {
19+
"argv": {
20+
"workflow-id": "wf-polyglot-231",
21+
"--json": true
22+
},
23+
"expected_body": {}
24+
},
25+
"sdk_python": {
26+
"method": "repair_workflow",
27+
"args": {
28+
"workflow_id": "wf-polyglot-231"
29+
},
30+
"expected_body": {}
31+
}
32+
}

tests/test_client.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,52 @@ async def test_terminate_request_matches_polyglot_fixture(self, client: Client)
642642
assert sdk["args"]["workflow_id"] == fixture["semantic_body"]["workflow_id"]
643643

644644

645+
class TestWorkflowMaintenanceCommands:
646+
@pytest.mark.asyncio
647+
async def test_repair_request_matches_polyglot_fixture(self, client: Client) -> None:
648+
fixture_path = Path(__file__).parent / "fixtures" / "control-plane" / "workflow-repair-parity.json"
649+
fixture = json.loads(fixture_path.read_text())
650+
sdk = fixture["sdk_python"]
651+
652+
resp = _mock_response(200, fixture["response_body"])
653+
654+
with patch.object(client._http, "request", new_callable=AsyncMock, return_value=resp) as mock:
655+
result = await client.repair_workflow(**sdk["args"])
656+
657+
call_args = mock.call_args
658+
assert call_args.args[0] == fixture["request"]["method"]
659+
assert call_args.args[1] == f"/api{fixture['request']['path']}"
660+
assert (call_args.kwargs.get("json") or call_args[1].get("json")) == sdk["expected_body"]
661+
662+
assert result.workflow_id == fixture["semantic_body"]["workflow_id"]
663+
assert result.outcome == fixture["response_body"]["outcome"]
664+
assert result.command_status == fixture["response_body"]["command_status"]
665+
assert result.command_id == fixture["response_body"]["command_id"]
666+
667+
@pytest.mark.asyncio
668+
async def test_archive_request_matches_polyglot_fixture(self, client: Client) -> None:
669+
fixture_path = Path(__file__).parent / "fixtures" / "control-plane" / "workflow-archive-parity.json"
670+
fixture = json.loads(fixture_path.read_text())
671+
sdk = fixture["sdk_python"]
672+
673+
resp = _mock_response(200, fixture["response_body"])
674+
675+
with patch.object(client._http, "request", new_callable=AsyncMock, return_value=resp) as mock:
676+
result = await client.archive_workflow(**sdk["args"])
677+
678+
call_args = mock.call_args
679+
assert call_args.args[0] == fixture["request"]["method"]
680+
assert call_args.args[1] == f"/api{fixture['request']['path']}"
681+
body = call_args.kwargs.get("json") or call_args[1].get("json")
682+
683+
assert body == sdk["expected_body"]
684+
assert body["reason"] == fixture["semantic_body"]["reason"]
685+
assert result.workflow_id == fixture["semantic_body"]["workflow_id"]
686+
assert result.outcome == fixture["response_body"]["outcome"]
687+
assert result.command_status == fixture["response_body"]["command_status"]
688+
assert result.command_id == fixture["response_body"]["command_id"]
689+
690+
645691
class TestQueryWorkflow:
646692
@pytest.mark.asyncio
647693
async def test_query(self, client: Client) -> None:

0 commit comments

Comments
 (0)