Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 14 additions & 127 deletions src/a2a3/platform/onboard/host/device_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,60 +184,19 @@ int DeviceRunner::destroy_comm_stream(void *stream) {
// `src/common/platform/onboard/host/device_runner_base.cpp`.

int DeviceRunner::run(Runtime &runtime, int block_dim, int launch_aicpu_num) {
// Validate launch_aicpu_num
if (launch_aicpu_num < 1 || launch_aicpu_num > PLATFORM_MAX_AICPU_THREADS) {
LOG_ERROR("launch_aicpu_num (%d) must be in range [1, %d]", launch_aicpu_num, PLATFORM_MAX_AICPU_THREADS);
return -1;
}
if (validate_launch_aicpu_num(launch_aicpu_num) != 0) return -1;

// Ensure device is initialized (lazy initialization)
int rc = ensure_device_initialized();
if (rc != 0) {
LOG_ERROR("ensure_device_initialized failed: %d", rc);
return rc;
}

// Lazy-allocate the 8-byte device_wall buffer on first run. AICPU writes
// the run wall (ns) into this buffer via KernelArgs::device_wall_data_base;
// host pulls it back via rtMemcpy D2H after stream sync (see post-sync
// block below). Kept resident across runs; freed in finalize.
if (device_wall_dev_ptr_ == nullptr) {
device_wall_dev_ptr_ = allocate_tensor(sizeof(uint64_t));
if (device_wall_dev_ptr_ != nullptr) {
kernel_args_.args.device_wall_data_base = reinterpret_cast<uint64_t>(device_wall_dev_ptr_);
uint64_t zero = 0;
(void)copy_to_device(device_wall_dev_ptr_, &zero, sizeof(uint64_t));
}
}

// Auto sentinel (block_dim == 0) is resolved directly from
// query_max_block_dim; explicit values still go through validate. The
// auto branch skips validate so we don't pay the ACL syscalls twice.
if (block_dim == 0) {
block_dim = query_max_block_dim(stream_aicore_);
LOG_INFO_V0("block_dim auto-resolved to %d", block_dim);
if (block_dim < 1) {
LOG_ERROR("block_dim auto-resolved to invalid value %d", block_dim);
return -1;
}
} else {
rc = validate_block_dim(stream_aicore_, block_dim);
if (rc != 0) {
return rc;
}
}
block_dim_ = block_dim;
ensure_device_wall_buffer();

block_dim = resolve_block_dim(block_dim);
if (block_dim < 0) return -1;
int num_aicore = block_dim * cores_per_blockdim_;
// Initialize handshake buffers in runtime
if (num_aicore > RUNTIME_MAX_WORKER) {
LOG_ERROR("block_dim (%d) exceeds RUNTIME_MAX_WORKER (%d)", block_dim, RUNTIME_MAX_WORKER);
return -1;
}

runtime.worker_count = num_aicore;
worker_count_ = num_aicore; // Store for print_handshake_results in destructor
runtime.aicpu_thread_num = launch_aicpu_num;

// Scope guards for register-address cleanup on all exit paths. Declared
// before the allocs so that an alloc-failure early-return still triggers
Expand Down Expand Up @@ -276,48 +235,16 @@ int DeviceRunner::run(Runtime &runtime, int block_dim, int launch_aicpu_num) {
}
}

// Calculate number of AIC cores (1/3 of total)
int num_aic = block_dim; // Round up for 1/3
// Build the profiling-flag bitfield (a2a3 carries an extra dep_gen bit).
uint32_t enable_profiling_flag = PROFILING_FLAG_NONE;
if (enable_dump_tensor_) {
SET_PROFILING_FLAG(enable_profiling_flag, PROFILING_FLAG_DUMP_TENSOR);
}
if (enable_l2_swimlane_) {
SET_PROFILING_FLAG(enable_profiling_flag, PROFILING_FLAG_L2_SWIMLANE);
}
if (enable_pmu_) {
SET_PROFILING_FLAG(enable_profiling_flag, PROFILING_FLAG_PMU);
}
if (enable_dep_gen_) {
SET_PROFILING_FLAG(enable_profiling_flag, PROFILING_FLAG_DEP_GEN);
}
if (enable_scope_stats_) {
SET_PROFILING_FLAG(enable_profiling_flag, PROFILING_FLAG_SCOPE_STATS);
}
if (enable_dump_tensor_) SET_PROFILING_FLAG(enable_profiling_flag, PROFILING_FLAG_DUMP_TENSOR);
if (enable_l2_swimlane_) SET_PROFILING_FLAG(enable_profiling_flag, PROFILING_FLAG_L2_SWIMLANE);
if (enable_pmu_) SET_PROFILING_FLAG(enable_profiling_flag, PROFILING_FLAG_PMU);
if (enable_dep_gen_) SET_PROFILING_FLAG(enable_profiling_flag, PROFILING_FLAG_DEP_GEN);
if (enable_scope_stats_) SET_PROFILING_FLAG(enable_profiling_flag, PROFILING_FLAG_SCOPE_STATS);
kernel_args_.args.enable_profiling_flag = enable_profiling_flag;

for (int i = 0; i < num_aicore; i++) {
runtime.workers[i].aicpu_ready = 0;
runtime.workers[i].aicore_done = 0;
runtime.workers[i].task = 0;
// Set core type: first 1/3 are AIC, remaining 2/3 are AIV
runtime.workers[i].core_type = (i < num_aic) ? CoreType::AIC : CoreType::AIV;
}

// Set function_bin_addr for all tasks: Runtime::func_id_to_addr_[] stores
// a CoreCallable device address; the binary code address is one
// compile-time offset further in. The dispatch path then reads
// resolved_addr_ from the on-device CoreCallable header.
LOG_DEBUG("Setting function_bin_addr for Tasks");
for (int i = 0; i < runtime.get_task_count(); i++) {
Task *task = runtime.get_task(i);
if (task != nullptr) {
uint64_t callable_addr = runtime.get_function_bin_addr(task->func_id);
task->function_bin_addr = callable_addr + CoreCallable::binary_data_offset();
LOG_DEBUG("Task %d (func_id=%d) -> function_bin_addr=0x%lx", i, task->func_id, task->function_bin_addr);
}
}
LOG_DEBUG("");
if (prepare_runtime_for_launch(runtime, block_dim, launch_aicpu_num) != 0) return -1;

auto runtime_args_cleanup = RAIIScopeGuard([this]() {
kernel_args_.finalize_device_kernel_args();
Expand Down Expand Up @@ -458,50 +385,10 @@ int DeviceRunner::run(Runtime &runtime, int block_dim, int launch_aicpu_num) {
return rc;
}

LOG_INFO_V0("=== aclrtSynchronizeStreamWithTimeout stream_aicpu_ ===");
rc = aclrtSynchronizeStreamWithTimeout(stream_aicpu_, PLATFORM_STREAM_SYNC_TIMEOUT_MS);
if (rc == ACL_ERROR_RT_STREAM_SYNC_TIMEOUT) {
LOG_ERROR(
"Stream sync timeout: stream=AICPU timeout_ms=%d device_id=%d block_dim=%d",
PLATFORM_STREAM_SYNC_TIMEOUT_MS, device_id_, block_dim_
);
return rc;
}
if (rc != 0) {
LOG_ERROR("aclrtSynchronizeStreamWithTimeout (AICPU) failed: %d", rc);
return rc;
}
rc = sync_run_streams();
if (rc != 0) return rc;

LOG_INFO_V0("=== aclrtSynchronizeStreamWithTimeout stream_aicore_ ===");
rc = aclrtSynchronizeStreamWithTimeout(stream_aicore_, PLATFORM_STREAM_SYNC_TIMEOUT_MS);
if (rc == ACL_ERROR_RT_STREAM_SYNC_TIMEOUT) {
LOG_ERROR(
"Stream sync timeout: stream=AICore timeout_ms=%d device_id=%d block_dim=%d",
PLATFORM_STREAM_SYNC_TIMEOUT_MS, device_id_, block_dim_
);
return rc;
}
if (rc != 0) {
LOG_ERROR("aclrtSynchronizeStreamWithTimeout (AICore) failed: %d", rc);
return rc;
}

// Pull the platform-level device wall (ns) back from the 8-byte device
// buffer that AICPU writes through via KernelArgs::device_wall_data_base.
// (We can't use the device_k_args_ shadow here — CANN's
// rtAicpuKernelLaunchExWithArgs copies KernelArgs into AICPU-private
// memory at launch, so AICPU's writes to its local copy don't propagate
// to device_k_args_.) Failure path is a soft warn — wall stays zero.
device_wall_ns_ = 0;
if (device_wall_dev_ptr_ != nullptr) {
int wall_rc = rtMemcpy(
&device_wall_ns_, sizeof(uint64_t), device_wall_dev_ptr_, sizeof(uint64_t), RT_MEMCPY_DEVICE_TO_HOST
);
if (wall_rc != 0) {
LOG_WARN("rtMemcpy(device_wall_ns) D2H failed: %d", wall_rc);
device_wall_ns_ = 0;
}
}
read_device_wall_ns();

// Tear down collectors. stop() joins mgmt then collector in the only safe
// order (mgmt's final-drain pass into L2 has poll as its consumer).
Expand Down
132 changes: 13 additions & 119 deletions src/a5/platform/onboard/host/device_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,104 +63,35 @@ DeviceRunner::~DeviceRunner() { finalize(); }
// `src/common/platform/onboard/host/device_runner_base.cpp`.

int DeviceRunner::run(Runtime &runtime, int block_dim, int launch_aicpu_num) {
if (launch_aicpu_num < 1 || launch_aicpu_num > PLATFORM_MAX_AICPU_THREADS) {
LOG_ERROR("launch_aicpu_num (%d) must be in range [1, %d]", launch_aicpu_num, PLATFORM_MAX_AICPU_THREADS);
return -1;
}
if (validate_launch_aicpu_num(launch_aicpu_num) != 0) return -1;

// Ensure device is initialized (lazy initialization)
int rc = ensure_device_initialized();
if (rc != 0) {
LOG_ERROR("ensure_device_initialized failed: %d", rc);
return rc;
}

// Lazy-allocate the 8-byte device_wall buffer on first run. See the a2a3
// onboard equivalent for the rationale.
if (device_wall_dev_ptr_ == nullptr) {
device_wall_dev_ptr_ = allocate_tensor(sizeof(uint64_t));
if (device_wall_dev_ptr_ != nullptr) {
kernel_args_.args.device_wall_data_base = reinterpret_cast<uint64_t>(device_wall_dev_ptr_);
uint64_t zero = 0;
(void)copy_to_device(device_wall_dev_ptr_, &zero, sizeof(uint64_t));
}
}

// Auto sentinel (block_dim == 0) is resolved directly from
// query_max_block_dim; explicit values still go through validate. The
// auto branch skips validate so we don't pay the ACL syscalls twice.
if (block_dim == 0) {
block_dim = query_max_block_dim(stream_aicore_);
LOG_INFO_V0("block_dim auto-resolved to %d", block_dim);
if (block_dim < 1) {
LOG_ERROR("block_dim auto-resolved to invalid value %d", block_dim);
return -1;
}
} else {
rc = validate_block_dim(stream_aicore_, block_dim);
if (rc != 0) {
return rc;
}
}
block_dim_ = block_dim;
ensure_device_wall_buffer();

block_dim = resolve_block_dim(block_dim);
if (block_dim < 0) return -1;
int num_aicore = block_dim * cores_per_blockdim_;
// Initialize handshake buffers in runtime
if (num_aicore > RUNTIME_MAX_WORKER) {
LOG_ERROR("block_dim (%d) exceeds RUNTIME_MAX_WORKER (%d)", block_dim, RUNTIME_MAX_WORKER);
return -1;
}

runtime.worker_count = num_aicore;
worker_count_ = num_aicore; // Store for print_handshake_results in destructor
runtime.aicpu_thread_num = launch_aicpu_num;

// Get AICore register addresses for register-based task dispatch
rc = init_aicore_register_addresses(&kernel_args_.args.regs, static_cast<uint64_t>(device_id_), mem_alloc_);
if (rc != 0) {
LOG_ERROR("init_aicore_register_addresses failed: %d", rc);
return rc;
}

// Calculate number of AIC cores (1/3 of total)
int num_aic = block_dim; // Round up for 1/3
// Build the profiling-flag bitfield.
uint32_t enable_profiling_flag = PROFILING_FLAG_NONE;
if (enable_dump_tensor_) {
SET_PROFILING_FLAG(enable_profiling_flag, PROFILING_FLAG_DUMP_TENSOR);
}
if (enable_l2_swimlane_) {
SET_PROFILING_FLAG(enable_profiling_flag, PROFILING_FLAG_L2_SWIMLANE);
}
if (enable_pmu_) {
SET_PROFILING_FLAG(enable_profiling_flag, PROFILING_FLAG_PMU);
}
if (enable_scope_stats_) {
SET_PROFILING_FLAG(enable_profiling_flag, PROFILING_FLAG_SCOPE_STATS);
}

for (int i = 0; i < num_aicore; i++) {
runtime.workers[i].aicpu_ready = 0;
runtime.workers[i].aicore_done = 0;
runtime.workers[i].task = 0;
// Set core type: first 1/3 are AIC, remaining 2/3 are AIV
runtime.workers[i].core_type = (i < num_aic) ? CoreType::AIC : CoreType::AIV;
}
// Profiling enablement lives on KernelArgs (no longer mirrored into Handshake).
if (enable_dump_tensor_) SET_PROFILING_FLAG(enable_profiling_flag, PROFILING_FLAG_DUMP_TENSOR);
if (enable_l2_swimlane_) SET_PROFILING_FLAG(enable_profiling_flag, PROFILING_FLAG_L2_SWIMLANE);
if (enable_pmu_) SET_PROFILING_FLAG(enable_profiling_flag, PROFILING_FLAG_PMU);
if (enable_scope_stats_) SET_PROFILING_FLAG(enable_profiling_flag, PROFILING_FLAG_SCOPE_STATS);
kernel_args_.args.enable_profiling_flag = enable_profiling_flag;

// Set function_bin_addr for all tasks: Runtime::func_id_to_addr_[] stores
// a CoreCallable device address; the binary code address is one
// compile-time offset further in.
LOG_DEBUG("Setting function_bin_addr for Tasks");
for (int i = 0; i < runtime.get_task_count(); i++) {
Task *task = runtime.get_task(i);
if (task != nullptr) {
uint64_t callable_addr = runtime.get_function_bin_addr(task->func_id);
task->function_bin_addr = callable_addr + CoreCallable::binary_data_offset();
LOG_DEBUG("Task %d (func_id=%d) -> function_bin_addr=0x%lx", i, task->func_id, task->function_bin_addr);
}
}
LOG_DEBUG("");
if (prepare_runtime_for_launch(runtime, block_dim, launch_aicpu_num) != 0) return -1;

// Scope guards for cleanup on all exit paths
auto regs_cleanup = RAIIScopeGuard([this]() {
Expand Down Expand Up @@ -284,47 +215,10 @@ int DeviceRunner::run(Runtime &runtime, int block_dim, int launch_aicpu_num) {
return rc;
}

LOG_INFO_V0("=== aclrtSynchronizeStreamWithTimeout stream_aicpu_ ===");
rc = aclrtSynchronizeStreamWithTimeout(stream_aicpu_, PLATFORM_STREAM_SYNC_TIMEOUT_MS);
if (rc == ACL_ERROR_RT_STREAM_SYNC_TIMEOUT) {
LOG_ERROR(
"Stream sync timeout: stream=AICPU timeout_ms=%d device_id=%d block_dim=%d",
PLATFORM_STREAM_SYNC_TIMEOUT_MS, device_id_, block_dim_
);
return rc;
}
if (rc != 0) {
LOG_ERROR("aclrtSynchronizeStreamWithTimeout (AICPU) failed: %d", rc);
return rc;
}

LOG_INFO_V0("=== aclrtSynchronizeStreamWithTimeout stream_aicore_ ===");
rc = aclrtSynchronizeStreamWithTimeout(stream_aicore_, PLATFORM_STREAM_SYNC_TIMEOUT_MS);
if (rc == ACL_ERROR_RT_STREAM_SYNC_TIMEOUT) {
LOG_ERROR(
"Stream sync timeout: stream=AICore timeout_ms=%d device_id=%d block_dim=%d",
PLATFORM_STREAM_SYNC_TIMEOUT_MS, device_id_, block_dim_
);
return rc;
}
if (rc != 0) {
LOG_ERROR("aclrtSynchronizeStreamWithTimeout (AICore) failed: %d", rc);
return rc;
}
rc = sync_run_streams();
if (rc != 0) return rc;

// Pull the platform-level device wall (ns) back from the dedicated
// 8-byte device buffer (AICPU writes via KernelArgs::device_wall_data_base
// — CANN copies args at launch so the inline field would be unreachable).
device_wall_ns_ = 0;
if (device_wall_dev_ptr_ != nullptr) {
int wall_rc = rtMemcpy(
&device_wall_ns_, sizeof(uint64_t), device_wall_dev_ptr_, sizeof(uint64_t), RT_MEMCPY_DEVICE_TO_HOST
);
if (wall_rc != 0) {
LOG_WARN("rtMemcpy(device_wall_ns) D2H failed: %d", wall_rc);
device_wall_ns_ = 0;
}
}
read_device_wall_ns();

// Tear down collectors. stop() joins mgmt then collector in the only
// safe order (mgmt's final-drain pass into L2 has poll as its
Expand Down
Loading
Loading