Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .cirrus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ task:
- export LD_LIBRARY_PATH=./rdma-core/build/lib
- just test-basic-with-cov
- just test-rc-pingpong-with-cov
- just test-rc-pingpong-with-events-with-cov
- just test-cmtime-with-cov
- just generate-cov
- sed -i 's#/tmp/cirrus-ci-build/##g' lcov.info
Expand Down
5 changes: 5 additions & 0 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ test-rc-pingpong-with-cov:
sleep 2
cargo llvm-cov --no-report run --example rc_pingpong_split -- -d {{rdma_dev}} -g 1 127.0.0.1

test-rc-pingpong-with-events-with-cov:
cargo llvm-cov --no-report run --example rc_pingpong_split -- -d {{rdma_dev}} -g 1 -e &
sleep 2
cargo llvm-cov --no-report run --example rc_pingpong_split -- -d {{rdma_dev}} -g 1 -e 127.0.0.1

test-cmtime-with-cov:
cargo llvm-cov --no-report run --example cmtime -- -b {{ip}} &
sleep 2
Expand Down
210 changes: 126 additions & 84 deletions examples/rc_pingpong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use postcard::{from_bytes, to_allocvec};
use serde::{Deserialize, Serialize};
use sideway::ibverbs::address::{AddressHandleAttribute, Gid};
use sideway::ibverbs::completion::{
CreateCompletionQueueWorkCompletionFlags, GenericCompletionQueue, WorkCompletionStatus,
CompletionChannel, CreateCompletionQueueWorkCompletionFlags, GenericCompletionQueue, PollCompletionQueueError,
WorkCompletionStatus,
};
use sideway::ibverbs::device::{DeviceInfo, DeviceList};
use sideway::ibverbs::device_context::Mtu;
Expand Down Expand Up @@ -69,6 +70,9 @@ pub struct Args {
/// Get CQE with timestamp
#[arg(long, short = 't', default_value_t = false)]
ts: bool,
/// Use CQ events instead of busy polling
#[arg(long, short = 'e', default_value_t = false)]
use_events: bool,
/// If no value provided, start a server and wait for connection, otherwise, connect to server at [host]
#[arg(name = "host")]
server_ip: Option<String>,
Expand Down Expand Up @@ -156,6 +160,13 @@ fn main() -> anyhow::Result<()> {
}
}

// Create completion channel if using events
let comp_channel = if args.use_events {
Some(CompletionChannel::new(&context).expect("Couldn't create completion channel"))
} else {
None
};

let pd = context.alloc_pd().unwrap_or_else(|_| panic!("Couldn't allocate PD"));
let send_data: Vec<u8> = vec![0; args.size as _];
let send_mr = unsafe {
Expand Down Expand Up @@ -189,16 +200,28 @@ fn main() -> anyhow::Result<()> {
);
}

// Associate completion channel with CQ if using events
if let Some(ref channel) = comp_channel {
cq_builder.setup_comp_channel(channel, 0);
}

let cq = cq_builder.setup_cqe(rx_depth + 1).build_ex().unwrap();

let cq_handle = GenericCompletionQueue::from(Arc::clone(&cq));

// Request initial notification if using events
if args.use_events {
cq_handle
.req_notify_cq(false)
.expect("Couldn't request CQ notification");
}

let mut builder = pd.create_qp_builder();

let mut qp = builder
.setup_max_inline_data(128)
.setup_send_cq(cq_handle.clone())
.setup_recv_cq(cq_handle)
.setup_recv_cq(cq_handle.clone())
.setup_max_send_wr(1)
.setup_max_recv_wr(rx_depth)
.build_ex()
Expand Down Expand Up @@ -324,100 +347,119 @@ fn main() -> anyhow::Result<()> {
outstanding_send = true;
}
// poll for the completion
{
loop {
match cq.start_poll() {
Ok(mut poller) => {
while let Some(wc) = poller.next() {
if wc.status() != WorkCompletionStatus::Success as u32 {
panic!(
"Failed status {:#?} ({}) for wr_id {}",
Into::<WorkCompletionStatus>::into(wc.status()),
wc.status(),
wc.wr_id()
);
}
match wc.wr_id() {
SEND_WR_ID => {
scnt += 1;
outstanding_send = false;
},
RECV_WR_ID => {
rcnt += 1;
rout -= 1;

// Post more receives if the receive side credit is low
if rout <= rx_depth / 2 {
let to_post = rx_depth - rout;
for _ in 0..to_post {
let mut guard = qp.start_post_recv();
let recv_handle = guard.construct_wr(RECV_WR_ID);
unsafe {
recv_handle.setup_sge(
recv_mr.lkey(),
recv_data.as_mut_ptr() as _,
args.size,
);
};
guard.post().unwrap();
}
rout += to_post;
let mut num_cq_events: u32 = 0;
loop {
// If using events, wait for CQ event before polling
if args.use_events {
if let Some(ref channel) = comp_channel {
// Get the CQ event (this blocks until an event arrives)
let _event_cq = channel.get_cq_event().expect("Failed to get CQ event");
num_cq_events += 1;

// Re-arm the notification BEFORE polling to avoid missing events
cq_handle
.req_notify_cq(false)
.expect("Couldn't request CQ notification");
}
}

// Poll for completions
match cq.start_poll() {
Ok(mut poller) => {
while let Some(wc) = poller.next() {
if wc.status() != WorkCompletionStatus::Success as u32 {
panic!(
"Failed status {:#?} ({}) for wr_id {}",
Into::<WorkCompletionStatus>::into(wc.status()),
wc.status(),
wc.wr_id()
);
}
match wc.wr_id() {
SEND_WR_ID => {
scnt += 1;
outstanding_send = false;
},
RECV_WR_ID => {
rcnt += 1;
rout -= 1;

// Post more receives if the receive side credit is low
if rout <= rx_depth / 2 {
let to_post = rx_depth - rout;
for _ in 0..to_post {
let mut guard = qp.start_post_recv();
let recv_handle = guard.construct_wr(RECV_WR_ID);
unsafe {
recv_handle.setup_sge(recv_mr.lkey(), recv_data.as_mut_ptr() as _, args.size);
};
guard.post().unwrap();
}
rout += to_post;
}

if args.ts {
let timestamp = wc.completion_timestamp();
if ts_param.last_completion_with_timestamp != 0 {
let delta: u64 = if timestamp >= ts_param.completion_recv_prev_time {
timestamp - ts_param.completion_recv_prev_time
} else {
completion_timestamp_mask - ts_param.completion_recv_prev_time
+ timestamp
+ 1
};

ts_param.completion_recv_max_time_delta =
ts_param.completion_recv_max_time_delta.max(delta);
ts_param.completion_recv_min_time_delta =
ts_param.completion_recv_min_time_delta.min(delta);
ts_param.completion_recv_total_time_delta += delta;
ts_param.completion_with_time_iters += 1;
}

ts_param.completion_recv_prev_time = timestamp;
ts_param.last_completion_with_timestamp = 1;
} else {
ts_param.last_completion_with_timestamp = 0;
if args.ts {
let timestamp = wc.completion_timestamp();
if ts_param.last_completion_with_timestamp != 0 {
let delta: u64 = if timestamp >= ts_param.completion_recv_prev_time {
timestamp - ts_param.completion_recv_prev_time
} else {
completion_timestamp_mask - ts_param.completion_recv_prev_time + timestamp + 1
};

ts_param.completion_recv_max_time_delta =
ts_param.completion_recv_max_time_delta.max(delta);
ts_param.completion_recv_min_time_delta =
ts_param.completion_recv_min_time_delta.min(delta);
ts_param.completion_recv_total_time_delta += delta;
ts_param.completion_with_time_iters += 1;
}
},
_ => {
panic!("Unknown error!");
},
}

if scnt < args.iter && !outstanding_send {
// Post another send if we haven't reached the iteration limit
let mut guard = qp.start_post_send();
let send_handle = guard.construct_wr(SEND_WR_ID, WorkRequestFlags::Signaled).setup_send();
unsafe {
send_handle.setup_sge(send_mr.lkey(), send_data.as_ptr() as _, args.size);
ts_param.completion_recv_prev_time = timestamp;
ts_param.last_completion_with_timestamp = 1;
} else {
ts_param.last_completion_with_timestamp = 0;
}
guard.post()?;
outstanding_send = true;
},
_ => {
panic!("Unknown error!");
},
}

if scnt < args.iter && !outstanding_send {
// Post another send if we haven't reached the iteration limit
let mut guard = qp.start_post_send();
let send_handle = guard.construct_wr(SEND_WR_ID, WorkRequestFlags::Signaled).setup_send();
unsafe {
send_handle.setup_sge(send_mr.lkey(), send_data.as_ptr() as _, args.size);
}
guard.post()?;
outstanding_send = true;
}
},
Err(_) => {
}
},
Err(PollCompletionQueueError::CompletionQueueEmpty) => {
// CQ is empty - if not using events, continue busy polling
if !args.use_events {
continue;
},
}
}
},
Err(e) => {
panic!("Failed to poll CQ: {:?}", e);
},
}

// Check if we're done
if scnt >= args.iter && rcnt >= args.iter {
break;
}
// Check if we're done
if scnt >= args.iter && rcnt >= args.iter {
break;
}
}

// Acknowledge all CQ events before cleanup
if num_cq_events > 0 {
cq_handle.ack_events(num_cq_events);
}

let end_time = clock.now();
let time = end_time.duration_since(start_time);
let bytes = args.size as u64 * args.iter as u64 * 2;
Expand Down
Loading
Loading