Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ set(PAIMON_CORE_SRCS
core/catalog/identifier.cpp
core/core_options.cpp
core/deletionvectors/deletion_vector.cpp
core/deletionvectors/deletion_file_writer.cpp
core/deletionvectors/bitmap_deletion_vector.cpp
core/deletionvectors/deletion_vectors_index_file.cpp
core/deletionvectors/deletion_vector_index_file_writer.cpp
core/global_index/global_index_evaluator_impl.cpp
core/global_index/global_index_scan.cpp
core/global_index/global_index_scan_impl.cpp
Expand Down Expand Up @@ -469,7 +473,6 @@ if(PAIMON_BUILD_TESTS)
add_paimon_test(core_test
SOURCES
core/append/append_only_writer_test.cpp
core/append/bucketed_append_compact_manager_test.cpp
core/casting/cast_executor_factory_test.cpp
core/casting/cast_executor_test.cpp
core/casting/casted_row_test.cpp
Expand All @@ -481,7 +484,9 @@ if(PAIMON_BUILD_TESTS)
core/catalog/identifier_test.cpp
core/core_options_test.cpp
core/deletionvectors/apply_deletion_vector_batch_reader_test.cpp
core/deletionvectors/bitmap_deletion_vector_test.cpp
core/deletionvectors/deletion_vector_test.cpp
core/deletionvectors/deletion_vectors_index_file_test.cpp
core/index/index_in_data_file_dir_path_factory_test.cpp
core/index/deletion_vector_meta_test.cpp
core/index/index_file_meta_serializer_test.cpp
Expand Down
19 changes: 0 additions & 19 deletions src/paimon/common/io/data_output_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,7 @@

#include "paimon/common/io/data_output_stream.h"

#include <cassert>
#include <type_traits>

#include "fmt/format.h"
#include "paimon/common/utils/math.h"
#include "paimon/fs/file_system.h"
#include "paimon/memory/bytes.h"
#include "paimon/result.h"

Expand All @@ -30,20 +25,6 @@ DataOutputStream::DataOutputStream(const std::shared_ptr<OutputStream>& output_s
: output_stream_(output_stream) {
assert(output_stream_);
}
template <typename T>
Status DataOutputStream::WriteValue(const T& value) {
static_assert(std::is_trivially_copyable_v<T>, "T must be trivially copyable");
T write_value = value;
if (NeedSwap()) {
write_value = EndianSwapValue(value);
}
int32_t write_length = sizeof(T);
PAIMON_ASSIGN_OR_RAISE(
int32_t actual_write_length,
output_stream_->Write(reinterpret_cast<char*>(&write_value), write_length));
PAIMON_RETURN_NOT_OK(AssertWriteLength(write_length, actual_write_length));
return Status::OK();
}

Status DataOutputStream::WriteBytes(const std::shared_ptr<Bytes>& bytes) {
int32_t write_length = bytes->size();
Expand Down
20 changes: 19 additions & 1 deletion src/paimon/common/io/data_output_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@
*/

#pragma once

#include <cassert>
#include <cstdint>
#include <memory>
#include <string>
#include <type_traits>

#include "paimon/common/utils/math.h"
#include "paimon/fs/file_system.h"
#include "paimon/io/byte_order.h"
#include "paimon/result.h"
#include "paimon/status.h"

namespace paimon {
Expand All @@ -33,7 +39,19 @@ class PAIMON_EXPORT DataOutputStream {
explicit DataOutputStream(const std::shared_ptr<OutputStream>& output_stream);

template <typename T>
Status WriteValue(const T& value);
Status WriteValue(const T& value) {
static_assert(std::is_trivially_copyable_v<T>, "T must be trivially copyable");
T write_value = value;
if (NeedSwap()) {
write_value = EndianSwapValue(value);
}
int32_t write_length = sizeof(T);
PAIMON_ASSIGN_OR_RAISE(
int32_t actual_write_length,
output_stream_->Write(reinterpret_cast<char*>(&write_value), write_length));
PAIMON_RETURN_NOT_OK(AssertWriteLength(write_length, actual_write_length));
return Status::OK();
}

Status WriteBytes(const std::shared_ptr<Bytes>& bytes);

Expand Down
82 changes: 82 additions & 0 deletions src/paimon/core/deletionvectors/bitmap_deletion_vector.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "paimon/core/deletionvectors/bitmap_deletion_vector.h"

#include "arrow/util/crc32.h"
#include "fmt/format.h"
#include "paimon/common/io/memory_segment_output_stream.h"
#include "paimon/io/byte_array_input_stream.h"
#include "paimon/io/data_input_stream.h"

namespace paimon {

Result<int32_t> BitmapDeletionVector::SerializeTo(const std::shared_ptr<MemoryPool>& pool,
DataOutputStream* out) {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<Bytes> data, SerializeToBytes(pool));
int64_t size = data->size();
if (size < 0 || size > std::numeric_limits<int32_t>::max()) {
return Status::Invalid("BitmapDeletionVector serialize size out of range: ", size);
}
PAIMON_RETURN_NOT_OK(out->WriteValue<int32_t>(static_cast<int32_t>(size)));
PAIMON_RETURN_NOT_OK(out->WriteBytes(data));
uint32_t crc32 = 0;
crc32 = arrow::internal::crc32(crc32, data->data(), size);
PAIMON_RETURN_NOT_OK(out->WriteValue<int32_t>(static_cast<int32_t>(crc32)));
return static_cast<int32_t>(size);
}

Result<PAIMON_UNIQUE_PTR<Bytes>> BitmapDeletionVector::SerializeToBytes(
const std::shared_ptr<MemoryPool>& pool) {
std::shared_ptr<Bytes> bitmap_bytes = roaring_bitmap_.Serialize(pool.get());
if (bitmap_bytes == nullptr) {
assert(bitmap_bytes);
return Status::Invalid("roaring bitmap serialize failed");
}
MemorySegmentOutputStream output(MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool);
output.WriteValue<int32_t>(MAGIC_NUMBER);
output.WriteBytes(bitmap_bytes);
return MemorySegmentUtils::CopyToBytes(output.Segments(), /*offset=*/0, output.CurrentSize(),
pool.get());
}

Status BitmapDeletionVector::CheckPosition(int64_t position) const {
if (position > RoaringBitmap32::MAX_VALUE) {
return Status::Invalid(fmt::format(
"The file has too many rows, RoaringBitmap32 only supports files with row count "
"not exceeding {}.",
RoaringBitmap32::MAX_VALUE));
}
return Status::OK();
}

Result<PAIMON_UNIQUE_PTR<DeletionVector>> BitmapDeletionVector::Deserialize(const char* buffer,
int32_t length,
MemoryPool* pool) {
auto in = std::make_shared<ByteArrayInputStream>(buffer, length);
DataInputStream input(in);
PAIMON_ASSIGN_OR_RAISE(int32_t magic_num, input.ReadValue<int32_t>());
if (magic_num != MAGIC_NUMBER) {
return Status::Invalid(fmt::format(
"Unable to deserialize deletion vector, invalid magic number: {}", magic_num));
}
RoaringBitmap32 roaring_bitmap;
PAIMON_RETURN_NOT_OK(roaring_bitmap.Deserialize(buffer + MAGIC_NUMBER_SIZE_BYTES,
length - MAGIC_NUMBER_SIZE_BYTES));
return pool->AllocateUnique<BitmapDeletionVector>(roaring_bitmap);
}

} // namespace paimon
55 changes: 17 additions & 38 deletions src/paimon/core/deletionvectors/bitmap_deletion_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@

#include <memory>

#include "paimon/common/io/memory_segment_output_stream.h"
#include "paimon/common/io/data_output_stream.h"
#include "paimon/core/deletionvectors/deletion_vector.h"
#include "paimon/io/byte_array_input_stream.h"
#include "paimon/io/data_input_stream.h"
#include "paimon/utils/roaring_bitmap32.h"

namespace paimon {

/// A `DeletionVector` based on `RoaringBitmap32`, it only supports files with row count
/// not exceeding `RoaringBitmap32::MAX_VALUE`.
class BitmapDeletionVector : public DeletionVector {
public:
static constexpr int32_t MAGIC_NUMBER = 1581511376;
static constexpr int32_t MAGIC_NUMBER_SIZE_BYTES = 4;

explicit BitmapDeletionVector(const RoaringBitmap32& roaring_bitmap)
: roaring_bitmap_(roaring_bitmap) {}

Expand All @@ -51,51 +54,27 @@ class BitmapDeletionVector : public DeletionVector {
return roaring_bitmap_.IsEmpty();
}

Result<PAIMON_UNIQUE_PTR<Bytes>> SerializeToBytes(
const std::shared_ptr<MemoryPool>& pool) override {
std::shared_ptr<Bytes> bitmap_bytes = roaring_bitmap_.Serialize(pool.get());
if (bitmap_bytes == nullptr) {
assert(bitmap_bytes);
return Status::Invalid("roaring bitmap serialize failed");
}
MemorySegmentOutputStream output(/*segment_size=*/1024, pool);
output.WriteValue<int32_t>(MAGIC_NUMBER);
output.WriteBytes(bitmap_bytes);
return MemorySegmentUtils::CopyToBytes(output.Segments(), /*offset=*/0,
output.CurrentSize(), pool.get());
int64_t GetCardinality() const override {
return roaring_bitmap_.Cardinality();
}

Result<int32_t> SerializeTo(const std::shared_ptr<MemoryPool>& pool,
DataOutputStream* out) override;

Result<PAIMON_UNIQUE_PTR<Bytes>> SerializeToBytes(
const std::shared_ptr<MemoryPool>& pool) override;

const RoaringBitmap32* GetBitmap() const {
return &roaring_bitmap_;
}

static Result<PAIMON_UNIQUE_PTR<DeletionVector>> Deserialize(const char* buffer, int32_t length,
MemoryPool* pool) {
auto in = std::make_shared<ByteArrayInputStream>(buffer, length);
DataInputStream input(in);
PAIMON_ASSIGN_OR_RAISE(int32_t magic_num, input.ReadValue<int32_t>());
if (magic_num != MAGIC_NUMBER) {
return Status::Invalid("Invalid magic number: ", std::to_string(magic_num));
}
RoaringBitmap32 roaring_bitmap;
PAIMON_RETURN_NOT_OK(roaring_bitmap.Deserialize(buffer + sizeof(MAGIC_NUMBER),
length - sizeof(MAGIC_NUMBER)));
return pool->AllocateUnique<BitmapDeletionVector>(roaring_bitmap);
}

static constexpr int32_t MAGIC_NUMBER = 1581511376;
MemoryPool* pool);

private:
Status CheckPosition(int64_t position) const {
if (position > RoaringBitmap32::MAX_VALUE) {
return Status::Invalid(
"The file has too many rows, RoaringBitmap32 only supports files with row count "
"not exceeding 2147483647.");
}
return Status::OK();
}
Status CheckPosition(int64_t position) const;

private:
RoaringBitmap32 roaring_bitmap_;
};

} // namespace paimon
122 changes: 122 additions & 0 deletions src/paimon/core/deletionvectors/bitmap_deletion_vector_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "paimon/core/deletionvectors/bitmap_deletion_vector.h"

#include <set>

#include "gtest/gtest.h"
#include "paimon/common/io/memory_segment_output_stream.h"
#include "paimon/common/utils/path_util.h"
#include "paimon/fs/file_system_factory.h"
#include "paimon/testing/utils/testharness.h"

namespace paimon::test {

TEST(BitmapDeletionVectorTest, BasicOperations) {
RoaringBitmap32 roaring;
BitmapDeletionVector dv(roaring);
ASSERT_TRUE(dv.IsEmpty());
for (int32_t i = 0; i < 2000; i += 2) {
ASSERT_OK(dv.Delete(i));
}
ASSERT_EQ(dv.GetCardinality(), 1000);
for (int32_t i = 0; i < 2000; ++i) {
if (i % 2 == 0) {
ASSERT_TRUE(dv.IsDeleted(i).value());
} else {
ASSERT_FALSE(dv.IsDeleted(i).value());
}
}
}

TEST(BitmapDeletionVectorTest, CheckedDelete) {
RoaringBitmap32 roaring;
BitmapDeletionVector dv(roaring);
ASSERT_TRUE(dv.CheckedDelete(42).value());
ASSERT_FALSE(dv.CheckedDelete(42).value());
ASSERT_TRUE(dv.IsDeleted(42).value());
}

TEST(BitmapDeletionVectorTest, SerializeAndDeserialize) {
RoaringBitmap32 roaring;
for (int32_t i = 0; i < 100; i += 3) {
roaring.Add(i);
}
BitmapDeletionVector dv(roaring);
auto pool = GetDefaultPool();
ASSERT_OK_AND_ASSIGN(auto bytes, dv.SerializeToBytes(pool));
ASSERT_OK_AND_ASSIGN(
auto dv2, BitmapDeletionVector::Deserialize(bytes->data(), bytes->size(), pool.get()));
for (int32_t i = 0; i < 100; ++i) {
ASSERT_EQ(dv.IsDeleted(i).value(), dv2->IsDeleted(i).value());
}
}

TEST(BitmapDeletionVectorTest, SerializeToOutputStream) {
RoaringBitmap32 roaring;
for (int32_t i = 0; i < 50; i += 2) {
roaring.Add(i);
}
BitmapDeletionVector dv(roaring);
auto pool = GetDefaultPool();
auto dir = UniqueTestDirectory::Create();
auto path = PathUtil::JoinPath(dir->Str(), "dv");
ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFactory::Get("local", path, {}));
ASSERT_OK_AND_ASSIGN(std::shared_ptr<OutputStream> output_stream,
fs->Create(path, /*overwrite=*/false));
DataOutputStream out(output_stream);
ASSERT_OK_AND_ASSIGN(int64_t size, dv.SerializeTo(pool, &out));
ASSERT_OK(output_stream->Flush());
ASSERT_OK(output_stream->Close());
ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> input_stream, fs->Open(path));
DataInputStream in(input_stream);
ASSERT_OK_AND_ASSIGN(int32_t byte_len, in.ReadValue<int32_t>());
auto bytes = Bytes::AllocateBytes(byte_len, pool.get());
ASSERT_OK(in.Read(bytes->data(), bytes->size()));
const char* data = bytes->data();
ASSERT_EQ(bytes->size(), size);
ASSERT_OK_AND_ASSIGN(auto dv2,
BitmapDeletionVector::Deserialize(data, bytes->size(), pool.get()));
for (int32_t i = 0; i < 50; ++i) {
ASSERT_EQ(dv.IsDeleted(i).value(), dv2->IsDeleted(i).value());
}
}

TEST(BitmapDeletionVectorTest, GetCardinality) {
RoaringBitmap32 empty;
BitmapDeletionVector dv_empty(empty);
ASSERT_EQ(dv_empty.GetCardinality(), 0);

RoaringBitmap32 cont;
for (int i = 0; i < 100; ++i) cont.Add(i);
BitmapDeletionVector dv_cont(cont);
ASSERT_EQ(dv_cont.GetCardinality(), 100);

RoaringBitmap32 gap;
for (int i = 0; i < 1000; i += 10) gap.Add(i);
BitmapDeletionVector dv_gap(gap);
ASSERT_EQ(dv_gap.GetCardinality(), 100);

RoaringBitmap32 del;
for (int i = 0; i < 10; ++i) del.Add(i);
BitmapDeletionVector dv_del(del);
ASSERT_EQ(dv_del.GetCardinality(), 10);
ASSERT_OK(dv_del.Delete(100));
ASSERT_EQ(dv_del.GetCardinality(), 11);
}

} // namespace paimon::test
Loading
Loading