Skip to content

Commit bff48ab

Browse files
bokelleyclaude
andauthored
feat(decisioning): handoff_to_workflow — externally-completed task primitive (#336)
Adds a third dispatch arm distinct from sync return and TaskHandoff: ``ctx.handoff_to_workflow(fn)`` for adopter-owned external workflows that complete on their own schedule (human queue review, batch jobs, Airflow DAGs, ML pipelines, scheduled cron). Naming choice: ``handoff_to_workflow`` over ``handoff_to_human``. The primitive isn't human-specific — the external completer could be a person, a nightly batch job, an ML pipeline, or any adopter-owned system that calls back via ``registry.complete()`` later. ``workflow`` matches industry-standard vocabulary (Camunda, Temporal, Step Functions) and pairs symmetrically with the existing ``handoff_to_task`` (framework's task vs. adopter's workflow). Mental model: * Sync return — Seller answers immediately. * ``handoff_to_task(fn)`` — Framework runs ``fn`` in background; ``fn`` returns terminal artifact within seconds-to-minutes. * ``handoff_to_workflow(fn)`` — ``fn`` runs ONCE to register work into adopter's external system; framework persists ``submitted`` state and returns wire envelope; adopter's external system later calls ``registry.complete()`` / ``registry.fail()`` directly when the work finishes (hours-to-days). Wire-shape parity: all three project to ``{task_id, status: 'submitted'}`` for the async paths. Buyers can't tell which path the seller took. New surfaces: * ``adcp.decisioning.WorkflowHandoff`` marker class + ``is_workflow_handoff()`` type-identity dispatch helper. * ``RequestContext.handoff_to_workflow(fn)`` method. * ``TaskRegistry.discard(task_id)`` Protocol method + ``InMemoryTaskRegistry.discard()`` implementation. Used by the workflow projection's rollback path — if enqueue fn raises, the just-allocated task_id is removed so the buyer never sees an orphan id. * ``adcp.decisioning.dispatch._project_workflow_handoff()``. Allocates task_id via ``registry.issue``, calls enqueue fn (sync via executor + contextvars snapshot, async awaited inline), rolls back via ``registry.discard`` on ``BaseException``, returns Submitted envelope. NO background coroutine. * ``_invoke_platform_method`` checks both markers; routes to the matching projection. TaskHandoff docstring updated: drops the ``handoff_to_human`` v4.5.0 forward-promise (now obsolete — ``handoff_to_workflow`` ships in 4.4.0). Points adopters at the new primitive for queued-approval flows. Public exports: ``WorkflowHandoff`` added to ``adcp.decisioning.__all__``. Test coverage in ``tests/test_decisioning_workflow_handoff.py`` (14 new tests): * Marker shape: type-identity dispatch, subclass rejection. * Wire-shape parity: Submitted envelope identical to TaskHandoff. * Sync + async enqueue both supported. * Rollback: enqueue exception → ``registry.discard()`` → no orphan task_id reaches the buyer (sync + async paths). * Registry persists ``submitted`` state correctly. * External completion via ``registry.complete()`` / ``registry.fail()``. * End-to-end via ``_invoke_platform_method``. * No background coroutine spawned (distinct from TaskHandoff). * Public exports present. One existing test updated: ``tests/test_decisioning_task_registry.py::test_custom_registry_satisfies_protocol_via_duck_typing`` declares ``discard`` on its stub since the Protocol added it. 2266 tests pass (up from 2252). Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent aeb2543 commit bff48ab

7 files changed

Lines changed: 746 additions & 32 deletions

File tree

src/adcp/decisioning/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ def create_media_buy(
107107
MaybeAsync,
108108
SalesResult,
109109
TaskHandoff,
110+
WorkflowHandoff,
110111
)
111112

112113
__all__ = [
@@ -147,6 +148,7 @@ def create_media_buy(
147148
"TaskHandoffContext",
148149
"TaskRegistry",
149150
"TaskState",
151+
"WorkflowHandoff",
150152
"create_adcp_server_from_platform",
151153
"serve",
152154
"WorkflowObjectType",

src/adcp/decisioning/context.py

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
from adcp.decisioning.resolve import ResourceResolver, _make_default_resolver
2525
from adcp.decisioning.state import StateReader, _make_default_state_reader
26-
from adcp.decisioning.types import Account, TaskHandoff
26+
from adcp.decisioning.types import Account, TaskHandoff, WorkflowHandoff
2727
from adcp.server.base import ToolContext
2828

2929
if TYPE_CHECKING:
@@ -195,5 +195,85 @@ def handoff_to_task(
195195
Adopter code passes either a coroutine function (``async def
196196
review_async(task_ctx): ...``) or a sync callable; the
197197
dispatcher detects which and runs it appropriately.
198+
199+
For external workflows that complete on their own schedule
200+
(human queue review, batch jobs, Airflow DAGs, ML pipelines)
201+
— use :meth:`handoff_to_workflow` instead. The split is purely
202+
about where the work runs (in-process / framework-managed vs.
203+
adopter-owned external system).
198204
"""
199205
return TaskHandoff(fn)
206+
207+
def handoff_to_workflow(
208+
self,
209+
fn: Callable[[Any], Awaitable[None] | None],
210+
) -> WorkflowHandoff:
211+
"""Promote this call to an externally-completed task.
212+
213+
For workflows that run OUTSIDE the framework's process —
214+
human queue review (trafficker UI), nightly batch jobs,
215+
Airflow DAGs, ML pipelines, scheduled cron. The framework
216+
allocates a ``task_id``, calls ``fn`` ONCE synchronously
217+
(or awaits it if a coroutine) to register the work into the
218+
adopter's external system, persists ``submitted`` state, and
219+
returns the wire envelope. NO background coroutine runs in
220+
the framework.
221+
222+
``fn`` receives a :class:`TaskHandoffContext` carrying
223+
``id`` (framework-allocated task_id) and ``_registry``
224+
(adopter can stash a reference for later completion). The
225+
adopter's external workflow later calls
226+
``registry.complete(task_id, result)`` or
227+
``registry.fail(task_id, error)`` directly when the work
228+
finishes — minutes, hours, or days later.
229+
230+
Buyer experience is identical to :meth:`handoff_to_task` —
231+
same ``{task_id, status: 'submitted'}`` wire envelope, same
232+
``tasks/get`` polling, same push-notification webhook on
233+
terminal state.
234+
235+
**Rollback.** If ``fn`` raises during enqueue, the framework
236+
discards the just-allocated task_id from the registry and
237+
propagates the exception (wrapped to ``AdcpError`` per the
238+
dispatch contract). Adopter enqueue fns that need
239+
transactional persistence wrap their own DB write in their
240+
own transaction; the framework's rollback is registry-side
241+
only.
242+
243+
Example::
244+
245+
class TraffickerSeller(DecisioningPlatform):
246+
def __init__(self, review_queue, task_registry):
247+
self.review_queue = review_queue
248+
# Stash for later completion when human acts
249+
self.task_registry = task_registry
250+
251+
def create_media_buy(self, req, ctx):
252+
if self._needs_human_approval(req):
253+
return ctx.handoff_to_workflow(
254+
lambda task_ctx: self._enqueue(task_ctx, req)
255+
)
256+
return CreateMediaBuySuccess(media_buy_id="mb_1", ...)
257+
258+
def _enqueue(self, task_ctx, req):
259+
self.review_queue.add(
260+
task_id=task_ctx.id,
261+
request_snapshot=req.model_dump(),
262+
)
263+
264+
# Elsewhere — Flask handler for the trafficker UI:
265+
async def on_decision(self, task_id, decision):
266+
if decision.approved:
267+
await self.task_registry.complete(
268+
task_id,
269+
CreateMediaBuySuccess(...).model_dump(),
270+
)
271+
else:
272+
await self.task_registry.fail(
273+
task_id, AdcpError(...).to_wire(),
274+
)
275+
276+
See :class:`adcp.decisioning.WorkflowHandoff` for the full
277+
semantics.
278+
"""
279+
return WorkflowHandoff(fn)

src/adcp/decisioning/dispatch.py

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,13 @@
5353
TaskHandoffContext,
5454
TaskRegistry,
5555
)
56-
from adcp.decisioning.types import AdcpError, TaskHandoff, is_task_handoff
56+
from adcp.decisioning.types import (
57+
AdcpError,
58+
TaskHandoff,
59+
WorkflowHandoff,
60+
is_task_handoff,
61+
is_workflow_handoff,
62+
)
5763

5864
if TYPE_CHECKING:
5965
from pydantic import BaseModel
@@ -806,6 +812,14 @@ async def _invoke_platform_method(
806812
registry=registry,
807813
executor=executor,
808814
)
815+
if is_workflow_handoff(result):
816+
return await _project_workflow_handoff(
817+
result,
818+
ctx,
819+
method_name=method_name,
820+
registry=registry,
821+
executor=executor,
822+
)
809823
return result
810824

811825

@@ -939,6 +953,83 @@ async def _run() -> None:
939953
_BACKGROUND_HANDOFF_TASKS: set[asyncio.Task[None]] = set()
940954

941955

956+
async def _project_workflow_handoff(
957+
handoff: WorkflowHandoff,
958+
ctx: RequestContext[Any],
959+
*,
960+
method_name: str,
961+
registry: TaskRegistry,
962+
executor: ThreadPoolExecutor,
963+
) -> dict[str, Any]:
964+
"""Project a :class:`WorkflowHandoff` to the wire Submitted envelope.
965+
966+
Distinct from :func:`_project_handoff`: NO background coroutine
967+
runs. The framework allocates a ``task_id`` via
968+
:meth:`TaskRegistry.issue` and calls the adopter's enqueue fn
969+
ONCE — synchronously if it's a sync callable, awaited if it's a
970+
coroutine. The enqueue fn registers the work into the adopter's
971+
external system (trafficker UI queue, batch DB, Airflow trigger,
972+
etc.) and returns; the framework then returns the Submitted
973+
envelope to the buyer.
974+
975+
The adopter's external workflow later calls
976+
``registry.complete(task_id, result)`` or
977+
``registry.fail(task_id, error)`` directly — minutes, hours, or
978+
days later. The registry is the long-lived control surface; the
979+
framework's role ends after enqueue.
980+
981+
**Rollback.** If the enqueue fn raises, the just-allocated
982+
task_id is discarded from the registry via
983+
:meth:`TaskRegistry.discard` so the buyer never sees a Submitted
984+
envelope referencing an orphan id their external workflow never
985+
registered. The exception is re-raised; the dispatch wrapper
986+
catches it and projects to ``AdcpError`` per the handler
987+
contract.
988+
989+
:param method_name: Wire-spec verb name — used as ``task_type``
990+
on the registry row so ``tasks/get`` round-trips correctly.
991+
"""
992+
fn = handoff._fn
993+
994+
task_id = await registry.issue(
995+
account_id=ctx.account.id,
996+
task_type=method_name,
997+
)
998+
handoff_ctx = TaskHandoffContext(id=task_id, _registry=registry)
999+
1000+
try:
1001+
if asyncio.iscoroutinefunction(fn):
1002+
await fn(handoff_ctx)
1003+
else:
1004+
ctx_snapshot = contextvars.copy_context()
1005+
loop = asyncio.get_running_loop()
1006+
await loop.run_in_executor(
1007+
executor,
1008+
functools.partial(ctx_snapshot.run, fn, handoff_ctx),
1009+
)
1010+
except BaseException:
1011+
# Rollback: the buyer can't be left with a Submitted envelope
1012+
# referencing a task_id the adopter's external workflow never
1013+
# registered. Discard the just-allocated registry row, then
1014+
# re-raise so the outer dispatch wrapper projects the
1015+
# exception to AdcpError. ``BaseException`` (not Exception)
1016+
# so KeyboardInterrupt / SystemExit also clean up the
1017+
# registry side; framework state should never strand on
1018+
# interpreter teardown.
1019+
await registry.discard(task_id)
1020+
raise
1021+
1022+
# Wire ``Submitted`` envelope — same shape as the TaskHandoff
1023+
# path. Buyers can't tell which path the seller took; that's
1024+
# intentional. ``task_type`` lives on the registry row (for
1025+
# ``tasks/get``), not on the wire envelope, per the same posture
1026+
# as :func:`_project_handoff`.
1027+
return {
1028+
"task_id": task_id,
1029+
"status": "submitted",
1030+
}
1031+
1032+
9421033
__all__ = [
9431034
"REQUIRED_METHODS_PER_SPECIALISM",
9441035
"SPEC_SPECIALISM_ENUM",

src/adcp/decisioning/task_registry.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,28 @@ async def get(
267267
"""
268268
...
269269

270+
async def discard(self, task_id: str) -> None:
271+
"""Remove a task_id from the registry — rollback path.
272+
273+
Used by the WorkflowHandoff dispatch projection
274+
(:func:`adcp.decisioning.dispatch._project_workflow_handoff`)
275+
when the adopter's enqueue fn raises after the task_id has
276+
been allocated. Without rollback, the buyer would receive a
277+
Submitted envelope referencing an orphan task_id their
278+
external workflow never registered.
279+
280+
Idempotent: discarding an unknown task_id is a no-op (no
281+
raise). The discard window is tightly scoped — between
282+
``issue()`` and the framework's projection step, with the
283+
adopter's enqueue fn in between. In practice this is a few
284+
milliseconds.
285+
286+
Adopters MUST NOT call ``discard`` on a task that has
287+
progressed past ``submitted`` — that's the wrong recovery
288+
path; use ``fail()`` instead.
289+
"""
290+
...
291+
270292

271293
# ---------------------------------------------------------------------------
272294
# In-memory reference implementation — v6.0 ships this; v6.1 lands a
@@ -417,6 +439,13 @@ async def get(
417439
return None
418440
return record.to_dict()
419441

442+
async def discard(self, task_id: str) -> None:
443+
async with self._lock:
444+
# Idempotent: pop with default. The Protocol contract
445+
# tolerates discarding an unknown id (no raise) so the
446+
# WorkflowHandoff projection's rollback can be unconditional.
447+
self._records.pop(task_id, None)
448+
420449

421450
# ---------------------------------------------------------------------------
422451
# TaskHandoffContext — what the framework passes into adopter handoff fns

0 commit comments

Comments
 (0)