Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion vortex-cuda/benches/filter_cuda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use cudarc::driver::CudaView;
use cudarc::driver::DevicePtr;
use cudarc::driver::DevicePtrMut;
use cudarc::driver::DeviceRepr;
use cudarc::driver::ValidAsZeroBits;
use cudarc::driver::sys::CUevent_flags;
use futures::executor::block_on;
use vortex::error::VortexExpect;
Expand Down Expand Up @@ -135,7 +136,15 @@ async fn run_filter_timed<T: CubFilterable + DeviceRepr>(
/// Benchmark filter for a specific type.
fn benchmark_filter_type<T>(c: &mut Criterion, type_name: &str)
where
T: CubFilterable + DeviceRepr + From<u8> + Debug + Clone + Send + Sync + 'static,
T: CubFilterable
+ DeviceRepr
+ ValidAsZeroBits
+ From<u8>
+ Debug
+ Clone
+ Send
+ Sync
+ 'static,
{
let mut group = c.benchmark_group("cuda");

Expand Down
4 changes: 3 additions & 1 deletion vortex-cuda/src/arrow/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,12 @@ async fn export_arrow_validity_buffer(
) -> VortexResult<(Option<BufferHandle>, i64)> {
let mask = validity.execute_mask(len, ctx.execution_ctx())?;
let null_count = i64::try_from(mask.false_count())?;
let validity_bits = len + arrow_offset;
let validity_bytes = validity_bits.div_ceil(8);

let validity_buffer = match mask {
Mask::AllTrue(_) => return Ok((None, 0)),
Mask::AllFalse(len) => ByteBuffer::zeroed((len + arrow_offset).div_ceil(8)),
Mask::AllFalse(_) => ByteBuffer::zeroed(validity_bytes),
values @ Mask::Values(_) => values.into_bit_buffer().into_inner().2,
};
let validity = ctx
Expand Down
3 changes: 2 additions & 1 deletion vortex-cuda/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use cudarc::driver::CudaSlice;
use cudarc::driver::DeviceRepr;
use cudarc::driver::LaunchArgs;
use cudarc::driver::LaunchConfig;
use cudarc::driver::ValidAsZeroBits;
use futures::future::BoxFuture;
use tracing::debug;
use tracing::trace;
Expand Down Expand Up @@ -256,7 +257,7 @@ impl CudaExecutionCtx {
data: D,
) -> VortexResult<BoxFuture<'static, VortexResult<BufferHandle>>>
where
T: DeviceRepr + Debug + Send + Sync + 'static,
T: DeviceRepr + ValidAsZeroBits + Debug + Send + Sync + 'static,
D: AsRef<[T]> + Send + 'static,
{
self.stream.copy_to_device(data)
Expand Down
116 changes: 108 additions & 8 deletions vortex-cuda/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@
//! CUDA stream utility functions.

use std::fmt::Debug;
use std::mem::size_of;
use std::mem::size_of_val;
use std::ops::Deref;
use std::sync::Arc;

use cudarc::driver::CudaSlice;
use cudarc::driver::CudaStream;
use cudarc::driver::DeviceRepr;
use cudarc::driver::ValidAsZeroBits;
use cudarc::driver::result::stream;
use futures::future::BoxFuture;
use kanal::Sender;
use tracing::warn;
use vortex::array::buffer::BufferHandle;
use vortex::error::VortexResult;
use vortex::error::vortex_ensure;
use vortex::error::vortex_err;

use crate::CudaDeviceBuffer;
Expand Down Expand Up @@ -62,22 +66,32 @@ impl VortexCudaStream {
/// synchronously before returning. For **pinned** host memory the transfer
/// is truly async and the source must stay alive until the copy completes
/// (guaranteed by the returned future capturing it).
///
/// The returned [`BufferHandle`] keeps the source byte length, while its
/// CUDA allocation may include zeroed tail padding. This is needed for
/// Arrow validity buffers passed to cuDF, which reads masks as 32-bit words.
pub(crate) fn copy_to_device<T, D>(
&self,
data: D,
) -> VortexResult<BoxFuture<'static, VortexResult<BufferHandle>>>
where
T: DeviceRepr + Debug + Send + Sync + 'static,
T: DeviceRepr + ValidAsZeroBits + Debug + Send + Sync + 'static,
D: AsRef<[T]> + Send + 'static,
{
let host_slice: &[T] = data.as_ref();
let byte_count = size_of_val(host_slice);
let allocation_len = padded_device_allocation_len::<T>(byte_count)?;
// `device_alloc` binds the CUDA context to the current thread.
let mut cuda_slice: CudaSlice<T> = self.device_alloc(host_slice.len())?;
let mut cuda_slice: CudaSlice<T> = self.device_alloc::<T>(allocation_len)?;

self.memcpy_htod(host_slice, &mut cuda_slice)
let mut values = cuda_slice.slice_mut(..host_slice.len());
self.memcpy_htod(host_slice, &mut values)
.map_err(|e| vortex_err!("Failed to schedule H2D copy: {}", e))?;

zero_padding(self, &mut cuda_slice, host_slice.len())?;

let cuda_buf = CudaDeviceBuffer::new(cuda_slice);
let buffer = BufferHandle::new_device(Arc::new(cuda_buf)).slice(0..byte_count);
let stream = Arc::clone(&self.0);

Ok(Box::pin(async move {
Expand All @@ -86,7 +100,7 @@ impl VortexCudaStream {
// Keep source memory alive until copy completes.
let _keep_alive = data;

Ok(BufferHandle::new_device(Arc::new(cuda_buf)))
Ok(buffer)
}))
}

Expand All @@ -99,20 +113,62 @@ impl VortexCudaStream {
/// For **pageable** host memory (the common case), `memcpy_htod` stages
/// the source into a driver-managed pinned buffer before returning, so
/// the source data is safe to drop after this call.
///
/// Like [`copy_to_device`](Self::copy_to_device), this preserves the source
/// byte length on the returned handle while keeping any tail padding in the
/// backing CUDA allocation.
pub(crate) fn copy_to_device_sync<T>(&self, data: &[T]) -> VortexResult<BufferHandle>
where
T: DeviceRepr + Debug + Send + Sync + 'static,
T: DeviceRepr + ValidAsZeroBits + Debug + Send + Sync + 'static,
{
let mut cuda_slice: CudaSlice<T> = self.device_alloc(data.len())?;
let byte_count = size_of_val(data);
let allocation_len = padded_device_allocation_len::<T>(byte_count)?;
let mut cuda_slice: CudaSlice<T> = self.device_alloc(allocation_len)?;

self.memcpy_htod(data, &mut cuda_slice)
let mut values = cuda_slice.slice_mut(..data.len());
self.memcpy_htod(data, &mut values)
.map_err(|e| vortex_err!("Failed to schedule H2D copy: {}", e))?;

zero_padding(self, &mut cuda_slice, data.len())?;

let cuda_buf = CudaDeviceBuffer::new(cuda_slice);
Ok(BufferHandle::new_device(Arc::new(cuda_buf)))
Ok(BufferHandle::new_device(Arc::new(cuda_buf)).slice(0..byte_count))
}
}

/// Returns the typed CUDA allocation length for `byte_count`.
///
/// The backing allocation is padded for cuDF's 32-bit validity mask reads.
/// The returned length is in `T` elements.
fn padded_device_allocation_len<T>(byte_count: usize) -> VortexResult<usize> {
let element_size = size_of::<T>();
vortex_ensure!(
element_size != 0,
"cannot copy zero-sized values to CUDA device"
);
let min_allocation_bytes = byte_count.next_multiple_of(size_of::<u32>());
Ok(min_allocation_bytes.div_ceil(element_size))
}

/// Zeroes the allocation tail after the copied values.
///
/// Returned handles are sliced to the copied byte count; the trailing padding
/// exists so a final 32-bit mask read stays within the backing allocation.
fn zero_padding<T: DeviceRepr + ValidAsZeroBits>(
stream: &VortexCudaStream,
cuda_slice: &mut CudaSlice<T>,
copied_len: usize,
) -> VortexResult<()> {
if copied_len >= cuda_slice.len() {
return Ok(());
}

let mut padding = cuda_slice.slice_mut(copied_len..);
stream
.memset_zeros(&mut padding)
.map_err(|e| vortex_err!("Failed to zero device buffer padding: {}", e))
}

/// Registers a callback and asynchronously waits for its completion.
///
/// This function can be used to asynchronously wait for events previously
Expand Down Expand Up @@ -191,3 +247,47 @@ fn register_stream_callback(stream: &CudaStream) -> VortexResult<kanal::AsyncRec

Ok(rx.to_async())
}

#[cfg(test)]
mod tests {
use vortex::error::VortexResult;
use vortex::session::VortexSession;

use super::padded_device_allocation_len;
use crate::CudaSession;

#[test]
fn test_padded_device_allocation_len() -> VortexResult<()> {
assert_eq!(padded_device_allocation_len::<u8>(0)?, 0);
assert_eq!(padded_device_allocation_len::<u8>(1)?, 4);
assert_eq!(padded_device_allocation_len::<u8>(4)?, 4);
assert_eq!(padded_device_allocation_len::<u8>(5)?, 8);
assert_eq!(padded_device_allocation_len::<u32>(1)?, 1);
assert_eq!(padded_device_allocation_len::<u32>(5)?, 2);
Ok(())
}

#[crate::test]
async fn test_copy_to_device_preserves_visible_len_with_padding() -> VortexResult<()> {
let ctx = CudaSession::create_execution_ctx(&VortexSession::empty())?;
let handle = ctx.stream().copy_to_device(vec![0xab_u8])?.await?;

assert_eq!(handle.len(), 1);
let host = handle.try_to_host()?.await?;
assert_eq!(host.as_slice(), &[0xab]);

Ok(())
}

#[crate::test]
async fn test_copy_to_device_sync_preserves_visible_len_with_padding() -> VortexResult<()> {
let ctx = CudaSession::create_execution_ctx(&VortexSession::empty())?;
let handle = ctx.stream().copy_to_device_sync(&[1_u8, 2, 3, 4, 5])?;

assert_eq!(handle.len(), 5);
let host = handle.try_to_host()?.await?;
assert_eq!(host.as_slice(), &[1, 2, 3, 4, 5]);

Ok(())
}
}
48 changes: 26 additions & 22 deletions vortex-test/e2e-cuda/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,21 @@ pub unsafe extern "C" fn export_array(
) -> i32 {
let mut ctx = CudaSession::create_execution_ctx(&SESSION).unwrap();

let primitive = PrimitiveArray::from_iter(0u32..5);
let decimal = DecimalArray::from_iter(0i128..5, DecimalDType::new(38, 2));
let strings = VarBinViewArray::from_iter_str([
"one",
"two",
"this string is long three",
"four",
"this string is long five",
let primitive = PrimitiveArray::from_option_iter([Some(0u32), None, Some(2), Some(3), None]);
let decimal = DecimalArray::from_option_iter(
[Some(0i128), Some(1), None, Some(3), Some(4)],
DecimalDType::new(38, 2),
);
let strings = VarBinViewArray::from_iter_nullable_str([
Some("one"),
None,
Some("this string is long three"),
Some("four"),
None,
]);
let dates = TemporalArray::new_date(
PrimitiveArray::from_iter([100i32, 200, 300, 400, 500]).into_array(),
PrimitiveArray::from_option_iter([Some(100i32), None, Some(300), Some(400), None])
.into_array(),
TimeUnit::Days,
);

Expand Down Expand Up @@ -124,24 +128,24 @@ pub unsafe extern "C" fn validate_array(
let array = make_array(array_data);
let struct_array = array.as_struct();

let primitive = UInt32Array::from_iter(0..5);
let decimal = Decimal128Array::from_iter_values(0..5)
let primitive = UInt32Array::from_iter([Some(0), None, Some(2), Some(3), None]);
let decimal = Decimal128Array::from_iter([Some(0i128), Some(1), None, Some(3), Some(4)])
.with_precision_and_scale(38, 2)
.expect("with_precision_and_scale");
let string = StringArray::from_iter_values([
"one",
"two",
"this string is long three",
"four",
"this string is long five",
let string = StringArray::from_iter([
Some("one"),
None,
Some("this string is long three"),
Some("four"),
None,
]);
let date = Date32Array::from(vec![100i32, 200, 300, 400, 500]);
let date = Date32Array::from(vec![Some(100i32), None, Some(300), Some(400), None]);

let expected_fields = Fields::from_iter([
Field::new("prims", primitive.data_type().clone(), false),
Field::new("decimals", decimal.data_type().clone(), false),
Field::new("strings", string.data_type().clone(), false),
Field::new("dates", date.data_type().clone(), false),
Field::new("prims", primitive.data_type().clone(), true),
Field::new("decimals", decimal.data_type().clone(), true),
Field::new("strings", string.data_type().clone(), true),
Field::new("dates", date.data_type().clone(), true),
]);

assert_eq!(
Expand Down
Loading