Skip to content

Commit c94802d

Browse files
committed
feat(decisioning): wire TaskHandoff finalize path — single-ledger via on_complete hook (#538-impl)
The HITL finalize path that the v1.5 design promised but the previous commits left as a follow-up. Adopters returning ctx.handoff_to_task(...) from finalize_proposal now get the expected behaviour: Submitted envelope to the buyer immediately, handoff fn runs in the background, proposal commits to the store on completion. Single ledger per § D3. Mechanism: _project_handoff gains an optional on_complete callback that runs with the typed result before registry.complete. maybe_intercept_finalize builds a closure capturing proposal_store + proposal_id and threads it through. If the handoff fn raises, on_complete raises, or store.commit raises, the framework calls registry.fail and the proposal stays DRAFT — no half-committed state. 5 new E2E tests cover: happy-path commit-on-completion; handoff path log emission; AdcpError from handoff fn; wrong return type; commit failure under transient store error. 3957 passed, 32 skipped (was 3952). Zero regressions.
1 parent df97cc8 commit c94802d

5 files changed

Lines changed: 426 additions & 28 deletions

File tree

examples/sales_proposal_mode_seller/src/proposal_manager.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,10 @@ async def finalize_proposal(
239239
from the draft; this method lock-prices and returns. Framework
240240
commits via ``proposal_store.commit`` after projection.
241241
242-
For HITL flows, return ``ctx.handoff_to_task(...)`` instead;
243-
that's a follow-up for v1.5+.
242+
For HITL flows, return ``ctx.handoff_to_task(...)`` instead.
243+
The framework projects ``Submitted`` immediately, runs the
244+
handoff fn in the background, and commits the proposal to the
245+
store on completion (single-ledger guarantee per § D3).
244246
"""
245247
del ctx
246248
committed_payload = dict(req.proposal_payload)

src/adcp/decisioning/dispatch.py

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@
6666
)
6767

6868
if TYPE_CHECKING:
69+
from collections.abc import Awaitable, Callable
70+
6971
from pydantic import BaseModel
7072

7173
from adcp.decisioning.accounts import AccountStore
@@ -1216,6 +1218,7 @@ async def _project_handoff(
12161218
method_name: str,
12171219
registry: TaskRegistry,
12181220
executor: ThreadPoolExecutor,
1221+
on_complete: Callable[[Any], Awaitable[None]] | None = None,
12191222
) -> dict[str, Any]:
12201223
"""Promote a TaskHandoff to a background task.
12211224
@@ -1230,9 +1233,11 @@ async def _project_handoff(
12301233
``contextvars.copy_context()`` snapshot. ``create_task``
12311234
inherits the snapshot for free; ``run_in_executor`` doesn't,
12321235
hence the explicit copy.
1233-
3. The background task awaits the handoff fn's return; on success
1234-
calls ``registry.complete(task_id, result.model_dump() if
1235-
Pydantic else result)``; on :class:`AdcpError` calls
1236+
3. The background task awaits the handoff fn's return. On success,
1237+
if ``on_complete`` is provided, the framework awaits it with the
1238+
typed result before persisting. Then ``registry.complete(task_id,
1239+
result.model_dump() if Pydantic else result)``. On
1240+
:class:`AdcpError` from the handoff fn OR ``on_complete``, calls
12361241
``registry.fail(task_id, error.to_wire())``; on any other
12371242
exception, wraps to ``INTERNAL_ERROR`` and calls
12381243
``registry.fail``.
@@ -1244,6 +1249,17 @@ async def _project_handoff(
12441249
etc.) — used as ``task_type`` on the registry row so
12451250
``tasks/get`` round-trips correctly.
12461251
1252+
:param on_complete: Optional framework hook invoked with the typed
1253+
result of the handoff fn before ``registry.complete``. Used by
1254+
the proposal-finalize lifecycle to commit the proposal to
1255+
:class:`ProposalStore` exactly once when the HITL approval
1256+
lands. If the hook raises, the framework treats it like a
1257+
handoff fn failure: ``registry.fail`` is called with the wrapped
1258+
error and ``registry.complete`` is NOT called. This is what
1259+
gives the v1.5 single-ledger guarantee its teeth — a commit
1260+
failure cannot leave the task in 'submitted' forever or land
1261+
the proposal in a half-committed state.
1262+
12471263
The handoff fn is extracted via the type-identity dispatch in
12481264
:func:`adcp.decisioning.types.is_task_handoff`. Subclassed
12491265
TaskHandoff instances (deliberate non-feature) silently take the
@@ -1292,6 +1308,38 @@ async def _run() -> None:
12921308
await registry.fail(task_id, wrapped.to_wire())
12931309
return
12941310

1311+
# Framework completion hook (e.g., proposal_store.commit for
1312+
# finalize). Runs with the TYPED result before model_dump so
1313+
# the closure can pull typed fields (.expires_at, .proposal)
1314+
# off it directly. Failures here are treated identically to a
1315+
# handoff fn failure: registry.fail, no registry.complete. This
1316+
# is the load-bearing seam for the v1.5 single-ledger D3
1317+
# guarantee — if the proposal_store.commit raises, the proposal
1318+
# stays DRAFT and the task lands in 'failed', so the buyer's
1319+
# tasks/get returns the failure rather than a phantom success.
1320+
if on_complete is not None:
1321+
try:
1322+
await on_complete(result)
1323+
except AdcpError as exc:
1324+
await registry.fail(task_id, exc.to_wire())
1325+
return
1326+
except Exception as exc:
1327+
logger.exception(
1328+
"Unhandled exception in on_complete hook for task %s — wrapping",
1329+
task_id,
1330+
)
1331+
wrapped = AdcpError(
1332+
"INTERNAL_ERROR",
1333+
message=(
1334+
f"Post-completion hook for {method_name!r} raised "
1335+
f"{type(exc).__name__}; see details for cause"
1336+
),
1337+
recovery="terminal",
1338+
details=_internal_error_details(exc),
1339+
)
1340+
await registry.fail(task_id, wrapped.to_wire())
1341+
return
1342+
12951343
# Persist terminal artifact. Pydantic responses get
12961344
# ``model_dump()``; dict responses pass through.
12971345
#

src/adcp/decisioning/handler.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1189,6 +1189,7 @@ async def get_products( # type: ignore[override]
11891189
params,
11901190
ctx,
11911191
executor=self._executor,
1192+
registry=self._registry,
11921193
)
11931194
if finalize_response is not None:
11941195
return cast("GetProductsResponse", finalize_response)

src/adcp/decisioning/proposal_dispatch.py

Lines changed: 48 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
from adcp.decisioning.context import RequestContext
8181
from adcp.decisioning.proposal_manager import ProposalManager
8282
from adcp.decisioning.proposal_store import ProposalStore
83+
from adcp.decisioning.task_registry import TaskRegistry
8384

8485
logger = logging.getLogger("adcp.decisioning.proposal_dispatch")
8586

@@ -146,6 +147,7 @@ async def maybe_intercept_finalize(
146147
ctx: RequestContext[Any],
147148
*,
148149
executor: ThreadPoolExecutor,
150+
registry: TaskRegistry,
149151
) -> dict[str, Any] | None:
150152
"""Intercept ``buying_mode='refine' + action='finalize'`` requests.
151153
@@ -243,29 +245,52 @@ async def maybe_intercept_finalize(
243245
)
244246

245247
if is_task_handoff(result):
246-
# HITL slow path. Per § D2: framework projects Submitted; the
247-
# handoff fn completes via the standard TaskRegistry flow. The
248-
# framework wraps the handoff so the proposal commit happens at
249-
# task-completion time — single ledger, no race.
250-
#
251-
# NOTE: TaskHandoff projection lives in dispatch._project_handoff.
252-
# Returning the handoff back through the standard path requires
253-
# threading more state than the seam-helper signature carries
254-
# today. v1.5 lands the inline path; the handoff path lands as a
255-
# follow-up once the dispatch surface exposes a hook. Adopters
256-
# who declare finalize=True + return TaskHandoff today will see
257-
# their handoff projected as Submitted but the framework won't
258-
# auto-commit on completion — the handoff body must call
259-
# store.commit explicitly, or fail closed.
260-
raise AdcpError(
261-
"INTERNAL_ERROR",
262-
message=(
263-
"TaskHandoff[FinalizeProposalSuccess] return path is not "
264-
"wired in v1.5 — the inline FinalizeProposalSuccess path "
265-
"is the supported route. File a follow-up issue if you "
266-
"need HITL finalize."
267-
),
268-
recovery="terminal",
248+
# HITL slow path. Per § D2 + § D3: framework projects Submitted
249+
# immediately; the handoff fn completes via the standard
250+
# TaskRegistry flow. The framework wires the proposal_store
251+
# commit as the on_complete hook so the SAME asyncio task that
252+
# calls registry.complete also calls store.commit — single
253+
# ledger, no race. If either fails, registry.fail is called and
254+
# the proposal stays DRAFT (handler at dispatch._project_handoff
255+
# treats on_complete failures identically to handoff fn
256+
# failures).
257+
from adcp.decisioning.dispatch import _project_handoff
258+
259+
async def _commit_on_handoff_completion(success: Any) -> None:
260+
# Type-check the handoff fn's return shape. Adopter mistakes
261+
# here surface as INTERNAL_ERROR on tasks/get rather than
262+
# silently corrupting state.
263+
if not isinstance(success, FinalizeProposalSuccess):
264+
raise AdcpError(
265+
"INTERNAL_ERROR",
266+
message=(
267+
f"finalize_proposal handoff fn returned "
268+
f"{type(success).__name__}; expected "
269+
"FinalizeProposalSuccess."
270+
),
271+
recovery="terminal",
272+
)
273+
await _await_maybe(
274+
store.commit(
275+
proposal_id,
276+
expires_at=success.expires_at,
277+
proposal_payload=dict(success.proposal),
278+
)
279+
)
280+
finalize_succeeded_log(
281+
proposal_id=proposal_id,
282+
account_id=account_id,
283+
expires_at=success.expires_at,
284+
path="handoff",
285+
)
286+
287+
return await _project_handoff(
288+
result,
289+
ctx,
290+
method_name="finalize_proposal",
291+
registry=registry,
292+
executor=executor,
293+
on_complete=_commit_on_handoff_completion,
269294
)
270295

271296
if not isinstance(result, FinalizeProposalSuccess):

0 commit comments

Comments
 (0)