Refactor: lock-free MPSC for deferred-completion registration#895
Refactor: lock-free MPSC for deferred-completion registration#895jvjhfhg wants to merge 1 commit into
Conversation
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis PR implements lock-free MPSC producer helpers for AICore completion mailboxes, reworks drain logic to handle CONDITION and TASK_NORMAL_DONE message kinds with a pending stash for orphaned normal-done entries, adds scheduler-aware inline completion, and replaces legacy deferred registration with mailbox-based routing across A2A3 and A5 runtimes. ChangesAICore Completion Mailbox MPSC Integration
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp`:
- Around line 91-92: The code may dereference mailbox when rt_ is nullptr
because mailbox is set to nullptr but later passed to
aicore_completion_mailbox_try_push_condition; to fix, add a null-check or assert
before the push loop: if (mailbox == nullptr) skip/defer the push path (set
defer_completion_to_consumer = true) or assert/mailbox availability depending on
intended behavior. Locate the variables mailbox and rt_ and the call
aicore_completion_mailbox_try_push_condition (and the conditions involving
slot_state.payload and cond_count) and ensure the push branch is guarded so
mailbox is never dereferenced when nullptr.
In
`@src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp`:
- Around line 87-88: The code may dereference a null mailbox
(rt_->aicore_mailbox) when rt_ is nullptr but cond_count > 0; before entering
the loop that pushes completions (the push loop that uses mailbox and
defer_completion_to_consumer), add a null-check guard similar to the a2a3
mirror: if mailbox is null, skip or short-circuit the push/completion branch
(and handle defer_completion_to_consumer accordingly) to avoid dereferencing
rt_->aicore_mailbox; update both the initial push loop and the secondary block
around the same logic referenced later (the code around the existing mailbox
usage at lines corresponding to 119-124) so all paths validate mailbox before
use.
In `@tests/ut/cpp/a2a3/test_aicore_completion_mailbox.cpp`:
- Around line 324-330: The test currently calls
wait_list.drain_aicore_completion_mailbox_locked(mb, err) while the drainer
thread may still be looping, creating a race on the single-consumer mailbox
(race on tail/entries/count and duplicate processing into entries[0]); fix by
ensuring the drainer thread is stopped and joined before the final
single-consumer drain: set drainer_stop.store(true, std::memory_order_release)
and call drainer.join() before invoking
wait_list.drain_aicore_completion_mailbox_locked(mb, err) so that only the main
thread consumes the mailbox (refer to drainer_stop, drainer.join,
wait_list::drain_aicore_completion_mailbox_locked, entries[], tail,
condition_count, MAX_COMPLETIONS_PER_TASK).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 234b466d-4cfc-40a1-890f-f39d377f49fb
📒 Files selected for processing (15)
src/a2a3/runtime/tensormap_and_ringbuffer/runtime/aicore_completion_mailbox.hsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_async_wait.hsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.hsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.hsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cppsrc/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cppsrc/a5/runtime/tensormap_and_ringbuffer/runtime/aicore_completion_mailbox.hsrc/a5/runtime/tensormap_and_ringbuffer/runtime/pto_async_wait.hsrc/a5/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.hsrc/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.hsrc/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cppsrc/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cpptests/ut/cpp/CMakeLists.txttests/ut/cpp/a2a3/test_aicore_completion_mailbox.cpptests/ut/cpp/a5/test_aicore_completion_mailbox.cpp
| volatile AICoreCompletionMailbox *mailbox = rt_ != nullptr ? rt_->aicore_mailbox : nullptr; | ||
| bool defer_completion_to_consumer = false; |
There was a problem hiding this comment.
Potential null pointer dereference if rt_ is nullptr.
Line 91 defensively sets mailbox = nullptr when rt_ == nullptr, but lines 127-132 call aicore_completion_mailbox_try_push_condition on mailbox without a null guard. If a task has slot_state.payload != nullptr with cond_count > 0 in an environment where rt_ is nullptr (e.g., certain test harnesses), this would dereference null.
Either assert that mailbox != nullptr when entering the push loop, or guard the push path:
Suggested fix
if (cond_count > 0) {
+ if (mailbox == nullptr) {
+ int32_t expected = PTO2_ERROR_NONE;
+ sched_->sm_header->sched_error_code.compare_exchange_strong(
+ expected, PTO2_ERROR_ASYNC_REGISTRATION_FAILED, std::memory_order_acq_rel,
+ std::memory_order_acquire
+ );
+ completed_.store(true, std::memory_order_release);
+ return;
+ }
// Publish "this task is deferred" before on_subtask_complete ...
slot_state.any_subtask_deferred.store(true, std::memory_order_release);Also applies to: 127-132
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp`
around lines 91 - 92, The code may dereference mailbox when rt_ is nullptr
because mailbox is set to nullptr but later passed to
aicore_completion_mailbox_try_push_condition; to fix, add a null-check or assert
before the push loop: if (mailbox == nullptr) skip/defer the push path (set
defer_completion_to_consumer = true) or assert/mailbox availability depending on
intended behavior. Locate the variables mailbox and rt_ and the call
aicore_completion_mailbox_try_push_condition (and the conditions involving
slot_state.payload and cond_count) and ensure the push branch is guarded so
mailbox is never dereferenced when nullptr.
| volatile AICoreCompletionMailbox *mailbox = rt_ != nullptr ? rt_->aicore_mailbox : nullptr; | ||
| bool defer_completion_to_consumer = false; |
There was a problem hiding this comment.
Same null mailbox issue as a2a3 mirror.
The a5 version has the same potential null pointer dereference when rt_ is nullptr but cond_count > 0. Apply the same null guard before entering the push loop.
Also applies to: 119-124
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@src/a5/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp`
around lines 87 - 88, The code may dereference a null mailbox
(rt_->aicore_mailbox) when rt_ is nullptr but cond_count > 0; before entering
the loop that pushes completions (the push loop that uses mailbox and
defer_completion_to_consumer), add a null-check guard similar to the a2a3
mirror: if mailbox is null, skip or short-circuit the push/completion branch
(and handle defer_completion_to_consumer accordingly) to avoid dereferencing
rt_->aicore_mailbox; update both the initial push loop and the secondary block
around the same logic referenced later (the code around the existing mailbox
usage at lines corresponding to 119-124) so all paths validate mailbox before
use.
| producer.join(); | ||
| // Final drain pass to consume any in-flight messages, then stop the | ||
| // drain thread. | ||
| int32_t err = PTO2_ERROR_NONE; | ||
| (void)wait_list.drain_aicore_completion_mailbox_locked(mb, err); | ||
| drainer_stop.store(true, std::memory_order_release); | ||
| drainer.join(); |
There was a problem hiding this comment.
Race: two concurrent consumers drain the single-consumer mailbox.
drain_aicore_completion_mailbox_locked is a single-consumer drain (MPSC), with no internal locking around tail/entries/count updates. At Line 328 the main thread drains while the drainer thread is still looping (drainer_stop isn't set until Line 329), so two consumers run concurrently. Beyond the data race on tail, both can process the same slot and append twice to entries[0], pushing condition_count past MAX_COMPLETIONS_PER_TASK (64) — an out-of-bounds write into conditions[]. Stop and join the drainer first, then do the final pass as the sole consumer.
🔒️ Proposed fix: drain final pass only after the drainer stops
producer.join();
- // Final drain pass to consume any in-flight messages, then stop the
- // drain thread.
- int32_t err = PTO2_ERROR_NONE;
- (void)wait_list.drain_aicore_completion_mailbox_locked(mb, err);
- drainer_stop.store(true, std::memory_order_release);
- drainer.join();
+ // Stop and join the drain thread first so the final pass runs as the sole
+ // consumer (the drain path is single-consumer / MPSC).
+ drainer_stop.store(true, std::memory_order_release);
+ drainer.join();
+ // Final drain pass to consume any in-flight messages left after the
+ // drainer observed the stop flag.
+ int32_t err = PTO2_ERROR_NONE;
+ (void)wait_list.drain_aicore_completion_mailbox_locked(mb, err);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| producer.join(); | |
| // Final drain pass to consume any in-flight messages, then stop the | |
| // drain thread. | |
| int32_t err = PTO2_ERROR_NONE; | |
| (void)wait_list.drain_aicore_completion_mailbox_locked(mb, err); | |
| drainer_stop.store(true, std::memory_order_release); | |
| drainer.join(); | |
| producer.join(); | |
| // Stop and join the drain thread first so the final pass runs as the sole | |
| // consumer (the drain path is single-consumer / MPSC). | |
| drainer_stop.store(true, std::memory_order_release); | |
| drainer.join(); | |
| // Final drain pass to consume any in-flight messages left after the | |
| // drainer observed the stop flag. | |
| int32_t err = PTO2_ERROR_NONE; | |
| (void)wait_list.drain_aicore_completion_mailbox_locked(mb, err); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/ut/cpp/a2a3/test_aicore_completion_mailbox.cpp` around lines 324 - 330,
The test currently calls wait_list.drain_aicore_completion_mailbox_locked(mb,
err) while the drainer thread may still be looping, creating a race on the
single-consumer mailbox (race on tail/entries/count and duplicate processing
into entries[0]); fix by ensuring the drainer thread is stopped and joined
before the final single-consumer drain: set drainer_stop.store(true,
std::memory_order_release) and call drainer.join() before invoking
wait_list.drain_aicore_completion_mailbox_locked(mb, err) so that only the main
thread consumes the mailbox (refer to drainer_stop, drainer.join,
wait_list::drain_aicore_completion_mailbox_locked, entries[], tail,
condition_count, MAX_COMPLETIONS_PER_TASK).
9bed01c to
99f7607
Compare
Replace the lock-based deferred-completion path (register_deferred under the AsyncWaitList::busy spinlock) with a lock-free MPSC ring, AICoreCompletionMailbox. FIN-handling scheduler threads push completion observations; the single dispatch thread drains them under the busy lock. This removes the producer-side lock contention that serialized FIN handling against the dispatch poll, and is especially impactful for deferred-heavy workloads. Mailbox / transport: - AICoreCompletionMailbox: bounded power-of-two ring, per-slot seq publication (absolute claim position = head+1, so generations self-disambiguate with no empty-marker reset). Producers claim a slot with a relaxed weak CAS on head and publish via a release seq store; the consumer drains in seq order behind an acquire seq load. - Two message kinds: CONDITION (one per deferred slab entry) and TASK_NORMAL_DONE (carries the slot_state pointer). - Split kernel-safe types (DeferredCompletionEntry/Slab + constants) into aicore_completion_mailbox_types.h; the ring + std::atomic push/drain helpers stay in the AICPU-only aicore_completion_mailbox.h. - Memory ordering minimized to the two load-bearing edges: payload publish (producer seq release / consumer seq acquire) and slot reuse (consumer tail release / producer tail acquire). Everything else relaxed. Scheduler: - complete_slot_task pushes deferred conditions + a normal-done sentinel through the mailbox instead of register_deferred. - The producer ordering (all CONDITIONs of a task precede its NORMAL_DONE, enforced by on_subtask_complete's acq_rel) plus in-order drain means an entry is always materialized on first arrival, so the old out-of-order staging (pending_completions / pending_normal_done) and the dead register_deferred path are removed. a2a3 and a5 kept symmetric. Unit tests cover push/drain, multi-producer ordering, capacity, and producer-interleaved-with-drain; a5 suite mirrors a2a3. UT: a5 7/7, a2a3 7/7.
99f7607 to
9dcc2fe
Compare
Summary
do { register_deferred(...); if (Skipped) SPIN_WAIT_HINT(); } while (Skipped)loop incomplete_slot_taskwith a lock-free push to a newAICoreCompletionMailbox; the consumer side drains the mailbox from insidepoll_and_complete. The wait_listbusylock is no longer in the FIN-handling critical path.PTO2TaskSlotState::any_subtask_deferreddiscriminator (packed into otherwise-padding soPTO2TaskSlotStatestays 64 bytes). Non-deferred tasks never touch the mailbox or the lock and complete inline on the FIN thread, matching pre-refactor behavior.Why
register_deferredtakesAsyncWaitList::busyfor every FIN, including non-deferred tasks (just to do afind_entry_by_tokenand returnNotDeferred). Under contention withpoll_and_complete(the only other lock holder),complete_slot_taskpreviously spun indo/while. Goal: registration never blocks on another scheduler's poll — a structural prerequisite for deferred-heavy workloads, even though current benchmark cases don't hit the contention regime hard enough to show measurable speedup.Approach
Producer side (FIN thread,
scheduler_completion.cpp::complete_slot_task):slab.error_codeandslab.countlocally (volatile, no lock).slab.count > 0: release-storeslot_state.any_subtask_deferred = true, then push NMSG_KIND_CONDITIONmessages to the mailbox via CAS-on-head.on_subtask_complete(acq_rel, baseline path).mixed_complete, acquire-loadany_subtask_deferred:MSG_KIND_TASK_NORMAL_DONEsentinel carrying&slot_stateinaddr. Skip inline completion; consumer handles it.on_mixed_task_complete+deferred_release(unchanged).The any_subtask_deferred flag uses the acq_rel barrier inside
on_subtask_complete(fetch_add) so the last subtask sees writes from all earlier subtasks of the same task. This makes "is this task deferred" a local atomic check rather than a global wait_list scan.Consumer side (
pto_scheduler.h::poll_and_complete):(wait_list.count > 0 || pending_completion_count > 0 || aicore_completion_mailbox_has_pending())inscheduler_dispatch.cpp— empty state skips the entire block, notry_lockattempted.try_lock(busy)— non-blocking; lose the race → return immediately, next iteration retries.drain_aicore_completion_mailbox_lockedwalks the mailbox in seq order, dispatches perkind:normal_done; if no entry exists (defensive; the producer-side fast-path means this shouldn't happen), inline-complete viatry_inline_complete_lockedrather than growingentries[].entries[]reverse, poll each unsatisfied condition, complete whennormal_done && waiting_count <= 0.Lock free for all FIN handlers;
busyis only acquired by the (at most one at a time) consumer.Files changed
src/{a2a3,a5}/runtime/tensormap_and_ringbuffer/runtime/aicore_completion_mailbox.h:kindfield in 64B message;MSG_KIND_CONDITION/MSG_KIND_TASK_NORMAL_DONEenums.src/{a2a3,a5}/runtime/tensormap_and_ringbuffer/runtime/pto_async_wait.h:try_push_condition/try_push_normal_done(MPSC CAS-on-head producers); drain rewritten to create entries directly +DrainCompletionSinkfor inline-complete handoff;mpsc_skipped_countdiagnostic counter;pending_normal_done[]stash (defensive, untouched by fast-path).src/{a2a3,a5}/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h:PTO2TaskSlotState::any_subtask_deferredatomic packed into existing padding, reset inreset_for_reuse.src/{a2a3,a5}/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp: producer fast-path replaces the do-while spin.src/{a2a3,a5}/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_dispatch.cpp: extend poll-gate withaicore_completion_mailbox_has_pending.src/{a2a3,a5}/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h:try_inline_complete_lockeddefinition + in-place drain ofdeferred_releaseon capacity overflow insidepoll_and_complete.tests/ut/cpp/{a2a3,a5}/test_aicore_completion_mailbox.cpp: new cpput suite (7 + 5 tests covering push/drain, normal_done attach, ring fullness, multi-producer no-loss / per-producer order).tests/ut/cpp/CMakeLists.txt: wire the two new test targets.Validation
no_hardwaretests pass (was 26 + 1 new).deferred_notify_demo:max_diff = 0on both ranks.dummy_task: all 3 scene cases PASSED (covers non-deferred fast-path on sim).deferred_notify_demo:max_diff = 0on both ranks.tools/benchmark_rounds.sh -n 30: 7/7 passing benchmarks within ±1.5% of baseline scheduler time (round-trip noise floor).mpsc_skipped_countstayed at 0 across all runs. See.docs/25.comm-api-refactor/07.mpsc-benchmark-results.mdfor the full table.spmd_paged_attentionCase1/Case2 hang in both baseline and after — pre-existing failure (reproduces ond873e48cwith--build), unrelated to this change; should be tracked as a separate issue.Test plan
cmake --build tests/ut/cpp/build && ctest -L no_hardware -j8clean.python examples/a2a3/.../deferred_notify_demo/test_deferred_notify_demo.py -p a2a3simpasses,max_diff = 0.python examples/a5/.../deferred_notify_demo/test_deferred_notify_demo.py -p a5simpasses,max_diff = 0.python tests/st/a2a3/.../dummy_task/test_dummy_task.py -p a2a3simall PASSED.alternating_matmul_add Case1or similar) shows nosched_error_codeset.Known caveats
spmd_paged_attentionCase1/Case2 hang predates this PR. Confirmed on the parent of the first MPSC commit; intentionally not in scope here.mpsc_skipped_countis a diagnostic. Non-zero value indicates either the 4096-slot mailbox is undersized for the workload or the consumer is starved; neither has been observed in the benchmark suite. Hook it up to L2 perf summary in a follow-up if deferred-heavy workloads emerge.register_deferredAPI and thepending_completions[]/pending_normal_done[]stashes are retained for the cpput unit tests and as defensive fallbacks. After the producer-side fast-path lands, the stash arrays are never written in practice; a follow-up can delete them and simplify the poll-gate to a 2-arg expression. Not bundled here to keep this PR mechanically smaller.