Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions vortex-cuda/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,7 @@ harness = false
[[bench]]
name = "arrow_validity_cuda"
harness = false

[[bench]]
name = "arrow_binary_cuda"
harness = false
127 changes: 127 additions & 0 deletions vortex-cuda/benches/arrow_binary_cuda.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! CUDA kernel-time benchmark for Arrow Device export of binary view arrays as Arrow Binary.

#![expect(clippy::cast_possible_truncation)]

#[allow(dead_code)]
mod bench_config;
mod timed_launch_strategy;

use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::time::Duration;

use criterion::BenchmarkId;
use criterion::Criterion;
use criterion::Throughput;
use futures::executor::block_on;
use vortex::array::ArrayRef;
use vortex::array::IntoArray;
use vortex::array::arrays::VarBinViewArray;
use vortex::array::arrays::varbinview::BinaryView;
use vortex::array::buffer::BufferHandle;
use vortex::array::validity::Validity;
use vortex::buffer::Buffer;
use vortex::buffer::ByteBuffer;
use vortex::dtype::DType;
use vortex::dtype::Nullability;
use vortex::error::VortexExpect;
use vortex::error::VortexResult;
use vortex::session::VortexSession;
use vortex_cuda::CudaExecutionCtx;
use vortex_cuda::CudaSession;
use vortex_cuda::arrow::ArrowDeviceArray;
use vortex_cuda::arrow::DeviceArrayExt;
use vortex_cuda_macros::cuda_available;
use vortex_cuda_macros::cuda_not_available;

use crate::timed_launch_strategy::TimedLaunchStrategy;

const BINARY_BENCH_SIZES: &[(usize, &str)] = &[(10_000_000, "10M")];
const VALUE_BYTES: usize = 16;

/// Build a non-nullable binary view array of `len` out-of-line values on the device.
async fn out_of_line_binary(len: usize, ctx: &mut CudaExecutionCtx) -> VortexResult<ArrayRef> {
let values = ByteBuffer::copy_from(vec![b'x'; len * VALUE_BYTES]);
let views = Buffer::from_iter((0..len).map(|idx| {
let offset = idx * VALUE_BYTES;
BinaryView::make_view(
&values.slice(offset..offset + VALUE_BYTES),
0,
offset as u32,
)
}));

let views = ctx
.ensure_on_device(BufferHandle::new_host(views.into_byte_buffer()))
.await?;
let values = ctx.ensure_on_device(BufferHandle::new_host(values)).await?;

Ok(VarBinViewArray::new_handle(
views,
Arc::from([values]),
DType::Binary(Nullability::NonNullable),
Validity::NonNullable,
)
.into_array())
}

unsafe fn release_arrow_device_array(array: &mut ArrowDeviceArray) {
unsafe {
if let Some(release) = array.array.release {
release(&raw mut array.array);
}
}
}

fn benchmark_arrow_binary_export(c: &mut Criterion) {
let mut group = c.benchmark_group("cuda");

for &(len, len_label) in BINARY_BENCH_SIZES {
// Kernels read views and value bytes and write offsets plus gathered values.
group.throughput(Throughput::Bytes(
(len * (size_of::<BinaryView>() + VALUE_BYTES + size_of::<i32>())) as u64,
));
group.bench_with_input(
BenchmarkId::new("cuda/arrow_binary_kernel_time/out_of_line", len_label),
&len,
|b, &len| {
b.iter_custom(|iters| {
let timed = TimedLaunchStrategy::default();
let timer = timed.timer();

let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty())
.vortex_expect("failed to create execution context")
.with_launch_strategy(Arc::new(timed));
let array = block_on(out_of_line_binary(len, &mut cuda_ctx))
.vortex_expect("failed to create binary fixture");

for _ in 0..iters {
let mut exported =
block_on(array.clone().export_device_array(&mut cuda_ctx))
.vortex_expect("failed to export device array");
unsafe { release_arrow_device_array(&mut exported) };
}

Duration::from_nanos(timer.load(Ordering::Relaxed))
});
},
);
}

group.finish();
}

criterion::criterion_group! {
name = benches;
config = bench_config::cuda_bench_config();
targets = benchmark_arrow_binary_export
}

#[cuda_available]
criterion::criterion_main!(benches);

#[cuda_not_available]
fn main() {}
221 changes: 221 additions & 0 deletions vortex-cuda/kernels/src/arrow_binary.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

#include "config.cuh"

#include <limits.h>
#include <stdint.h>

namespace {

constexpr uint32_t MAX_INLINED_SIZE = 12;

struct BinaryView {
uint32_t size;
uint8_t inline_data[MAX_INLINED_SIZE];
};

struct BinaryViewRef {
uint32_t size;
uint8_t prefix[4];
uint32_t buffer_index;
uint32_t offset;
};

// Return whether a row is valid in a little-endian Arrow/Vortex bitmap, treating a missing (null)
// validity bitmap as all-valid.
__device__ bool is_valid(const uint8_t *const validity, uint64_t idx) {
return validity == nullptr || ((validity[idx / 8] >> (idx % 8)) & 1);
}

// Initialize scan input from BinaryView sizes. Null rows contribute zero bytes so the gather kernel
// never needs to read their view payload.
//
// Threads stride by blockDim within the block's element range so warp accesses to views and scan
// stay coalesced.
__device__ void init_scan_device(const BinaryView *const __restrict views,
const uint8_t *const __restrict validity,
const uint64_t *const __restrict data_buffer_lens,
int32_t *const __restrict scan,
uint32_t *const status,
uint64_t data_buffer_count,
uint64_t len) {
const uint64_t scan_len = len + 1;
const uint64_t elements_per_block = blockDim.x * ELEMENTS_PER_THREAD;
const uint64_t block_start = blockIdx.x * elements_per_block;
const uint64_t block_stop = min(block_start + elements_per_block, scan_len);

for (uint64_t idx = block_start + threadIdx.x; idx < block_stop; idx += blockDim.x) {
if (idx >= len || !is_valid(validity, idx)) {
scan[idx] = 0;
continue;
}

const BinaryView view = views[idx];
const uint32_t size = view.size;
if (size > static_cast<uint32_t>(INT32_MAX)) {
scan[idx] = 0;
atomicMax(status, 2u);
continue;
}

if (size > MAX_INLINED_SIZE) {
const BinaryViewRef *const view_ref = reinterpret_cast<const BinaryViewRef *>(&view);
const uint64_t buffer_index = static_cast<uint64_t>(view_ref->buffer_index);
const uint64_t offset = static_cast<uint64_t>(view_ref->offset);
const uint64_t end = offset + static_cast<uint64_t>(size);
if (buffer_index >= data_buffer_count || end < offset || end > data_buffer_lens[buffer_index]) {
scan[idx] = 0;
atomicMax(status, 1u);
continue;
}
}

scan[idx] = static_cast<int32_t>(size);
}
}

// Detect i32 overflow of the CUB exclusive-sum offsets by checking signs. init_scan rejects
// per-row sizes above i32::MAX, so consecutive true prefix sums differ by less than 2^31 and the
// first overflowing prefix lands in [2^31, 2^32), which wraps to a negative offset in the scan
// output. No negative offset therefore proves no prefix overflowed.
__device__ void
validate_offsets_device(const int32_t *const __restrict offsets, uint32_t *const status, uint64_t scan_len) {
const uint64_t elements_per_block = blockDim.x * ELEMENTS_PER_THREAD;
const uint64_t block_start = blockIdx.x * elements_per_block;
const uint64_t block_stop = min(block_start + elements_per_block, scan_len);

for (uint64_t idx = block_start + threadIdx.x; idx < block_stop; idx += blockDim.x) {
if (offsets[idx] < 0) {
atomicMax(status, 2u);
}
}
}

__device__ uint64_t upper_bound_offsets(const int32_t *const offsets, uint64_t len, uint64_t value) {
uint64_t first = 0;
while (len > 0) {
const uint64_t half = len / 2;
const uint64_t mid = first + half;
if (static_cast<uint64_t>(offsets[mid]) <= value) {
first = mid + 1;
len -= half + 1;
} else {
len = half;
}
}
return first;
}

// Resolve the payload pointer for one view, pointing inline views at their global view bytes.
__device__ const uint8_t *
input_ptr(const BinaryView *const views, uint64_t row, const uint64_t *const data_buffer_ptrs) {
const BinaryView *const view = views + row;
if (view->size <= MAX_INLINED_SIZE) {
return view->inline_data;
}

const BinaryViewRef *const view_ref = reinterpret_cast<const BinaryViewRef *>(view);
return reinterpret_cast<const uint8_t *>(data_buffer_ptrs[view_ref->buffer_index]) + view_ref->offset;
}

// Copy BinaryView payload bytes into one contiguous Arrow Binary values buffer.
//
// Each thread owns a contiguous ELEMENTS_PER_THREAD output byte range, so the row lookup runs once
// per range and then advances sequentially. Byte stores from such strided ranges waste almost the
// whole memory transaction, so threads stage 16 output bytes in registers and write them with one
// vector store. Full chunks start at a multiple of ELEMENTS_PER_THREAD, keeping the stores aligned.
__device__ void gather_device(const BinaryView *const __restrict views,
const uint64_t *const __restrict data_buffer_ptrs,
const int32_t *const __restrict offsets,
uint8_t *const __restrict output,
uint64_t len,
uint64_t total_bytes) {
const uint64_t worker = blockIdx.x * blockDim.x + threadIdx.x;
const uint64_t start = start_elem(worker, total_bytes);
const uint64_t stop = stop_elem(worker, total_bytes);
if (start == stop) {
return;
}

uint64_t row = upper_bound_offsets(offsets, len + 1, start) - 1;
uint64_t row_start = static_cast<uint64_t>(offsets[row]);
uint64_t row_end = static_cast<uint64_t>(offsets[row + 1]);
const uint8_t *input = input_ptr(views, row, data_buffer_ptrs);

const auto next_byte = [&](uint64_t byte_idx) -> uint8_t {
while (byte_idx >= row_end) {
row++;
row_start = static_cast<uint64_t>(offsets[row]);
row_end = static_cast<uint64_t>(offsets[row + 1]);
input = input_ptr(views, row, data_buffer_ptrs);
}
return input[byte_idx - row_start];
};

uint64_t byte_idx = start;
for (; byte_idx + 16 <= stop; byte_idx += 16) {
while (byte_idx >= row_end) {
row++;
row_start = static_cast<uint64_t>(offsets[row]);
row_end = static_cast<uint64_t>(offsets[row + 1]);
input = input_ptr(views, row, data_buffer_ptrs);
}

// Fast path: the group sits inside one row and the source allows word loads.
const uint8_t *const src = input + (byte_idx - row_start);
if (byte_idx + 16 <= row_end && (reinterpret_cast<uintptr_t>(src) & 3) == 0) {
const uint32_t *const src_words = reinterpret_cast<const uint32_t *>(src);
*reinterpret_cast<uint4 *>(output + byte_idx) =
make_uint4(src_words[0], src_words[1], src_words[2], src_words[3]);
continue;
}

uint32_t words[4];
#pragma unroll
for (uint32_t word = 0; word < 4; word++) {
uint32_t value = 0;
#pragma unroll
for (uint32_t byte = 0; byte < 4; byte++) {
const uint64_t lane = byte_idx + word * 4 + byte;
value |= static_cast<uint32_t>(next_byte(lane)) << (byte * 8);
}
words[word] = value;
}
*reinterpret_cast<uint4 *>(output + byte_idx) = make_uint4(words[0], words[1], words[2], words[3]);
}

for (; byte_idx < stop; byte_idx++) {
output[byte_idx] = next_byte(byte_idx);
}
}

} // namespace

// Fill the CUB scan input with per-row binary lengths plus a final zero sentinel. A null validity
// pointer marks an all-valid array.
extern "C" __global__ void arrow_binary_init_scan(const BinaryView *const views,
const uint8_t *const validity,
const uint64_t *const data_buffer_lens,
int32_t *const scan,
uint32_t *const status,
uint64_t data_buffer_count,
uint64_t len) {
init_scan_device(views, validity, data_buffer_lens, scan, status, data_buffer_count, len);
}

// Check that the scanned Arrow Binary offsets never overflowed the i32 range.
extern "C" __global__ void
arrow_binary_validate_offsets(const int32_t *const offsets, uint32_t *const status, uint64_t scan_len) {
validate_offsets_device(offsets, status, scan_len);
}

// Gather inline and referenced BinaryView payloads into Arrow Binary's contiguous values buffer.
extern "C" __global__ void arrow_binary_gather(const BinaryView *const views,
const uint64_t *const data_buffer_ptrs,
const int32_t *const offsets,
uint8_t *const output,
uint64_t len,
uint64_t total_bytes) {
gather_device(views, data_buffer_ptrs, offsets, output, len, total_bytes);
}
Loading
Loading