Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 112 additions & 35 deletions ghostscope-loader/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@ use ghostscope_protocol::{ParsedTraceEvent, StreamingTraceParser, TraceContext};
use log::log_enabled;
use log::Level as LogLevel;
use std::convert::TryInto;
use std::future::poll_fn;
use std::os::unix::io::AsRawFd;
use std::os::unix::io::RawFd;
use std::task::Poll;
use tokio::io::unix::AsyncFd;
use tokio::io::Interest;
use tracing::{debug, error, info, warn};

// Export kernel capabilities detection
Expand All @@ -51,11 +55,25 @@ enum EventMap {
RingBuf(RingBuf<MapData>),
PerfEventArray {
_map: PerfEventArray<MapData>,
buffers: Vec<aya::maps::perf::PerfEventArrayBuffer<MapData>>,
cpu_ids: Vec<u32>,
cpu_buffers: Vec<PerfEventCpuBuffer>,
},
}

#[derive(Clone, Copy, Debug)]
struct PerfBufferFd(RawFd);

impl AsRawFd for PerfBufferFd {
fn as_raw_fd(&self) -> RawFd {
self.0
}
}

struct PerfEventCpuBuffer {
cpu_id: u32,
buffer: aya::maps::perf::PerfEventArrayBuffer<MapData>,
readiness: AsyncFd<PerfBufferFd>,
}

pub fn hello() -> String {
format!("Loader: {}", ghostscope_compiler::hello())
}
Expand Down Expand Up @@ -425,8 +443,7 @@ impl GhostScopeLoader {
);

// Open buffers for all online CPUs
let mut buffers = Vec::new();
let mut cpu_ids = Vec::new();
let mut cpu_buffers = Vec::new();

for cpu_id in online_cpus {
let pages = self.perf_page_count;
Expand All @@ -443,25 +460,36 @@ impl GhostScopeLoader {
cpu_id
);
}
buffers.push(buffer);
cpu_ids.push(cpu_id);
let fd = buffer.as_raw_fd();
let readiness =
AsyncFd::with_interest(PerfBufferFd(fd), Interest::READABLE).map_err(
|err| {
LoaderError::Generic(format!(
"Failed to register perf buffer fd for CPU {cpu_id}: {err}"
))
},
)?;
cpu_buffers.push(PerfEventCpuBuffer {
cpu_id,
buffer,
readiness,
});
}
Err(e) => {
warn!("Failed to open perf buffer for CPU {}: {}", cpu_id, e);
}
}
}

if buffers.is_empty() {
if cpu_buffers.is_empty() {
return Err(LoaderError::Generic(
"Failed to open any perf event buffers".to_string(),
));
}

EventMap::PerfEventArray {
_map: perf_array,
buffers,
cpu_ids,
cpu_buffers,
}
} else {
return Err(LoaderError::MapNotFound(
Expand Down Expand Up @@ -684,13 +712,14 @@ impl GhostScopeLoader {

match event_map {
EventMap::RingBuf(ringbuf) => {
// Create AsyncFd and wait for readable
// Create AsyncFd and wait for readable; clear readiness to avoid spin
let async_fd = AsyncFd::new(ringbuf.as_raw_fd())
.map_err(|e| LoaderError::Generic(format!("Failed to create AsyncFd: {e}")))?;
let _guard = async_fd
let mut guard = async_fd
.readable()
.await
.map_err(|e| LoaderError::Generic(format!("AsyncFd error: {e}")))?;
guard.clear_ready();

// Read all available events
while let Some(item) = ringbuf.next() {
Expand All @@ -705,36 +734,31 @@ impl GhostScopeLoader {
}
}
}
EventMap::PerfEventArray {
buffers, cpu_ids, ..
} => {
EventMap::PerfEventArray { cpu_buffers, .. } => {
use bytes::BytesMut;

// Poll all CPU buffers (non-blocking check)
for (idx, buffer) in buffers.iter_mut().enumerate() {
// Check if buffer has events
if !buffer.readable() {
continue;
}
let parser = &mut self.parser;

// Read events from this CPU's buffer
let mut drain_buffer = |entry: &mut PerfEventCpuBuffer| -> Result<bool> {
let mut produced = false;
let mut read_bufs = vec![BytesMut::with_capacity(4096)];
match buffer.read_events(&mut read_bufs) {

match entry.buffer.read_events(&mut read_bufs) {
Ok(result) => {
if result.read > 0 {
produced = true;
info!(
"Read {} events from CPU {} buffer",
result.read, cpu_ids[idx]
result.read, entry.cpu_id
);
}
if result.lost > 0 {
warn!(
"Lost {} events from CPU {} buffer",
result.lost, cpu_ids[idx]
result.lost, entry.cpu_id
);
}

// Parse and collect each event
for (i, data) in read_bufs.iter().enumerate().take(result.read) {
debug!(
"PerfEvent {}: {} bytes - {:02x?}",
Expand All @@ -743,27 +767,80 @@ impl GhostScopeLoader {
&data[..data.len().min(32)]
);

match self.parser.process_segment(data, trace_context) {
match parser.process_segment(data, trace_context) {
Ok(Some(parsed_event)) => events.push(parsed_event),
Ok(None) => {}
Err(e) => {
return Err(LoaderError::Generic(
format!("Fatal: Failed to parse trace event from PerfEventArray CPU {}: {e}",
cpu_ids[idx])
));
let cpu = entry.cpu_id;
return Err(LoaderError::Generic(format!(
"Fatal: Failed to parse trace event from PerfEventArray CPU {cpu}: {e}"
)));
}
}
}
}
Err(e) => {
warn!("Failed to read from CPU {} buffer: {}", cpu_ids[idx], e);
warn!("Failed to read from CPU {} buffer: {}", entry.cpu_id, e);
}
}
}

// If no events were collected, yield to avoid busy waiting
if events.is_empty() {
tokio::task::yield_now().await;
Ok(produced)
};

loop {
// Drain any buffers that already report data without waiting.
let mut made_progress = false;
for entry in cpu_buffers.iter_mut() {
if entry.buffer.readable() {
made_progress |= drain_buffer(entry)?;
}
}

if made_progress {
break;
}

// Wait for at least one buffer to become readable.
let ready_idx = poll_fn(|cx| {
for (idx, entry) in cpu_buffers.iter().enumerate() {
match entry.readiness.poll_read_ready(cx) {
Poll::Ready(Ok(mut guard)) => {
guard.clear_ready();
return Poll::Ready(Ok(idx));
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => {}
}
}
Poll::Pending
})
.await
.map_err(|e| {
LoaderError::Generic(format!(
"AsyncFd error while waiting for perf events: {e}"
))
})?;

// Drain the buffer that triggered readiness.
made_progress |= drain_buffer(
cpu_buffers
.get_mut(ready_idx)
.expect("ready index should be valid"),
)?;

// Drain any other buffers now advertising data.
for (idx, entry) in cpu_buffers.iter_mut().enumerate() {
if idx == ready_idx || !entry.buffer.readable() {
continue;
}
made_progress |= drain_buffer(entry)?;
}

if made_progress {
break;
}
// No events were produced despite readiness (eg. lost event markers).
// Loop back and wait again.
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion ghostscope/src/cli/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ async fn run_cli_with_session(
)
.await
{
error!("Failed to compile and load script: {}", e);
error!("Failed to compile and load script: {:#}", e);
info!("GhostScope encountered an error during script compilation. Exiting gracefully.");
return Err(e);
}
Expand Down
37 changes: 23 additions & 14 deletions ghostscope/src/script/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,18 @@ async fn create_and_attach_loader(
.as_ref()
.map(|c| c.ebpf_config.proc_module_offsets_max_entries as u32)
.unwrap_or(4096);
let pin_path = ghostscope_process::maps::proc_offsets_pin_path();
if let Err(e) = ghostscope_process::maps::ensure_pinned_proc_offsets_exists(max_entries) {
warn!(
"Failed to ensure pinned proc_module_offsets map exists ({} entries): {}",
max_entries, e
error!(
"Failed to ensure pinned proc_module_offsets map exists at {} ({} entries): {:#}",
pin_path.display(),
max_entries,
e
);
return Err(e.context(format!(
"Unable to prepare pinned proc_module_offsets map at {}",
pin_path.display()
)));
}

let mut loader = GhostScopeLoader::new(&config.ebpf_bytecode)
Expand Down Expand Up @@ -398,14 +405,17 @@ pub async fn compile_and_load_script_for_tui(
}
Err(e) => {
error!(
"Failed to attach uprobe for trace_id {}: {}",
"Failed to attach uprobe for trace_id {}: {:#}",
config.assigned_trace_id, e
);
tracing::info!(
"Attachment hints: check privileges, target binary availability, PID validity, and function addresses if needed."
);
// Update corresponding result to failed
for result in &mut results {
if result.pc_address == config.function_address.unwrap_or(0) {
result.status =
ExecutionStatus::Failed(format!("Failed to attach uprobe: {e}"));
ExecutionStatus::Failed(format!("Failed to attach uprobe: {e:#}"));
success_count -= 1;
failed_count += 1;
break;
Expand Down Expand Up @@ -682,17 +692,16 @@ pub async fn compile_and_load_script_for_cli(
}
Err(e) => {
error!(
"Failed to attach uprobe for trace_id {}: {}",
"Failed to attach uprobe for trace_id {}: {:#}",
config.assigned_trace_id, e
);
return Err(anyhow::anyhow!(
"Failed to attach uprobe: {}. Possible reasons: \
1. Need root permissions (run with sudo), \
2. Target binary doesn't exist or lacks debug info, \
3. Process not running or PID invalid, \
4. Function addresses not accessible",
e
));
tracing::info!(
"Attachment hints: check privileges, target binary availability, PID validity, and function addresses if needed."
);
return Err(e.context(format!(
"Failed to attach uprobe for trace_id {}",
config.assigned_trace_id
)));
}
}
}
Expand Down
14 changes: 8 additions & 6 deletions ghostscope/src/tracing/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ impl TraceInstance {
info!("Trace {} is already enabled", self.trace_id);
Ok(())
} else if let Some(ref mut loader) = self.loader {
info!(
"Enabling trace {} for target '{}' at PC 0x{:x} in binary '{}'",
self.trace_id, self.target_display, self.pc, self.binary_path
);
if loader.is_uprobe_attached() {
warn!("Uprobe already attached for trace {}", self.trace_id);
let already_attached = loader.is_uprobe_attached();
if !already_attached {
info!(
"Enabling trace {} for target '{}' at PC 0x{:x} in binary '{}'",
self.trace_id, self.target_display, self.pc, self.binary_path
);
}
if already_attached {
self.is_enabled = true;
Ok(())
} else if loader.get_attachment_info().is_some() {
Expand Down