Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/a5/platform/src/host/pmu_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ int PmuCollector::init(
/*shm_dev=*/nullptr, /*shm_host=*/nullptr, /*shm_size=*/0, device_id
);

// RAII rollback: any early return after this point releases the shm
// region + ring address table + per-core PmuAicoreRing + per-core
// PmuBuffers. Per-core rings are tracked separately via
// `guard.add_direct_ptr()` because they're plain alloc_cb allocations
// (no host shadow / not in dev_to_host_). `guard.commit()` runs on the
// success path before the trailing return 0.
profiling_common::InitRollbackGuard<decltype(manager_)> guard(manager_, free_cb);

// ---- Allocate shared header + buffer-state region ----
size_t shm_size = calc_pmu_data_size(num_cores);
void *shm_host_local = nullptr;
Expand Down Expand Up @@ -113,6 +121,7 @@ int PmuCollector::init(
return -1;
}
aicore_rings_dev_[c] = ring_dev;
guard.add_direct_ptr(ring_dev);
state->aicore_ring_ptr = reinterpret_cast<uint64_t>(ring_dev);
reinterpret_cast<uint64_t *>(aicore_ring_addrs_host_)[c] = reinterpret_cast<uint64_t>(ring_dev);

Expand Down Expand Up @@ -176,6 +185,7 @@ int PmuCollector::init(
"PMU collector initialized: %d cores, %d threads, SHM=0x%lx, CSV=%s (opened on first record)", num_cores,
num_threads, reinterpret_cast<unsigned long>(shm_dev_), csv_path_.c_str()
);
guard.commit();
return 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,46 @@ class BufferPoolManager {
malloc_shadows_.clear();
}

/**
* Abort-path cleanup: free EVERY framework-tracked device pointer (via
* `release_fn`) and every framework-malloc'd host shadow, then clear all
* containers. Distinct from `release_owned_buffers()` + `clear_mappings()`
* because this also catches buffers parked in callers' SPSC free_queues
* (which the framework tracked via `register_mapping` but does not own a
* queue for). Intended for `init()` error paths where `finalize()` has
* not run.
*
* Drains recycled/done/ready first (just discards — release goes via
* dev_to_host_ to avoid double-free) and then iterates the full
* dev→host map. Each unique dev_ptr is released exactly once.
*/
template <typename ReleaseFn>
void release_all_owned(const ReleaseFn &release_fn) {
for (auto &pool : recycled_)
pool.clear();
{
std::scoped_lock<std::mutex> lock(done_mutex_);
std::queue<DoneInfo>().swap(done_queue_);
}
{
std::scoped_lock<std::mutex> lock(ready_mutex_);
std::queue<ReadyBufferInfo>().swap(ready_queue_);
}
for (auto &kv : dev_to_host_) {
if (kv.first != nullptr) {
release_fn(kv.first);
}
// erase-based check (matches release_owned_buffers): atomic
// check-and-remove guards against a double-free if any duplicate
// mapping ever sneaks into dev_to_host_.
if (kv.second != nullptr && malloc_shadows_.erase(kv.second) > 0) {
std::free(kv.second);
}
}
dev_to_host_.clear();
malloc_shadows_.clear();
}

// -------------------------------------------------------------------------
// Per-tick mirror of the shared-memory region
// -------------------------------------------------------------------------
Expand Down
69 changes: 69 additions & 0 deletions src/common/platform/include/host/profiling_common/profiler_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
#include <optional>
#include <thread>
#include <utility>
#include <vector>

#include "common/memory_barrier.h"
#include "common/platform_config.h"
Expand Down Expand Up @@ -189,6 +190,74 @@ using ProfFreeCallback = std::function<int(void *dev_ptr)>;
// safe teardown via `clear_mappings()` / `release_owned_buffers()`. See
// `ProfilerBase::start()` for the inline definition.

/**
* RAII scope guard for collector `init()` rollback. On destruction (without
* `commit()`) it (1) calls `manager.release_all_owned(release_fn)` to free
* every framework-tracked dev_ptr + host shadow, and (2) releases any extra
* direct dev_ptrs the collector added via `add_direct_ptr()` (used for
* pointers the collector owns outside the framework — e.g. PMU per-core
* `PmuAicoreRing` allocations on a5).
*
* Pattern:
* int Collector::init(...) {
* ...
* set_memory_context(...);
* InitRollbackGuard<Manager> guard(manager_, free_cb);
* void *dev_ptr = alloc_paired_buffer(size, &host_ptr);
* if (dev_ptr == nullptr) return -1; // guard runs, frees nothing yet
* ...
* void *direct = alloc_cb(...);
* guard.add_direct_ptr(direct); // ensure it's freed on abort
* ...
* guard.commit(); // success — disarm
* initialized_ = true;
* return 0;
* }
*/
template <typename Manager>
class InitRollbackGuard {
public:
using ReleaseFn = std::function<int(void *)>;

InitRollbackGuard(Manager &manager, ReleaseFn release_fn) :
manager_(manager),
release_fn_(std::move(release_fn)),
committed_(false) {}

~InitRollbackGuard() {
if (committed_) return;
for (void *p : direct_ptrs_) {
if (p != nullptr && release_fn_) {
release_fn_(p);
}
}
// Call release_all_owned unconditionally: it also frees malloc'd
// host shadows (via std::free, no callback needed). Gating on
// release_fn_ here would leak shadows if a collector ever passed
// an empty free_cb. Device-pointer release is gated inside the
// lambda instead.
manager_.release_all_owned([this](void *p) {
if (p != nullptr && release_fn_) {
release_fn_(p);
}
});
}

InitRollbackGuard(const InitRollbackGuard &) = delete;
InitRollbackGuard &operator=(const InitRollbackGuard &) = delete;

void add_direct_ptr(void *p) {
if (p != nullptr) direct_ptrs_.push_back(p);
}
void commit() { committed_ = true; }

private:
Manager &manager_;
ReleaseFn release_fn_;
std::vector<void *> direct_ptrs_;
bool committed_;
};

// Result of Module::resolve_entry. Carries everything the unified
// process_entry algorithm needs to (a) refill the originating pool's free
// queue and (b) hand the ready buffer off to the collector.
Expand Down
7 changes: 7 additions & 0 deletions src/common/platform/src/host/scope_stats_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ int ScopeStatsCollector::init(
/*shm_dev=*/nullptr, /*shm_host=*/nullptr, /*shm_size=*/0, device_id
);

// RAII rollback: any early return after this point releases every
// framework-tracked buffer (shm region + per-buffer-state PmuBuffer-style
// entries) via free_cb. `guard.commit()` runs on the success path before
// the trailing return 0.
profiling_common::InitRollbackGuard<decltype(manager_)> guard(manager_, free_cb);

const int num_instances = 1;
size_t shm_size = calc_scope_stats_shm_size(num_instances);
void *shm_host_local = nullptr;
Expand Down Expand Up @@ -115,6 +121,7 @@ int ScopeStatsCollector::init(

initialized_ = true;
shm_dev_ = shm_dev_local;
guard.commit();

// Re-set_memory_context now that the shm region is ready. start(tf) gates
// on shm_host_ being non-null, so this is the moment the collector becomes
Expand Down
7 changes: 7 additions & 0 deletions src/common/platform/src/host/tensor_dump_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ int TensorDumpCollector::initialize(
/*shm_dev=*/nullptr, /*shm_host=*/nullptr, /*shm_size=*/0, device_id
);

// RAII rollback: any early return after this point releases the shm
// region + per-thread arenas + DumpMetaBuffers through the framework's
// dev→host map. `guard.commit()` runs on the success path before the
// trailing return 0.
profiling_common::InitRollbackGuard<decltype(manager_)> guard(manager_, free_cb);

// Allocate dump shared memory (header + buffer states)
size_t shm_size = calc_dump_data_size(num_dump_threads);
void *shm_host_local = nullptr;
Expand Down Expand Up @@ -151,6 +157,7 @@ int TensorDumpCollector::initialize(
arena_size / (1024 * 1024), PLATFORM_DUMP_BUFFERS_PER_THREAD
);

guard.commit();
return 0;
}

Expand Down
Loading