Skip to content

Commit c1ffffd

Browse files
[cross-repo from server#280] Conformance blocker: make worker-versioning measurable against current artifacts (#174)
1 parent b8feeb3 commit c1ffffd

5 files changed

Lines changed: 193 additions & 1 deletion

File tree

src/durable_workflow/client.py

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -597,12 +597,29 @@ class TaskQueueBuildIdCohort:
597597
sdk_versions: list[str]
598598
last_heartbeat_at: str | None = None
599599
first_seen_at: str | None = None
600+
drain_intent: str | None = None
601+
drained_at: str | None = None
602+
promoted_at: str | None = None
603+
rolled_back_at: str | None = None
604+
new_start_selected: bool = False
605+
workflow_definition_fingerprint_count: int = 0
606+
workflow_definition_fingerprint_conflicts: list[dict[str, Any]] | None = None
600607
raw: dict[str, Any] | None = None
601608

602609
@classmethod
603610
def from_dict(cls, data: dict[str, Any]) -> TaskQueueBuildIdCohort:
604611
runtimes = data.get("runtimes")
605612
sdk_versions = data.get("sdk_versions")
613+
fingerprint_conflicts_raw = data.get("workflow_definition_fingerprint_conflicts")
614+
fingerprint_conflicts: list[dict[str, Any]] | None = None
615+
if isinstance(fingerprint_conflicts_raw, list):
616+
fingerprint_conflicts = []
617+
for item in fingerprint_conflicts_raw:
618+
if not isinstance(item, dict):
619+
continue
620+
fingerprint_conflicts.append(
621+
{str(key): value for key, value in item.items() if isinstance(key, str)}
622+
)
606623
return cls(
607624
build_id=data.get("build_id"),
608625
rollout_status=str(data.get("rollout_status") or ""),
@@ -614,6 +631,15 @@ def from_dict(cls, data: dict[str, Any]) -> TaskQueueBuildIdCohort:
614631
sdk_versions=[v for v in sdk_versions if isinstance(v, str)] if isinstance(sdk_versions, list) else [],
615632
last_heartbeat_at=data.get("last_heartbeat_at"),
616633
first_seen_at=data.get("first_seen_at"),
634+
drain_intent=data.get("drain_intent") if isinstance(data.get("drain_intent"), str) else None,
635+
drained_at=data.get("drained_at") if isinstance(data.get("drained_at"), str) else None,
636+
promoted_at=data.get("promoted_at") if isinstance(data.get("promoted_at"), str) else None,
637+
rolled_back_at=data.get("rolled_back_at") if isinstance(data.get("rolled_back_at"), str) else None,
638+
new_start_selected=bool(data.get("new_start_selected")),
639+
workflow_definition_fingerprint_count=int(
640+
data.get("workflow_definition_fingerprint_count") or 0
641+
),
642+
workflow_definition_fingerprint_conflicts=fingerprint_conflicts,
617643
raw=data,
618644
)
619645

@@ -650,28 +676,41 @@ def from_dict(cls, data: dict[str, Any]) -> TaskQueueBuildIdRollout:
650676
class TaskQueueBuildIdRolloutState:
651677
"""Operator-recorded drain intent for one ``(task_queue, build_id)`` cohort.
652678
653-
Returned by ``drain_task_queue_build_id`` and ``resume_task_queue_build_id``.
679+
Returned by ``drain_task_queue_build_id``,
680+
``promote_task_queue_build_id``, and ``resume_task_queue_build_id``.
654681
``build_id`` is ``None`` for the unversioned cohort (workers registered
655682
without a build identifier). ``drain_intent`` is ``"active"`` or
656683
``"draining"``. ``drained_at`` is set only when ``drain_intent`` is
657684
``"draining"``; repeated drains do not shift the timestamp.
685+
``promoted_at`` and ``new_start_selected`` identify the cohort currently
686+
selected for fresh workflow starts.
658687
"""
659688

660689
namespace: str | None
661690
task_queue: str
662691
build_id: str | None
663692
drain_intent: str
664693
drained_at: str | None
694+
promoted_at: str | None = None
695+
rolled_back_at: str | None = None
696+
new_start_selected: bool = False
697+
deployment: dict[str, Any] | None = None
665698
raw: dict[str, Any] | None = None
666699

667700
@classmethod
668701
def from_dict(cls, data: dict[str, Any]) -> TaskQueueBuildIdRolloutState:
702+
deployment_raw = data.get("deployment")
703+
deployment = dict(deployment_raw) if isinstance(deployment_raw, dict) else None
669704
return cls(
670705
namespace=data.get("namespace"),
671706
task_queue=str(data.get("task_queue") or ""),
672707
build_id=data.get("build_id") if isinstance(data.get("build_id"), str) else None,
673708
drain_intent=str(data.get("drain_intent") or ""),
674709
drained_at=data.get("drained_at") if isinstance(data.get("drained_at"), str) else None,
710+
promoted_at=data.get("promoted_at") if isinstance(data.get("promoted_at"), str) else None,
711+
rolled_back_at=data.get("rolled_back_at") if isinstance(data.get("rolled_back_at"), str) else None,
712+
new_start_selected=bool(data.get("new_start_selected")),
713+
deployment=deployment,
675714
raw=data,
676715
)
677716

@@ -1895,6 +1934,24 @@ async def drain_task_queue_build_id(
18951934
action="drain",
18961935
)
18971936

1937+
async def promote_task_queue_build_id(
1938+
self,
1939+
task_queue: str,
1940+
build_id: str | None,
1941+
) -> TaskQueueBuildIdRolloutState:
1942+
"""Select a build-id cohort for fresh workflow starts on a task queue.
1943+
1944+
New workflow starts pin to ``build_id`` after promotion. Existing
1945+
workflow runs keep their stamped compatibility marker and continue
1946+
routing only to compatible workers. Pass ``None`` to promote the
1947+
unversioned cohort.
1948+
"""
1949+
return await self._mutate_task_queue_build_id_rollout(
1950+
task_queue,
1951+
build_id,
1952+
action="promote",
1953+
)
1954+
18981955
async def resume_task_queue_build_id(
18991956
self,
19001957
task_queue: str,

src/durable_workflow/sync.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,16 @@ def drain_task_queue_build_id(
340340
)
341341
return result
342342

343+
def promote_task_queue_build_id(
344+
self,
345+
task_queue: str,
346+
build_id: str | None,
347+
) -> TaskQueueBuildIdRolloutState:
348+
result: TaskQueueBuildIdRolloutState = _run(
349+
self._async.promote_task_queue_build_id(task_queue, build_id)
350+
)
351+
return result
352+
343353
def resume_task_queue_build_id(
344354
self,
345355
task_queue: str,
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
{
2+
"schema": "durable-workflow.polyglot.control-plane-request-fixture",
3+
"version": 1,
4+
"operation": "task_queue.build_id.promote",
5+
"request": {
6+
"method": "POST",
7+
"path": "/task-queues/orders-critical/build-ids/promote",
8+
"body": {
9+
"build_id": "build-2026.04.21-z9"
10+
}
11+
},
12+
"semantic_body": {
13+
"namespace": "orders-prod",
14+
"task_queue": "orders-critical",
15+
"build_id": "build-2026.04.21-z9",
16+
"drain_intent": "active",
17+
"drained_at": null,
18+
"promoted_at": "2026-04-22T10:00:00Z",
19+
"new_start_selected": true
20+
},
21+
"response_body": {
22+
"namespace": "orders-prod",
23+
"task_queue": "orders-critical",
24+
"build_id": "build-2026.04.21-z9",
25+
"drain_intent": "active",
26+
"drained_at": null,
27+
"promoted_at": "2026-04-22T10:00:00Z",
28+
"new_start_selected": true
29+
},
30+
"cli": {
31+
"argv": {
32+
"task-queue": "orders-critical",
33+
"--build-id": "build-2026.04.21-z9",
34+
"--json": true
35+
}
36+
},
37+
"sdk_python": {
38+
"method": "promote_task_queue_build_id",
39+
"args": {
40+
"task_queue": "orders-critical",
41+
"build_id": "build-2026.04.21-z9"
42+
}
43+
}
44+
}

tests/test_client.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1533,6 +1533,17 @@ async def test_list_task_queue_build_ids_surfaces_cohort_worker_counts(
15331533
"total_worker_count": 3,
15341534
"runtimes": ["worker-runtime"],
15351535
"sdk_versions": ["polyglot-sdk/2.0.0"],
1536+
"drain_intent": "active",
1537+
"drained_at": None,
1538+
"promoted_at": "2026-04-22T10:00:00Z",
1539+
"new_start_selected": True,
1540+
"workflow_definition_fingerprint_count": 2,
1541+
"workflow_definition_fingerprint_conflicts": [
1542+
{
1543+
"workflow_type": "orders.Checkout",
1544+
"fingerprint_count": 2,
1545+
}
1546+
],
15361547
"last_heartbeat_at": "2026-04-22T09:30:00Z",
15371548
"first_seen_at": "2026-04-22T08:00:00Z",
15381549
},
@@ -1571,6 +1582,16 @@ async def test_list_task_queue_build_ids_surfaces_cohort_worker_counts(
15711582
assert alpha.total_worker_count == 3
15721583
assert alpha.runtimes == ["worker-runtime"]
15731584
assert alpha.sdk_versions == ["polyglot-sdk/2.0.0"]
1585+
assert alpha.drain_intent == "active"
1586+
assert alpha.promoted_at == "2026-04-22T10:00:00Z"
1587+
assert alpha.new_start_selected is True
1588+
assert alpha.workflow_definition_fingerprint_count == 2
1589+
assert alpha.workflow_definition_fingerprint_conflicts == [
1590+
{
1591+
"workflow_type": "orders.Checkout",
1592+
"fingerprint_count": 2,
1593+
}
1594+
]
15741595
assert unversioned.build_id is None
15751596
assert unversioned.rollout_status == "stale_only"
15761597
assert unversioned.stale_worker_count == 1
@@ -1607,6 +1628,38 @@ async def test_drain_task_queue_build_id_matches_polyglot_fixture(self, client:
16071628
assert result.drain_intent == semantic["drain_intent"]
16081629
assert result.drained_at == semantic["drained_at"]
16091630

1631+
@pytest.mark.asyncio
1632+
async def test_promote_task_queue_build_id_matches_polyglot_fixture(self, client: Client) -> None:
1633+
fixture_path = (
1634+
Path(__file__).parent
1635+
/ "fixtures"
1636+
/ "control-plane"
1637+
/ "task-queue-build-id-promote-parity.json"
1638+
)
1639+
fixture = json.loads(fixture_path.read_text())
1640+
assert fixture["operation"] == "task_queue.build_id.promote"
1641+
sdk = fixture["sdk_python"]
1642+
resp = _mock_response(200, fixture["response_body"])
1643+
1644+
with patch.object(
1645+
client._http, "request", new_callable=AsyncMock, return_value=resp
1646+
) as mock:
1647+
result = await client.promote_task_queue_build_id(**sdk["args"])
1648+
1649+
assert mock.call_args.args[0] == fixture["request"]["method"]
1650+
assert mock.call_args.args[1] == f"/api{fixture['request']['path']}"
1651+
body = mock.call_args.kwargs.get("json")
1652+
assert body == fixture["request"]["body"]
1653+
1654+
semantic = fixture["semantic_body"]
1655+
assert result.namespace == semantic["namespace"]
1656+
assert result.task_queue == semantic["task_queue"]
1657+
assert result.build_id == semantic["build_id"]
1658+
assert result.drain_intent == semantic["drain_intent"]
1659+
assert result.drained_at is None
1660+
assert result.promoted_at == semantic["promoted_at"]
1661+
assert result.new_start_selected == semantic["new_start_selected"]
1662+
16101663
@pytest.mark.asyncio
16111664
async def test_resume_task_queue_build_id_matches_polyglot_fixture(self, client: Client) -> None:
16121665
fixture_path = (

tests/test_sync.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,34 @@ def test_drain_task_queue_build_id(self) -> None:
294294
)
295295
assert mock.call_args.kwargs.get("json") == {"build_id": "build-alpha"}
296296

297+
def test_promote_task_queue_build_id(self) -> None:
298+
client = Client("http://localhost:8080")
299+
resp = _mock_response(
300+
200,
301+
{
302+
"namespace": "default",
303+
"task_queue": "orders",
304+
"build_id": "build-alpha",
305+
"drain_intent": "active",
306+
"drained_at": None,
307+
"promoted_at": "2026-04-22T10:00:00Z",
308+
"new_start_selected": True,
309+
},
310+
)
311+
with patch.object(
312+
client._async._http, "request", new_callable=AsyncMock, return_value=resp
313+
) as mock:
314+
result = client.promote_task_queue_build_id("orders", "build-alpha")
315+
assert result.drain_intent == "active"
316+
assert result.promoted_at == "2026-04-22T10:00:00Z"
317+
assert result.new_start_selected is True
318+
assert result.build_id == "build-alpha"
319+
assert mock.call_args.args[:2] == (
320+
"POST",
321+
"/api/task-queues/orders/build-ids/promote",
322+
)
323+
assert mock.call_args.kwargs.get("json") == {"build_id": "build-alpha"}
324+
297325
def test_resume_task_queue_build_id_for_unversioned_cohort(self) -> None:
298326
client = Client("http://localhost:8080")
299327
resp = _mock_response(

0 commit comments

Comments
 (0)