Skip to content

Commit 93dafbf

Browse files
Expose workflow controls in Python sync client
Add sync facade and workflow-handle helpers for run visibility, history export, repair, and archive operations so Python callers can use the same control-plane methods through async handles and the blocking client.
1 parent aa14813 commit 93dafbf

4 files changed

Lines changed: 239 additions & 0 deletions

File tree

src/durable_workflow/client.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,29 @@ async def describe(self) -> WorkflowExecution:
523523
"""Return the server's current view of this workflow. See :meth:`Client.describe_workflow`."""
524524
return await self._client.describe_workflow(self.workflow_id)
525525

526+
async def get_history(self) -> Any:
527+
"""Fetch this run's durable history. See :meth:`Client.get_history`."""
528+
if self.run_id is None:
529+
raise ValueError("run_id is required to fetch workflow history from a handle")
530+
return await self._client.get_history(self.workflow_id, self.run_id)
531+
532+
async def export_history(self) -> Any:
533+
"""Export this run's history as a replay bundle. See :meth:`Client.export_history`."""
534+
if self.run_id is None:
535+
raise ValueError("run_id is required to export workflow history from a handle")
536+
return await self._client.export_history(self.workflow_id, self.run_id)
537+
538+
async def list_runs(self) -> WorkflowRunList:
539+
"""List all runs in this workflow execution chain. See :meth:`Client.list_workflow_runs`."""
540+
return await self._client.list_workflow_runs(self.workflow_id)
541+
542+
async def describe_run(self, run_id: str | None = None) -> WorkflowRun:
543+
"""Return one run's detailed status. See :meth:`Client.describe_workflow_run`."""
544+
selected_run_id = run_id or self.run_id
545+
if selected_run_id is None:
546+
raise ValueError("run_id is required to describe a workflow run from a handle")
547+
return await self._client.describe_workflow_run(self.workflow_id, selected_run_id)
548+
526549
async def signal(self, signal_name: str, args: list[Any] | None = None) -> None:
527550
"""Deliver an external signal to this workflow. See :meth:`Client.signal_workflow`."""
528551
await self._client.signal_workflow(self.workflow_id, signal_name, args=args)
@@ -539,6 +562,14 @@ async def terminate(self, *, reason: str | None = None) -> None:
539562
"""Forcefully stop this workflow. See :meth:`Client.terminate_workflow`."""
540563
await self._client.terminate_workflow(self.workflow_id, reason=reason)
541564

565+
async def repair(self) -> WorkflowCommandResult:
566+
"""Ask the server to repair this workflow. See :meth:`Client.repair_workflow`."""
567+
return await self._client.repair_workflow(self.workflow_id)
568+
569+
async def archive(self, *, reason: str | None = None) -> WorkflowCommandResult:
570+
"""Move this terminal workflow into the archive tier. See :meth:`Client.archive_workflow`."""
571+
return await self._client.archive_workflow(self.workflow_id, reason=reason)
572+
542573
async def update(
543574
self,
544575
update_name: str,

src/durable_workflow/sync.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616
ScheduleTriggerResult,
1717
TaskQueueDescription,
1818
TaskQueueList,
19+
WorkflowCommandResult,
1920
WorkflowExecution,
2021
WorkflowHandle,
2122
WorkflowList,
23+
WorkflowRun,
24+
WorkflowRunList,
2225
)
2326
from .metrics import MetricsRecorder
2427
from .retry_policy import TransportRetryPolicy
@@ -54,6 +57,20 @@ def describe(self) -> WorkflowExecution:
5457
result: WorkflowExecution = _run(self._handle.describe())
5558
return result
5659

60+
def get_history(self) -> Any:
61+
return _run(self._handle.get_history())
62+
63+
def export_history(self) -> Any:
64+
return _run(self._handle.export_history())
65+
66+
def list_runs(self) -> WorkflowRunList:
67+
result: WorkflowRunList = _run(self._handle.list_runs())
68+
return result
69+
70+
def describe_run(self, run_id: str | None = None) -> WorkflowRun:
71+
result: WorkflowRun = _run(self._handle.describe_run(run_id))
72+
return result
73+
5774
def signal(self, signal_name: str, args: list[Any] | None = None) -> None:
5875
_run(self._handle.signal(signal_name, args=args))
5976

@@ -66,6 +83,14 @@ def cancel(self, *, reason: str | None = None) -> None:
6683
def terminate(self, *, reason: str | None = None) -> None:
6784
_run(self._handle.terminate(reason=reason))
6885

86+
def repair(self) -> WorkflowCommandResult:
87+
result: WorkflowCommandResult = _run(self._handle.repair())
88+
return result
89+
90+
def archive(self, *, reason: str | None = None) -> WorkflowCommandResult:
91+
result: WorkflowCommandResult = _run(self._handle.archive(reason=reason))
92+
return result
93+
6994
def update(
7095
self,
7196
update_name: str,
@@ -262,6 +287,14 @@ def get_history(self, workflow_id: str, run_id: str) -> Any:
262287
def export_history(self, workflow_id: str, run_id: str) -> Any:
263288
return _run(self._async.export_history(workflow_id, run_id))
264289

290+
def list_workflow_runs(self, workflow_id: str) -> WorkflowRunList:
291+
result: WorkflowRunList = _run(self._async.list_workflow_runs(workflow_id))
292+
return result
293+
294+
def describe_workflow_run(self, workflow_id: str, run_id: str) -> WorkflowRun:
295+
result: WorkflowRun = _run(self._async.describe_workflow_run(workflow_id, run_id))
296+
return result
297+
265298
def signal_workflow(self, workflow_id: str, signal_name: str, *, args: list[Any] | None = None) -> None:
266299
_run(self._async.signal_workflow(workflow_id, signal_name, args=args))
267300

@@ -274,6 +307,14 @@ def cancel_workflow(self, workflow_id: str, *, reason: str | None = None) -> Non
274307
def terminate_workflow(self, workflow_id: str, *, reason: str | None = None) -> None:
275308
_run(self._async.terminate_workflow(workflow_id, reason=reason))
276309

310+
def repair_workflow(self, workflow_id: str) -> WorkflowCommandResult:
311+
result: WorkflowCommandResult = _run(self._async.repair_workflow(workflow_id))
312+
return result
313+
314+
def archive_workflow(self, workflow_id: str, *, reason: str | None = None) -> WorkflowCommandResult:
315+
result: WorkflowCommandResult = _run(self._async.archive_workflow(workflow_id, reason=reason))
316+
return result
317+
277318
def update_workflow(
278319
self,
279320
workflow_id: str,

tests/test_client.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,57 @@ async def test_describe_run_request_matches_polyglot_fixture(self, client: Clien
426426
assert run.actions == response["actions"]
427427

428428

429+
class TestWorkflowHandleControlPlane:
430+
@pytest.mark.asyncio
431+
async def test_run_visibility_delegates_to_client(self, client: Client) -> None:
432+
handle = WorkflowHandle(client, workflow_id="wf-1", run_id="r1", workflow_type="greeter")
433+
client.get_history = AsyncMock(return_value={"events": []})
434+
client.export_history = AsyncMock(return_value={"schema": "durable.workflow.history.v2"})
435+
client.list_workflow_runs = AsyncMock(return_value="runs")
436+
client.describe_workflow_run = AsyncMock(return_value="run")
437+
438+
assert await handle.get_history() == {"events": []}
439+
assert await handle.export_history() == {"schema": "durable.workflow.history.v2"}
440+
assert await handle.list_runs() == "runs"
441+
assert await handle.describe_run() == "run"
442+
443+
client.get_history.assert_awaited_once_with("wf-1", "r1")
444+
client.export_history.assert_awaited_once_with("wf-1", "r1")
445+
client.list_workflow_runs.assert_awaited_once_with("wf-1")
446+
client.describe_workflow_run.assert_awaited_once_with("wf-1", "r1")
447+
448+
@pytest.mark.asyncio
449+
async def test_run_specific_methods_require_run_id(self, client: Client) -> None:
450+
handle = WorkflowHandle(client, workflow_id="wf-1")
451+
452+
with pytest.raises(ValueError, match="run_id is required"):
453+
await handle.get_history()
454+
with pytest.raises(ValueError, match="run_id is required"):
455+
await handle.export_history()
456+
with pytest.raises(ValueError, match="run_id is required"):
457+
await handle.describe_run()
458+
459+
@pytest.mark.asyncio
460+
async def test_describe_run_accepts_explicit_run_id(self, client: Client) -> None:
461+
handle = WorkflowHandle(client, workflow_id="wf-1")
462+
client.describe_workflow_run = AsyncMock(return_value="run")
463+
464+
assert await handle.describe_run("r2") == "run"
465+
client.describe_workflow_run.assert_awaited_once_with("wf-1", "r2")
466+
467+
@pytest.mark.asyncio
468+
async def test_maintenance_delegates_to_client(self, client: Client) -> None:
469+
handle = WorkflowHandle(client, workflow_id="wf-1", run_id="r1", workflow_type="greeter")
470+
client.repair_workflow = AsyncMock(return_value="repair")
471+
client.archive_workflow = AsyncMock(return_value="archive")
472+
473+
assert await handle.repair() == "repair"
474+
assert await handle.archive(reason="retention") == "archive"
475+
476+
client.repair_workflow.assert_awaited_once_with("wf-1")
477+
client.archive_workflow.assert_awaited_once_with("wf-1", reason="retention")
478+
479+
429480
class TestSignalWorkflow:
430481
@pytest.mark.asyncio
431482
async def test_signal(self, client: Client) -> None:

tests/test_sync.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import httpx
88
import pytest
99

10+
from durable_workflow.client import WorkflowHandle
1011
from durable_workflow.sync import Client, SyncWorkflowHandle
1112

1213

@@ -161,6 +162,101 @@ def test_describe_task_queue(self) -> None:
161162
assert mock.call_args.args[:2] == ("GET", "/api/task-queues/orders%2Fhigh%20priority")
162163

163164

165+
class TestSyncClientRunVisibility:
166+
def test_list_workflow_runs(self) -> None:
167+
client = Client("http://localhost:8080")
168+
resp = _mock_response(
169+
200,
170+
{
171+
"workflow_id": "wf-1",
172+
"run_count": 2,
173+
"runs": [
174+
{"workflow_id": "wf-1", "run_id": "r1", "workflow_type": "greeter", "status": "completed"},
175+
{
176+
"workflow_id": "wf-1",
177+
"run_id": "r2",
178+
"workflow_type": "greeter",
179+
"status": "running",
180+
"is_current_run": True,
181+
},
182+
],
183+
},
184+
)
185+
with patch.object(client._async._http, "request", new_callable=AsyncMock, return_value=resp) as mock:
186+
result = client.list_workflow_runs("wf-1")
187+
188+
assert result.workflow_id == "wf-1"
189+
assert result.run_count == 2
190+
assert [run.run_id for run in result.runs] == ["r1", "r2"]
191+
assert result.runs[1].is_current_run is True
192+
assert mock.call_args.args[:2] == ("GET", "/api/workflows/wf-1/runs")
193+
194+
def test_describe_workflow_run(self) -> None:
195+
client = Client("http://localhost:8080")
196+
resp = _mock_response(
197+
200,
198+
{
199+
"workflow_id": "wf-1",
200+
"run_id": "r1",
201+
"workflow_type": "greeter",
202+
"status": "completed",
203+
"status_bucket": "terminal",
204+
"actions": {"archive": {"enabled": True}},
205+
},
206+
)
207+
with patch.object(client._async._http, "request", new_callable=AsyncMock, return_value=resp) as mock:
208+
run = client.describe_workflow_run("wf-1", "r1")
209+
210+
assert run.workflow_id == "wf-1"
211+
assert run.run_id == "r1"
212+
assert run.status_bucket == "terminal"
213+
assert run.actions == {"archive": {"enabled": True}}
214+
assert mock.call_args.args[:2] == ("GET", "/api/workflows/wf-1/runs/r1")
215+
216+
217+
class TestSyncClientMaintenance:
218+
def test_repair_workflow(self) -> None:
219+
client = Client("http://localhost:8080")
220+
resp = _mock_response(
221+
200,
222+
{
223+
"workflow_id": "wf-1",
224+
"outcome": "accepted",
225+
"command_status": "queued",
226+
"command_id": "cmd-1",
227+
},
228+
)
229+
with patch.object(client._async._http, "request", new_callable=AsyncMock, return_value=resp) as mock:
230+
result = client.repair_workflow("wf-1")
231+
232+
assert result.workflow_id == "wf-1"
233+
assert result.outcome == "accepted"
234+
assert result.command_status == "queued"
235+
assert result.command_id == "cmd-1"
236+
assert mock.call_args.args[:2] == ("POST", "/api/workflows/wf-1/repair")
237+
238+
def test_archive_workflow(self) -> None:
239+
client = Client("http://localhost:8080")
240+
resp = _mock_response(
241+
200,
242+
{
243+
"workflow_id": "wf-1",
244+
"outcome": "accepted",
245+
"command_status": "completed",
246+
"command_id": "cmd-2",
247+
},
248+
)
249+
with patch.object(client._async._http, "request", new_callable=AsyncMock, return_value=resp) as mock:
250+
result = client.archive_workflow("wf-1", reason="retention policy")
251+
252+
assert result.workflow_id == "wf-1"
253+
assert result.outcome == "accepted"
254+
assert result.command_status == "completed"
255+
assert result.command_id == "cmd-2"
256+
assert mock.call_args.args[:2] == ("POST", "/api/workflows/wf-1/archive")
257+
assert mock.call_args.kwargs["json"] == {"reason": "retention policy"}
258+
259+
164260
class TestSyncClientUpdate:
165261
def test_update(self) -> None:
166262
client = Client("http://localhost:8080")
@@ -187,6 +283,26 @@ def test_handle_update(self) -> None:
187283
assert result["outcome"] == "completed"
188284

189285

286+
class TestSyncWorkflowHandleControlPlane:
287+
def test_run_visibility_and_maintenance_delegate_to_async_handle(self) -> None:
288+
async_handle = WorkflowHandle(AsyncMock(), workflow_id="wf-1", run_id="r1", workflow_type="greeter")
289+
async_handle.get_history = AsyncMock(return_value={"events": []}) # type: ignore[method-assign]
290+
async_handle.export_history = AsyncMock(return_value={"schema": "durable.workflow.history.v2"}) # type: ignore[method-assign]
291+
async_handle.list_runs = AsyncMock(return_value=[]) # type: ignore[method-assign]
292+
async_handle.describe_run = AsyncMock(return_value={"run_id": "r1"}) # type: ignore[method-assign]
293+
async_handle.repair = AsyncMock(return_value={"outcome": "accepted"}) # type: ignore[method-assign]
294+
async_handle.archive = AsyncMock(return_value={"outcome": "completed"}) # type: ignore[method-assign]
295+
handle = SyncWorkflowHandle(async_handle)
296+
297+
assert handle.get_history() == {"events": []}
298+
assert handle.export_history() == {"schema": "durable.workflow.history.v2"}
299+
assert handle.list_runs() == []
300+
assert handle.describe_run() == {"run_id": "r1"}
301+
assert handle.repair() == {"outcome": "accepted"}
302+
assert handle.archive(reason="retention") == {"outcome": "completed"}
303+
async_handle.archive.assert_awaited_once_with(reason="retention")
304+
305+
190306
class TestSyncClientContextManager:
191307
def test_context_manager(self) -> None:
192308
with Client("http://localhost:8080") as client:

0 commit comments

Comments
 (0)