Skip to content

Refactor: lock-free MPSC for deferred-completion registration#895

Open
jvjhfhg wants to merge 1 commit into
hw-native-sys:mainfrom
jvjhfhg:fix/deferred-perf
Open

Refactor: lock-free MPSC for deferred-completion registration#895
jvjhfhg wants to merge 1 commit into
hw-native-sys:mainfrom
jvjhfhg:fix/deferred-perf

Conversation

@jvjhfhg
Copy link
Copy Markdown
Collaborator

@jvjhfhg jvjhfhg commented May 29, 2026

Summary

  • Replace the do { register_deferred(...); if (Skipped) SPIN_WAIT_HINT(); } while (Skipped) loop in complete_slot_task with a lock-free push to a new AICoreCompletionMailbox; the consumer side drains the mailbox from inside poll_and_complete. The wait_list busy lock is no longer in the FIN-handling critical path.
  • Preserve the parallel non-deferred completion path via a PTO2TaskSlotState::any_subtask_deferred discriminator (packed into otherwise-padding so PTO2TaskSlotState stays 64 bytes). Non-deferred tasks never touch the mailbox or the lock and complete inline on the FIN thread, matching pre-refactor behavior.
  • Mirror to the a5 runtime.

Why

register_deferred takes AsyncWaitList::busy for every FIN, including non-deferred tasks (just to do a find_entry_by_token and return NotDeferred). Under contention with poll_and_complete (the only other lock holder), complete_slot_task previously spun in do/while. Goal: registration never blocks on another scheduler's poll — a structural prerequisite for deferred-heavy workloads, even though current benchmark cases don't hit the contention regime hard enough to show measurable speedup.

Approach

Producer side (FIN thread, scheduler_completion.cpp::complete_slot_task):

  1. Read slab.error_code and slab.count locally (volatile, no lock).
  2. If slab.count > 0: release-store slot_state.any_subtask_deferred = true, then push N MSG_KIND_CONDITION messages to the mailbox via CAS-on-head.
  3. Call on_subtask_complete (acq_rel, baseline path).
  4. On mixed_complete, acquire-load any_subtask_deferred:
    • true → push one MSG_KIND_TASK_NORMAL_DONE sentinel carrying &slot_state in addr. Skip inline completion; consumer handles it.
    • false → run inline on_mixed_task_complete + deferred_release (unchanged).

The any_subtask_deferred flag uses the acq_rel barrier inside on_subtask_complete (fetch_add) so the last subtask sees writes from all earlier subtasks of the same task. This makes "is this task deferred" a local atomic check rather than a global wait_list scan.

Consumer side (pto_scheduler.h::poll_and_complete):

  1. Gated by (wait_list.count > 0 || pending_completion_count > 0 || aicore_completion_mailbox_has_pending()) in scheduler_dispatch.cpp — empty state skips the entire block, no try_lock attempted.
  2. try_lock(busy) — non-blocking; lose the race → return immediately, next iteration retries.
  3. drain_aicore_completion_mailbox_locked walks the mailbox in seq order, dispatches per kind:
    • CONDITION: find or create entry, append condition.
    • TASK_NORMAL_DONE: find entry → set normal_done; if no entry exists (defensive; the producer-side fast-path means this shouldn't happen), inline-complete via try_inline_complete_locked rather than growing entries[].
  4. Iterate entries[] reverse, poll each unsatisfied condition, complete when normal_done && waiting_count <= 0.

Lock free for all FIN handlers; busy is only acquired by the (at most one at a time) consumer.

Files changed

  • src/{a2a3,a5}/runtime/tensormap_and_ringbuffer/runtime/aicore_completion_mailbox.h: kind field in 64B message; MSG_KIND_CONDITION / MSG_KIND_TASK_NORMAL_DONE enums.
  • src/{a2a3,a5}/runtime/tensormap_and_ringbuffer/runtime/pto_async_wait.h: try_push_condition / try_push_normal_done (MPSC CAS-on-head producers); drain rewritten to create entries directly + DrainCompletionSink for inline-complete handoff; mpsc_skipped_count diagnostic counter; pending_normal_done[] stash (defensive, untouched by fast-path).
  • src/{a2a3,a5}/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h: PTO2TaskSlotState::any_subtask_deferred atomic packed into existing padding, reset in reset_for_reuse.
  • src/{a2a3,a5}/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp: producer fast-path replaces the do-while spin.
  • src/{a2a3,a5}/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cpp: extend poll-gate with aicore_completion_mailbox_has_pending.
  • src/{a2a3,a5}/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h: try_inline_complete_locked definition + in-place drain of deferred_release on capacity overflow inside poll_and_complete.
  • tests/ut/cpp/{a2a3,a5}/test_aicore_completion_mailbox.cpp: new cpput suite (7 + 5 tests covering push/drain, normal_done attach, ring fullness, multi-producer no-loss / per-producer order).
  • tests/ut/cpp/CMakeLists.txt: wire the two new test targets.

Validation

  • cpput: 27/27 no_hardware tests pass (was 26 + 1 new).
  • a2a3sim deferred_notify_demo: max_diff = 0 on both ranks.
  • a2a3sim dummy_task: all 3 scene cases PASSED (covers non-deferred fast-path on sim).
  • a5sim deferred_notify_demo: max_diff = 0 on both ranks.
  • a2a3 onboard, device 3, tools/benchmark_rounds.sh -n 30: 7/7 passing benchmarks within ±1.5% of baseline scheduler time (round-trip noise floor). mpsc_skipped_count stayed at 0 across all runs. See .docs/25.comm-api-refactor/07.mpsc-benchmark-results.md for the full table.
  • spmd_paged_attention Case1/Case2 hang in both baseline and after — pre-existing failure (reproduces on d873e48c with --build), unrelated to this change; should be tracked as a separate issue.

Test plan

  • cmake --build tests/ut/cpp/build && ctest -L no_hardware -j8 clean.
  • python examples/a2a3/.../deferred_notify_demo/test_deferred_notify_demo.py -p a2a3sim passes, max_diff = 0.
  • python examples/a5/.../deferred_notify_demo/test_deferred_notify_demo.py -p a5sim passes, max_diff = 0.
  • python tests/st/a2a3/.../dummy_task/test_dummy_task.py -p a2a3sim all PASSED.
  • Smoke a non-deferred onboard case (alternating_matmul_add Case1 or similar) shows no sched_error_code set.

Known caveats

  • spmd_paged_attention Case1/Case2 hang predates this PR. Confirmed on the parent of the first MPSC commit; intentionally not in scope here.
  • mpsc_skipped_count is a diagnostic. Non-zero value indicates either the 4096-slot mailbox is undersized for the workload or the consumer is starved; neither has been observed in the benchmark suite. Hook it up to L2 perf summary in a follow-up if deferred-heavy workloads emerge.
  • The legacy register_deferred API and the pending_completions[] / pending_normal_done[] stashes are retained for the cpput unit tests and as defensive fallbacks. After the producer-side fast-path lands, the stash arrays are never written in practice; a follow-up can delete them and simplify the poll-gate to a 2-arg expression. Not bundled here to keep this PR mechanically smaller.

@gemini-code-assist
Copy link
Copy Markdown

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 29, 2026

Review Change Stack

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 4510fb57-02d7-4652-ac53-695ba976aed3

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR implements lock-free MPSC producer helpers for AICore completion mailboxes, reworks drain logic to handle CONDITION and TASK_NORMAL_DONE message kinds with a pending stash for orphaned normal-done entries, adds scheduler-aware inline completion, and replaces legacy deferred registration with mailbox-based routing across A2A3 and A5 runtimes.

Changes

AICore Completion Mailbox MPSC Integration

Layer / File(s) Summary
Mailbox message schema with kind discrimination
src/a2a3/runtime/.../aicore_completion_mailbox.h, src/a5/runtime/.../aicore_completion_mailbox.h
Adds uint32_t kind field to distinguish MSG_KIND_CONDITION from MSG_KIND_TASK_NORMAL_DONE messages, with padding adjustments to preserve layout.
MPSC producer helpers for condition and normal-done messages
src/a2a3/runtime/.../pto_async_wait.h, src/a5/runtime/.../pto_async_wait.h
Implements lock-free CAS-based try-push functions publishing CONDITION and TASK_NORMAL_DONE messages to mailbox with per-slot sequence release semantics.
Drain API and pending-normal-done storage
src/a2a3/runtime/.../pto_async_wait.h, src/a5/runtime/.../pto_async_wait.h
Introduces DrainCompletionSink for scheduler-aware completion, PendingNormalDone stash for orphaned messages, MAX_PENDING_NORMAL_DONE capacity, and diagnostic mpsc_skipped_count counter.
Message kind discrimination in drain loop
src/a2a3/runtime/.../pto_async_wait.h, src/a5/runtime/.../pto_async_wait.h
Reworks drain to branch on kind: CONDITION appends via append_condition_locked, TASK_NORMAL_DONE stashes or absorbs into entries, invalid kinds return error; adds helper functions for pending absorption.
Task slot any_subtask_deferred coordination flag
src/a2a3/runtime/.../pto_runtime2_types.h, src/a5/runtime/.../pto_runtime2_types.h
Adds atomic boolean and padding to PTO2TaskSlotState to track deferred-completion conditions pushed by subtasks, with reset on slot reuse.
Legacy register_deferred updated to absorb stashed normal-done
src/a2a3/runtime/.../pto_async_wait.h, src/a5/runtime/.../pto_async_wait.h
Extends register_deferred with created_entry flag to absorb stashed NORMAL_DONE sentinels when allocating fresh entries, preserving ordering contracts.
Scheduler inline-completion fast path
src/a2a3/runtime/scheduler/pto_scheduler.h, src/a5/runtime/scheduler/pto_scheduler.h
Adds try_inline_complete_locked to invoke on_mixed_task_complete while locked and drain deferred-release buffer in-place when full, with completion counter.
Scheduler poll_and_complete refactored for drain sink
src/a2a3/runtime/scheduler/pto_scheduler.h, src/a5/runtime/scheduler/pto_scheduler.h
Constructs DrainCompletionSink with scheduler/buffer metadata and passes to drain; changes deferred-release overflow from error-return to in-place drain-and-append.
Deferred completion via MPSC mailbox instead of register_deferred
src/a2a3/runtime/scheduler/scheduler_completion.cpp, src/a5/runtime/scheduler/scheduler_completion.cpp
Replaces legacy deferred-registration path with MPSC mailbox routing: publishes any_subtask_deferred, spin-pushes conditions to mailbox, conditionally pushes TASK_NORMAL_DONE and gates completion via defer_completion_to_consumer.
Dispatch loop polls when mailbox has pending work
src/a2a3/runtime/scheduler/scheduler_dispatch.cpp, src/a5/runtime/scheduler/scheduler_dispatch.cpp
Extends dispatch condition to include mailbox pending check, triggering earlier poll-and-complete when mailbox has entries.
A2A3 mailbox MPSC unit tests
tests/ut/cpp/a2a3/test_aicore_completion_mailbox.cpp
Validates condition/normal-done push/drain, message attachment, capacity enforcement, multi-producer concurrency with sequence validation, and producer/drainer stress.
A5 mailbox MPSC unit tests
tests/ut/cpp/a5/test_aicore_completion_mailbox.cpp
Validates condition/normal-done push/drain, message attachment, capacity limits, and multi-producer concurrency with sequence ordering checks.
CMake test target registration
tests/ut/cpp/CMakeLists.txt
Adds test_aicore_completion_mailbox and test_a5_aicore_completion_mailbox targets using existing runtime test helpers.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

🐰 A mailbox takes flight on atomic wings,
Conditions and sentinels dance the MPSC ring,
Stashed normal-dones wait, then absorb just right,
Inline completions shine with scheduler's light,
Hop hop, the drain loop decides the fate—
Lock-free, concurrent, and perfectly straight!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 27.27% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Refactor: lock-free MPSC for deferred-completion registration' accurately captures the main change: replacing a spin-based deferred registration mechanism with a lock-free MPSC mailbox approach. It is concise, specific, and directly reflects the primary objective described in the PR.
Description check ✅ Passed The description is comprehensive and directly related to the changeset. It explains the rationale (reduce lock contention in the FIN critical path), the architectural approach (producer push via MPSC, consumer drain on poll), implementation details, files changed, validation results, and test plan. The description clearly covers all aspects of the proposed refactor.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp`:
- Around line 91-92: The code may dereference mailbox when rt_ is nullptr
because mailbox is set to nullptr but later passed to
aicore_completion_mailbox_try_push_condition; to fix, add a null-check or assert
before the push loop: if (mailbox == nullptr) skip/defer the push path (set
defer_completion_to_consumer = true) or assert/mailbox availability depending on
intended behavior. Locate the variables mailbox and rt_ and the call
aicore_completion_mailbox_try_push_condition (and the conditions involving
slot_state.payload and cond_count) and ensure the push branch is guarded so
mailbox is never dereferenced when nullptr.

In
`@src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp`:
- Around line 87-88: The code may dereference a null mailbox
(rt_->aicore_mailbox) when rt_ is nullptr but cond_count > 0; before entering
the loop that pushes completions (the push loop that uses mailbox and
defer_completion_to_consumer), add a null-check guard similar to the a2a3
mirror: if mailbox is null, skip or short-circuit the push/completion branch
(and handle defer_completion_to_consumer accordingly) to avoid dereferencing
rt_->aicore_mailbox; update both the initial push loop and the secondary block
around the same logic referenced later (the code around the existing mailbox
usage at lines corresponding to 119-124) so all paths validate mailbox before
use.

In `@tests/ut/cpp/a2a3/test_aicore_completion_mailbox.cpp`:
- Around line 324-330: The test currently calls
wait_list.drain_aicore_completion_mailbox_locked(mb, err) while the drainer
thread may still be looping, creating a race on the single-consumer mailbox
(race on tail/entries/count and duplicate processing into entries[0]); fix by
ensuring the drainer thread is stopped and joined before the final
single-consumer drain: set drainer_stop.store(true, std::memory_order_release)
and call drainer.join() before invoking
wait_list.drain_aicore_completion_mailbox_locked(mb, err) so that only the main
thread consumes the mailbox (refer to drainer_stop, drainer.join,
wait_list::drain_aicore_completion_mailbox_locked, entries[], tail,
condition_count, MAX_COMPLETIONS_PER_TASK).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 234b466d-4cfc-40a1-890f-f39d377f49fb

📥 Commits

Reviewing files that changed from the base of the PR and between c22db51 and bbd4fce.

📒 Files selected for processing (15)
  • src/a2a3/runtime/tensormap_and_ringbuffer/runtime/aicore_completion_mailbox.h
  • src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_async_wait.h
  • src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h
  • src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h
  • src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp
  • src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cpp
  • src/a5/runtime/tensormap_and_ringbuffer/runtime/aicore_completion_mailbox.h
  • src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_async_wait.h
  • src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h
  • src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h
  • src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp
  • src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cpp
  • tests/ut/cpp/CMakeLists.txt
  • tests/ut/cpp/a2a3/test_aicore_completion_mailbox.cpp
  • tests/ut/cpp/a5/test_aicore_completion_mailbox.cpp

Comment on lines +91 to +92
volatile AICoreCompletionMailbox *mailbox = rt_ != nullptr ? rt_->aicore_mailbox : nullptr;
bool defer_completion_to_consumer = false;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Potential null pointer dereference if rt_ is nullptr.

Line 91 defensively sets mailbox = nullptr when rt_ == nullptr, but lines 127-132 call aicore_completion_mailbox_try_push_condition on mailbox without a null guard. If a task has slot_state.payload != nullptr with cond_count > 0 in an environment where rt_ is nullptr (e.g., certain test harnesses), this would dereference null.

Either assert that mailbox != nullptr when entering the push loop, or guard the push path:

Suggested fix
         if (cond_count > 0) {
+            if (mailbox == nullptr) {
+                int32_t expected = PTO2_ERROR_NONE;
+                sched_->sm_header->sched_error_code.compare_exchange_strong(
+                    expected, PTO2_ERROR_ASYNC_REGISTRATION_FAILED, std::memory_order_acq_rel,
+                    std::memory_order_acquire
+                );
+                completed_.store(true, std::memory_order_release);
+                return;
+            }
             // Publish "this task is deferred" before on_subtask_complete ...
             slot_state.any_subtask_deferred.store(true, std::memory_order_release);

Also applies to: 127-132

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp`
around lines 91 - 92, The code may dereference mailbox when rt_ is nullptr
because mailbox is set to nullptr but later passed to
aicore_completion_mailbox_try_push_condition; to fix, add a null-check or assert
before the push loop: if (mailbox == nullptr) skip/defer the push path (set
defer_completion_to_consumer = true) or assert/mailbox availability depending on
intended behavior. Locate the variables mailbox and rt_ and the call
aicore_completion_mailbox_try_push_condition (and the conditions involving
slot_state.payload and cond_count) and ensure the push branch is guarded so
mailbox is never dereferenced when nullptr.

Comment on lines +87 to +88
volatile AICoreCompletionMailbox *mailbox = rt_ != nullptr ? rt_->aicore_mailbox : nullptr;
bool defer_completion_to_consumer = false;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Same null mailbox issue as a2a3 mirror.

The a5 version has the same potential null pointer dereference when rt_ is nullptr but cond_count > 0. Apply the same null guard before entering the push loop.

Also applies to: 119-124

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp`
around lines 87 - 88, The code may dereference a null mailbox
(rt_->aicore_mailbox) when rt_ is nullptr but cond_count > 0; before entering
the loop that pushes completions (the push loop that uses mailbox and
defer_completion_to_consumer), add a null-check guard similar to the a2a3
mirror: if mailbox is null, skip or short-circuit the push/completion branch
(and handle defer_completion_to_consumer accordingly) to avoid dereferencing
rt_->aicore_mailbox; update both the initial push loop and the secondary block
around the same logic referenced later (the code around the existing mailbox
usage at lines corresponding to 119-124) so all paths validate mailbox before
use.

Comment on lines +324 to +330
producer.join();
// Final drain pass to consume any in-flight messages, then stop the
// drain thread.
int32_t err = PTO2_ERROR_NONE;
(void)wait_list.drain_aicore_completion_mailbox_locked(mb, err);
drainer_stop.store(true, std::memory_order_release);
drainer.join();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Race: two concurrent consumers drain the single-consumer mailbox.

drain_aicore_completion_mailbox_locked is a single-consumer drain (MPSC), with no internal locking around tail/entries/count updates. At Line 328 the main thread drains while the drainer thread is still looping (drainer_stop isn't set until Line 329), so two consumers run concurrently. Beyond the data race on tail, both can process the same slot and append twice to entries[0], pushing condition_count past MAX_COMPLETIONS_PER_TASK (64) — an out-of-bounds write into conditions[]. Stop and join the drainer first, then do the final pass as the sole consumer.

🔒️ Proposed fix: drain final pass only after the drainer stops
     producer.join();
-    // Final drain pass to consume any in-flight messages, then stop the
-    // drain thread.
-    int32_t err = PTO2_ERROR_NONE;
-    (void)wait_list.drain_aicore_completion_mailbox_locked(mb, err);
-    drainer_stop.store(true, std::memory_order_release);
-    drainer.join();
+    // Stop and join the drain thread first so the final pass runs as the sole
+    // consumer (the drain path is single-consumer / MPSC).
+    drainer_stop.store(true, std::memory_order_release);
+    drainer.join();
+    // Final drain pass to consume any in-flight messages left after the
+    // drainer observed the stop flag.
+    int32_t err = PTO2_ERROR_NONE;
+    (void)wait_list.drain_aicore_completion_mailbox_locked(mb, err);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
producer.join();
// Final drain pass to consume any in-flight messages, then stop the
// drain thread.
int32_t err = PTO2_ERROR_NONE;
(void)wait_list.drain_aicore_completion_mailbox_locked(mb, err);
drainer_stop.store(true, std::memory_order_release);
drainer.join();
producer.join();
// Stop and join the drain thread first so the final pass runs as the sole
// consumer (the drain path is single-consumer / MPSC).
drainer_stop.store(true, std::memory_order_release);
drainer.join();
// Final drain pass to consume any in-flight messages left after the
// drainer observed the stop flag.
int32_t err = PTO2_ERROR_NONE;
(void)wait_list.drain_aicore_completion_mailbox_locked(mb, err);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/ut/cpp/a2a3/test_aicore_completion_mailbox.cpp` around lines 324 - 330,
The test currently calls wait_list.drain_aicore_completion_mailbox_locked(mb,
err) while the drainer thread may still be looping, creating a race on the
single-consumer mailbox (race on tail/entries/count and duplicate processing
into entries[0]); fix by ensuring the drainer thread is stopped and joined
before the final single-consumer drain: set drainer_stop.store(true,
std::memory_order_release) and call drainer.join() before invoking
wait_list.drain_aicore_completion_mailbox_locked(mb, err) so that only the main
thread consumes the mailbox (refer to drainer_stop, drainer.join,
wait_list::drain_aicore_completion_mailbox_locked, entries[], tail,
condition_count, MAX_COMPLETIONS_PER_TASK).

@jvjhfhg jvjhfhg force-pushed the fix/deferred-perf branch 5 times, most recently from 9bed01c to 99f7607 Compare May 30, 2026 09:25
Replace the lock-based deferred-completion path (register_deferred under
the AsyncWaitList::busy spinlock) with a lock-free MPSC ring,
AICoreCompletionMailbox. FIN-handling scheduler threads push completion
observations; the single dispatch thread drains them under the busy lock.
This removes the producer-side lock contention that serialized FIN
handling against the dispatch poll, and is especially impactful for
deferred-heavy workloads.

Mailbox / transport:
- AICoreCompletionMailbox: bounded power-of-two ring, per-slot seq
  publication (absolute claim position = head+1, so generations
  self-disambiguate with no empty-marker reset). Producers claim a slot
  with a relaxed weak CAS on head and publish via a release seq store;
  the consumer drains in seq order behind an acquire seq load.
- Two message kinds: CONDITION (one per deferred slab entry) and
  TASK_NORMAL_DONE (carries the slot_state pointer).
- Split kernel-safe types (DeferredCompletionEntry/Slab + constants) into
  aicore_completion_mailbox_types.h; the ring + std::atomic push/drain
  helpers stay in the AICPU-only aicore_completion_mailbox.h.
- Memory ordering minimized to the two load-bearing edges: payload
  publish (producer seq release / consumer seq acquire) and slot reuse
  (consumer tail release / producer tail acquire). Everything else relaxed.

Scheduler:
- complete_slot_task pushes deferred conditions + a normal-done sentinel
  through the mailbox instead of register_deferred.
- The producer ordering (all CONDITIONs of a task precede its NORMAL_DONE,
  enforced by on_subtask_complete's acq_rel) plus in-order drain means an
  entry is always materialized on first arrival, so the old out-of-order
  staging (pending_completions / pending_normal_done) and the dead
  register_deferred path are removed.

a2a3 and a5 kept symmetric. Unit tests cover push/drain, multi-producer
ordering, capacity, and producer-interleaved-with-drain; a5 suite mirrors
a2a3. UT: a5 7/7, a2a3 7/7.
@jvjhfhg jvjhfhg force-pushed the fix/deferred-perf branch from 99f7607 to 9dcc2fe Compare May 30, 2026 10:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant