diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3b0b7cd5635..4d608622f2d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -537,6 +537,13 @@ jobs: --no-fail-fast \ --target x86_64-unknown-linux-gnu \ --verbose + - name: Build cudf test library + run: cargo +nightly build --locked -p vortex-test-e2e-cuda --target x86_64-unknown-linux-gnu + - name: Download and run cudf-test-harness + run: | + curl -fsSL https://github.com/vortex-data/cudf-test-harness/releases/latest/download/cudf-test-harness-x86_64.tar.gz | tar -xz + cd cudf-test-harness-x86_64 + ./cudf-test-harness check $GITHUB_WORKSPACE/target/x86_64-unknown-linux-gnu/debug/libvortex_test_e2e_cuda.so rust-test-other: name: "Rust tests (${{ matrix.os }})" diff --git a/Cargo.lock b/Cargo.lock index 1d3854d23ef..944b8fa0832 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10420,6 +10420,8 @@ name = "vortex-cuda" version = "0.1.0" dependencies = [ "arc-swap", + "arrow-data 57.2.0", + "arrow-schema 57.2.0", "async-trait", "codspeed-criterion-compat-walltime", "cudarc", @@ -11100,12 +11102,10 @@ dependencies = [ name = "vortex-test-e2e-cuda" version = "0.1.0" dependencies = [ - "cudarc", - "rstest", - "tokio", - "vortex-array", + "arrow-schema 57.2.0", + "futures", + "vortex", "vortex-cuda", - "vortex-error", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index c63e310ab97..01914a4d9b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,9 +25,9 @@ members = [ "vortex-datafusion", "vortex-duckdb", "vortex-cuda", + "vortex-cuda/cub", "vortex-cuda/macros", "vortex-cuda/nvcomp", - "vortex-cuda/cub", "vortex-cxx", "vortex-ffi", "fuzz", diff --git a/vortex-cuda/Cargo.toml b/vortex-cuda/Cargo.toml index ed85085545a..30db4d6a30a 100644 --- a/vortex-cuda/Cargo.toml +++ b/vortex-cuda/Cargo.toml @@ -22,6 +22,8 @@ _test-harness = [] [dependencies] arc-swap = { workspace = true } +arrow-data = { workspace = true, features = ["ffi"] } +arrow-schema = { workspace = true, features = ["ffi"] } async-trait = { workspace = true } cudarc = { workspace = true, features = ["f16"] } fastlanes = { workspace = true } diff --git a/vortex-cuda/src/arrow/canonical.rs b/vortex-cuda/src/arrow/canonical.rs new file mode 100644 index 00000000000..7e8fe44ad6e --- /dev/null +++ b/vortex-cuda/src/arrow/canonical.rs @@ -0,0 +1,332 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use async_trait::async_trait; +use futures::future::BoxFuture; +use vortex_array::ArrayRef; +use vortex_array::Canonical; +use vortex_array::ToCanonical; +use vortex_array::arrays::BoolArrayParts; +use vortex_array::arrays::DecimalArrayParts; +use vortex_array::arrays::PrimitiveArrayParts; +use vortex_array::arrays::StructArray; +use vortex_array::arrays::StructArrayParts; +use vortex_array::arrays::VarBinViewArrayParts; +use vortex_array::buffer::BufferHandle; +use vortex_array::validity::Validity; +use vortex_buffer::BufferMut; +use vortex_dtype::DecimalType; +use vortex_dtype::datetime::AnyTemporal; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; + +use crate::CudaExecutionCtx; +use crate::arrow::ArrowArray; +use crate::arrow::ArrowDeviceArray; +use crate::arrow::DeviceType; +use crate::arrow::ExportDeviceArray; +use crate::arrow::PrivateData; +use crate::arrow::SyncEvent; +use crate::executor::CudaArrayExt; + +/// An implementation of `ExportDeviceArray` that exports Vortex arrays to `ArrowDeviceArray` by +/// first decoding the array on the GPU and then converting the canonical type to the nearest +/// Arrow equivalent. +#[derive(Debug)] +pub(crate) struct CanonicalDeviceArrayExport; + +#[async_trait] +impl ExportDeviceArray for CanonicalDeviceArrayExport { + async fn export_device_array( + &self, + array: ArrayRef, + ctx: &mut CudaExecutionCtx, + ) -> VortexResult { + let cuda_array = array.execute_cuda(ctx).await?; + + let (arrow_array, sync_event) = export_canonical(cuda_array, ctx).await?; + + Ok(ArrowDeviceArray { + array: arrow_array, + sync_event, + device_id: ctx.stream().context().ordinal() as i64, + device_type: DeviceType::Cuda, + _reserved: Default::default(), + }) + } +} + +fn export_canonical( + cuda_array: Canonical, + ctx: &mut CudaExecutionCtx, +) -> BoxFuture<'_, VortexResult<(ArrowArray, SyncEvent)>> { + Box::pin(async { + match cuda_array { + Canonical::Struct(struct_array) => export_struct(struct_array, ctx).await, + Canonical::Primitive(primitive) => { + let len = primitive.len(); + let PrimitiveArrayParts { + buffer, validity, .. + } = primitive.into_parts(); + + check_validity_empty(validity)?; + + let buffer = ensure_device_resident(buffer, ctx).await?; + + export_fixed_size(buffer, len, 0, ctx) + } + Canonical::Null(null_array) => { + let len = null_array.len(); + + // The null array has no buffers, no children, just metadata. + let mut array = ArrowArray::empty(); + array.length = len as i64; + array.null_count = len as i64; + array.release = Some(release_array); + + // we don't need a sync event for Null since no data is copied. + Ok((array, None)) + } + Canonical::Decimal(decimal) => { + let len = decimal.len(); + let DecimalArrayParts { + values, + values_type, + validity, + .. + } = decimal.into_parts(); + + // verify that there is no null buffer + check_validity_empty(validity)?; + + // TODO(aduffy): GPU kernel for upcasting. + vortex_ensure!( + values_type >= DecimalType::I32, + "cannot export DecimalArray with values type {values_type}. must be i32 or wider." + ); + + let buffer = if values.is_on_device() { + values + } else { + ctx.move_to_device(values)?.await? + }; + + export_fixed_size(buffer, len, 0, ctx) + } + Canonical::Extension(extension) => { + if !extension.ext_dtype().is::() { + vortex_bail!("only support temporal extension types currently"); + } + + let values = extension.storage().to_primitive(); + let len = extension.len(); + + let PrimitiveArrayParts { + buffer, validity, .. + } = values.into_parts(); + + check_validity_empty(validity)?; + + let buffer = ensure_device_resident(buffer, ctx).await?; + export_fixed_size(buffer, len, 0, ctx) + } + + Canonical::Bool(bool_array) => { + let BoolArrayParts { + bits, + offset, + len, + validity, + .. + } = bool_array.into_parts(); + + check_validity_empty(validity)?; + + export_fixed_size(bits, len, offset, ctx) + } + Canonical::VarBinView(view) => { + let len = view.len(); + + let VarBinViewArrayParts { + views, + buffers, + validity, + .. + } = view.into_parts(); + + check_validity_empty(validity)?; + + export_variadic(Some(views), buffers.to_vec(), len, ctx).await + } + c => todo!("support for exporting {} arrays", c.dtype()), + } + }) +} + +async fn export_struct( + array: StructArray, + ctx: &mut CudaExecutionCtx, +) -> VortexResult<(ArrowArray, SyncEvent)> { + let len = array.len(); + let StructArrayParts { + validity, fields, .. + } = array.into_parts(); + + check_validity_empty(validity)?; + + // We need the children to be held across await points. + let mut children = Vec::with_capacity(fields.len()); + + for field in fields.iter() { + let cuda_field = field.clone().execute_cuda(ctx).await?; + let (arrow_field, _) = export_canonical(cuda_field, ctx).await?; + children.push(arrow_field); + } + + let mut private_data = PrivateData::new(vec![None], children, ctx)?; + let sync_event: SyncEvent = private_data.sync_event(); + + // Populate the ArrowArray with the child arrays. + let mut arrow_struct = ArrowArray::empty(); + arrow_struct.length = len as i64; + arrow_struct.n_children = fields.len() as i64; + arrow_struct.children = private_data.children.as_mut_ptr(); + + // StructArray _can_ contain a validity buffer. In this case, we just write a null pointer + // for it. + arrow_struct.n_buffers = 1; + arrow_struct.buffers = private_data.buffer_ptrs.as_mut_ptr(); + arrow_struct.release = Some(release_array); + arrow_struct.private_data = Box::into_raw(private_data).cast(); + + Ok((arrow_struct, sync_event)) +} + +/// Export fixed-size array data that owns a single buffer of values. +fn export_fixed_size( + buffer: BufferHandle, + len: usize, + offset: usize, + ctx: &mut CudaExecutionCtx, +) -> VortexResult<(ArrowArray, SyncEvent)> { + vortex_ensure!( + buffer.is_on_device(), + "buffer must already be copied to device before calling" + ); + + // TODO(aduffy): currently the null buffer is always None, in the future we will need + // to pass it. + let mut private_data = PrivateData::new(vec![None, Some(buffer)], vec![], ctx)?; + let sync_event: SyncEvent = private_data.sync_event(); + + // Return a copy of the CudaEvent + let arrow_array = ArrowArray { + length: len as i64, + null_count: 0, + offset: offset as i64, + // 1 (optional) buffer for nulls, one buffer for the data + n_buffers: 2, + buffers: private_data.buffer_ptrs.as_mut_ptr(), + n_children: 0, + children: std::ptr::null_mut(), + release: Some(release_array), + dictionary: std::ptr::null_mut(), + private_data: Box::into_raw(private_data).cast(), + }; + + Ok((arrow_array, sync_event)) +} + +async fn export_variadic( + buffer: Option, + variadic_buffers: Vec, + len: usize, + ctx: &mut CudaExecutionCtx, +) -> VortexResult<(ArrowArray, SyncEvent)> { + let mut buffers = vec![]; + + // push an empty buffer for the nulls. + buffers.push(None); + + if let Some(buf) = buffer { + buffers.push(Some(ensure_device_resident(buf, ctx).await?)); + } + + // We create a new buffer that contains the lengths of the variadic buffers as i64. + let mut variadic_buffer_lens = BufferMut::with_capacity(variadic_buffers.len()); + for buffer in variadic_buffers { + variadic_buffer_lens.push(buffer.len() as i64); + buffers.push(Some(ensure_device_resident(buffer, ctx).await?)); + } + + let variadic_buffer_lens = ensure_device_resident( + BufferHandle::new_host(variadic_buffer_lens.freeze().into_byte_buffer()), + ctx, + ) + .await?; + + buffers.push(Some(variadic_buffer_lens)); + + let mut private_data = PrivateData::new(buffers, vec![], ctx)?; + let sync_event = private_data.sync_event(); + + let arrow_array = ArrowArray { + length: len as i64, + n_buffers: private_data.buffers.len() as i64, + buffers: private_data.buffer_ptrs.as_mut_ptr(), + n_children: 0, + children: std::ptr::null_mut(), + offset: 0, + null_count: 0, + dictionary: std::ptr::null_mut(), + private_data: Box::into_raw(private_data).cast(), + release: Some(release_array), + }; + + Ok((arrow_array, sync_event)) +} + +/// Check that the validity buffer is empty and does not need to be copied over the device boundary. +fn check_validity_empty(validity: Validity) -> VortexResult<()> { + if let Validity::AllInvalid | Validity::Array(_) = validity { + vortex_bail!("Exporting array with non-trivial validity not supported yet") + } + + Ok(()) +} + +async fn ensure_device_resident( + buffer_handle: BufferHandle, + ctx: &mut CudaExecutionCtx, +) -> VortexResult { + if buffer_handle.is_on_device() { + Ok(buffer_handle) + } else { + ctx.move_to_device(buffer_handle)?.await + } +} + +// export some nested data + +unsafe extern "C" fn release_array(array: *mut ArrowArray) { + // SAFETY: this is only safe if we're dropping an ArrowArray that was created from Rust + // code. This is necessary to ensure that the fields inside the CudaPrivateData + // get dropped to free native/GPU memory. + unsafe { + let private_data_ptr = + std::ptr::replace(&raw mut (*array).private_data, std::ptr::null_mut()); + + if !private_data_ptr.is_null() { + let mut private_data = Box::from_raw(private_data_ptr.cast::()); + let children = std::mem::take(&mut private_data.children); + for child in children { + release_array(child); + } + drop(private_data); + } + + // update the release function to NULL to avoid any possibility of double-frees. + (*array).release = None; + } +} diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs new file mode 100644 index 00000000000..6d8b53221a5 --- /dev/null +++ b/vortex-cuda/src/arrow/mod.rs @@ -0,0 +1,216 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! This module implements the Arrow C Device data interface extension for sharing GPU-resident +//! data. +//! +//! This is an extension to the Arrow C Data Interface. +//! +//! More documentation at + +mod canonical; + +use std::ffi::c_void; +use std::fmt::Debug; +use std::ptr::NonNull; +use std::sync::Arc; + +use async_trait::async_trait; +pub(crate) use canonical::CanonicalDeviceArrayExport; +use cudarc::driver::CudaEvent; +use cudarc::driver::CudaStream; +use cudarc::driver::sys; +use cudarc::runtime::sys::cudaEvent_t; +use vortex_array::Array; +use vortex_array::ArrayRef; +use vortex_array::buffer::BufferHandle; +use vortex_error::VortexResult; +use vortex_error::vortex_err; + +use crate::CudaBufferExt; +use crate::CudaExecutionCtx; + +#[derive(Debug, Copy, Clone)] +#[repr(i32)] +pub enum DeviceType { + /// Host-resident data buffer + Cpu = 1, + Cuda = 2, + CudaHost = 3, + // OpenCL = 4, + // Vulkan = 7, + // Metal = 8, + // Vpi = 9, + // Rocm = 10, + // RocmHost = 11, + CudaManaged = 13, + // OneApi = 14, + // WebGPU = 15, + // Hexagon = 16, +} + +/// A (potentially null) pointer to a `cudaEvent_t`. +pub type SyncEvent = Option>; + +/// The C Device data interface representation of an Arrow array. +/// +/// This array contains on-device pointers to Arrow array data, along with a synchronization +/// event that the client must wait on. +#[repr(C)] +#[derive(Debug)] +pub struct ArrowDeviceArray { + array: ArrowArray, + device_id: i64, + device_type: DeviceType, + sync_event: SyncEvent, + + // unused space reserved for future fields + _reserved: [i64; 3], +} + +unsafe impl Send for ArrowDeviceArray {} +unsafe impl Sync for ArrowDeviceArray {} + +/// An FFI-compatible version of the ArrowArray that holds pointers to device buffers. +#[repr(C)] +#[derive(Debug)] +pub(crate) struct ArrowArray { + length: i64, + null_count: i64, + offset: i64, + n_buffers: i64, + n_children: i64, + buffers: *mut sys::CUdeviceptr, + children: *mut *mut ArrowArray, + // NOTE: we don't support exporting dictionary arrays, so we leave this as an opaque pointer. + dictionary: *mut (), + release: Option, + // When exported, this MUST contain everything that is owned by this array. + // for example, any buffer pointed to in `buffers` must be here, as well + // as the `buffers` pointer itself. + // In other words, everything in ArrowArray must be owned by + // `private_data` and can assume that they do not outlive `private_data`. + private_data: *mut c_void, +} + +impl ArrowArray { + #[allow(unused)] + pub fn empty() -> Self { + Self { + length: 0, + null_count: 0, + offset: 0, + n_buffers: 0, + n_children: 0, + buffers: std::ptr::null_mut(), + children: std::ptr::null_mut(), + dictionary: std::ptr::null_mut(), + release: None, + private_data: std::ptr::null_mut(), + } + } +} + +unsafe impl Send for ArrowArray {} +unsafe impl Sync for ArrowArray {} + +#[expect( + unused, + reason = "cuda_stream and cuda_buffers need to have deferred drop" +)] +pub(crate) struct PrivateData { + /// Hold a reference to the CudaStream so that it stays alive even after CudaExecutionCtx + /// has been dropped. + pub(crate) cuda_stream: Arc, + /// The single boxed slice which owns all buffers that the Rust code allocated on the device. + pub(crate) buffers: Box<[Option]>, + /// Boxed slice of buffer pointers. We return a pointer to the start of this allocation over + /// the interface, so we hold it here so the Box contents are not freed. + pub(crate) buffer_ptrs: Box<[sys::CUdeviceptr]>, + pub(crate) cuda_event: CudaEvent, + pub(crate) cuda_event_ptr: cudaEvent_t, + pub(crate) children: Box<[*mut ArrowArray]>, +} + +impl PrivateData { + pub(crate) fn new( + buffers: Vec>, + children: Vec, + ctx: &mut CudaExecutionCtx, + ) -> VortexResult> { + let buffers = buffers.into_boxed_slice(); + let buffer_ptrs: Box<[sys::CUdeviceptr]> = buffers + .iter() + .map(|buf| { + match buf { + None => { + // null pointer + Ok(sys::CUdeviceptr::default()) + } + Some(handle) => handle.cuda_device_ptr(), + } + }) + .collect::>>()? + .into_boxed_slice(); + + let children = children + .into_iter() + .map(|array| Box::into_raw(Box::new(array))) + .collect::>(); + + // generate the synchronization event + let cuda_event = ctx + .stream() + .record_event(None) + .map_err(|_| vortex_err!("failed to create cudaEvent_t"))?; + + Ok(Box::new(Self { + buffers, + buffer_ptrs, + cuda_stream: Arc::clone(ctx.stream()), + children, + cuda_event_ptr: cuda_event.cu_event().cast(), + cuda_event, + })) + } + + pub(crate) fn sync_event(&mut self) -> SyncEvent { + NonNull::new(&raw mut self.cuda_event_ptr) + } +} + +#[async_trait] +pub trait DeviceArrayExt: Array { + async fn export_device_array( + self, + ctx: &mut CudaExecutionCtx, + ) -> VortexResult; +} + +#[async_trait] +impl DeviceArrayExt for ArrayRef { + async fn export_device_array( + self, + ctx: &mut CudaExecutionCtx, + ) -> VortexResult { + let exporter = Arc::clone(ctx.exporter()); + exporter.export_device_array(self, ctx).await + } +} + +/// A type that can convert a Vortex array into an [`ArrowDeviceArray`]. +#[async_trait] +pub trait ExportDeviceArray: Debug + Send + Sync + 'static { + /// Export a Vortex array as an [`ArrowDeviceArray`]. + /// + /// The Arrow Device Array is part of the Arrow C Device data interface extension to the Arrow + /// specification. It enables passing Vortex arrays to other processes that consume Arrow + /// arrays, such as cudf. + /// + /// See . + async fn export_device_array( + &self, + array: ArrayRef, + ctx: &mut CudaExecutionCtx, + ) -> VortexResult; +} diff --git a/vortex-cuda/src/device_buffer.rs b/vortex-cuda/src/device_buffer.rs index 520f4551e96..f1ad5e06af0 100644 --- a/vortex-cuda/src/device_buffer.rs +++ b/vortex-cuda/src/device_buffer.rs @@ -124,8 +124,15 @@ pub trait CudaBufferExt { /// /// # Errors /// - /// Returns an error if the buffer is not on the device. + /// Returns an error if the buffer is not a CUDA buffer. fn cuda_view(&self) -> VortexResult>; + + /// Returns the on-device pointer for the start of the buffer handle. + /// + /// # Errors + /// + /// Returns an error if the buffer is not a CUDA buffer. + fn cuda_device_ptr(&self) -> VortexResult; } impl CudaBufferExt for BufferHandle { @@ -141,6 +148,18 @@ impl CudaBufferExt for BufferHandle { Ok(cuda_buf.as_view::()) } + + fn cuda_device_ptr(&self) -> VortexResult { + let ptr = self + .as_device_opt() + .ok_or_else(|| vortex_err!("Buffer is not on device"))? + .as_any() + .downcast_ref::() + .ok_or_else(|| vortex_err!("expected CudaDeviceBuffer"))? + .device_ptr; + + Ok(ptr) + } } impl Debug for CudaDeviceBuffer { diff --git a/vortex-cuda/src/executor.rs b/vortex-cuda/src/executor.rs index 426d428d163..f5717a94dcd 100644 --- a/vortex-cuda/src/executor.rs +++ b/vortex-cuda/src/executor.rs @@ -27,6 +27,7 @@ use vortex_error::VortexResult; use vortex_error::vortex_err; use crate::CudaSession; +use crate::ExportDeviceArray; use crate::session::CudaSessionExt; use crate::stream::VortexCudaStream; @@ -149,17 +150,22 @@ impl CudaExecutionCtx { } /// See `VortexCudaStream::move_to_device`. - pub fn move_to_device( + pub fn move_to_device( &self, handle: BufferHandle, ) -> VortexResult>> { - self.stream.move_to_device::(handle) + self.stream.move_to_device(handle) } /// Returns a reference to the underlying CUDA stream. pub fn stream(&self) -> &Arc { &self.stream.0 } + + /// Get a handle to the exporter that can convert arrays into `ArrowDeviceArray`. + pub fn exporter(&self) -> &Arc { + self.cuda_session.export_device_array() + } } /// Support trait for CUDA-accelerated decompression of arrays. diff --git a/vortex-cuda/src/kernel/arrays/dict.rs b/vortex-cuda/src/kernel/arrays/dict.rs index e8f10a56d61..a65eef98d04 100644 --- a/vortex-cuda/src/kernel/arrays/dict.rs +++ b/vortex-cuda/src/kernel/arrays/dict.rs @@ -105,13 +105,13 @@ async fn execute_dict_prim_typed(values_buffer)?.await? + ctx.move_to_device(values_buffer)?.await? }; let codes_device = if codes_buffer.is_on_device() { codes_buffer } else { - ctx.move_to_device::(codes_buffer)?.await? + ctx.move_to_device(codes_buffer)?.await? }; // Allocate output buffer on device @@ -204,13 +204,13 @@ async fn execute_dict_decimal_typed< let values_device = if values_buffer.is_on_device() { values_buffer } else { - ctx.move_to_device::(values_buffer)?.await? + ctx.move_to_device(values_buffer)?.await? }; let codes_device = if codes_buffer.is_on_device() { codes_buffer } else { - ctx.move_to_device::(codes_buffer)?.await? + ctx.move_to_device(codes_buffer)?.await? }; // Allocate output buffer on device (codes_len * value_byte_width bytes) diff --git a/vortex-cuda/src/kernel/encodings/alp.rs b/vortex-cuda/src/kernel/encodings/alp.rs index da1e8669420..de4deb61b4f 100644 --- a/vortex-cuda/src/kernel/encodings/alp.rs +++ b/vortex-cuda/src/kernel/encodings/alp.rs @@ -74,7 +74,7 @@ where let device_input: BufferHandle = if buffer.is_on_device() { buffer } else { - ctx.move_to_device::(buffer)?.await? + ctx.move_to_device(buffer)?.await? }; // Get CUDA view of input diff --git a/vortex-cuda/src/kernel/encodings/bitpacked.rs b/vortex-cuda/src/kernel/encodings/bitpacked.rs index c1e905aff77..c9385b6a862 100644 --- a/vortex-cuda/src/kernel/encodings/bitpacked.rs +++ b/vortex-cuda/src/kernel/encodings/bitpacked.rs @@ -83,7 +83,7 @@ where let device_input: BufferHandle = if packed.is_on_device() { packed } else { - ctx.move_to_device::(packed)?.await? + ctx.move_to_device(packed)?.await? }; // Get CUDA view of input diff --git a/vortex-cuda/src/kernel/encodings/for_.rs b/vortex-cuda/src/kernel/encodings/for_.rs index 1a68e2a3d7c..a93cea4d0d5 100644 --- a/vortex-cuda/src/kernel/encodings/for_.rs +++ b/vortex-cuda/src/kernel/encodings/for_.rs @@ -73,7 +73,7 @@ where let device_buffer: BufferHandle = if buffer.is_on_device() { buffer } else { - ctx.move_to_device::

(buffer)?.await? + ctx.move_to_device(buffer)?.await? }; // Get CUDA view of the buffer diff --git a/vortex-cuda/src/kernel/encodings/runend.rs b/vortex-cuda/src/kernel/encodings/runend.rs index f66774f36a0..9ba2601f3a3 100644 --- a/vortex-cuda/src/kernel/encodings/runend.rs +++ b/vortex-cuda/src/kernel/encodings/runend.rs @@ -116,13 +116,13 @@ async fn decode_runend_typed(ends_buffer)?.await? + ctx.move_to_device(ends_buffer)?.await? }; let values_device = if values_buffer.is_on_device() { values_buffer } else { - ctx.move_to_device::(values_buffer)?.await? + ctx.move_to_device(values_buffer)?.await? }; let output_slice = ctx.device_alloc::(output_len)?; diff --git a/vortex-cuda/src/kernel/encodings/zigzag.rs b/vortex-cuda/src/kernel/encodings/zigzag.rs index c6c545e394f..1e4c97263b9 100644 --- a/vortex-cuda/src/kernel/encodings/zigzag.rs +++ b/vortex-cuda/src/kernel/encodings/zigzag.rs @@ -78,7 +78,7 @@ where let device_buffer: BufferHandle = if buffer.is_on_device() { buffer } else { - ctx.move_to_device::(buffer)?.await? + ctx.move_to_device(buffer)?.await? }; // Get CUDA view of the buffer diff --git a/vortex-cuda/src/kernel/filter/mod.rs b/vortex-cuda/src/kernel/filter/mod.rs index 102b9ef34a5..23e12daa167 100644 --- a/vortex-cuda/src/kernel/filter/mod.rs +++ b/vortex-cuda/src/kernel/filter/mod.rs @@ -95,7 +95,7 @@ async fn filter_sized(input)?.await? + ctx.move_to_device(input)?.await? }; // Construct the inputs for the cub::DeviceSelect::Flagged call. diff --git a/vortex-cuda/src/kernel/filter/varbinview.rs b/vortex-cuda/src/kernel/filter/varbinview.rs index f6bc39a7729..5ba1bddd510 100644 --- a/vortex-cuda/src/kernel/filter/varbinview.rs +++ b/vortex-cuda/src/kernel/filter/varbinview.rs @@ -28,7 +28,7 @@ pub(super) async fn filter_varbinview( let d_views = if views.is_on_device() { views } else { - ctx.move_to_device::(views)?.await? + ctx.move_to_device(views)?.await? }; let filtered_views = filter_sized::(d_views, mask, ctx).await?; diff --git a/vortex-cuda/src/kernel/patches/mod.rs b/vortex-cuda/src/kernel/patches/mod.rs index 9922c7bf56d..20dd03a15bb 100644 --- a/vortex-cuda/src/kernel/patches/mod.rs +++ b/vortex-cuda/src/kernel/patches/mod.rs @@ -73,13 +73,13 @@ pub(crate) async fn execute_patches< let d_patch_indices = if indices_buffer.is_on_device() { indices_buffer } else { - ctx.move_to_device::(indices_buffer)?.await? + ctx.move_to_device(indices_buffer)?.await? }; let d_patch_values = if values_buffer.is_on_device() { values_buffer } else { - ctx.move_to_device::(values_buffer)?.await? + ctx.move_to_device(values_buffer)?.await? }; let d_target_view = target.as_view::(); @@ -175,11 +175,7 @@ mod tests { .. } = values.into_parts(); - let handle = ctx - .move_to_device::(cuda_buffer) - .unwrap() - .await - .unwrap(); + let handle = ctx.move_to_device(cuda_buffer).unwrap().await.unwrap(); let device_buf = handle .as_device() .as_any() diff --git a/vortex-cuda/src/lib.rs b/vortex-cuda/src/lib.rs index c186ea687c0..9221b2a1288 100644 --- a/vortex-cuda/src/lib.rs +++ b/vortex-cuda/src/lib.rs @@ -5,6 +5,7 @@ use std::process::Command; +pub mod arrow; mod canonical; mod device_buffer; pub mod executor; @@ -14,6 +15,7 @@ mod session; mod stream; mod stream_pool; +pub use arrow::ExportDeviceArray; pub use canonical::CanonicalCudaExt; pub use device_buffer::CudaBufferExt; pub use device_buffer::CudaDeviceBuffer; diff --git a/vortex-cuda/src/session.rs b/vortex-cuda/src/session.rs index b21901aab18..33233582116 100644 --- a/vortex-cuda/src/session.rs +++ b/vortex-cuda/src/session.rs @@ -12,6 +12,8 @@ use vortex_session::Ref; use vortex_session::SessionExt; use vortex_utils::aliases::dash_map::DashMap; +use crate::ExportDeviceArray; +use crate::arrow::CanonicalDeviceArrayExport; use crate::executor::CudaExecute; pub use crate::executor::CudaExecutionCtx; use crate::kernel::KernelLoader; @@ -29,6 +31,7 @@ const DEFAULT_STREAM_POOL_CAPACITY: usize = 4; pub struct CudaSession { context: Arc, kernels: Arc>, + export_device_array: Arc, kernel_loader: Arc, stream_pool: Arc, } @@ -52,6 +55,7 @@ impl CudaSession { context, kernels: Arc::new(DashMap::default()), kernel_loader: Arc::new(KernelLoader::new()), + export_device_array: Arc::new(CanonicalDeviceArrayExport), stream_pool, } } @@ -116,6 +120,11 @@ impl CudaSession { self.kernel_loader .load_function(module_name, type_suffixes, &self.context) } + + /// Get a handle to the exporter that converts Vortex arrays to `ArrowDeviceArray`. + pub fn export_device_array(&self) -> &Arc { + &self.export_device_array + } } impl Default for CudaSession { diff --git a/vortex-cuda/src/stream.rs b/vortex-cuda/src/stream.rs index 98f1af8d994..fad54b36ab4 100644 --- a/vortex-cuda/src/stream.rs +++ b/vortex-cuda/src/stream.rs @@ -15,7 +15,6 @@ use cudarc::driver::result::stream; use futures::future::BoxFuture; use kanal::Sender; use vortex_array::buffer::BufferHandle; -use vortex_buffer::Buffer; use vortex_error::VortexResult; use vortex_error::vortex_err; @@ -98,7 +97,7 @@ impl VortexCudaStream { /// # Returns /// /// A future that resolves to the device buffer handle when the copy completes. - pub fn move_to_device( + pub fn move_to_device( &self, handle: BufferHandle, ) -> VortexResult>> { @@ -106,8 +105,7 @@ impl VortexCudaStream { .as_host_opt() .ok_or_else(|| vortex_err!("Buffer is not on host"))?; - let buffer: Buffer = Buffer::from_byte_buffer(host_buffer.clone()); - self.copy_to_device(buffer) + self.copy_to_device(host_buffer.clone()) } } diff --git a/vortex-test/e2e-cuda/Cargo.toml b/vortex-test/e2e-cuda/Cargo.toml index c1318c2f898..52eedb4e71e 100644 --- a/vortex-test/e2e-cuda/Cargo.toml +++ b/vortex-test/e2e-cuda/Cargo.toml @@ -12,13 +12,14 @@ repository = { workspace = true } rust-version = { workspace = true } version = { workspace = true } +[lib] +crate-type = ["cdylib"] + [lints] workspace = true [dependencies] -cudarc = { workspace = true } -rstest = { workspace = true } -tokio = { workspace = true, features = ["rt", "macros"] } -vortex-array = { workspace = true, features = ["_test-harness"] } +arrow-schema = { workspace = true, features = ["ffi"] } +futures = { workspace = true, features = ["executor"] } +vortex = { workspace = true } vortex-cuda = { workspace = true, features = ["_test-harness"] } -vortex-error = { workspace = true } diff --git a/vortex-test/e2e-cuda/src/lib.rs b/vortex-test/e2e-cuda/src/lib.rs index 6b479935adb..06e30909d43 100644 --- a/vortex-test/e2e-cuda/src/lib.rs +++ b/vortex-test/e2e-cuda/src/lib.rs @@ -1,4 +1,90 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -//! End-to-end CUDA tests for Vortex. +//! This file is a simple C-compatible API that is called from the cudf-test-harness at CI time. +//! +//! The flow is: +//! +//! * test harness calls `dlopen` in this library +//! * invokes the `export_array` function to get back the device array +//! * pass the arrays to `cudf`'s `from_arrow_device_column` +//! * run some operations on the loaded column view +//! * call `array->release()` to drop the data allocated from the Rust side + +#![allow(clippy::unwrap_used, clippy::expect_used)] + +use std::sync::LazyLock; + +use arrow_schema::ffi::FFI_ArrowSchema; +use futures::executor::block_on; +use vortex::array::Array; +use vortex::array::IntoArray; +use vortex::array::arrays::DecimalArray; +use vortex::array::arrays::PrimitiveArray; +use vortex::array::arrays::StructArray; +use vortex::array::arrays::VarBinViewArray; +use vortex::array::session::ArraySession; +use vortex::array::validity::Validity; +use vortex::dtype::DecimalDType; +use vortex::dtype::FieldNames; +use vortex::expr::session::ExprSession; +use vortex::io::session::RuntimeSession; +use vortex::layout::session::LayoutSession; +use vortex::metrics::VortexMetrics; +use vortex::session::VortexSession; +use vortex_cuda::CudaSession; +use vortex_cuda::arrow::ArrowDeviceArray; +use vortex_cuda::arrow::DeviceArrayExt; + +static SESSION: LazyLock = LazyLock::new(|| { + VortexSession::empty() + .with::() + .with::() + .with::() + .with::() + .with::() + .with::() +}); + +#[unsafe(no_mangle)] +pub extern "C" fn export_array( + schema_ptr: &mut FFI_ArrowSchema, + array_ptr: &mut ArrowDeviceArray, +) -> i32 { + let mut ctx = CudaSession::create_execution_ctx(&SESSION).unwrap(); + + let primitive = PrimitiveArray::from_iter(0u32..1024); + let string = + VarBinViewArray::from_iter_str((0..1024).map(|idx| format!("this is string {idx}"))); + let decimal = DecimalArray::from_iter(0i64..1024, DecimalDType::new(19, 2)); + + let array = StructArray::new( + FieldNames::from_iter(["prims", "strings", "decimals"]), + vec![ + primitive.into_array(), + string.into_array(), + decimal.into_array(), + ], + 1024, + Validity::NonNullable, + ) + .into_array(); + + let data_type = array + .dtype() + .to_arrow_dtype() + .expect("converting schema to Arrow DataType"); + + *schema_ptr = FFI_ArrowSchema::try_from(data_type).expect("data_type to FFI_ArrowSchema"); + + match block_on(array.export_device_array(&mut ctx)) { + Ok(exported) => { + *array_ptr = exported; + 0 + } + Err(err) => { + eprintln!("error in export_device_array: {err}"); + 1 + } + } +}