From 7af4940b7870d4c795638dd9a7305f6a96205a52 Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Tue, 9 Jun 2026 15:58:12 +0000 Subject: [PATCH] feat[gpu]: export binary as Arrow device array binary Signed-off-by: Alexander Droste --- vortex-cuda/Cargo.toml | 4 + vortex-cuda/benches/arrow_binary_cuda.rs | 127 +++++++ vortex-cuda/kernels/src/arrow_binary.cu | 221 +++++++++++++ vortex-cuda/src/arrow/canonical.rs | 402 ++++++++++++++++++++++- vortex-cuda/src/arrow/mod.rs | 1 + vortex-test/e2e-cuda/src/lib.rs | 3 + 6 files changed, 751 insertions(+), 7 deletions(-) create mode 100644 vortex-cuda/benches/arrow_binary_cuda.rs create mode 100644 vortex-cuda/kernels/src/arrow_binary.cu diff --git a/vortex-cuda/Cargo.toml b/vortex-cuda/Cargo.toml index f528ee62d8d..c4c083b97a3 100644 --- a/vortex-cuda/Cargo.toml +++ b/vortex-cuda/Cargo.toml @@ -112,3 +112,7 @@ harness = false [[bench]] name = "arrow_validity_cuda" harness = false + +[[bench]] +name = "arrow_binary_cuda" +harness = false diff --git a/vortex-cuda/benches/arrow_binary_cuda.rs b/vortex-cuda/benches/arrow_binary_cuda.rs new file mode 100644 index 00000000000..bd3154e471a --- /dev/null +++ b/vortex-cuda/benches/arrow_binary_cuda.rs @@ -0,0 +1,127 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! CUDA kernel-time benchmark for Arrow Device export of binary view arrays as Arrow Binary. + +#![expect(clippy::cast_possible_truncation)] + +#[allow(dead_code)] +mod bench_config; +mod timed_launch_strategy; + +use std::sync::Arc; +use std::sync::atomic::Ordering; +use std::time::Duration; + +use criterion::BenchmarkId; +use criterion::Criterion; +use criterion::Throughput; +use futures::executor::block_on; +use vortex::array::ArrayRef; +use vortex::array::IntoArray; +use vortex::array::arrays::VarBinViewArray; +use vortex::array::arrays::varbinview::BinaryView; +use vortex::array::buffer::BufferHandle; +use vortex::array::validity::Validity; +use vortex::buffer::Buffer; +use vortex::buffer::ByteBuffer; +use vortex::dtype::DType; +use vortex::dtype::Nullability; +use vortex::error::VortexExpect; +use vortex::error::VortexResult; +use vortex::session::VortexSession; +use vortex_cuda::CudaExecutionCtx; +use vortex_cuda::CudaSession; +use vortex_cuda::arrow::ArrowDeviceArray; +use vortex_cuda::arrow::DeviceArrayExt; +use vortex_cuda_macros::cuda_available; +use vortex_cuda_macros::cuda_not_available; + +use crate::timed_launch_strategy::TimedLaunchStrategy; + +const BINARY_BENCH_SIZES: &[(usize, &str)] = &[(10_000_000, "10M")]; +const VALUE_BYTES: usize = 16; + +/// Build a non-nullable binary view array of `len` out-of-line values on the device. +async fn out_of_line_binary(len: usize, ctx: &mut CudaExecutionCtx) -> VortexResult { + let values = ByteBuffer::copy_from(vec![b'x'; len * VALUE_BYTES]); + let views = Buffer::from_iter((0..len).map(|idx| { + let offset = idx * VALUE_BYTES; + BinaryView::make_view( + &values.slice(offset..offset + VALUE_BYTES), + 0, + offset as u32, + ) + })); + + let views = ctx + .ensure_on_device(BufferHandle::new_host(views.into_byte_buffer())) + .await?; + let values = ctx.ensure_on_device(BufferHandle::new_host(values)).await?; + + Ok(VarBinViewArray::new_handle( + views, + Arc::from([values]), + DType::Binary(Nullability::NonNullable), + Validity::NonNullable, + ) + .into_array()) +} + +unsafe fn release_arrow_device_array(array: &mut ArrowDeviceArray) { + unsafe { + if let Some(release) = array.array.release { + release(&raw mut array.array); + } + } +} + +fn benchmark_arrow_binary_export(c: &mut Criterion) { + let mut group = c.benchmark_group("cuda"); + + for &(len, len_label) in BINARY_BENCH_SIZES { + // Kernels read views and value bytes and write offsets plus gathered values. + group.throughput(Throughput::Bytes( + (len * (size_of::() + VALUE_BYTES + size_of::())) as u64, + )); + group.bench_with_input( + BenchmarkId::new("cuda/arrow_binary_kernel_time/out_of_line", len_label), + &len, + |b, &len| { + b.iter_custom(|iters| { + let timed = TimedLaunchStrategy::default(); + let timer = timed.timer(); + + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context") + .with_launch_strategy(Arc::new(timed)); + let array = block_on(out_of_line_binary(len, &mut cuda_ctx)) + .vortex_expect("failed to create binary fixture"); + + for _ in 0..iters { + let mut exported = + block_on(array.clone().export_device_array(&mut cuda_ctx)) + .vortex_expect("failed to export device array"); + unsafe { release_arrow_device_array(&mut exported) }; + } + + Duration::from_nanos(timer.load(Ordering::Relaxed)) + }); + }, + ); + } + + group.finish(); +} + +criterion::criterion_group! { + name = benches; + config = bench_config::cuda_bench_config(); + targets = benchmark_arrow_binary_export +} + +#[cuda_available] +criterion::criterion_main!(benches); + +#[cuda_not_available] +fn main() {} diff --git a/vortex-cuda/kernels/src/arrow_binary.cu b/vortex-cuda/kernels/src/arrow_binary.cu new file mode 100644 index 00000000000..ae9e2fa8ecb --- /dev/null +++ b/vortex-cuda/kernels/src/arrow_binary.cu @@ -0,0 +1,221 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#include "config.cuh" + +#include +#include + +namespace { + +constexpr uint32_t MAX_INLINED_SIZE = 12; + +struct BinaryView { + uint32_t size; + uint8_t inline_data[MAX_INLINED_SIZE]; +}; + +struct BinaryViewRef { + uint32_t size; + uint8_t prefix[4]; + uint32_t buffer_index; + uint32_t offset; +}; + +// Return whether a row is valid in a little-endian Arrow/Vortex bitmap, treating a missing (null) +// validity bitmap as all-valid. +__device__ bool is_valid(const uint8_t *const validity, uint64_t idx) { + return validity == nullptr || ((validity[idx / 8] >> (idx % 8)) & 1); +} + +// Initialize scan input from BinaryView sizes. Null rows contribute zero bytes so the gather kernel +// never needs to read their view payload. +// +// Threads stride by blockDim within the block's element range so warp accesses to views and scan +// stay coalesced. +__device__ void init_scan_device(const BinaryView *const __restrict views, + const uint8_t *const __restrict validity, + const uint64_t *const __restrict data_buffer_lens, + int32_t *const __restrict scan, + uint32_t *const status, + uint64_t data_buffer_count, + uint64_t len) { + const uint64_t scan_len = len + 1; + const uint64_t elements_per_block = blockDim.x * ELEMENTS_PER_THREAD; + const uint64_t block_start = blockIdx.x * elements_per_block; + const uint64_t block_stop = min(block_start + elements_per_block, scan_len); + + for (uint64_t idx = block_start + threadIdx.x; idx < block_stop; idx += blockDim.x) { + if (idx >= len || !is_valid(validity, idx)) { + scan[idx] = 0; + continue; + } + + const BinaryView view = views[idx]; + const uint32_t size = view.size; + if (size > static_cast(INT32_MAX)) { + scan[idx] = 0; + atomicMax(status, 2u); + continue; + } + + if (size > MAX_INLINED_SIZE) { + const BinaryViewRef *const view_ref = reinterpret_cast(&view); + const uint64_t buffer_index = static_cast(view_ref->buffer_index); + const uint64_t offset = static_cast(view_ref->offset); + const uint64_t end = offset + static_cast(size); + if (buffer_index >= data_buffer_count || end < offset || end > data_buffer_lens[buffer_index]) { + scan[idx] = 0; + atomicMax(status, 1u); + continue; + } + } + + scan[idx] = static_cast(size); + } +} + +// Detect i32 overflow of the CUB exclusive-sum offsets by checking signs. init_scan rejects +// per-row sizes above i32::MAX, so consecutive true prefix sums differ by less than 2^31 and the +// first overflowing prefix lands in [2^31, 2^32), which wraps to a negative offset in the scan +// output. No negative offset therefore proves no prefix overflowed. +__device__ void +validate_offsets_device(const int32_t *const __restrict offsets, uint32_t *const status, uint64_t scan_len) { + const uint64_t elements_per_block = blockDim.x * ELEMENTS_PER_THREAD; + const uint64_t block_start = blockIdx.x * elements_per_block; + const uint64_t block_stop = min(block_start + elements_per_block, scan_len); + + for (uint64_t idx = block_start + threadIdx.x; idx < block_stop; idx += blockDim.x) { + if (offsets[idx] < 0) { + atomicMax(status, 2u); + } + } +} + +__device__ uint64_t upper_bound_offsets(const int32_t *const offsets, uint64_t len, uint64_t value) { + uint64_t first = 0; + while (len > 0) { + const uint64_t half = len / 2; + const uint64_t mid = first + half; + if (static_cast(offsets[mid]) <= value) { + first = mid + 1; + len -= half + 1; + } else { + len = half; + } + } + return first; +} + +// Resolve the payload pointer for one view, pointing inline views at their global view bytes. +__device__ const uint8_t * +input_ptr(const BinaryView *const views, uint64_t row, const uint64_t *const data_buffer_ptrs) { + const BinaryView *const view = views + row; + if (view->size <= MAX_INLINED_SIZE) { + return view->inline_data; + } + + const BinaryViewRef *const view_ref = reinterpret_cast(view); + return reinterpret_cast(data_buffer_ptrs[view_ref->buffer_index]) + view_ref->offset; +} + +// Copy BinaryView payload bytes into one contiguous Arrow Binary values buffer. +// +// Each thread owns a contiguous ELEMENTS_PER_THREAD output byte range, so the row lookup runs once +// per range and then advances sequentially. Byte stores from such strided ranges waste almost the +// whole memory transaction, so threads stage 16 output bytes in registers and write them with one +// vector store. Full chunks start at a multiple of ELEMENTS_PER_THREAD, keeping the stores aligned. +__device__ void gather_device(const BinaryView *const __restrict views, + const uint64_t *const __restrict data_buffer_ptrs, + const int32_t *const __restrict offsets, + uint8_t *const __restrict output, + uint64_t len, + uint64_t total_bytes) { + const uint64_t worker = blockIdx.x * blockDim.x + threadIdx.x; + const uint64_t start = start_elem(worker, total_bytes); + const uint64_t stop = stop_elem(worker, total_bytes); + if (start == stop) { + return; + } + + uint64_t row = upper_bound_offsets(offsets, len + 1, start) - 1; + uint64_t row_start = static_cast(offsets[row]); + uint64_t row_end = static_cast(offsets[row + 1]); + const uint8_t *input = input_ptr(views, row, data_buffer_ptrs); + + const auto next_byte = [&](uint64_t byte_idx) -> uint8_t { + while (byte_idx >= row_end) { + row++; + row_start = static_cast(offsets[row]); + row_end = static_cast(offsets[row + 1]); + input = input_ptr(views, row, data_buffer_ptrs); + } + return input[byte_idx - row_start]; + }; + + uint64_t byte_idx = start; + for (; byte_idx + 16 <= stop; byte_idx += 16) { + while (byte_idx >= row_end) { + row++; + row_start = static_cast(offsets[row]); + row_end = static_cast(offsets[row + 1]); + input = input_ptr(views, row, data_buffer_ptrs); + } + + // Fast path: the group sits inside one row and the source allows word loads. + const uint8_t *const src = input + (byte_idx - row_start); + if (byte_idx + 16 <= row_end && (reinterpret_cast(src) & 3) == 0) { + const uint32_t *const src_words = reinterpret_cast(src); + *reinterpret_cast(output + byte_idx) = + make_uint4(src_words[0], src_words[1], src_words[2], src_words[3]); + continue; + } + + uint32_t words[4]; +#pragma unroll + for (uint32_t word = 0; word < 4; word++) { + uint32_t value = 0; +#pragma unroll + for (uint32_t byte = 0; byte < 4; byte++) { + const uint64_t lane = byte_idx + word * 4 + byte; + value |= static_cast(next_byte(lane)) << (byte * 8); + } + words[word] = value; + } + *reinterpret_cast(output + byte_idx) = make_uint4(words[0], words[1], words[2], words[3]); + } + + for (; byte_idx < stop; byte_idx++) { + output[byte_idx] = next_byte(byte_idx); + } +} + +} // namespace + +// Fill the CUB scan input with per-row binary lengths plus a final zero sentinel. A null validity +// pointer marks an all-valid array. +extern "C" __global__ void arrow_binary_init_scan(const BinaryView *const views, + const uint8_t *const validity, + const uint64_t *const data_buffer_lens, + int32_t *const scan, + uint32_t *const status, + uint64_t data_buffer_count, + uint64_t len) { + init_scan_device(views, validity, data_buffer_lens, scan, status, data_buffer_count, len); +} + +// Check that the scanned Arrow Binary offsets never overflowed the i32 range. +extern "C" __global__ void +arrow_binary_validate_offsets(const int32_t *const offsets, uint32_t *const status, uint64_t scan_len) { + validate_offsets_device(offsets, status, scan_len); +} + +// Gather inline and referenced BinaryView payloads into Arrow Binary's contiguous values buffer. +extern "C" __global__ void arrow_binary_gather(const BinaryView *const views, + const uint64_t *const data_buffer_ptrs, + const int32_t *const offsets, + uint8_t *const output, + uint64_t len, + uint64_t total_bytes) { + gather_device(views, data_buffer_ptrs, offsets, output, len, total_bytes); +} diff --git a/vortex-cuda/src/arrow/canonical.rs b/vortex-cuda/src/arrow/canonical.rs index d5924819ef9..0c9f28cc325 100644 --- a/vortex-cuda/src/arrow/canonical.rs +++ b/vortex-cuda/src/arrow/canonical.rs @@ -7,9 +7,11 @@ use std::ptr; use std::sync::Arc; use async_trait::async_trait; +use cudarc::driver::CudaSlice; use cudarc::driver::DeviceRepr; use cudarc::driver::LaunchConfig; use cudarc::driver::PushKernelArg; +use cudarc::driver::result as cuda_driver; use futures::future::BoxFuture; use vortex::array::ArrayRef; use vortex::array::Canonical; @@ -25,6 +27,7 @@ use vortex::array::arrays::ListViewArray; use vortex::array::arrays::PrimitiveArray; use vortex::array::arrays::Struct; use vortex::array::arrays::StructArray; +use vortex::array::arrays::VarBinViewArray; use vortex::array::arrays::bool::BoolDataParts; use vortex::array::arrays::decimal::DecimalDataParts; use vortex::array::arrays::dict::DictOwnedExt; @@ -66,6 +69,7 @@ use crate::arrow::SyncEvent; use crate::arrow::arrow_device_export_dictionary_codes_dtype; use crate::arrow::cuda_decimal_value_type; use crate::arrow::list_view::export_device_list_view; +use crate::cub::exclusive_sum_i32; use crate::executor::CudaArrayExt; /// An implementation of `ExportDeviceArray` that exports Vortex arrays to `ArrowDeviceArray` by @@ -206,6 +210,10 @@ fn export_canonical( export_fixed_size_list(fixed_size_list, ctx).await } Canonical::VarBinView(varbinview) => { + if matches!(varbinview.dtype(), DType::Binary(_)) { + return export_binary(varbinview, ctx).await; + } + let len = varbinview.len(); let VarBinViewDataParts { views, @@ -438,6 +446,236 @@ where Ok(BufferHandle::new_device(Arc::new(output_device))) } +/// Export Vortex binary views as standard Arrow `Binary`. +/// +/// cuDF imports Arrow `Binary` through the Arrow Device path, but does not currently accept +/// Arrow `BinaryView`. This path keeps conversion on the CUDA stream by building `i32` offsets +/// from view sizes and gathering inline/out-of-line view bytes into one contiguous values buffer. +async fn export_binary( + varbinview: VarBinViewArray, + ctx: &mut CudaExecutionCtx, +) -> VortexResult<(ArrowArray, SyncEvent)> { + let len = varbinview.len(); + let VarBinViewDataParts { + views, + buffers: data_buffers, + validity, + .. + } = varbinview.into_data_parts(); + + let (validity_buffer, null_count) = export_arrow_validity_buffer(validity, len, 0, ctx).await?; + let views = ctx.ensure_on_device(views).await?; + let (offsets, values) = + export_binary_buffers(&views, &data_buffers, validity_buffer.as_ref(), len, ctx).await?; + + let buffers = vec![validity_buffer, Some(offsets), Some(values)]; + + let mut private_data = PrivateData::new(buffers, vec![], ctx)?; + let sync_event = private_data.sync_event(); + let arrow_array = ArrowArray { + length: len as i64, + null_count, + offset: 0, + // Arrow Binary layout: optional null bitmap, i32 offsets, contiguous bytes. + n_buffers: 3, + buffers: private_data.buffer_ptrs.as_mut_ptr(), + n_children: 0, + children: ptr::null_mut(), + release: Some(release_array), + dictionary: ptr::null_mut(), + private_data: Box::into_raw(private_data).cast(), + }; + + Ok((arrow_array, sync_event)) +} + +/// Build Arrow Binary offsets and values from VarBinView buffers on the active CUDA stream. +async fn export_binary_buffers( + views: &BufferHandle, + data_buffers: &[BufferHandle], + validity: Option<&BufferHandle>, + len: usize, + ctx: &mut CudaExecutionCtx, +) -> VortexResult<(BufferHandle, BufferHandle)> { + let mut device_data_buffers = Vec::with_capacity(data_buffers.len()); + for buffer in data_buffers { + device_data_buffers.push(ctx.ensure_on_device(buffer.clone()).await?); + } + + let mut ptr_values = Vec::with_capacity(device_data_buffers.len()); + let mut len_values = Vec::with_capacity(device_data_buffers.len()); + for buffer in &device_data_buffers { + ptr_values.push(buffer.cuda_device_ptr()?); + len_values.push(u64::try_from(buffer.len())?); + } + if device_data_buffers.is_empty() { + // Kernels never dereference these when data_buffer_count is zero, but the arguments + // still need a real device allocation. + ptr_values.push(0); + len_values.push(0); + } + let data_buffer_ptrs = device_buffer_from(ptr_values, ctx).await?; + let data_buffer_lens = device_buffer_from(len_values, ctx).await?; + let status = device_buffer_from(vec![0u32], ctx).await?; + + let scan_input = init_binary_scan( + views, + validity, + &data_buffer_lens, + device_data_buffers.len(), + &status, + len, + ctx, + )?; + let output_offsets = BufferHandle::new_device(Arc::new(CudaDeviceBuffer::new( + exclusive_sum_i32(&scan_input, len + 1, ctx)?, + ))); + validate_binary_offsets(&output_offsets, len, &status, ctx)?; + + // One status read covers init_scan and offset validation. Both must pass before gather may + // dereference view payloads through the scanned offsets. + check_binary_status(&status).await?; + + let total_bytes = total_binary_bytes(&output_offsets, len).await?; + let output_values = gather_binary_values( + views, + &data_buffer_ptrs, + &output_offsets, + total_bytes, + len, + ctx, + )?; + + Ok((output_offsets, output_values)) +} + +/// Copy a small host vector to a device-resident buffer. +async fn device_buffer_from( + values: Vec, + ctx: &mut CudaExecutionCtx, +) -> VortexResult +where + Buffer: From>, +{ + ctx.ensure_on_device(BufferHandle::new_host( + Buffer::from(values).into_byte_buffer(), + )) + .await +} + +async fn check_binary_status(status: &BufferHandle) -> VortexResult<()> { + match Buffer::::from_byte_buffer(status.try_to_host()?.await?)[0] { + 0 => Ok(()), + 1 => vortex_bail!( + "cannot export BinaryView as Arrow Binary: a view references an invalid data buffer" + ), + 2 => vortex_bail!( + "cannot export BinaryView as Arrow Binary: offsets exceed i32 range required by Arrow Binary" + ), + status => vortex_bail!("unexpected Arrow Binary export status {status}"), + } +} + +fn init_binary_scan( + views: &BufferHandle, + validity: Option<&BufferHandle>, + data_buffer_lens: &BufferHandle, + data_buffer_count: usize, + status: &BufferHandle, + len: usize, + ctx: &mut CudaExecutionCtx, +) -> VortexResult> { + let scan_len = len + 1; + let views_view = views.cuda_view::()?; + // A null pointer signals an all-valid array to the kernel. + let validity_ptr = validity + .map(|v| v.cuda_device_ptr()) + .transpose()? + .unwrap_or(0); + let lens_view = data_buffer_lens.cuda_view::()?; + let status_view = status.cuda_view::()?; + let data_buffer_count_u64 = data_buffer_count as u64; + let len_u64 = len as u64; + let scan_input = ctx.device_alloc::(scan_len)?; + let kernel = ctx.load_function_with_suffixes("arrow_binary", &["init_scan"])?; + + ctx.launch_kernel(&kernel, scan_len, |args| { + args.arg(&views_view) + .arg(&validity_ptr) + .arg(&lens_view) + .arg(&scan_input) + .arg(&status_view) + .arg(&data_buffer_count_u64) + .arg(&len_u64); + })?; + + Ok(scan_input) +} + +/// Flag scanned offsets that overflowed i32. A negative offset is proof of overflow because +/// init_scan caps every row size at `i32::MAX`, so the first overflowing prefix sum always wraps +/// into the negative range. +fn validate_binary_offsets( + offsets: &BufferHandle, + len: usize, + status: &BufferHandle, + ctx: &mut CudaExecutionCtx, +) -> VortexResult<()> { + let scan_len = len + 1; + let offsets_view = offsets.cuda_view::()?; + let status_view = status.cuda_view::()?; + let scan_len_u64 = scan_len as u64; + let kernel = ctx.load_function_with_suffixes("arrow_binary", &["validate_offsets"])?; + + ctx.launch_kernel(&kernel, scan_len, |args| { + args.arg(&offsets_view).arg(&status_view).arg(&scan_len_u64); + }) +} + +async fn total_binary_bytes(offsets: &BufferHandle, len: usize) -> VortexResult { + let total = Buffer::::from_byte_buffer( + offsets + .slice_typed::(len..len + 1) + .try_to_host()? + .await?, + )[0]; + usize::try_from(total).map_err(Into::into) +} + +fn gather_binary_values( + views: &BufferHandle, + data_buffer_ptrs: &BufferHandle, + offsets: &BufferHandle, + total_bytes: usize, + len: usize, + ctx: &mut CudaExecutionCtx, +) -> VortexResult { + let output_values = ctx.device_alloc::(total_bytes.max(1))?; + + if total_bytes != 0 { + let views_view = views.cuda_view::()?; + let ptrs_view = data_buffer_ptrs.cuda_view::()?; + let offsets_view = offsets.cuda_view::()?; + let len_u64 = len as u64; + let total_bytes_u64 = total_bytes as u64; + let kernel = ctx.load_function_with_suffixes("arrow_binary", &["gather"])?; + + ctx.launch_kernel(&kernel, total_bytes, |args| { + args.arg(&views_view) + .arg(&ptrs_view) + .arg(&offsets_view) + .arg(&output_values) + .arg(&len_u64) + .arg(&total_bytes_u64); + })?; + } + + Ok( + BufferHandle::new_device(Arc::new(CudaDeviceBuffer::new(output_values))) + .slice(0..total_bytes), + ) +} + /// Export Vortex validity as an Arrow validity byte buffer on the CUDA device. /// /// Returns `None` for the buffer when Arrow can omit validity because all rows are valid. @@ -853,6 +1091,13 @@ unsafe extern "C" fn release_array(array: *mut ArrowArray) { if !private_data_ptr.is_null() { let mut private_data = Box::from_raw(private_data_ptr.cast::()); + // Release may run on a foreign thread; bind this array's context before synchronizing + // so async frees cannot race consumer-side reads. + let cuda_context = Arc::clone(private_data.cuda_stream.context()); + match cuda_context.bind_to_thread() { + Ok(()) => cuda_context.record_err(cuda_driver::ctx::synchronize()), + Err(err) => cuda_context.record_err(Err::<(), _>(err)), + } release_children(&mut private_data); release_dictionary(&mut private_data); } @@ -1164,6 +1409,47 @@ mod tests { .collect()) } + fn private_data_buffer_bytes( + array: &ArrowArray, + buffer_idx: usize, + ) -> VortexResult { + let private_data = unsafe { &*array.private_data.cast::() }; + let buffer = private_data.buffers[buffer_idx] + .as_ref() + .vortex_expect("buffer should be present"); + Ok(buffer.to_host_sync()) + } + + // Assert Arrow Binary export uses the standard null bitmap, i32 offsets, and values layout. + fn assert_binary_layout( + array: &ArrowArray, + expected_len: i64, + expected_null_count: i64, + expected_offsets: &[i32], + expected_values: &[u8], + ) -> VortexResult<()> { + assert_eq!(array.length, expected_len); + assert_eq!(array.null_count, expected_null_count); + assert_eq!(array.offset, 0); + assert_eq!(array.n_buffers, 3); + assert_eq!(array.n_children, 0); + assert!(array.release.is_some()); + assert!(!array.private_data.is_null()); + + let buffers = + unsafe { std::slice::from_raw_parts(array.buffers, usize::try_from(array.n_buffers)?) }; + assert_eq!(buffers[0].is_null(), expected_null_count == 0); + assert!(!buffers[1].is_null()); + assert!(!buffers[2].is_null()); + assert_eq!(private_data_buffer_i32_values(array, 1)?, expected_offsets); + assert_eq!( + private_data_buffer_bytes(array, 2)?.as_ref(), + expected_values + ); + + Ok(()) + } + fn assert_exported_decimal_values( value_buffer: &BufferHandle, expected: &[T], @@ -1637,7 +1923,7 @@ mod tests { } #[crate::test] - async fn test_export_binaryview_inline_outline_values() -> VortexResult<()> { + async fn test_export_binary_inline_outline_values() -> VortexResult<()> { let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) .vortex_expect("failed to create execution context"); @@ -1653,14 +1939,98 @@ mod tests { let mut exported = array.export_device_array_with_schema(&mut ctx).await?; let field = Field::try_from(&exported.schema)?; - assert_eq!(field, Field::new("", DataType::BinaryView, true)); - assert_varbinview_layout(&exported.array.array, 5, 1, &[out_of_line.len()])?; + assert_eq!(field, Field::new("", DataType::Binary, true)); + assert_binary_layout( + &exported.array.array, + 5, + 1, + &[0, 0, 3, 3, 8, i32::try_from(8 + out_of_line.len())?], + &[b"\x00\xff\xfe".as_slice(), b"short", out_of_line].concat(), + )?; assert_eq!(exported.array.device_type, ARROW_DEVICE_CUDA); unsafe { release_exported_array(&raw mut exported.array.array) }; Ok(()) } + #[crate::test] + async fn test_export_binary_empty_and_all_null() -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let empty = VarBinViewArray::from_iter_nullable_bin(std::iter::empty::>()) + .into_array(); + let mut exported = empty.export_device_array_with_schema(&mut ctx).await?; + let field = Field::try_from(&exported.schema)?; + assert_eq!(field, Field::new("", DataType::Binary, true)); + assert_binary_layout(&exported.array.array, 0, 0, &[0], b"")?; + assert_eq!(exported.array.device_type, ARROW_DEVICE_CUDA); + unsafe { release_exported_array(&raw mut exported.array.array) }; + + let all_null = + VarBinViewArray::from_iter_nullable_bin([None::<&[u8]>, None::<&[u8]>]).into_array(); + let mut exported = all_null.export_device_array_with_schema(&mut ctx).await?; + let field = Field::try_from(&exported.schema)?; + assert_eq!(field, Field::new("", DataType::Binary, true)); + assert_binary_layout(&exported.array.array, 2, 2, &[0, 0, 0], b"")?; + assert_eq!(exported.array.device_type, ARROW_DEVICE_CUDA); + unsafe { release_exported_array(&raw mut exported.array.array) }; + + Ok(()) + } + + #[crate::test] + async fn test_export_binary_invalid_data_buffer_ref_errors() -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let view = BinaryView::make_view(b"this references a missing data buffer", 0, 0); + let array = VarBinViewArray::new_handle( + BufferHandle::new_host(Buffer::from_iter([view]).into_byte_buffer()), + Arc::from([]), + DType::Binary(Nullability::NonNullable), + Validity::NonNullable, + ) + .into_array(); + + let err = array + .export_device_array_with_schema(&mut ctx) + .await + .expect_err("missing binary data buffer should fail"); + assert!( + err.to_string() + .contains("a view references an invalid data buffer") + ); + + Ok(()) + } + + #[crate::test] + async fn test_export_binary_i32_offset_overflow_errors() -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let view = BinaryView::new_ref(i32::MAX as u32 + 1, [0; 4], 0, 0); + let array = VarBinViewArray::new_handle( + BufferHandle::new_host(Buffer::from_iter([view]).into_byte_buffer()), + Arc::from([]), + DType::Binary(Nullability::NonNullable), + Validity::NonNullable, + ) + .into_array(); + + let err = array + .export_device_array_with_schema(&mut ctx) + .await + .expect_err("oversized binary value should fail Arrow Binary export"); + assert!( + err.to_string() + .contains("offsets exceed i32 range required by Arrow Binary") + ); + + Ok(()) + } + #[crate::test] async fn test_export_list() -> VortexResult<()> { let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) @@ -1863,7 +2233,7 @@ mod tests { )] #[case::binary( multi_buffer_varbinview(DType::Binary(Nullability::NonNullable)), - DataType::BinaryView + DataType::Binary )] #[crate::test] async fn test_export_varbinview_multiple_variadic_buffers( @@ -1877,8 +2247,19 @@ mod tests { let mut exported = array.export_device_array_with_schema(&mut ctx).await?; let field = Field::try_from(&exported.schema)?; + let is_binary = expected_data_type == DataType::Binary; assert_eq!(field, Field::new("", expected_data_type, false)); - assert_varbinview_layout(&exported.array.array, 3, 0, &expected_data_buffer_lengths)?; + if is_binary { + assert_binary_layout( + &exported.array.array, + 3, + 0, + &[0, 6, 36, 67], + b"inlinefirst value stored out-of-linesecond value stored out-of-line", + )?; + } else { + assert_varbinview_layout(&exported.array.array, 3, 0, &expected_data_buffer_lengths)?; + } assert_eq!(exported.array.device_type, ARROW_DEVICE_CUDA); unsafe { release_exported_array(&raw mut exported.array.array) }; @@ -2288,8 +2669,15 @@ mod tests { .slice(1..4)?; let mut exported = binary.export_device_array_with_schema(&mut ctx).await?; let field = Field::try_from(&exported.schema)?; - assert_eq!(field, Field::new("", DataType::BinaryView, true)); - assert_varbinview_shape(&exported.array.array, 3, 1)?; + assert_eq!(field, Field::new("", DataType::Binary, true)); + let sliced_out_of_line = b"this out-of-line binary value remains in the slice"; + assert_binary_layout( + &exported.array.array, + 3, + 1, + &[0, 0, 2, i32::try_from(2 + sliced_out_of_line.len())?], + &[b"\x00\xff".as_slice(), sliced_out_of_line].concat(), + )?; assert_eq!(exported.array.device_type, ARROW_DEVICE_CUDA); unsafe { release_exported_array(&raw mut exported.array.array) }; diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs index 6bef6352fcb..5bba90f03a9 100644 --- a/vortex-cuda/src/arrow/mod.rs +++ b/vortex-cuda/src/arrow/mod.rs @@ -405,6 +405,7 @@ fn arrow_device_export_field( .to_arrow_field(name.as_ref(), dtype)?; let data_type = match dtype { + DType::Binary(_) => DataType::Binary, DType::Decimal(decimal_dtype, _) => arrow_device_export_decimal_data_type(*decimal_dtype), DType::Struct(struct_dtype, _) => { DataType::Struct(arrow_device_export_struct_fields(struct_dtype, ctx)?.into()) diff --git a/vortex-test/e2e-cuda/src/lib.rs b/vortex-test/e2e-cuda/src/lib.rs index fe07d71726d..7fa001c9639 100644 --- a/vortex-test/e2e-cuda/src/lib.rs +++ b/vortex-test/e2e-cuda/src/lib.rs @@ -242,6 +242,9 @@ fn export_array_inner(schema_ptr: &mut FFI_ArrowSchema, array_ptr: &mut ArrowDev "strings", "sliced_utf8", "multi_buffer_utf8", + // Arrow Binary is intentionally omitted from the cuDF harness for now: cuDF's + // Arrow Device import path rejects NANOARROW_TYPE_BINARY, and treating arbitrary + // bytes as strings would be semantically incorrect. "dates", "dictionary", "lists",