Skip to content

Commit 032c2fb

Browse files
Guard Python workflow re-registration
Issue: zorporation/durable-workflow#448 Loop-ID: build-02
1 parent b423426 commit 032c2fb

5 files changed

Lines changed: 134 additions & 0 deletions

File tree

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,11 @@ server. Query routing and synchronous pre-accept update validator execution are
133133
still server-side follow-ups; use those paths only with deployments that
134134
advertise support for the target workflow type.
135135

136+
Workers fingerprint registered workflow class definitions and advertise those
137+
fingerprints during registration. Re-registering the same `worker_id` with a
138+
changed class body for an already advertised workflow type raises immediately;
139+
restart the worker process with a new id before serving changed workflow code.
140+
136141
## Features
137142

138143
- **Async-first**: Built on `httpx` and `asyncio`
@@ -141,6 +146,7 @@ advertise support for the target workflow type.
141146
- **HTTP/JSON protocol**: No gRPC, no protobuf dependencies
142147
- **Codec envelopes**: Avro payloads by default, with JSON decode compatibility for existing history
143148
- **Payload-size warnings**: Structured warnings before oversized workflow, activity, signal, update, query, or search-attribute payloads reach the server
149+
- **Workflow definition guard**: Worker registration refuses same-id hot reloads when a workflow class definition changed
144150
- **Worker interceptors**: Typed hooks around workflow tasks, activity calls, and query tasks for tracing, logging, and custom metrics
145151
- **Metrics hooks**: Pluggable counters and histograms, with an optional Prometheus adapter
146152

src/durable_workflow/client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1103,6 +1103,7 @@ async def register_worker(
11031103
worker_id: str,
11041104
task_queue: str,
11051105
supported_workflow_types: list[str] | None = None,
1106+
workflow_definition_fingerprints: dict[str, str] | None = None,
11061107
supported_activity_types: list[str] | None = None,
11071108
runtime: str = "python",
11081109
sdk_version: str | None = None,
@@ -1123,6 +1124,8 @@ async def register_worker(
11231124
"supported_workflow_types": supported_workflow_types or [],
11241125
"supported_activity_types": supported_activity_types or [],
11251126
}
1127+
if workflow_definition_fingerprints is not None:
1128+
body["workflow_definition_fingerprints"] = workflow_definition_fingerprints
11261129
return await self._request("POST", "/worker/register", worker=True, json=body)
11271130

11281131
async def poll_workflow_task(

src/durable_workflow/worker.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@
1717

1818
import asyncio
1919
import contextlib
20+
import hashlib
21+
import inspect
2022
import logging
2123
import time
2224
import traceback
2325
import uuid
2426
from collections.abc import Awaitable, Callable, Iterable
27+
from types import FunctionType
2528
from typing import Any
2629

2730
from . import serializer
@@ -54,6 +57,7 @@
5457
log = logging.getLogger("durable_workflow.worker")
5558

5659
_TERMINAL_WORKFLOW_STATUSES = {"completed", "failed", "terminated", "canceled", "cancelled"}
60+
_WORKER_WORKFLOW_FINGERPRINTS: dict[tuple[str, str], str] = {}
5761

5862

5963
def _command_payload_codec(codec: object) -> str:
@@ -72,6 +76,70 @@ def _string_or_none(value: Any) -> str | None:
7276
return value if isinstance(value, str) and value else None
7377

7478

79+
def _callable_fingerprint_payload(value: object) -> str:
80+
if isinstance(value, staticmethod | classmethod):
81+
value = value.__func__
82+
if not isinstance(value, FunctionType):
83+
return repr(value)
84+
85+
try:
86+
return inspect.getsource(value)
87+
except (OSError, TypeError):
88+
code = value.__code__
89+
return repr(
90+
(
91+
code.co_argcount,
92+
code.co_posonlyargcount,
93+
code.co_kwonlyargcount,
94+
code.co_code,
95+
code.co_consts,
96+
code.co_names,
97+
code.co_varnames,
98+
)
99+
)
100+
101+
102+
def _workflow_definition_fingerprint(cls: type) -> str:
103+
h = hashlib.sha256()
104+
h.update(b"durable-workflow-python.workflow-definition.v1\0")
105+
h.update(f"{cls.__module__}.{cls.__qualname__}\0".encode())
106+
h.update(f"{_workflow_name(cls)}\0".encode())
107+
108+
for registry_name in (
109+
"__workflow_signals__",
110+
"__workflow_queries__",
111+
"__workflow_updates__",
112+
"__workflow_update_validators__",
113+
):
114+
registry = getattr(cls, registry_name, {}) or {}
115+
h.update(registry_name.encode())
116+
h.update(repr(sorted(registry.items())).encode())
117+
118+
try:
119+
h.update(inspect.getsource(cls).encode())
120+
except (OSError, TypeError):
121+
for attr, value in sorted(cls.__dict__.items()):
122+
if attr.startswith("__") and attr.endswith("__"):
123+
continue
124+
h.update(attr.encode())
125+
h.update(_callable_fingerprint_payload(value).encode())
126+
127+
return f"sha256:{h.hexdigest()}"
128+
129+
130+
def _guard_worker_workflow_fingerprints(worker_id: str, fingerprints: dict[str, str]) -> None:
131+
for workflow_type, fingerprint in fingerprints.items():
132+
key = (worker_id, workflow_type)
133+
previous = _WORKER_WORKFLOW_FINGERPRINTS.get(key)
134+
if previous is not None and previous != fingerprint:
135+
raise RuntimeError(
136+
"Workflow definition changed for worker "
137+
f"{worker_id!r} and workflow type {workflow_type!r}; "
138+
"restart the worker process before re-registering this workflow type."
139+
)
140+
_WORKER_WORKFLOW_FINGERPRINTS[key] = fingerprint
141+
142+
75143
def _manifest_version(manifest: Any) -> str:
76144
if isinstance(manifest, dict):
77145
value = manifest.get("version")
@@ -166,8 +234,13 @@ def __init__(
166234
self.client = client
167235
self.task_queue = task_queue
168236
self.workflows = {_workflow_name(w): w for w in workflows}
237+
self.workflow_definition_fingerprints = {
238+
workflow_type: _workflow_definition_fingerprint(workflow_cls)
239+
for workflow_type, workflow_cls in self.workflows.items()
240+
}
169241
self.activities = {_activity_name(a): a for a in activities}
170242
self.worker_id = worker_id or f"py-worker-{uuid.uuid4().hex[:8]}"
243+
_guard_worker_workflow_fingerprints(self.worker_id, self.workflow_definition_fingerprints)
171244
self._poll_timeout = poll_timeout
172245
self._stop = asyncio.Event()
173246
self._wf_semaphore = asyncio.Semaphore(max_concurrent_workflow_tasks)
@@ -244,6 +317,7 @@ async def _register(self) -> None:
244317
worker_id=self.worker_id,
245318
task_queue=self.task_queue,
246319
supported_workflow_types=list(self.workflows),
320+
workflow_definition_fingerprints=self.workflow_definition_fingerprints,
247321
supported_activity_types=list(self.activities),
248322
)
249323
log.info("worker %s registered on %s", self.worker_id, self.task_queue)

tests/test_client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,11 +629,13 @@ async def test_register(self, client: Client) -> None:
629629
worker_id="w1",
630630
task_queue="q1",
631631
supported_workflow_types=["greeter"],
632+
workflow_definition_fingerprints={"greeter": "sha256:abc"},
632633
supported_activity_types=["greet"],
633634
)
634635
assert result["registered"] is True
635636
body = mock.call_args.kwargs.get("json") or mock.call_args[1].get("json")
636637
assert body["runtime"] == "python"
638+
assert body["workflow_definition_fingerprints"] == {"greeter": "sha256:abc"}
637639

638640
@pytest.mark.asyncio
639641
async def test_register_advertises_installed_package_version(self, client: Client) -> None:

tests/test_worker.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,57 @@ async def test_register(self, mock_client: AsyncMock) -> None:
127127
call_kwargs = mock_client.register_worker.call_args.kwargs
128128
assert call_kwargs["task_queue"] == "q1"
129129
assert "test-wf" in call_kwargs["supported_workflow_types"]
130+
assert call_kwargs["workflow_definition_fingerprints"]["test-wf"].startswith("sha256:")
130131
assert "test-act" in call_kwargs["supported_activity_types"]
131132

133+
def test_constructor_rejects_changed_workflow_definition_for_same_worker_id(
134+
self, mock_client: AsyncMock
135+
) -> None:
136+
@workflow.defn(name="reloadable-wf")
137+
class ReloadableWorkflowV1:
138+
def run(self, ctx): # type: ignore[no-untyped-def]
139+
return "v1"
140+
141+
@workflow.defn(name="reloadable-wf")
142+
class ReloadableWorkflowV2:
143+
def run(self, ctx): # type: ignore[no-untyped-def]
144+
return "v2"
145+
146+
Worker(
147+
mock_client,
148+
task_queue="q1",
149+
workflows=[ReloadableWorkflowV1],
150+
activities=[],
151+
worker_id="reload-worker",
152+
)
153+
154+
with pytest.raises(RuntimeError, match="Workflow definition changed"):
155+
Worker(
156+
mock_client,
157+
task_queue="q1",
158+
workflows=[ReloadableWorkflowV2],
159+
activities=[],
160+
worker_id="reload-worker",
161+
)
162+
163+
def test_constructor_allows_same_workflow_definition_for_same_worker_id(
164+
self, mock_client: AsyncMock
165+
) -> None:
166+
Worker(
167+
mock_client,
168+
task_queue="q1",
169+
workflows=[TestWorkflow],
170+
activities=[],
171+
worker_id="stable-worker",
172+
)
173+
Worker(
174+
mock_client,
175+
task_queue="q1",
176+
workflows=[TestWorkflow],
177+
activities=[],
178+
worker_id="stable-worker",
179+
)
180+
132181
@pytest.mark.asyncio
133182
async def test_register_calls_cluster_info(self, mock_client: AsyncMock) -> None:
134183
worker = Worker(mock_client, task_queue="q1", workflows=[TestWorkflow], activities=[])

0 commit comments

Comments
 (0)