diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index d02fa95f..17731af2 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -170,6 +170,10 @@ set(PAIMON_CORE_SRCS core/catalog/identifier.cpp core/core_options.cpp core/deletionvectors/deletion_vector.cpp + core/deletionvectors/deletion_file_writer.cpp + core/deletionvectors/bitmap_deletion_vector.cpp + core/deletionvectors/deletion_vectors_index_file.cpp + core/deletionvectors/deletion_vector_index_file_writer.cpp core/global_index/global_index_evaluator_impl.cpp core/global_index/global_index_scan.cpp core/global_index/global_index_scan_impl.cpp @@ -469,7 +473,6 @@ if(PAIMON_BUILD_TESTS) add_paimon_test(core_test SOURCES core/append/append_only_writer_test.cpp - core/append/bucketed_append_compact_manager_test.cpp core/casting/cast_executor_factory_test.cpp core/casting/cast_executor_test.cpp core/casting/casted_row_test.cpp @@ -481,7 +484,9 @@ if(PAIMON_BUILD_TESTS) core/catalog/identifier_test.cpp core/core_options_test.cpp core/deletionvectors/apply_deletion_vector_batch_reader_test.cpp + core/deletionvectors/bitmap_deletion_vector_test.cpp core/deletionvectors/deletion_vector_test.cpp + core/deletionvectors/deletion_vectors_index_file_test.cpp core/index/index_in_data_file_dir_path_factory_test.cpp core/index/deletion_vector_meta_test.cpp core/index/index_file_meta_serializer_test.cpp diff --git a/src/paimon/common/io/data_output_stream.cpp b/src/paimon/common/io/data_output_stream.cpp index 5252ecea..6d8c82b5 100644 --- a/src/paimon/common/io/data_output_stream.cpp +++ b/src/paimon/common/io/data_output_stream.cpp @@ -16,12 +16,7 @@ #include "paimon/common/io/data_output_stream.h" -#include -#include - #include "fmt/format.h" -#include "paimon/common/utils/math.h" -#include "paimon/fs/file_system.h" #include "paimon/memory/bytes.h" #include "paimon/result.h" @@ -30,20 +25,6 @@ DataOutputStream::DataOutputStream(const std::shared_ptr& output_s : output_stream_(output_stream) { assert(output_stream_); } -template -Status DataOutputStream::WriteValue(const T& value) { - static_assert(std::is_trivially_copyable_v, "T must be trivially copyable"); - T write_value = value; - if (NeedSwap()) { - write_value = EndianSwapValue(value); - } - int32_t write_length = sizeof(T); - PAIMON_ASSIGN_OR_RAISE( - int32_t actual_write_length, - output_stream_->Write(reinterpret_cast(&write_value), write_length)); - PAIMON_RETURN_NOT_OK(AssertWriteLength(write_length, actual_write_length)); - return Status::OK(); -} Status DataOutputStream::WriteBytes(const std::shared_ptr& bytes) { int32_t write_length = bytes->size(); diff --git a/src/paimon/common/io/data_output_stream.h b/src/paimon/common/io/data_output_stream.h index 6d383a11..6c22b47c 100644 --- a/src/paimon/common/io/data_output_stream.h +++ b/src/paimon/common/io/data_output_stream.h @@ -15,11 +15,17 @@ */ #pragma once + +#include #include #include #include +#include +#include "paimon/common/utils/math.h" +#include "paimon/fs/file_system.h" #include "paimon/io/byte_order.h" +#include "paimon/result.h" #include "paimon/status.h" namespace paimon { @@ -33,7 +39,19 @@ class PAIMON_EXPORT DataOutputStream { explicit DataOutputStream(const std::shared_ptr& output_stream); template - Status WriteValue(const T& value); + Status WriteValue(const T& value) { + static_assert(std::is_trivially_copyable_v, "T must be trivially copyable"); + T write_value = value; + if (NeedSwap()) { + write_value = EndianSwapValue(value); + } + int32_t write_length = sizeof(T); + PAIMON_ASSIGN_OR_RAISE( + int32_t actual_write_length, + output_stream_->Write(reinterpret_cast(&write_value), write_length)); + PAIMON_RETURN_NOT_OK(AssertWriteLength(write_length, actual_write_length)); + return Status::OK(); + } Status WriteBytes(const std::shared_ptr& bytes); diff --git a/src/paimon/core/deletionvectors/bitmap_deletion_vector.cpp b/src/paimon/core/deletionvectors/bitmap_deletion_vector.cpp new file mode 100644 index 00000000..66be892d --- /dev/null +++ b/src/paimon/core/deletionvectors/bitmap_deletion_vector.cpp @@ -0,0 +1,82 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/deletionvectors/bitmap_deletion_vector.h" + +#include "arrow/util/crc32.h" +#include "fmt/format.h" +#include "paimon/common/io/memory_segment_output_stream.h" +#include "paimon/io/byte_array_input_stream.h" +#include "paimon/io/data_input_stream.h" + +namespace paimon { + +Result BitmapDeletionVector::SerializeTo(const std::shared_ptr& pool, + DataOutputStream* out) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr data, SerializeToBytes(pool)); + int64_t size = data->size(); + if (size < 0 || size > std::numeric_limits::max()) { + return Status::Invalid("BitmapDeletionVector serialize size out of range: ", size); + } + PAIMON_RETURN_NOT_OK(out->WriteValue(static_cast(size))); + PAIMON_RETURN_NOT_OK(out->WriteBytes(data)); + uint32_t crc32 = 0; + crc32 = arrow::internal::crc32(crc32, data->data(), size); + PAIMON_RETURN_NOT_OK(out->WriteValue(static_cast(crc32))); + return static_cast(size); +} + +Result> BitmapDeletionVector::SerializeToBytes( + const std::shared_ptr& pool) { + std::shared_ptr bitmap_bytes = roaring_bitmap_.Serialize(pool.get()); + if (bitmap_bytes == nullptr) { + assert(bitmap_bytes); + return Status::Invalid("roaring bitmap serialize failed"); + } + MemorySegmentOutputStream output(MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool); + output.WriteValue(MAGIC_NUMBER); + output.WriteBytes(bitmap_bytes); + return MemorySegmentUtils::CopyToBytes(output.Segments(), /*offset=*/0, output.CurrentSize(), + pool.get()); +} + +Status BitmapDeletionVector::CheckPosition(int64_t position) const { + if (position > RoaringBitmap32::MAX_VALUE) { + return Status::Invalid(fmt::format( + "The file has too many rows, RoaringBitmap32 only supports files with row count " + "not exceeding {}.", + RoaringBitmap32::MAX_VALUE)); + } + return Status::OK(); +} + +Result> BitmapDeletionVector::Deserialize(const char* buffer, + int32_t length, + MemoryPool* pool) { + auto in = std::make_shared(buffer, length); + DataInputStream input(in); + PAIMON_ASSIGN_OR_RAISE(int32_t magic_num, input.ReadValue()); + if (magic_num != MAGIC_NUMBER) { + return Status::Invalid(fmt::format( + "Unable to deserialize deletion vector, invalid magic number: {}", magic_num)); + } + RoaringBitmap32 roaring_bitmap; + PAIMON_RETURN_NOT_OK(roaring_bitmap.Deserialize(buffer + MAGIC_NUMBER_SIZE_BYTES, + length - MAGIC_NUMBER_SIZE_BYTES)); + return pool->AllocateUnique(roaring_bitmap); +} + +} // namespace paimon diff --git a/src/paimon/core/deletionvectors/bitmap_deletion_vector.h b/src/paimon/core/deletionvectors/bitmap_deletion_vector.h index ee9ff317..21361dba 100644 --- a/src/paimon/core/deletionvectors/bitmap_deletion_vector.h +++ b/src/paimon/core/deletionvectors/bitmap_deletion_vector.h @@ -18,16 +18,19 @@ #include -#include "paimon/common/io/memory_segment_output_stream.h" +#include "paimon/common/io/data_output_stream.h" #include "paimon/core/deletionvectors/deletion_vector.h" -#include "paimon/io/byte_array_input_stream.h" -#include "paimon/io/data_input_stream.h" #include "paimon/utils/roaring_bitmap32.h" + namespace paimon { + /// A `DeletionVector` based on `RoaringBitmap32`, it only supports files with row count /// not exceeding `RoaringBitmap32::MAX_VALUE`. class BitmapDeletionVector : public DeletionVector { public: + static constexpr int32_t MAGIC_NUMBER = 1581511376; + static constexpr int32_t MAGIC_NUMBER_SIZE_BYTES = 4; + explicit BitmapDeletionVector(const RoaringBitmap32& roaring_bitmap) : roaring_bitmap_(roaring_bitmap) {} @@ -51,51 +54,27 @@ class BitmapDeletionVector : public DeletionVector { return roaring_bitmap_.IsEmpty(); } - Result> SerializeToBytes( - const std::shared_ptr& pool) override { - std::shared_ptr bitmap_bytes = roaring_bitmap_.Serialize(pool.get()); - if (bitmap_bytes == nullptr) { - assert(bitmap_bytes); - return Status::Invalid("roaring bitmap serialize failed"); - } - MemorySegmentOutputStream output(/*segment_size=*/1024, pool); - output.WriteValue(MAGIC_NUMBER); - output.WriteBytes(bitmap_bytes); - return MemorySegmentUtils::CopyToBytes(output.Segments(), /*offset=*/0, - output.CurrentSize(), pool.get()); + int64_t GetCardinality() const override { + return roaring_bitmap_.Cardinality(); } + Result SerializeTo(const std::shared_ptr& pool, + DataOutputStream* out) override; + + Result> SerializeToBytes( + const std::shared_ptr& pool) override; + const RoaringBitmap32* GetBitmap() const { return &roaring_bitmap_; } static Result> Deserialize(const char* buffer, int32_t length, - MemoryPool* pool) { - auto in = std::make_shared(buffer, length); - DataInputStream input(in); - PAIMON_ASSIGN_OR_RAISE(int32_t magic_num, input.ReadValue()); - if (magic_num != MAGIC_NUMBER) { - return Status::Invalid("Invalid magic number: ", std::to_string(magic_num)); - } - RoaringBitmap32 roaring_bitmap; - PAIMON_RETURN_NOT_OK(roaring_bitmap.Deserialize(buffer + sizeof(MAGIC_NUMBER), - length - sizeof(MAGIC_NUMBER))); - return pool->AllocateUnique(roaring_bitmap); - } - - static constexpr int32_t MAGIC_NUMBER = 1581511376; + MemoryPool* pool); private: - Status CheckPosition(int64_t position) const { - if (position > RoaringBitmap32::MAX_VALUE) { - return Status::Invalid( - "The file has too many rows, RoaringBitmap32 only supports files with row count " - "not exceeding 2147483647."); - } - return Status::OK(); - } + Status CheckPosition(int64_t position) const; - private: RoaringBitmap32 roaring_bitmap_; }; + } // namespace paimon diff --git a/src/paimon/core/deletionvectors/bitmap_deletion_vector_test.cpp b/src/paimon/core/deletionvectors/bitmap_deletion_vector_test.cpp new file mode 100644 index 00000000..318817fb --- /dev/null +++ b/src/paimon/core/deletionvectors/bitmap_deletion_vector_test.cpp @@ -0,0 +1,122 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/deletionvectors/bitmap_deletion_vector.h" + +#include + +#include "gtest/gtest.h" +#include "paimon/common/io/memory_segment_output_stream.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/fs/file_system_factory.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(BitmapDeletionVectorTest, BasicOperations) { + RoaringBitmap32 roaring; + BitmapDeletionVector dv(roaring); + ASSERT_TRUE(dv.IsEmpty()); + for (int32_t i = 0; i < 2000; i += 2) { + ASSERT_OK(dv.Delete(i)); + } + ASSERT_EQ(dv.GetCardinality(), 1000); + for (int32_t i = 0; i < 2000; ++i) { + if (i % 2 == 0) { + ASSERT_TRUE(dv.IsDeleted(i).value()); + } else { + ASSERT_FALSE(dv.IsDeleted(i).value()); + } + } +} + +TEST(BitmapDeletionVectorTest, CheckedDelete) { + RoaringBitmap32 roaring; + BitmapDeletionVector dv(roaring); + ASSERT_TRUE(dv.CheckedDelete(42).value()); + ASSERT_FALSE(dv.CheckedDelete(42).value()); + ASSERT_TRUE(dv.IsDeleted(42).value()); +} + +TEST(BitmapDeletionVectorTest, SerializeAndDeserialize) { + RoaringBitmap32 roaring; + for (int32_t i = 0; i < 100; i += 3) { + roaring.Add(i); + } + BitmapDeletionVector dv(roaring); + auto pool = GetDefaultPool(); + ASSERT_OK_AND_ASSIGN(auto bytes, dv.SerializeToBytes(pool)); + ASSERT_OK_AND_ASSIGN( + auto dv2, BitmapDeletionVector::Deserialize(bytes->data(), bytes->size(), pool.get())); + for (int32_t i = 0; i < 100; ++i) { + ASSERT_EQ(dv.IsDeleted(i).value(), dv2->IsDeleted(i).value()); + } +} + +TEST(BitmapDeletionVectorTest, SerializeToOutputStream) { + RoaringBitmap32 roaring; + for (int32_t i = 0; i < 50; i += 2) { + roaring.Add(i); + } + BitmapDeletionVector dv(roaring); + auto pool = GetDefaultPool(); + auto dir = UniqueTestDirectory::Create(); + auto path = PathUtil::JoinPath(dir->Str(), "dv"); + ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFactory::Get("local", path, {})); + ASSERT_OK_AND_ASSIGN(std::shared_ptr output_stream, + fs->Create(path, /*overwrite=*/false)); + DataOutputStream out(output_stream); + ASSERT_OK_AND_ASSIGN(int64_t size, dv.SerializeTo(pool, &out)); + ASSERT_OK(output_stream->Flush()); + ASSERT_OK(output_stream->Close()); + ASSERT_OK_AND_ASSIGN(std::shared_ptr input_stream, fs->Open(path)); + DataInputStream in(input_stream); + ASSERT_OK_AND_ASSIGN(int32_t byte_len, in.ReadValue()); + auto bytes = Bytes::AllocateBytes(byte_len, pool.get()); + ASSERT_OK(in.Read(bytes->data(), bytes->size())); + const char* data = bytes->data(); + ASSERT_EQ(bytes->size(), size); + ASSERT_OK_AND_ASSIGN(auto dv2, + BitmapDeletionVector::Deserialize(data, bytes->size(), pool.get())); + for (int32_t i = 0; i < 50; ++i) { + ASSERT_EQ(dv.IsDeleted(i).value(), dv2->IsDeleted(i).value()); + } +} + +TEST(BitmapDeletionVectorTest, GetCardinality) { + RoaringBitmap32 empty; + BitmapDeletionVector dv_empty(empty); + ASSERT_EQ(dv_empty.GetCardinality(), 0); + + RoaringBitmap32 cont; + for (int i = 0; i < 100; ++i) cont.Add(i); + BitmapDeletionVector dv_cont(cont); + ASSERT_EQ(dv_cont.GetCardinality(), 100); + + RoaringBitmap32 gap; + for (int i = 0; i < 1000; i += 10) gap.Add(i); + BitmapDeletionVector dv_gap(gap); + ASSERT_EQ(dv_gap.GetCardinality(), 100); + + RoaringBitmap32 del; + for (int i = 0; i < 10; ++i) del.Add(i); + BitmapDeletionVector dv_del(del); + ASSERT_EQ(dv_del.GetCardinality(), 10); + ASSERT_OK(dv_del.Delete(100)); + ASSERT_EQ(dv_del.GetCardinality(), 11); +} + +} // namespace paimon::test diff --git a/src/paimon/core/deletionvectors/deletion_file_writer.cpp b/src/paimon/core/deletionvectors/deletion_file_writer.cpp new file mode 100644 index 00000000..13963bc1 --- /dev/null +++ b/src/paimon/core/deletionvectors/deletion_file_writer.cpp @@ -0,0 +1,61 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/deletionvectors/deletion_file_writer.h" + +#include "paimon/common/io/data_output_stream.h" +#include "paimon/core/deletionvectors/deletion_vectors_index_file.h" + +namespace paimon { + +Result> DeletionFileWriter::Create( + const std::shared_ptr& path_factory, const std::shared_ptr& fs, + const std::shared_ptr& pool) { + std::string path = path_factory->NewPath(); + bool is_external_path = path_factory->IsExternalPath(); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr out, fs->Create(path, /*overwrite=*/true)); + DataOutputStream output_stream(out); + PAIMON_RETURN_NOT_OK(output_stream.WriteValue(DeletionVectorsIndexFile::VERSION_ID_V1)); + return std::unique_ptr( + new DeletionFileWriter(path, is_external_path, out, pool)); +} + +Status DeletionFileWriter::Write(const std::string& key, + const std::shared_ptr& deletion_vector) { + PAIMON_ASSIGN_OR_RAISE(int64_t start, out_->GetPos()); + if (start < 0 || start > std::numeric_limits::max()) { + return Status::Invalid(fmt::format("Output position {} out of int32 range.", start)); + } + DataOutputStream output_stream(out_); + PAIMON_ASSIGN_OR_RAISE(int32_t length, deletion_vector->SerializeTo(pool_, &output_stream)); + dv_metas_.insert(key, DeletionVectorMeta(key, static_cast(start), length, + deletion_vector->GetCardinality())); + return Status::OK(); +} + +Result> DeletionFileWriter::GetResult() const { + int64_t length = output_bytes_; + if (length < 0 || length > std::numeric_limits::max()) { + return Status::Invalid( + fmt::format("Deletion file result length {} out of int32 range.", length)); + } + return std::make_unique( + DeletionVectorsIndexFile::DELETION_VECTORS_INDEX, PathUtil::GetName(path_), length, + dv_metas_.size(), dv_metas_, + is_external_path_ ? std::optional(path_) : std::optional()); +} + +} // namespace paimon diff --git a/src/paimon/core/deletionvectors/deletion_file_writer.h b/src/paimon/core/deletionvectors/deletion_file_writer.h new file mode 100644 index 00000000..064c4aec --- /dev/null +++ b/src/paimon/core/deletionvectors/deletion_file_writer.h @@ -0,0 +1,66 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "fmt/format.h" +#include "paimon/common/utils/linked_hash_map.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/core/deletionvectors/deletion_vector.h" +#include "paimon/core/index/deletion_vector_meta.h" +#include "paimon/core/index/index_path_factory.h" +#include "paimon/fs/file_system.h" + +namespace paimon { + +/// Writer to write deletion file. +class DeletionFileWriter { + public: + static Result> Create( + const std::shared_ptr& path_factory, + const std::shared_ptr& fs, const std::shared_ptr& pool); + + Result GetPos() const { + return out_->GetPos(); + } + + Status Write(const std::string& key, const std::shared_ptr& deletion_vector); + + Status Close() { + PAIMON_RETURN_NOT_OK(out_->Flush()); + PAIMON_ASSIGN_OR_RAISE(output_bytes_, out_->GetPos()); + return out_->Close(); + } + + Result> GetResult() const; + + private: + DeletionFileWriter(const std::string& path, bool is_external_path, + std::shared_ptr& out, const std::shared_ptr& pool) + : path_(path), is_external_path_(is_external_path), out_(std::move(out)), pool_(pool) {} + + std::string path_; + bool is_external_path_; + std::shared_ptr out_; + std::shared_ptr pool_; + LinkedHashMap dv_metas_; + int64_t output_bytes_ = -1; +}; + +} // namespace paimon diff --git a/src/paimon/core/deletionvectors/deletion_vector.h b/src/paimon/core/deletionvectors/deletion_vector.h index c443a37f..48368ba6 100644 --- a/src/paimon/core/deletionvectors/deletion_vector.h +++ b/src/paimon/core/deletionvectors/deletion_vector.h @@ -31,6 +31,7 @@ namespace paimon { class FileSystem; +class DataOutputStream; struct DeletionFile; /// The DeletionVector can efficiently record the positions of rows that are deleted in a file, @@ -72,6 +73,13 @@ class DeletionVector { /// @return true if the deletion vector is empty, false if it contains deletions. virtual bool IsEmpty() const = 0; + /// @return the number of distinct integers added to the DeletionVector. + virtual int64_t GetCardinality() const = 0; + + /// Serializes the deletion vector. + virtual Result SerializeTo(const std::shared_ptr& pool, + DataOutputStream* out) = 0; + /// Serializes the deletion vector to a byte array for storage or transmission. /// /// @return A byte array representing the serialized deletion vector. diff --git a/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.cpp b/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.cpp new file mode 100644 index 00000000..7f533520 --- /dev/null +++ b/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.cpp @@ -0,0 +1,42 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/deletionvectors/deletion_vector_index_file_writer.h" + +#include "paimon/common/utils/scope_guard.h" +#include "paimon/core/deletionvectors/deletion_file_writer.h" + +namespace paimon { + +Result> DeletionVectorIndexFileWriter::WriteSingleFile( + const std::map>& input) { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr writer, + DeletionFileWriter::Create(index_path_factory_, fs_, pool_)); + ScopeGuard guard([&]() { + if (writer) { + (void)writer->Close(); + } + }); + for (const auto& [key, value] : input) { + PAIMON_RETURN_NOT_OK(writer->Write(key, value)); + } + guard.Release(); + PAIMON_RETURN_NOT_OK(writer->Close()); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr result, writer->GetResult()); + return result; +} + +} // namespace paimon diff --git a/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.h b/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.h new file mode 100644 index 00000000..687d16e9 --- /dev/null +++ b/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.h @@ -0,0 +1,47 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/core/deletionvectors/deletion_vector.h" +#include "paimon/core/index/index_path_factory.h" +#include "paimon/fs/file_system.h" +#include "paimon/result.h" + +namespace paimon { + +/// Writer for deletion vector index file. +class DeletionVectorIndexFileWriter { + public: + DeletionVectorIndexFileWriter(const std::shared_ptr& fs, + const std::shared_ptr& path_factory, + int64_t target_size_per_index_file, + const std::shared_ptr& pool) + : index_path_factory_(path_factory), fs_(fs), pool_(pool) {} + + Result> WriteSingleFile( + const std::map>& input); + + private: + std::shared_ptr index_path_factory_; + std::shared_ptr fs_; + std::shared_ptr pool_; +}; + +} // namespace paimon diff --git a/src/paimon/core/deletionvectors/deletion_vectors_index_file.cpp b/src/paimon/core/deletionvectors/deletion_vectors_index_file.cpp new file mode 100644 index 00000000..a4323baa --- /dev/null +++ b/src/paimon/core/deletionvectors/deletion_vectors_index_file.cpp @@ -0,0 +1,33 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/deletionvectors/deletion_vectors_index_file.h" + +#include "paimon/core/deletionvectors/deletion_vector_index_file_writer.h" + +namespace paimon { + +Result> DeletionVectorsIndexFile::WriteSingleFile( + const std::map>& input) { + return CreateWriter()->WriteSingleFile(input); +} + +std::shared_ptr DeletionVectorsIndexFile::CreateWriter() const { + return std::make_shared(fs_, path_factory_, + target_size_per_index_file_, pool_); +} + +} // namespace paimon diff --git a/src/paimon/core/deletionvectors/deletion_vectors_index_file.h b/src/paimon/core/deletionvectors/deletion_vectors_index_file.h index f715f957..18b3da35 100644 --- a/src/paimon/core/deletionvectors/deletion_vectors_index_file.h +++ b/src/paimon/core/deletionvectors/deletion_vectors_index_file.h @@ -15,12 +15,49 @@ */ #pragma once + +#include +#include #include + +#include "paimon/core/deletionvectors/deletion_vector.h" +#include "paimon/core/index/index_file.h" +#include "paimon/core/index/index_file_meta.h" + namespace paimon { + +class DeletionVectorIndexFileWriter; + /// DeletionVectors index file. -// TODO(xinyu.lxy): add IndexFile -class DeletionVectorsIndexFile { +class DeletionVectorsIndexFile : public IndexFile { public: static constexpr char DELETION_VECTORS_INDEX[] = "DELETION_VECTORS"; + static constexpr int8_t VERSION_ID_V1 = 1; + + DeletionVectorsIndexFile(const std::shared_ptr& fs, + const std::shared_ptr& path_factory, + int64_t target_size_per_index_file, bool bitmap64, + const std::shared_ptr& pool) + : IndexFile(fs, path_factory), + target_size_per_index_file_(target_size_per_index_file), + bitmap64_(bitmap64), + pool_(pool) {} + + ~DeletionVectorsIndexFile() override = default; + + bool Bitmap64() const { + return bitmap64_; + } + + Result> WriteSingleFile( + const std::map>& input); + + private: + std::shared_ptr CreateWriter() const; + + const int64_t target_size_per_index_file_; + const bool bitmap64_; + std::shared_ptr pool_; }; + } // namespace paimon diff --git a/src/paimon/core/deletionvectors/deletion_vectors_index_file_test.cpp b/src/paimon/core/deletionvectors/deletion_vectors_index_file_test.cpp new file mode 100644 index 00000000..292d2585 --- /dev/null +++ b/src/paimon/core/deletionvectors/deletion_vectors_index_file_test.cpp @@ -0,0 +1,83 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/deletionvectors/deletion_vectors_index_file.h" + +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/core/deletionvectors/bitmap_deletion_vector.h" +#include "paimon/core/index/index_file_meta.h" +#include "paimon/fs/file_system_factory.h" +#include "paimon/testing/mock/mock_index_path_factory.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(DeletionVectorsIndexFileTest, Basic) { + auto dir = UniqueTestDirectory::Create(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr fs, + FileSystemFactory::Get("local", dir->Str(), {})); + auto path_factory = std::make_shared(dir->Str()); + auto pool = GetDefaultPool(); + DeletionVectorsIndexFile index_file( + fs, path_factory, /*target_size_per_index_file=*/1024 * 1024, /*bitmap64=*/false, pool); + + std::map> input; + RoaringBitmap32 roaring_1; + for (int32_t i = 0; i < 10; ++i) { + roaring_1.Add(i); + } + input["dv1"] = std::make_shared(roaring_1); + RoaringBitmap32 roaring_2; + for (int32_t i = 100; i < 110; ++i) { + roaring_2.Add(i); + } + input["dv2"] = std::make_shared(roaring_2); + + ASSERT_FALSE(index_file.Bitmap64()); + ASSERT_OK_AND_ASSIGN(auto meta, index_file.WriteSingleFile(input)); + ASSERT_GT(meta->FileSize(), 0); + ASSERT_EQ(meta->IndexType(), DeletionVectorsIndexFile::DELETION_VECTORS_INDEX); + ASSERT_EQ(meta->FileName(), "index-0"); + ASSERT_EQ(meta->ExternalPath(), std::nullopt); +} + +TEST(DeletionVectorsIndexFileTest, ExternalPathAndIndexFileMeta) { + auto dir = UniqueTestDirectory::Create(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr fs, + FileSystemFactory::Get("local", dir->Str(), {})); + auto path_factory = std::make_shared(dir->Str()); + path_factory->SetExternal(true); + auto pool = GetDefaultPool(); + DeletionVectorsIndexFile index_file(fs, path_factory, + /*target_size_per_index_file=*/1024 * 1024, + /*bitmap64=*/false, pool); + + std::map> input; + RoaringBitmap32 roaring; + for (int32_t i = 0; i < 5; ++i) { + roaring.Add(i); + } + input["dv_ext"] = std::make_shared(roaring); + + ASSERT_OK_AND_ASSIGN(auto meta, index_file.WriteSingleFile(input)); + ASSERT_EQ(meta->ExternalPath().value(), PathUtil::JoinPath(dir->Str(), "index-0")); +} + +} // namespace paimon::test diff --git a/src/paimon/core/index/index_file.h b/src/paimon/core/index/index_file.h new file mode 100644 index 00000000..a48082c4 --- /dev/null +++ b/src/paimon/core/index/index_file.h @@ -0,0 +1,69 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/core/index/index_path_factory.h" +#include "paimon/fs/file_system.h" +#include "paimon/result.h" + +namespace paimon { + +// Base index file +class IndexFile { + public: + IndexFile(const std::shared_ptr& fs, + const std::shared_ptr& path_factory) + : fs_(fs), path_factory_(path_factory) {} + virtual ~IndexFile() = default; + + virtual std::string Path(const std::shared_ptr& file) const { + return path_factory_->ToPath(file); + } + + virtual Result FileSize(const std::shared_ptr& file) const { + return FileSize(Path(file)); + } + + virtual Result FileSize(const std::string& file) const { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr file_status, fs_->GetFileStatus(file)); + return file_status->GetLen(); + } + + virtual void Delete(const std::shared_ptr& file) const { + // Deletion is best-effort + auto status = fs_->Delete(Path(file), /*recursive=*/false); + (void)status; + } + + virtual Result Exists(const std::shared_ptr& file) const { + return fs_->Exists(Path(file)); + } + + virtual bool IsExternalPath() const { + return path_factory_->IsExternalPath(); + } + + protected: + std::shared_ptr fs_; + std::shared_ptr path_factory_; +}; + +} // namespace paimon diff --git a/src/paimon/testing/mock/mock_index_path_factory.h b/src/paimon/testing/mock/mock_index_path_factory.h new file mode 100644 index 00000000..7d5c979d --- /dev/null +++ b/src/paimon/testing/mock/mock_index_path_factory.h @@ -0,0 +1,56 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/common/utils/path_util.h" +#include "paimon/core/index/index_path_factory.h" + +namespace paimon::test { + +class MockIndexPathFactory : public IndexPathFactory { + public: + explicit MockIndexPathFactory(const std::string& index_path) : index_path_(index_path) {} + + std::string NewPath() const override { + return PathUtil::JoinPath(index_path_, std::string(IndexPathFactory::INDEX_PREFIX) + + std::to_string(index_file_count_->fetch_add(1))); + } + std::string ToPath(const std::string& file_name) const override { + return PathUtil::JoinPath(index_path_, file_name); + } + std::string ToPath(const std::shared_ptr& file) const override { + return PathUtil::JoinPath(index_path_, file->FileName()); + } + bool IsExternalPath() const override { + return external_; + } + void SetExternal(bool v) { + external_ = v; + } + + private: + std::string index_path_; + bool external_ = false; + std::shared_ptr> index_file_count_ = + std::make_shared>(0); +}; + +} // namespace paimon::test