Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions vortex-cuda/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,7 @@ harness = false
[[bench]]
name = "list_view_cuda"
harness = false

[[bench]]
name = "arrow_validity_cuda"
harness = false
85 changes: 85 additions & 0 deletions vortex-cuda/benches/arrow_validity_cuda.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! CUDA benchmarks for Arrow validity bitmap repacking.

mod bench_config;
mod timed_launch_strategy;

use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::time::Duration;

use criterion::BenchmarkId;
use criterion::Criterion;
use criterion::Throughput;
use futures::executor::block_on;
use vortex::array::buffer::BufferHandle;
use vortex::buffer::BitBuffer;
use vortex::error::VortexExpect;
use vortex::session::VortexSession;
use vortex_cuda::CudaSession;
use vortex_cuda::arrow::test_harness;
use vortex_cuda_macros::cuda_available;
use vortex_cuda_macros::cuda_not_available;

use crate::timed_launch_strategy::TimedLaunchStrategy;

const INPUT_OFFSET: usize = 5;
const ARROW_OFFSET: usize = 3;

fn benchmark_arrow_validity_repack(c: &mut Criterion) {
let mut group = c.benchmark_group("cuda");

for &(len, len_label) in bench_config::BENCH_SIZES {
group.throughput(Throughput::Elements(len as u64));
group.bench_with_input(
BenchmarkId::new("cuda/arrow_validity/repack", len_label),
&len,
|b, &len| {
b.iter_custom(|iters| {
let timed = TimedLaunchStrategy::default();
let timer = timed.timer();

let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty())
.vortex_expect("failed to create execution context")
.with_launch_strategy(Arc::new(timed));
let source = BitBuffer::collect_bool(len + INPUT_OFFSET, |idx| idx % 3 != 0);
let sliced = source.slice(INPUT_OFFSET..INPUT_OFFSET + len);
let (input_offset, _, input_buffer) = sliced.into_inner();
let input_buffer =
block_on(cuda_ctx.ensure_on_device(BufferHandle::new_host(input_buffer)))
.vortex_expect("failed to copy validity input to device");

for _ in 0..iters {
let output = test_harness::repack_arrow_validity_buffer(
&input_buffer,
input_offset,
len,
ARROW_OFFSET,
&mut cuda_ctx,
)
.vortex_expect("failed to repack Arrow validity");
std::hint::black_box(output);
}

Duration::from_nanos(timer.load(Ordering::Relaxed))
});
},
);
}

group.finish();
}

criterion::criterion_group! {
name = benches;
config = bench_config::cuda_bench_config();
targets = benchmark_arrow_validity_repack
}

#[cuda_available]
criterion::criterion_main!(benches);

#[cuda_not_available]
fn main() {}
110 changes: 110 additions & 0 deletions vortex-cuda/kernels/src/arrow_validity.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

#include "config.cuh"

#include <stdint.h>

namespace {

// Load the `word_idx`-th little-endian u64 of `input`, treating bytes outside
// `[0, input_bytes)` as zero. `input` must be 8-byte aligned.
__device__ uint64_t load_input_word(const uint8_t *const input, int64_t word_idx, uint64_t input_bytes) {
if (word_idx < 0) {
return 0;
}
const uint64_t byte_idx = static_cast<uint64_t>(word_idx) * sizeof(uint64_t);
if (byte_idx >= input_bytes) {
return 0;
}
if (byte_idx + sizeof(uint64_t) <= input_bytes) {
return reinterpret_cast<const uint64_t *>(input)[word_idx];
}
// Trailing partial word: assemble byte-by-byte to avoid reading past the buffer.
uint64_t word = 0;
for (uint64_t i = byte_idx; i < input_bytes; i++) {
word |= static_cast<uint64_t>(input[i]) << ((i - byte_idx) * 8);
}
return word;
}

// Build one 64-bit word of the Arrow validity bitmap.
//
// Output bit `b` for `b` in `[arrow_offset, validity_bits)` equals input bit `b + shift`;
// all other bits are zero. Two adjacent input words are funnel-shifted to align the input
// bits with the output word, then the leading/trailing edges are masked.
__device__ uint64_t repack_word(const uint8_t *const input,
uint64_t word_idx,
int64_t shift,
uint64_t arrow_offset,
uint64_t validity_bits,
uint64_t input_bytes) {
const uint64_t word_start = word_idx * 64;

// Bits before Arrow's array offset are padding from the consumer's point of view.
// Tail bits beyond len + offset stay zero so word-at-a-time mask readers are safe.
uint64_t mask = ~uint64_t {0};
if (word_start < arrow_offset) {
const uint64_t lead = arrow_offset - word_start;
mask = lead >= 64 ? 0 : mask << lead;
}
const uint64_t remaining = validity_bits - word_start;
if (remaining < 64) {
mask &= (uint64_t {1} << remaining) - 1;
}
if (mask == 0) {
return 0;
}

// `>> 6` floors also for negative bit positions, unlike `/ 64` which truncates toward zero.
const int64_t input_bit = static_cast<int64_t>(word_start) + shift;
const int64_t input_word = input_bit >> 6;
const uint32_t bit = static_cast<uint32_t>(input_bit & 63);

const uint64_t lo = load_input_word(input, input_word, input_bytes);
if (bit == 0) {
return lo & mask;
}
const uint64_t hi = load_input_word(input, input_word + 1, input_bytes);
return ((lo >> bit) | (hi << (64 - bit))) & mask;
}

// Rebuild a possibly bit-offset Vortex validity bitmap into an Arrow-compatible bitmap.
//
// `input_offset` is the bit offset into `input`; `arrow_offset` is the logical Arrow array offset
// to preserve in the output. Bits outside `[arrow_offset, arrow_offset + len)` are left unset.
// The output allocation must hold `ceil((len + arrow_offset) / 64)` full 64-bit words; every
// word is written, so no zero-initialization of the output is required.
__device__ void arrow_validity_repack_device(const uint8_t *const input,
uint64_t *const output,
uint64_t len,
uint64_t input_offset,
uint64_t arrow_offset,
uint64_t input_bytes) {
// One worker owns a contiguous range of output words. Each word is rebuilt locally so
// there are no cross-thread bit writes or atomics.
const uint64_t worker = blockIdx.x * blockDim.x + threadIdx.x;
const uint64_t validity_bits = len + arrow_offset;
const uint64_t output_words = (validity_bits + 63) / 64;
const uint64_t stride = static_cast<uint64_t>(gridDim.x) * blockDim.x;

// Translate Arrow-visible output bits back to source bitmap bits. The source bitmap may
// start at any bit offset, while Arrow's buffer pointer is byte-addressed.
const int64_t shift = static_cast<int64_t>(input_offset) - static_cast<int64_t>(arrow_offset);

for (uint64_t word_idx = worker; word_idx < output_words; word_idx += stride) {
output[word_idx] = repack_word(input, word_idx, shift, arrow_offset, validity_bits, input_bytes);
}
}

} // namespace

// CUDA entry point for validity bitmap repacking used by Arrow Device export.
extern "C" __global__ void arrow_validity_repack(const uint8_t *const input,
uint64_t *const output,
uint64_t len,
uint64_t input_offset,
uint64_t arrow_offset,
uint64_t input_bytes) {
arrow_validity_repack_device(input, output, len, input_offset, arrow_offset, input_bytes);
}
Loading
Loading