Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion vortex-buffer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ warn-copy = ["dep:tracing"]

[dependencies]
arrow-buffer = { workspace = true }
bitvec = { workspace = true }
bytes = { workspace = true }
itertools = { workspace = true }
memmap2 = { workspace = true, optional = true }
Expand Down
33 changes: 29 additions & 4 deletions vortex-buffer/src/alignment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ impl Alignment {
Self::new(align_of::<T>())
}

/// The largest valid alignment: the greatest power of 2 that fits into a `u16`.
pub const MAX: Alignment = Alignment::new(1 << 15);

/// Check if `self` alignment is a "larger" than `other` alignment.
///
/// ## Example
Expand All @@ -67,10 +70,32 @@ impl Alignment {
/// assert!(!b.is_aligned_to(a));
/// ```
#[inline]
pub fn is_aligned_to(&self, other: Alignment) -> bool {
// Since we know alignments are powers of 2, we can compare them by checking if the number
// of trailing zeros in the binary representation of the alignment is greater or equal.
self.0.trailing_zeros() >= other.0.trailing_zeros()
pub const fn is_aligned_to(&self, other: Alignment) -> bool {
// Since both alignments are powers of 2, divisibility is equivalent to ordering.
self.0 >= other.0
}

/// Check if the given byte offset (or length) is a multiple of this alignment.
///
/// ## Example
///
/// ```
/// use vortex_buffer::Alignment;
///
/// let a = Alignment::new(4);
/// assert!(a.is_offset_aligned(8));
/// assert!(!a.is_offset_aligned(2));
/// ```
#[inline]
pub const fn is_offset_aligned(&self, offset: usize) -> bool {
// Alignment is always a power of 2, so a mask test is equivalent to `offset % self == 0`.
offset & (self.0 - 1) == 0
}

/// Check if the given pointer is aligned to this alignment.
#[inline]
pub fn is_ptr_aligned<T>(&self, ptr: *const T) -> bool {
self.is_offset_aligned(ptr.addr())
}

/// Returns the log2 of the alignment.
Expand Down
23 changes: 20 additions & 3 deletions vortex-buffer/src/bit/buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,11 @@ impl BitBuffer {
unsafe { buffer.push_unchecked(packed) }
}

buffer.truncate(len.div_ceil(8));
let mut bytes = buffer.into_byte_buffer();
bytes.truncate(len.div_ceil(8));

Self {
buffer: buffer.freeze().into_byte_buffer(),
buffer: bytes.freeze(),
offset: 0,
len,
}
Expand Down Expand Up @@ -312,7 +313,23 @@ impl BitBuffer {
assert!(end <= self.len);
let len = end - start;

Self::new_with_offset(self.buffer.clone(), len, self.offset + start)
let offset = self.offset + start;
let byte_offset = offset / 8;
let bit_offset = offset % 8;

// Trim whole bytes off the front directly rather than going through `new_with_offset`,
// which would slice (and re-clone) the clone we'd have to pass it.
let buffer = if byte_offset != 0 {
self.buffer.slice_unaligned(byte_offset..)
} else {
self.buffer.clone().aligned(Alignment::none())
};

Self {
buffer,
offset: bit_offset,
len,
}
}

/// Slice any full bytes from the buffer, leaving the offset < 8.
Expand Down
86 changes: 72 additions & 14 deletions vortex-buffer/src/bit/buf_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use std::ops::Not;

use bitvec::view::BitView;
use arrow_buffer::bit_mask::set_bits;

use crate::BitBuffer;
use crate::BufferMut;
Expand Down Expand Up @@ -267,7 +267,9 @@ impl BitBufferMut {

/// Clears the bit buffer (but keeps any allocated memory).
pub fn clear(&mut self) {
// Since there are no items we need to drop, we simply set the length to 0.
// Also clear the byte buffer (not just `len`) so the "bits beyond len are zero"
// invariant holds; `append_false` and `append_buffer` rely on it.
self.buffer.clear();
self.len = 0;
self.offset = 0;
}
Expand Down Expand Up @@ -518,17 +520,16 @@ impl BitBufferMut {
self.buffer.as_mut_slice()[dst_byte + full_bytes] |= src_bytes[full_bytes] & mask;
}
} else {
// Use bitvec for unaligned bit copying.
let self_slice = self
.buffer
.as_mut_slice()
.view_bits_mut::<bitvec::prelude::Lsb0>();
let other_slice = buffer
.inner()
.as_slice()
.view_bits::<bitvec::prelude::Lsb0>();
let source_range = src_bit_offset..src_bit_offset + bit_len;
self_slice[start_bit_pos..end_bit_pos].copy_from_bitslice(&other_slice[source_range]);
// Word-wise bit copy that handles mismatched source/destination bit offsets.
// `set_bits` ORs into the destination, which is safe because bits beyond `len`
// are guaranteed to be zero.
set_bits(
self.buffer.as_mut_slice(),
buffer.inner().as_slice(),
start_bit_pos,
src_bit_offset,
bit_len,
);
}

self.len += bit_len;
Expand Down Expand Up @@ -649,6 +650,8 @@ impl FromIterator<bool> for BitBufferMut {

#[cfg(test)]
mod tests {
use rstest::rstest;

use crate::BufferMut;
use crate::bit::buf_mut::BitBufferMut;
use crate::bitbuffer;
Expand Down Expand Up @@ -923,7 +926,62 @@ mod tests {
assert!(frozen.value(7));
}

#[cfg_attr(miri, ignore)] // bitvec crate uses a ptr cast that Miri doesn't support
#[test]
fn append_after_clear_reads_back_false() {
// `clear` must not leave stale set bits behind: `append_false` and `append_buffer`
// rely on bits beyond `len` being zero.
let mut bools = BitBufferMut::new_set(16);
bools.clear();
bools.append_false();
bools.append_buffer(&crate::BitBuffer::new_unset(8));

let bools = bools.freeze();
assert_eq!(bools.len(), 9);
assert_eq!(bools.true_count(), 0);
}

#[test]
fn test_append_buffer_after_truncate() {
// Truncating leaves stale set bits in the last partial byte; an append after that
// must overwrite them rather than OR into them.
let mut buf = BitBufferMut::new_set(16);
buf.truncate(3);
buf.append_buffer(&crate::BitBuffer::new_unset(8));

let frozen = buf.freeze();
assert_eq!(frozen.len(), 11);
for i in 0..3 {
assert!(frozen.value(i), "bit {i} should be set");
}
for i in 3..11 {
assert!(!frozen.value(i), "bit {i} should be unset");
}
}

#[rstest]
#[case::both_aligned(0, 0)]
#[case::dst_unaligned(3, 0)]
#[case::src_unaligned(0, 5)]
#[case::mismatched(3, 5)]
#[case::equal_nonzero(5, 5)]
fn test_append_buffer_long(#[case] dst_prefix: usize, #[case] src_start: usize) {
// Exercise every alignment combination across many words.
let source = crate::BitBuffer::from_iter((0..301).map(|i| i % 3 == 0));
let source = source.slice(src_start..301);

let mut dest = BitBufferMut::with_capacity(512);
dest.append_n(true, dst_prefix);
dest.append_buffer(&source);

assert_eq!(dest.len(), dst_prefix + source.len());
for i in 0..dst_prefix {
assert!(dest.value(i), "prefix bit {i}");
}
for i in 0..source.len() {
assert_eq!(dest.value(dst_prefix + i), source.value(i), "bit {i}");
}
}

#[test]
fn test_append_buffer_with_offsets() {
// Create source buffer with offset
Expand Down
72 changes: 61 additions & 11 deletions vortex-buffer/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,20 @@ pub struct Buffer<T> {
pub(crate) _marker: PhantomData<T>,
}

/// Zero-length backing for empty buffers, "aligned" to [`Alignment::MAX`] so it satisfies any
/// valid alignment without allocating. A zero-length slice never reads memory, so it may use a
/// dangling pointer as long as it is non-null and aligned.
const EMPTY_BACKING: &[u8] = {
let addr = 1usize << 15;
assert!(Alignment::MAX.is_offset_aligned(addr));
// SAFETY: the pointer is non-null and aligned, and the slice is zero-length.
unsafe { std::slice::from_raw_parts(std::ptr::without_provenance(addr), 0) }
};

impl<T> Default for Buffer<T> {
fn default() -> Self {
Self {
bytes: Default::default(),
bytes: Bytes::from_static(EMPTY_BACKING),
length: 0,
alignment: Alignment::of::<T>(),
_marker: PhantomData,
Expand Down Expand Up @@ -101,12 +111,27 @@ impl<T> Buffer<T> {

/// Create a new empty `ByteBuffer` with the provided alignment.
pub fn empty() -> Self {
BufferMut::empty().freeze()
Self::empty_aligned(Alignment::of::<T>())
}

/// Create a new empty `ByteBuffer` with the provided alignment.
///
/// This does not allocate: empty buffers are backed by a zero-length `Bytes` that is
/// aligned to [`Alignment::MAX`].
pub fn empty_aligned(alignment: Alignment) -> Self {
BufferMut::empty_aligned(alignment).freeze()
if !alignment.is_aligned_to(Alignment::of::<T>()) {
vortex_panic!(
"Alignment {} must align to the scalar type's alignment {}",
alignment,
Alignment::of::<T>(),
);
}
Self {
bytes: Bytes::from_static(EMPTY_BACKING),
length: 0,
alignment,
_marker: PhantomData,
}
}

/// Create a new full `ByteBuffer` with the given value.
Expand Down Expand Up @@ -152,7 +177,7 @@ impl<T> Buffer<T> {
Alignment::of::<T>(),
);
}
if bytes.as_ptr().align_offset(*alignment) != 0 {
if !alignment.is_ptr_aligned(bytes.as_ptr()) {
vortex_panic!(
"Bytes alignment must align to the requested alignment {}",
alignment,
Expand Down Expand Up @@ -320,7 +345,7 @@ impl<T> Buffer<T> {
let begin_byte = begin * size_of::<T>();
let end_byte = end * size_of::<T>();

if !begin_byte.is_multiple_of(*alignment) {
if !alignment.is_offset_aligned(begin_byte) {
vortex_panic!(
"range start must be aligned to {alignment:?}, byte {}",
begin_byte
Expand Down Expand Up @@ -369,7 +394,7 @@ impl<T> Buffer<T> {
vortex_panic!("slice_ref subset alignment must at least align to the buffer alignment")
}

if subset.as_ptr().align_offset(*alignment) != 0 {
if !alignment.is_ptr_aligned(subset.as_ptr()) {
vortex_panic!("slice_ref subset must be aligned to {:?}", alignment);
}

Expand Down Expand Up @@ -435,17 +460,17 @@ impl<T> Buffer<T> {
/// Convert self into `BufferMut<T>`, cloning the data if there are multiple strong references.
pub fn into_mut(self) -> BufferMut<T> {
self.try_into_mut()
.unwrap_or_else(|buffer| BufferMut::<T>::copy_from(&buffer))
.unwrap_or_else(|buffer| BufferMut::<T>::copy_from_aligned(&buffer, buffer.alignment))
}

/// Returns whether a `Buffer<T>` is aligned to the given alignment.
pub fn is_aligned(&self, alignment: Alignment) -> bool {
self.bytes.as_ptr().align_offset(*alignment) == 0
alignment.is_ptr_aligned(self.bytes.as_ptr())
}

/// Return a `Buffer<T>` with the given alignment. Where possible, this will be zero-copy.
pub fn aligned(mut self, alignment: Alignment) -> Self {
if self.as_ptr().align_offset(*alignment) == 0 {
if alignment.is_ptr_aligned(self.as_ptr()) {
self.alignment = alignment;
self
} else {
Expand All @@ -462,7 +487,7 @@ impl<T> Buffer<T> {

/// Return a `Buffer<T>` with the given alignment. Panics if the buffer is not aligned.
pub fn ensure_aligned(mut self, alignment: Alignment) -> Self {
if self.as_ptr().align_offset(*alignment) == 0 {
if alignment.is_ptr_aligned(self.as_ptr()) {
self.alignment = alignment;
self
} else {
Expand Down Expand Up @@ -634,7 +659,7 @@ impl Buf for ByteBuffer {

#[inline]
fn advance(&mut self, cnt: usize) {
if !cnt.is_multiple_of(*self.alignment) {
if !self.alignment.is_offset_aligned(cnt) {
vortex_panic!(
"Cannot advance buffer by {} items, resulting alignment is not {}",
cnt,
Expand Down Expand Up @@ -786,6 +811,31 @@ mod test {
assert_eq!(vec, buff.as_ref());
}

#[test]
fn empty_aligned_max_alignment() {
// Empty buffers are backed by a static and must satisfy any valid alignment.
let buf = Buffer::<u8>::empty_aligned(Alignment::MAX);
assert!(buf.is_empty());
assert!(buf.is_aligned(Alignment::MAX));
}

#[test]
fn empty_slice_preserves_alignment() {
let buf = Buffer::<u64>::zeroed_aligned(8, Alignment::new(64));
let sliced = buf.slice(0..0);
assert!(sliced.is_empty());
assert_eq!(sliced.alignment(), Alignment::new(64));
assert!(sliced.is_aligned(Alignment::new(64)));
}

#[test]
fn empty_into_mut_preserves_alignment() {
let buf = Buffer::<u8>::empty_aligned(Alignment::new(64));
let buf_mut = buf.into_mut();
assert_eq!(buf_mut.alignment(), Alignment::new(64));
assert!(buf_mut.is_empty());
}

#[test]
fn test_slice_unaligned_end_pos() {
let data = vec![0u8; 2];
Expand Down
6 changes: 3 additions & 3 deletions vortex-buffer/src/buffer_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ impl<T> BufferMut<T> {
}

let bytes_at = at * size_of::<T>();
if !bytes_at.is_multiple_of(*self.alignment) {
if !self.alignment.is_offset_aligned(bytes_at) {
vortex_panic!(
"Cannot split buffer at {}, resulting alignment is not {}",
at,
Expand Down Expand Up @@ -742,7 +742,7 @@ impl Buf for ByteBufferMut {
}

fn advance(&mut self, cnt: usize) {
if !cnt.is_multiple_of(*self.alignment) {
if !self.alignment.is_offset_aligned(cnt) {
vortex_panic!(
"Cannot advance buffer by {} items, resulting alignment is not {}",
cnt,
Expand All @@ -765,7 +765,7 @@ unsafe impl BufMut for ByteBufferMut {

#[inline]
unsafe fn advance_mut(&mut self, cnt: usize) {
if !cnt.is_multiple_of(*self.alignment) {
if !self.alignment.is_offset_aligned(cnt) {
vortex_panic!(
"Cannot advance buffer by {} items, resulting alignment is not {}",
cnt,
Expand Down
Loading
Loading