From 6ce4bd77e3eebf7d7b39689b2b5aed56750d85fc Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Tue, 9 Jun 2026 14:56:56 +0000 Subject: [PATCH 1/3] test[gpu]: cover sliced utf8 Arrow device export Add cuDF e2e coverage for sliced and multi-buffer Utf8View arrays, including non-ASCII values and sliced null validity. Keep bit-offset validity repacking on the CUDA stream for Arrow Device export, with focused tests and a CUDA benchmark for the repack path. Signed-off-by: Alexander Droste --- vortex-cuda/Cargo.toml | 4 + vortex-cuda/benches/arrow_validity_cuda.rs | 87 ++++++++ vortex-cuda/kernels/src/arrow_validity.cu | 64 ++++++ vortex-cuda/src/arrow/canonical.rs | 223 +++++++++++++++++++-- vortex-cuda/src/arrow/mod.rs | 29 +++ vortex-cuda/src/device_buffer.rs | 21 ++ vortex-test/e2e-cuda/src/lib.rs | 32 ++- 7 files changed, 441 insertions(+), 19 deletions(-) create mode 100644 vortex-cuda/benches/arrow_validity_cuda.rs create mode 100644 vortex-cuda/kernels/src/arrow_validity.cu diff --git a/vortex-cuda/Cargo.toml b/vortex-cuda/Cargo.toml index 25241cc9a02..f528ee62d8d 100644 --- a/vortex-cuda/Cargo.toml +++ b/vortex-cuda/Cargo.toml @@ -108,3 +108,7 @@ harness = false [[bench]] name = "list_view_cuda" harness = false + +[[bench]] +name = "arrow_validity_cuda" +harness = false diff --git a/vortex-cuda/benches/arrow_validity_cuda.rs b/vortex-cuda/benches/arrow_validity_cuda.rs new file mode 100644 index 00000000000..a26e9f1a332 --- /dev/null +++ b/vortex-cuda/benches/arrow_validity_cuda.rs @@ -0,0 +1,87 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! CUDA benchmarks for Arrow validity bitmap repacking. + +mod bench_config; +mod timed_launch_strategy; + +use std::sync::Arc; +use std::sync::atomic::Ordering; +use std::time::Duration; + +use criterion::BenchmarkId; +use criterion::Criterion; +use criterion::Throughput; +use futures::executor::block_on; +use vortex::array::buffer::BufferHandle; +use vortex::buffer::BitBuffer; +use vortex::error::VortexExpect; +use vortex::session::VortexSession; +use vortex_cuda::CudaSession; +use vortex_cuda::arrow::test_harness; +use vortex_cuda_macros::cuda_available; +use vortex_cuda_macros::cuda_not_available; + +use crate::timed_launch_strategy::TimedLaunchStrategy; + +const INPUT_OFFSET: usize = 5; +const ARROW_OFFSET: usize = 3; + +fn benchmark_arrow_validity_repack(c: &mut Criterion) { + let mut group = c.benchmark_group("cuda"); + + for &(len, len_label) in bench_config::BENCH_SIZES { + group.throughput(Throughput::Elements(len as u64)); + group.bench_with_input( + BenchmarkId::new("cuda/arrow_validity/repack", len_label), + &len, + |b, &len| { + b.iter_custom(|iters| { + let timed = TimedLaunchStrategy::default(); + let timer = timed.timer(); + + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context") + .with_launch_strategy(Arc::new(timed)); + let source = BitBuffer::collect_bool(len + INPUT_OFFSET, |idx| idx % 3 != 0); + let sliced = source.slice(INPUT_OFFSET..INPUT_OFFSET + len); + let (input_offset, _, input_buffer) = sliced.into_inner(); + let input_buffer = + block_on(cuda_ctx.ensure_on_device(BufferHandle::new_host(input_buffer))) + .vortex_expect("failed to copy validity input to device"); + let output_bytes = (len + ARROW_OFFSET).div_ceil(8); + + for _ in 0..iters { + let output = test_harness::repack_arrow_validity_buffer( + &input_buffer, + input_offset, + len, + ARROW_OFFSET, + output_bytes, + &mut cuda_ctx, + ) + .vortex_expect("failed to repack Arrow validity"); + std::hint::black_box(output); + } + + Duration::from_nanos(timer.load(Ordering::Relaxed)) + }); + }, + ); + } + + group.finish(); +} + +criterion::criterion_group! { + name = benches; + config = bench_config::cuda_bench_config(); + targets = benchmark_arrow_validity_repack +} + +#[cuda_available] +criterion::criterion_main!(benches); + +#[cuda_not_available] +fn main() {} diff --git a/vortex-cuda/kernels/src/arrow_validity.cu b/vortex-cuda/kernels/src/arrow_validity.cu new file mode 100644 index 00000000000..faa424efa14 --- /dev/null +++ b/vortex-cuda/kernels/src/arrow_validity.cu @@ -0,0 +1,64 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#include "config.cuh" + +#include + +namespace { + +// Read one validity bit from a little-endian Arrow/Vortex bitmap. +__device__ bool get_bit(const uint8_t *const input, uint64_t bit_idx) { + return (input[bit_idx / 8] >> (bit_idx % 8)) & 1; +} + +// Rebuild a possibly bit-offset Vortex validity bitmap into an Arrow-compatible bitmap. +// +// `input_offset` is the bit offset into `input`; `arrow_offset` is the logical Arrow array offset +// to preserve in the output. Bits outside `[arrow_offset, arrow_offset + len)` are left unset. +__device__ void arrow_validity_repack_device(const uint8_t *const input, + uint8_t *const output, + uint64_t len, + uint64_t input_offset, + uint64_t arrow_offset, + uint64_t validity_bits) { + // One worker owns a contiguous byte range. Each byte is rebuilt locally so there are no + // cross-thread bit writes or atomics. + const uint64_t worker = blockIdx.x * blockDim.x + threadIdx.x; + const uint64_t output_bytes = (validity_bits + 7) / 8; + const uint64_t start_byte = start_elem(worker, output_bytes); + const uint64_t stop_byte = stop_elem(worker, output_bytes); + + for (uint64_t byte_idx = start_byte; byte_idx < stop_byte; byte_idx++) { + uint8_t byte = 0; + for (uint64_t bit_idx = 0; bit_idx < 8; bit_idx++) { + const uint64_t output_bit = byte_idx * 8 + bit_idx; + + // Bits before Arrow's array offset are padding from the consumer's point of view. + // Tail bits beyond len + offset stay zero so word-at-a-time mask readers are safe. + if (output_bit >= validity_bits || output_bit < arrow_offset) { + continue; + } + + // Translate the Arrow-visible output bit back to the source bitmap bit. The source + // bitmap may start at any bit offset, while Arrow's buffer pointer is byte-addressed. + const uint64_t input_bit = input_offset + output_bit - arrow_offset; + if (input_bit < input_offset + len && get_bit(input, input_bit)) { + byte |= static_cast(1u << bit_idx); + } + } + output[byte_idx] = byte; + } +} + +} // namespace + +// CUDA entry point for validity bitmap repacking used by Arrow Device export. +extern "C" __global__ void arrow_validity_repack(const uint8_t *const input, + uint8_t *const output, + uint64_t len, + uint64_t input_offset, + uint64_t arrow_offset, + uint64_t validity_bits) { + arrow_validity_repack_device(input, output, len, input_offset, arrow_offset, validity_bits); +} diff --git a/vortex-cuda/src/arrow/canonical.rs b/vortex-cuda/src/arrow/canonical.rs index 558fdd9e96e..34b7b85ffd0 100644 --- a/vortex-cuda/src/arrow/canonical.rs +++ b/vortex-cuda/src/arrow/canonical.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::mem; +use std::mem::size_of; use std::ptr; use std::sync::Arc; @@ -39,7 +40,6 @@ use vortex::array::builtins::ArrayBuiltins; use vortex::array::match_each_decimal_value_type; use vortex::array::validity::Validity; use vortex::buffer::Buffer; -use vortex::buffer::ByteBuffer; use vortex::dtype::DType; use vortex::dtype::DecimalType; use vortex::dtype::NativeDecimalType; @@ -53,7 +53,6 @@ use vortex::error::vortex_err; use vortex::extension::datetime::AnyTemporal; use vortex::mask::Mask; -use super::list_view::export_device_list_view; use crate::CudaBufferExt; use crate::CudaDeviceBuffer; use crate::CudaExecutionCtx; @@ -65,6 +64,7 @@ use crate::arrow::PrivateData; use crate::arrow::SyncEvent; use crate::arrow::arrow_device_export_dictionary_codes_dtype; use crate::arrow::cuda_decimal_value_type; +use crate::arrow::list_view::export_device_list_view; use crate::executor::CudaArrayExt; /// An implementation of `ExportDeviceArray` that exports Vortex arrays to `ArrowDeviceArray` by @@ -437,7 +437,7 @@ where Ok(BufferHandle::new_device(Arc::new(output_device))) } -/// Export Vortex validity as an Arrow validity byte buffer. +/// Export Vortex validity as an Arrow validity byte buffer on the CUDA device. /// /// Returns `None` for the buffer when Arrow can omit validity because all rows are valid. pub(super) async fn export_arrow_validity_buffer( @@ -453,14 +453,91 @@ pub(super) async fn export_arrow_validity_buffer( let validity_buffer = match mask { Mask::AllTrue(_) => return Ok((None, 0)), - Mask::AllFalse(_) => ByteBuffer::zeroed(validity_bytes), - values @ Mask::Values(_) => values.into_bit_buffer().into_inner().2, + Mask::AllFalse(_) => device_zeroed_byte_buffer(validity_bytes, ctx)?, + Mask::Values(values) => { + let bits = values.bit_buffer(); + if arrow_offset == 0 && bits.offset() == 0 { + // Fast path: the Vortex bitmap already matches Arrow's byte-addressed layout. + let (_, _, buffer) = bits.clone().into_inner(); + ctx.ensure_on_device(BufferHandle::new_host(buffer)).await? + } else { + // Slow path: bit offsets cannot be represented by the Arrow buffer pointer. + // Repack on the GPU so compact/sliced exports keep Arrow offset semantics. + let (input_offset, _, input_buffer) = bits.clone().into_inner(); + let input_buffer = ctx + .ensure_on_device(BufferHandle::new_host(input_buffer)) + .await?; + repack_arrow_validity_buffer( + &input_buffer, + input_offset, + len, + arrow_offset, + validity_bytes, + ctx, + )? + } + } }; - let validity = ctx - .ensure_on_device(BufferHandle::new_host(validity_buffer)) - .await?; - Ok((Some(validity), null_count)) + Ok((Some(validity_buffer), null_count)) +} + +/// Allocate a zeroed device buffer with cuDF-safe padding for Arrow validity masks. +fn device_zeroed_byte_buffer( + byte_len: usize, + ctx: &mut CudaExecutionCtx, +) -> VortexResult { + let allocation_len = byte_len.next_multiple_of(size_of::()).max(1); + let mut buffer = ctx.device_alloc::(allocation_len)?; + ctx.stream() + .memset_zeros(&mut buffer) + .map_err(|err| vortex_err!("Failed to zero Arrow validity buffer: {err}"))?; + Ok(BufferHandle::new_device(Arc::new(CudaDeviceBuffer::new(buffer))).slice(0..byte_len)) +} + +/// Repack a validity bitmap into Arrow layout without copying bitmap bits back to the CPU. +/// +/// Vortex bitmaps may start at any bit offset. Arrow exposes only a byte-addressed validity buffer +/// plus an array offset, so sliced compact exports need a GPU rewrite when either side has a +/// bit-level offset. +pub(super) fn repack_arrow_validity_buffer( + input_buffer: &BufferHandle, + input_offset: usize, + len: usize, + arrow_offset: usize, + output_bytes: usize, + ctx: &mut CudaExecutionCtx, +) -> VortexResult { + let output_allocation_len = output_bytes.next_multiple_of(size_of::()).max(1); + let mut output = ctx.device_alloc::(output_allocation_len)?; + ctx.stream() + .memset_zeros(&mut output) + .map_err(|err| vortex_err!("Failed to zero Arrow validity buffer padding: {err}"))?; + let output_buffer = + BufferHandle::new_device(Arc::new(CudaDeviceBuffer::new(output))).slice(0..output_bytes); + + // Arrow validity buffers are byte-addressed. Repack on-device when either layout has a + // bit-level offset so logical row 0 lands at the expected Arrow bit position. + let input_view = input_buffer.cuda_view::()?; + let output_view = output_buffer.cuda_view::()?; + let len = u64::try_from(len)?; + let input_offset = u64::try_from(input_offset)?; + let arrow_offset = u64::try_from(arrow_offset)?; + let validity_bits = len + .checked_add(arrow_offset) + .ok_or_else(|| vortex_err!("Arrow validity bit length overflows u64"))?; + + let kernel = ctx.load_function_with_suffixes("arrow_validity", &["repack"])?; + ctx.launch_kernel(&kernel, output_bytes, |args| { + args.arg(&input_view) + .arg(&output_view) + .arg(&len) + .arg(&input_offset) + .arg(&arrow_offset) + .arg(&validity_bits); + })?; + + Ok(output_buffer) } /// Export a standard Vortex list as Arrow `List`: validity, offsets, and one child array. @@ -834,6 +911,7 @@ mod tests { use vortex::array::arrays::varbinview::BinaryView; use vortex::array::buffer::BufferHandle; use vortex::array::validity::Validity; + use vortex::buffer::BitBuffer; use vortex::buffer::Buffer; use vortex::buffer::ByteBuffer; use vortex::dtype::DType; @@ -857,6 +935,9 @@ mod tests { use crate::arrow::ArrowDeviceArray; use crate::arrow::DeviceArrayExt; use crate::arrow::PrivateData; + use crate::arrow::canonical::export_arrow_validity_buffer; + use crate::arrow::canonical::repack_arrow_validity_buffer; + use crate::device_buffer::cuda_backing_allocation; use crate::session::CudaSession; unsafe fn release_exported_array(array: *mut ArrowArray) { @@ -2163,19 +2244,28 @@ mod tests { let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) .vortex_expect("failed to create execution context"); - let utf8 = VarBinViewArray::from_iter_str([ - "skip this out-of-line value before the slice", - "hello", - "こんにちは", - "this out-of-line value remains in the slice", + let utf8 = VarBinViewArray::from_iter_nullable_str([ + Some("skip this out-of-line value before the slice"), + Some("hello"), + None, + Some("this out-of-line value remains in the slice"), ]) .into_array() .slice(1..4)?; let mut exported = utf8.export_device_array_with_schema(&mut ctx).await?; let field = Field::try_from(&exported.schema)?; - assert_eq!(field, Field::new("", DataType::Utf8View, false)); - assert_varbinview_shape(&exported.array.array, 3, 0)?; + assert_eq!(field, Field::new("", DataType::Utf8View, true)); + assert_varbinview_shape(&exported.array.array, 3, 1)?; assert_eq!(exported.array.device_type, ARROW_DEVICE_CUDA); + + let private_data = unsafe { &*exported.array.array.private_data.cast::() }; + let null_buffer = private_data.buffers[0] + .as_ref() + .vortex_expect("sliced null buffer should be present"); + let null_bits = BitBuffer::new(null_buffer.to_host_sync(), 3) + .iter() + .collect::>(); + assert_eq!(null_bits, [true, false, true]); unsafe { release_exported_array(&raw mut exported.array.array) }; let binary = VarBinViewArray::from_iter_nullable_bin([ @@ -2196,6 +2286,107 @@ mod tests { Ok(()) } + #[crate::test] + async fn test_repack_arrow_validity_buffer_offsets() -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let input_offset = 5; + let arrow_offset = 3; + let logical_bits = [true, false, true, true, false, false, true, false, true]; + let source = BitBuffer::from_iter( + std::iter::repeat_n(false, input_offset).chain(logical_bits.iter().copied()), + ); + let sliced = source.slice(input_offset..input_offset + logical_bits.len()); + let (actual_input_offset, _, input_buffer) = sliced.into_inner(); + assert_eq!(actual_input_offset, input_offset); + + let input_buffer = ctx + .ensure_on_device(BufferHandle::new_host(input_buffer)) + .await?; + let output_bits = logical_bits.len() + arrow_offset; + let output = repack_arrow_validity_buffer( + &input_buffer, + actual_input_offset, + logical_bits.len(), + arrow_offset, + output_bits.div_ceil(8), + &mut ctx, + )?; + ctx.synchronize_stream()?; + + let actual = BitBuffer::new(output.to_host_sync(), output_bits) + .iter() + .collect::>(); + let expected = std::iter::repeat_n(false, arrow_offset) + .chain(logical_bits) + .collect::>(); + assert_eq!(actual, expected); + + Ok(()) + } + + #[crate::test] + async fn test_repack_arrow_validity_buffer_zeroes_padding() -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let len = 9; + let arrow_offset = 3; + let source = BitBuffer::from_iter(std::iter::repeat_n(true, len)); + let (input_offset, _, input_buffer) = source.into_inner(); + let input_buffer = ctx + .ensure_on_device(BufferHandle::new_host(input_buffer)) + .await?; + let output_bytes = (len + arrow_offset).div_ceil(8); + + let output = repack_arrow_validity_buffer( + &input_buffer, + input_offset, + len, + arrow_offset, + output_bytes, + &mut ctx, + )?; + ctx.synchronize_stream()?; + + assert_eq!(output.len(), output_bytes); + let backing = cuda_backing_allocation(&output)?; + let backing_bytes = backing.to_host_sync(); + assert_eq!( + backing_bytes.len(), + output_bytes.next_multiple_of(size_of::()) + ); + assert!(backing_bytes[output_bytes..].iter().all(|byte| *byte == 0)); + + Ok(()) + } + + #[crate::test] + async fn test_export_all_false_validity_buffer_is_zeroed_on_device() -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let len = 4; + let arrow_offset = 5; + let (buffer, null_count) = export_arrow_validity_buffer( + Validity::from(BitBuffer::from_iter(std::iter::repeat_n(false, len))), + len, + arrow_offset, + &mut ctx, + ) + .await?; + ctx.synchronize_stream()?; + + assert_eq!(null_count, i64::try_from(len)?); + let buffer = buffer.vortex_expect("all-false validity should export a null buffer"); + let bytes = buffer.to_host_sync(); + assert_eq!(bytes.len(), (len + arrow_offset).div_ceil(8)); + assert!(bytes.iter().all(|byte| *byte == 0)); + + Ok(()) + } + // Check nullable primitives export Arrow null bitmaps on device. #[crate::test] async fn test_export_nullable_primitive() -> VortexResult<()> { diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs index 956b6a6d2ed..7cadf80d2c0 100644 --- a/vortex-cuda/src/arrow/mod.rs +++ b/vortex-cuda/src/arrow/mod.rs @@ -63,6 +63,35 @@ pub use arrow_c_abi::ArrowArray; pub use arrow_c_abi::ArrowDeviceArray; pub use arrow_c_abi::ArrowDeviceType; +#[cfg(feature = "_test-harness")] +#[doc(hidden)] +pub mod test_harness { + use vortex::array::buffer::BufferHandle; + use vortex::error::VortexResult; + + use crate::CudaExecutionCtx; + use crate::arrow::canonical::repack_arrow_validity_buffer as repack_arrow_validity_buffer_impl; + + /// Repack a validity bitmap into Arrow's byte-addressed bitmap layout on the active stream. + pub fn repack_arrow_validity_buffer( + input_buffer: &BufferHandle, + input_offset: usize, + len: usize, + arrow_offset: usize, + output_bytes: usize, + ctx: &mut CudaExecutionCtx, + ) -> VortexResult { + repack_arrow_validity_buffer_impl( + input_buffer, + input_offset, + len, + arrow_offset, + output_bytes, + ctx, + ) + } +} + /// CUDA device memory. pub const ARROW_DEVICE_CUDA: ArrowDeviceType = arrow_c_abi::ARROW_DEVICE_CUDA as ArrowDeviceType; diff --git a/vortex-cuda/src/device_buffer.rs b/vortex-cuda/src/device_buffer.rs index e0f6477a9e3..bc07cd5daaa 100644 --- a/vortex-cuda/src/device_buffer.rs +++ b/vortex-cuda/src/device_buffer.rs @@ -131,6 +131,27 @@ impl CudaDeviceBuffer { } } +#[cfg(test)] +pub(crate) fn cuda_backing_allocation(handle: &BufferHandle) -> VortexResult { + let device_buffer = handle + .as_device_opt() + .ok_or_else(|| vortex_err!("Buffer is not on device"))?; + + let cuda_buf = device_buffer + .as_any() + .downcast_ref::() + .ok_or_else(|| vortex_err!("expected CudaDeviceBuffer, was {device_buffer:?}"))?; + let len = cuda_buf.allocation.as_bytes_view().len(); + + Ok(BufferHandle::new_device(Arc::new(CudaDeviceBuffer { + allocation: Arc::clone(&cuda_buf.allocation), + offset: 0, + len, + device_ptr: cuda_buf.device_ptr, + alignment: cuda_buf.alignment, + }))) +} + // TODO(aduffy): we should add cuda_view_mut and enforce the borrow rules. This is a bit tricky // because many executions are async, we should lean into that with ownership and having any // async context actions take ownership of the buffer and return ownership when they're done. diff --git a/vortex-test/e2e-cuda/src/lib.rs b/vortex-test/e2e-cuda/src/lib.rs index 049cedeacab..fe07d71726d 100644 --- a/vortex-test/e2e-cuda/src/lib.rs +++ b/vortex-test/e2e-cuda/src/lib.rs @@ -130,6 +130,21 @@ fn fixed_size_list_as_list_array() -> VortexArrayRef { .into_array() } +fn sliced_utf8_array() -> VortexArrayRef { + VarBinViewArray::from_iter_nullable_str([ + Some("skip this out-of-line value before the slice"), + Some("hello"), + Some("こんにちは"), + None, + Some("this out-of-line value remains in the slice"), + Some("é"), + Some("skip this out-of-line value after the slice"), + ]) + .into_array() + .slice(1..6) + .expect("sliced utf8 array") +} + fn multi_buffer_varbinview(dtype: DType) -> VortexArrayRef { let first = ByteBuffer::copy_from("first value stored out-of-line".as_bytes()); let second = ByteBuffer::copy_from("second value stored out-of-line".as_bytes()); @@ -225,6 +240,7 @@ fn export_array_inner(schema_ptr: &mut FFI_ArrowSchema, array_ptr: &mut ArrowDev "decimal64", "decimal128", "strings", + "sliced_utf8", "multi_buffer_utf8", "dates", "dictionary", @@ -237,6 +253,7 @@ fn export_array_inner(schema_ptr: &mut FFI_ArrowSchema, array_ptr: &mut ArrowDev decimal64.into_array(), decimal128.into_array(), strings.into_array(), + sliced_utf8_array(), multi_buffer_utf8_array(), dates.into_array(), dictionary_array(), @@ -324,6 +341,13 @@ fn validate_array_inner(ffi_schema: &FFI_ArrowSchema, ffi_array: &mut FFI_ArrowA Some("four"), None, ]); + let sliced_utf8 = StringArray::from_iter([ + Some("hello"), + Some("こんにちは"), + None, + Some("this out-of-line value remains in the slice"), + Some("é"), + ]); let multi_buffer_utf8 = StringArray::from_iter([ Some("inline"), Some("first value stored out-of-line"), @@ -362,6 +386,7 @@ fn validate_array_inner(ffi_schema: &FFI_ArrowSchema, ffi_array: &mut FFI_ArrowA Field::new("decimal64", decimal64.data_type().clone(), true), Field::new("decimal128", decimal128.data_type().clone(), true), Field::new("strings", string.data_type().clone(), true), + Field::new("sliced_utf8", sliced_utf8.data_type().clone(), true), Field::new( "multi_buffer_utf8", multi_buffer_utf8.data_type().clone(), @@ -379,12 +404,13 @@ fn validate_array_inner(ffi_schema: &FFI_ArrowSchema, ffi_array: &mut FFI_ArrowA return 1; } - let expected_arrays: [ArrowArrayRef; 8] = [ + let expected_arrays: [ArrowArrayRef; 9] = [ primitive, Arc::new(decimal32), Arc::new(decimal64), Arc::new(decimal128), Arc::new(string), + Arc::new(sliced_utf8), Arc::new(multi_buffer_utf8), Arc::new(date), dictionary, @@ -401,11 +427,11 @@ fn validate_array_inner(ffi_schema: &FFI_ArrowSchema, ffi_array: &mut FFI_ArrowA } } - if !list_values_eq(list.as_ref(), struct_array.column(8).as_ref()) { + if !list_values_eq(list.as_ref(), struct_array.column(9).as_ref()) { eprintln!("wrong values for lists column"); return 1; } - if !list_values_eq(fixed_size_list.as_ref(), struct_array.column(9).as_ref()) { + if !list_values_eq(fixed_size_list.as_ref(), struct_array.column(10).as_ref()) { eprintln!("wrong values for fixed_lists column"); return 1; } From af201ca2cb7b60e5c4c0cabd956128295a9a4ff3 Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Wed, 10 Jun 2026 14:02:48 +0000 Subject: [PATCH 2/3] perf[gpu]: word-at-a-time Arrow validity repack kernel Rebuild the validity bitmap 64 bits at a time with a funnel shift over two adjacent input words, masking the leading offset bits and the trailing length bits, instead of testing bits one by one. Launch one word per thread with a grid-stride loop so warp accesses coalesce. Repack of 100M bits on GH200 drops from 140us to 21us (6.7x). Also derive the output size from len + arrow_offset instead of taking a redundant output_bytes parameter, drop the now-unneeded output memset (every word is written, edge masks zero the padding), bound the host-to-device copy to the slice's bytes via shrink_offset, and cover negative-shift and multi-word offsets in the repack tests. Signed-off-by: Alexander Droste --- vortex-cuda/benches/arrow_validity_cuda.rs | 2 - vortex-cuda/kernels/src/arrow_validity.cu | 111 +++++++++++----- vortex-cuda/src/arrow/canonical.rs | 140 +++++++++++---------- vortex-cuda/src/arrow/mod.rs | 10 +- 4 files changed, 155 insertions(+), 108 deletions(-) diff --git a/vortex-cuda/benches/arrow_validity_cuda.rs b/vortex-cuda/benches/arrow_validity_cuda.rs index a26e9f1a332..9221d7c0698 100644 --- a/vortex-cuda/benches/arrow_validity_cuda.rs +++ b/vortex-cuda/benches/arrow_validity_cuda.rs @@ -50,7 +50,6 @@ fn benchmark_arrow_validity_repack(c: &mut Criterion) { let input_buffer = block_on(cuda_ctx.ensure_on_device(BufferHandle::new_host(input_buffer))) .vortex_expect("failed to copy validity input to device"); - let output_bytes = (len + ARROW_OFFSET).div_ceil(8); for _ in 0..iters { let output = test_harness::repack_arrow_validity_buffer( @@ -58,7 +57,6 @@ fn benchmark_arrow_validity_repack(c: &mut Criterion) { input_offset, len, ARROW_OFFSET, - output_bytes, &mut cuda_ctx, ) .vortex_expect("failed to repack Arrow validity"); diff --git a/vortex-cuda/kernels/src/arrow_validity.cu b/vortex-cuda/kernels/src/arrow_validity.cu index faa424efa14..83bf8092ca0 100644 --- a/vortex-cuda/kernels/src/arrow_validity.cu +++ b/vortex-cuda/kernels/src/arrow_validity.cu @@ -7,47 +7,96 @@ namespace { -// Read one validity bit from a little-endian Arrow/Vortex bitmap. -__device__ bool get_bit(const uint8_t *const input, uint64_t bit_idx) { - return (input[bit_idx / 8] >> (bit_idx % 8)) & 1; +// Load the `word_idx`-th little-endian u64 of `input`, treating bytes outside +// `[0, input_bytes)` as zero. `input` must be 8-byte aligned. +__device__ uint64_t load_input_word(const uint8_t *const input, + int64_t word_idx, + uint64_t input_bytes) { + if (word_idx < 0) { + return 0; + } + const uint64_t byte_idx = static_cast(word_idx) * sizeof(uint64_t); + if (byte_idx >= input_bytes) { + return 0; + } + if (byte_idx + sizeof(uint64_t) <= input_bytes) { + return reinterpret_cast(input)[word_idx]; + } + // Trailing partial word: assemble byte-by-byte to avoid reading past the buffer. + uint64_t word = 0; + for (uint64_t i = byte_idx; i < input_bytes; i++) { + word |= static_cast(input[i]) << ((i - byte_idx) * 8); + } + return word; +} + +// Build one 64-bit word of the Arrow validity bitmap. +// +// Output bit `b` for `b` in `[arrow_offset, validity_bits)` equals input bit `b + shift`; +// all other bits are zero. Two adjacent input words are funnel-shifted to align the input +// bits with the output word, then the leading/trailing edges are masked. +__device__ uint64_t repack_word(const uint8_t *const input, + uint64_t word_idx, + int64_t shift, + uint64_t arrow_offset, + uint64_t validity_bits, + uint64_t input_bytes) { + const uint64_t word_start = word_idx * 64; + + // Bits before Arrow's array offset are padding from the consumer's point of view. + // Tail bits beyond len + offset stay zero so word-at-a-time mask readers are safe. + uint64_t mask = ~uint64_t{0}; + if (word_start < arrow_offset) { + const uint64_t lead = arrow_offset - word_start; + mask = lead >= 64 ? 0 : mask << lead; + } + const uint64_t remaining = validity_bits - word_start; + if (remaining < 64) { + mask &= (uint64_t{1} << remaining) - 1; + } + if (mask == 0) { + return 0; + } + + // `>> 6` floors also for negative bit positions, unlike `/ 64` which truncates toward zero. + const int64_t input_bit = static_cast(word_start) + shift; + const int64_t input_word = input_bit >> 6; + const uint32_t bit = static_cast(input_bit & 63); + + const uint64_t lo = load_input_word(input, input_word, input_bytes); + if (bit == 0) { + return lo & mask; + } + const uint64_t hi = load_input_word(input, input_word + 1, input_bytes); + return ((lo >> bit) | (hi << (64 - bit))) & mask; } // Rebuild a possibly bit-offset Vortex validity bitmap into an Arrow-compatible bitmap. // // `input_offset` is the bit offset into `input`; `arrow_offset` is the logical Arrow array offset // to preserve in the output. Bits outside `[arrow_offset, arrow_offset + len)` are left unset. +// The output allocation must hold `ceil((len + arrow_offset) / 64)` full 64-bit words; every +// word is written, so no zero-initialization of the output is required. __device__ void arrow_validity_repack_device(const uint8_t *const input, - uint8_t *const output, + uint64_t *const output, uint64_t len, uint64_t input_offset, uint64_t arrow_offset, - uint64_t validity_bits) { - // One worker owns a contiguous byte range. Each byte is rebuilt locally so there are no - // cross-thread bit writes or atomics. + uint64_t input_bytes) { + // One worker owns a contiguous range of output words. Each word is rebuilt locally so + // there are no cross-thread bit writes or atomics. const uint64_t worker = blockIdx.x * blockDim.x + threadIdx.x; - const uint64_t output_bytes = (validity_bits + 7) / 8; - const uint64_t start_byte = start_elem(worker, output_bytes); - const uint64_t stop_byte = stop_elem(worker, output_bytes); - - for (uint64_t byte_idx = start_byte; byte_idx < stop_byte; byte_idx++) { - uint8_t byte = 0; - for (uint64_t bit_idx = 0; bit_idx < 8; bit_idx++) { - const uint64_t output_bit = byte_idx * 8 + bit_idx; + const uint64_t validity_bits = len + arrow_offset; + const uint64_t output_words = (validity_bits + 63) / 64; + const uint64_t stride = static_cast(gridDim.x) * blockDim.x; - // Bits before Arrow's array offset are padding from the consumer's point of view. - // Tail bits beyond len + offset stay zero so word-at-a-time mask readers are safe. - if (output_bit >= validity_bits || output_bit < arrow_offset) { - continue; - } + // Translate Arrow-visible output bits back to source bitmap bits. The source bitmap may + // start at any bit offset, while Arrow's buffer pointer is byte-addressed. + const int64_t shift = static_cast(input_offset) - static_cast(arrow_offset); - // Translate the Arrow-visible output bit back to the source bitmap bit. The source - // bitmap may start at any bit offset, while Arrow's buffer pointer is byte-addressed. - const uint64_t input_bit = input_offset + output_bit - arrow_offset; - if (input_bit < input_offset + len && get_bit(input, input_bit)) { - byte |= static_cast(1u << bit_idx); - } - } - output[byte_idx] = byte; + for (uint64_t word_idx = worker; word_idx < output_words; word_idx += stride) { + output[word_idx] = + repack_word(input, word_idx, shift, arrow_offset, validity_bits, input_bytes); } } @@ -55,10 +104,10 @@ __device__ void arrow_validity_repack_device(const uint8_t *const input, // CUDA entry point for validity bitmap repacking used by Arrow Device export. extern "C" __global__ void arrow_validity_repack(const uint8_t *const input, - uint8_t *const output, + uint64_t *const output, uint64_t len, uint64_t input_offset, uint64_t arrow_offset, - uint64_t validity_bits) { - arrow_validity_repack_device(input, output, len, input_offset, arrow_offset, validity_bits); + uint64_t input_bytes) { + arrow_validity_repack_device(input, output, len, input_offset, arrow_offset, input_bytes); } diff --git a/vortex-cuda/src/arrow/canonical.rs b/vortex-cuda/src/arrow/canonical.rs index 34b7b85ffd0..d5924819ef9 100644 --- a/vortex-cuda/src/arrow/canonical.rs +++ b/vortex-cuda/src/arrow/canonical.rs @@ -8,6 +8,7 @@ use std::sync::Arc; use async_trait::async_trait; use cudarc::driver::DeviceRepr; +use cudarc::driver::LaunchConfig; use cudarc::driver::PushKernelArg; use futures::future::BoxFuture; use vortex::array::ArrayRef; @@ -455,26 +456,21 @@ pub(super) async fn export_arrow_validity_buffer( Mask::AllTrue(_) => return Ok((None, 0)), Mask::AllFalse(_) => device_zeroed_byte_buffer(validity_bytes, ctx)?, Mask::Values(values) => { - let bits = values.bit_buffer(); + // Shrinking the offset below 8 bounds the host-to-device copy to the slice's bytes + // instead of the whole backing bitmap. + let bits = values.bit_buffer().clone().shrink_offset(); if arrow_offset == 0 && bits.offset() == 0 { // Fast path: the Vortex bitmap already matches Arrow's byte-addressed layout. - let (_, _, buffer) = bits.clone().into_inner(); + let (_, _, buffer) = bits.into_inner(); ctx.ensure_on_device(BufferHandle::new_host(buffer)).await? } else { // Slow path: bit offsets cannot be represented by the Arrow buffer pointer. // Repack on the GPU so compact/sliced exports keep Arrow offset semantics. - let (input_offset, _, input_buffer) = bits.clone().into_inner(); + let (input_offset, _, input_buffer) = bits.into_inner(); let input_buffer = ctx .ensure_on_device(BufferHandle::new_host(input_buffer)) .await?; - repack_arrow_validity_buffer( - &input_buffer, - input_offset, - len, - arrow_offset, - validity_bytes, - ctx, - )? + repack_arrow_validity_buffer(&input_buffer, input_offset, len, arrow_offset, ctx)? } } }; @@ -499,45 +495,59 @@ fn device_zeroed_byte_buffer( /// /// Vortex bitmaps may start at any bit offset. Arrow exposes only a byte-addressed validity buffer /// plus an array offset, so sliced compact exports need a GPU rewrite when either side has a -/// bit-level offset. +/// bit-level offset. The kernel writes the output one 64-bit word at a time, funnel-shifting two +/// adjacent input words, so the allocation is padded to whole words (zeroed by the edge masks). pub(super) fn repack_arrow_validity_buffer( input_buffer: &BufferHandle, input_offset: usize, len: usize, arrow_offset: usize, - output_bytes: usize, ctx: &mut CudaExecutionCtx, ) -> VortexResult { - let output_allocation_len = output_bytes.next_multiple_of(size_of::()).max(1); - let mut output = ctx.device_alloc::(output_allocation_len)?; - ctx.stream() - .memset_zeros(&mut output) - .map_err(|err| vortex_err!("Failed to zero Arrow validity buffer padding: {err}"))?; - let output_buffer = - BufferHandle::new_device(Arc::new(CudaDeviceBuffer::new(output))).slice(0..output_bytes); - - // Arrow validity buffers are byte-addressed. Repack on-device when either layout has a - // bit-level offset so logical row 0 lands at the expected Arrow bit position. - let input_view = input_buffer.cuda_view::()?; - let output_view = output_buffer.cuda_view::()?; - let len = u64::try_from(len)?; - let input_offset = u64::try_from(input_offset)?; - let arrow_offset = u64::try_from(arrow_offset)?; let validity_bits = len .checked_add(arrow_offset) - .ok_or_else(|| vortex_err!("Arrow validity bit length overflows u64"))?; - - let kernel = ctx.load_function_with_suffixes("arrow_validity", &["repack"])?; - ctx.launch_kernel(&kernel, output_bytes, |args| { - args.arg(&input_view) - .arg(&output_view) - .arg(&len) - .arg(&input_offset) - .arg(&arrow_offset) - .arg(&validity_bits); - })?; + .ok_or_else(|| vortex_err!("Arrow validity bit length overflows usize"))?; + let output_bytes = validity_bits.div_ceil(8); + let output_words = validity_bits.div_ceil(u64::BITS as usize); + + // The kernel loads the input bitmap as 64-bit words. + if !input_buffer + .cuda_device_ptr()? + .is_multiple_of(size_of::() as u64) + { + vortex_bail!("Arrow validity repack requires an 8-byte aligned device buffer"); + } + + let output = ctx.device_alloc::(output_words.max(1))?; + let output_device = CudaDeviceBuffer::new(output); + + if output_words > 0 { + let input_view = input_buffer.cuda_view::()?; + let output_view = output_device.as_view::(); + let len = u64::try_from(len)?; + let input_offset = u64::try_from(input_offset)?; + let arrow_offset = u64::try_from(arrow_offset)?; + let input_bytes = u64::try_from(input_buffer.len())?; + + let kernel = ctx.load_function_with_suffixes("arrow_validity", &["repack"])?; + const REPACK_THREADS_PER_BLOCK: u32 = 256; + let num_blocks = u32::try_from(output_words.div_ceil(REPACK_THREADS_PER_BLOCK as usize))?; + let config = LaunchConfig { + grid_dim: (num_blocks, 1, 1), + block_dim: (REPACK_THREADS_PER_BLOCK, 1, 1), + shared_mem_bytes: 0, + }; + ctx.launch_kernel_config(&kernel, config, output_words, |args| { + args.arg(&input_view) + .arg(&output_view) + .arg(&len) + .arg(&input_offset) + .arg(&arrow_offset) + .arg(&input_bytes); + })?; + } - Ok(output_buffer) + Ok(BufferHandle::new_device(Arc::new(output_device)).slice(0..output_bytes)) } /// Export a standard Vortex list as Arrow `List`: validity, offsets, and one child array. @@ -2286,33 +2296,37 @@ mod tests { Ok(()) } + #[rstest::rstest] + #[case::input_ahead_of_arrow(5, 3, 9)] + #[case::arrow_ahead_of_input(3, 70, 9)] + #[case::equal_offsets(7, 7, 9)] + #[case::byte_aligned_input(0, 9, 9)] + #[case::word_aligned_offsets(64, 128, 130)] + #[case::multi_word(13, 0, 301)] #[crate::test] - async fn test_repack_arrow_validity_buffer_offsets() -> VortexResult<()> { + async fn test_repack_arrow_validity_buffer_offsets( + #[case] input_offset: usize, + #[case] arrow_offset: usize, + #[case] len: usize, + ) -> VortexResult<()> { let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) .vortex_expect("failed to create execution context"); - let input_offset = 5; - let arrow_offset = 3; - let logical_bits = [true, false, true, true, false, false, true, false, true]; + let logical_bits = (0..len).map(|idx| idx % 3 != 0).collect::>(); + // All-true filler before the slice would leak into the output if offsets were mishandled. let source = BitBuffer::from_iter( - std::iter::repeat_n(false, input_offset).chain(logical_bits.iter().copied()), + std::iter::repeat_n(true, input_offset).chain(logical_bits.iter().copied()), ); - let sliced = source.slice(input_offset..input_offset + logical_bits.len()); - let (actual_input_offset, _, input_buffer) = sliced.into_inner(); - assert_eq!(actual_input_offset, input_offset); + let sliced = source.slice(input_offset..input_offset + len); + // BitBuffer rebases whole bytes into the backing buffer, keeping the bit offset below 8. + let (input_offset, _, input_buffer) = sliced.into_inner(); let input_buffer = ctx .ensure_on_device(BufferHandle::new_host(input_buffer)) .await?; - let output_bits = logical_bits.len() + arrow_offset; - let output = repack_arrow_validity_buffer( - &input_buffer, - actual_input_offset, - logical_bits.len(), - arrow_offset, - output_bits.div_ceil(8), - &mut ctx, - )?; + let output_bits = len + arrow_offset; + let output = + repack_arrow_validity_buffer(&input_buffer, input_offset, len, arrow_offset, &mut ctx)?; ctx.synchronize_stream()?; let actual = BitBuffer::new(output.to_host_sync(), output_bits) @@ -2340,14 +2354,8 @@ mod tests { .await?; let output_bytes = (len + arrow_offset).div_ceil(8); - let output = repack_arrow_validity_buffer( - &input_buffer, - input_offset, - len, - arrow_offset, - output_bytes, - &mut ctx, - )?; + let output = + repack_arrow_validity_buffer(&input_buffer, input_offset, len, arrow_offset, &mut ctx)?; ctx.synchronize_stream()?; assert_eq!(output.len(), output_bytes); @@ -2355,7 +2363,7 @@ mod tests { let backing_bytes = backing.to_host_sync(); assert_eq!( backing_bytes.len(), - output_bytes.next_multiple_of(size_of::()) + output_bytes.next_multiple_of(size_of::()) ); assert!(backing_bytes[output_bytes..].iter().all(|byte| *byte == 0)); diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs index 7cadf80d2c0..6bef6352fcb 100644 --- a/vortex-cuda/src/arrow/mod.rs +++ b/vortex-cuda/src/arrow/mod.rs @@ -78,17 +78,9 @@ pub mod test_harness { input_offset: usize, len: usize, arrow_offset: usize, - output_bytes: usize, ctx: &mut CudaExecutionCtx, ) -> VortexResult { - repack_arrow_validity_buffer_impl( - input_buffer, - input_offset, - len, - arrow_offset, - output_bytes, - ctx, - ) + repack_arrow_validity_buffer_impl(input_buffer, input_offset, len, arrow_offset, ctx) } } From f8d19e72f24d984230a9a504775f99009183b8f9 Mon Sep 17 00:00:00 2001 From: Alexander Droste Date: Wed, 10 Jun 2026 14:11:22 +0000 Subject: [PATCH 3/3] style[gpu]: clang-format arrow_validity kernel Co-Authored-By: Claude Fable 5 Signed-off-by: Alexander Droste --- vortex-cuda/kernels/src/arrow_validity.cu | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/vortex-cuda/kernels/src/arrow_validity.cu b/vortex-cuda/kernels/src/arrow_validity.cu index 83bf8092ca0..906054c86ee 100644 --- a/vortex-cuda/kernels/src/arrow_validity.cu +++ b/vortex-cuda/kernels/src/arrow_validity.cu @@ -9,9 +9,7 @@ namespace { // Load the `word_idx`-th little-endian u64 of `input`, treating bytes outside // `[0, input_bytes)` as zero. `input` must be 8-byte aligned. -__device__ uint64_t load_input_word(const uint8_t *const input, - int64_t word_idx, - uint64_t input_bytes) { +__device__ uint64_t load_input_word(const uint8_t *const input, int64_t word_idx, uint64_t input_bytes) { if (word_idx < 0) { return 0; } @@ -45,14 +43,14 @@ __device__ uint64_t repack_word(const uint8_t *const input, // Bits before Arrow's array offset are padding from the consumer's point of view. // Tail bits beyond len + offset stay zero so word-at-a-time mask readers are safe. - uint64_t mask = ~uint64_t{0}; + uint64_t mask = ~uint64_t {0}; if (word_start < arrow_offset) { const uint64_t lead = arrow_offset - word_start; mask = lead >= 64 ? 0 : mask << lead; } const uint64_t remaining = validity_bits - word_start; if (remaining < 64) { - mask &= (uint64_t{1} << remaining) - 1; + mask &= (uint64_t {1} << remaining) - 1; } if (mask == 0) { return 0; @@ -95,8 +93,7 @@ __device__ void arrow_validity_repack_device(const uint8_t *const input, const int64_t shift = static_cast(input_offset) - static_cast(arrow_offset); for (uint64_t word_idx = worker; word_idx < output_words; word_idx += stride) { - output[word_idx] = - repack_word(input, word_idx, shift, arrow_offset, validity_bits, input_bytes); + output[word_idx] = repack_word(input, word_idx, shift, arrow_offset, validity_bits, input_bytes); } }