From 580aa909d6054fe19efe69d4ebe8632ebe08d86f Mon Sep 17 00:00:00 2001 From: "yonghao.fyh" Date: Sat, 28 Feb 2026 09:06:06 +0800 Subject: [PATCH 1/7] feat: support write for deletion vector --- src/paimon/CMakeLists.txt | 4 + .../bitmap_deletion_vector.cpp | 79 ++++++++++++ .../deletionvectors/bitmap_deletion_vector.h | 55 +++----- .../bitmap_deletion_vector_test.cpp | 122 ++++++++++++++++++ .../deletionvectors/deletion_file_writer.h | 85 ++++++++++++ .../core/deletionvectors/deletion_vector.h | 8 ++ .../deletion_vector_index_file_writer.cpp | 36 ++++++ .../deletion_vector_index_file_writer.h | 51 ++++++++ .../deletion_vectors_index_file.cpp | 33 +++++ .../deletion_vectors_index_file.h | 41 +++++- src/paimon/core/index/index_file.h | 66 ++++++++++ 11 files changed, 540 insertions(+), 40 deletions(-) create mode 100644 src/paimon/core/deletionvectors/bitmap_deletion_vector.cpp create mode 100644 src/paimon/core/deletionvectors/bitmap_deletion_vector_test.cpp create mode 100644 src/paimon/core/deletionvectors/deletion_file_writer.h create mode 100644 src/paimon/core/deletionvectors/deletion_vector_index_file_writer.cpp create mode 100644 src/paimon/core/deletionvectors/deletion_vector_index_file_writer.h create mode 100644 src/paimon/core/deletionvectors/deletion_vectors_index_file.cpp create mode 100644 src/paimon/core/index/index_file.h diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index f180aae3..0824a76f 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -169,6 +169,9 @@ set(PAIMON_CORE_SRCS core/catalog/identifier.cpp core/core_options.cpp core/deletionvectors/deletion_vector.cpp + core/deletionvectors/bitmap_deletion_vector.cpp + core/deletionvectors/deletion_vectors_index_file.cpp + core/deletionvectors/deletion_vector_index_file_writer.cpp core/global_index/global_index_evaluator_impl.cpp core/global_index/global_index_scan.cpp core/global_index/global_index_scan_impl.cpp @@ -480,6 +483,7 @@ if(PAIMON_BUILD_TESTS) core/catalog/identifier_test.cpp core/core_options_test.cpp core/deletionvectors/apply_deletion_vector_batch_reader_test.cpp + core/deletionvectors/bitmap_deletion_vector_test.cpp core/deletionvectors/deletion_vector_test.cpp core/index/index_in_data_file_dir_path_factory_test.cpp core/index/deletion_vector_meta_test.cpp diff --git a/src/paimon/core/deletionvectors/bitmap_deletion_vector.cpp b/src/paimon/core/deletionvectors/bitmap_deletion_vector.cpp new file mode 100644 index 00000000..649d3490 --- /dev/null +++ b/src/paimon/core/deletionvectors/bitmap_deletion_vector.cpp @@ -0,0 +1,79 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/deletionvectors/bitmap_deletion_vector.h" + +#include "arrow/util/crc32.h" +#include "fmt/format.h" +#include "paimon/common/io/memory_segment_output_stream.h" +#include "paimon/io/byte_array_input_stream.h" +#include "paimon/io/data_input_stream.h" + +namespace paimon { + +Result BitmapDeletionVector::SerializeTo(const std::shared_ptr& pool, + DataOutputStream* out) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr data, SerializeToBytes(pool)); + int64_t size = data->size(); + PAIMON_RETURN_NOT_OK(out->WriteValue(static_cast(size))); + PAIMON_RETURN_NOT_OK(out->WriteBytes(data)); + uint32_t crc32 = 0; + crc32 = arrow::internal::crc32(crc32, data->data(), size); + PAIMON_RETURN_NOT_OK(out->WriteValue(static_cast(crc32))); + return static_cast(size); +} + +Result> BitmapDeletionVector::SerializeToBytes( + const std::shared_ptr& pool) { + std::shared_ptr bitmap_bytes = roaring_bitmap_.Serialize(pool.get()); + if (bitmap_bytes == nullptr) { + assert(bitmap_bytes); + return Status::Invalid("roaring bitmap serialize failed"); + } + MemorySegmentOutputStream output(MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool); + output.WriteValue(MAGIC_NUMBER); + output.WriteBytes(bitmap_bytes); + return MemorySegmentUtils::CopyToBytes(output.Segments(), /*offset=*/0, output.CurrentSize(), + pool.get()); +} + +Status BitmapDeletionVector::CheckPosition(int64_t position) const { + if (position > RoaringBitmap32::MAX_VALUE) { + return Status::Invalid(fmt::format( + "The file has too many rows, RoaringBitmap32 only supports files with row count " + "not exceeding {}.", + RoaringBitmap32::MAX_VALUE)); + } + return Status::OK(); +} + +Result> BitmapDeletionVector::Deserialize(const char* buffer, + int32_t length, + MemoryPool* pool) { + auto in = std::make_shared(buffer, length); + DataInputStream input(in); + PAIMON_ASSIGN_OR_RAISE(int32_t magic_num, input.ReadValue()); + if (magic_num != MAGIC_NUMBER) { + return Status::Invalid(fmt::format( + "Unable to deserialize deletion vector, invalid magic number: {}", magic_num)); + } + RoaringBitmap32 roaring_bitmap; + PAIMON_RETURN_NOT_OK(roaring_bitmap.Deserialize(buffer + MAGIC_NUMBER_SIZE_BYTES, + length - MAGIC_NUMBER_SIZE_BYTES)); + return pool->AllocateUnique(roaring_bitmap); +} + +} // namespace paimon diff --git a/src/paimon/core/deletionvectors/bitmap_deletion_vector.h b/src/paimon/core/deletionvectors/bitmap_deletion_vector.h index ee9ff317..21361dba 100644 --- a/src/paimon/core/deletionvectors/bitmap_deletion_vector.h +++ b/src/paimon/core/deletionvectors/bitmap_deletion_vector.h @@ -18,16 +18,19 @@ #include -#include "paimon/common/io/memory_segment_output_stream.h" +#include "paimon/common/io/data_output_stream.h" #include "paimon/core/deletionvectors/deletion_vector.h" -#include "paimon/io/byte_array_input_stream.h" -#include "paimon/io/data_input_stream.h" #include "paimon/utils/roaring_bitmap32.h" + namespace paimon { + /// A `DeletionVector` based on `RoaringBitmap32`, it only supports files with row count /// not exceeding `RoaringBitmap32::MAX_VALUE`. class BitmapDeletionVector : public DeletionVector { public: + static constexpr int32_t MAGIC_NUMBER = 1581511376; + static constexpr int32_t MAGIC_NUMBER_SIZE_BYTES = 4; + explicit BitmapDeletionVector(const RoaringBitmap32& roaring_bitmap) : roaring_bitmap_(roaring_bitmap) {} @@ -51,51 +54,27 @@ class BitmapDeletionVector : public DeletionVector { return roaring_bitmap_.IsEmpty(); } - Result> SerializeToBytes( - const std::shared_ptr& pool) override { - std::shared_ptr bitmap_bytes = roaring_bitmap_.Serialize(pool.get()); - if (bitmap_bytes == nullptr) { - assert(bitmap_bytes); - return Status::Invalid("roaring bitmap serialize failed"); - } - MemorySegmentOutputStream output(/*segment_size=*/1024, pool); - output.WriteValue(MAGIC_NUMBER); - output.WriteBytes(bitmap_bytes); - return MemorySegmentUtils::CopyToBytes(output.Segments(), /*offset=*/0, - output.CurrentSize(), pool.get()); + int64_t GetCardinality() const override { + return roaring_bitmap_.Cardinality(); } + Result SerializeTo(const std::shared_ptr& pool, + DataOutputStream* out) override; + + Result> SerializeToBytes( + const std::shared_ptr& pool) override; + const RoaringBitmap32* GetBitmap() const { return &roaring_bitmap_; } static Result> Deserialize(const char* buffer, int32_t length, - MemoryPool* pool) { - auto in = std::make_shared(buffer, length); - DataInputStream input(in); - PAIMON_ASSIGN_OR_RAISE(int32_t magic_num, input.ReadValue()); - if (magic_num != MAGIC_NUMBER) { - return Status::Invalid("Invalid magic number: ", std::to_string(magic_num)); - } - RoaringBitmap32 roaring_bitmap; - PAIMON_RETURN_NOT_OK(roaring_bitmap.Deserialize(buffer + sizeof(MAGIC_NUMBER), - length - sizeof(MAGIC_NUMBER))); - return pool->AllocateUnique(roaring_bitmap); - } - - static constexpr int32_t MAGIC_NUMBER = 1581511376; + MemoryPool* pool); private: - Status CheckPosition(int64_t position) const { - if (position > RoaringBitmap32::MAX_VALUE) { - return Status::Invalid( - "The file has too many rows, RoaringBitmap32 only supports files with row count " - "not exceeding 2147483647."); - } - return Status::OK(); - } + Status CheckPosition(int64_t position) const; - private: RoaringBitmap32 roaring_bitmap_; }; + } // namespace paimon diff --git a/src/paimon/core/deletionvectors/bitmap_deletion_vector_test.cpp b/src/paimon/core/deletionvectors/bitmap_deletion_vector_test.cpp new file mode 100644 index 00000000..318817fb --- /dev/null +++ b/src/paimon/core/deletionvectors/bitmap_deletion_vector_test.cpp @@ -0,0 +1,122 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/deletionvectors/bitmap_deletion_vector.h" + +#include + +#include "gtest/gtest.h" +#include "paimon/common/io/memory_segment_output_stream.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/fs/file_system_factory.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(BitmapDeletionVectorTest, BasicOperations) { + RoaringBitmap32 roaring; + BitmapDeletionVector dv(roaring); + ASSERT_TRUE(dv.IsEmpty()); + for (int32_t i = 0; i < 2000; i += 2) { + ASSERT_OK(dv.Delete(i)); + } + ASSERT_EQ(dv.GetCardinality(), 1000); + for (int32_t i = 0; i < 2000; ++i) { + if (i % 2 == 0) { + ASSERT_TRUE(dv.IsDeleted(i).value()); + } else { + ASSERT_FALSE(dv.IsDeleted(i).value()); + } + } +} + +TEST(BitmapDeletionVectorTest, CheckedDelete) { + RoaringBitmap32 roaring; + BitmapDeletionVector dv(roaring); + ASSERT_TRUE(dv.CheckedDelete(42).value()); + ASSERT_FALSE(dv.CheckedDelete(42).value()); + ASSERT_TRUE(dv.IsDeleted(42).value()); +} + +TEST(BitmapDeletionVectorTest, SerializeAndDeserialize) { + RoaringBitmap32 roaring; + for (int32_t i = 0; i < 100; i += 3) { + roaring.Add(i); + } + BitmapDeletionVector dv(roaring); + auto pool = GetDefaultPool(); + ASSERT_OK_AND_ASSIGN(auto bytes, dv.SerializeToBytes(pool)); + ASSERT_OK_AND_ASSIGN( + auto dv2, BitmapDeletionVector::Deserialize(bytes->data(), bytes->size(), pool.get())); + for (int32_t i = 0; i < 100; ++i) { + ASSERT_EQ(dv.IsDeleted(i).value(), dv2->IsDeleted(i).value()); + } +} + +TEST(BitmapDeletionVectorTest, SerializeToOutputStream) { + RoaringBitmap32 roaring; + for (int32_t i = 0; i < 50; i += 2) { + roaring.Add(i); + } + BitmapDeletionVector dv(roaring); + auto pool = GetDefaultPool(); + auto dir = UniqueTestDirectory::Create(); + auto path = PathUtil::JoinPath(dir->Str(), "dv"); + ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFactory::Get("local", path, {})); + ASSERT_OK_AND_ASSIGN(std::shared_ptr output_stream, + fs->Create(path, /*overwrite=*/false)); + DataOutputStream out(output_stream); + ASSERT_OK_AND_ASSIGN(int64_t size, dv.SerializeTo(pool, &out)); + ASSERT_OK(output_stream->Flush()); + ASSERT_OK(output_stream->Close()); + ASSERT_OK_AND_ASSIGN(std::shared_ptr input_stream, fs->Open(path)); + DataInputStream in(input_stream); + ASSERT_OK_AND_ASSIGN(int32_t byte_len, in.ReadValue()); + auto bytes = Bytes::AllocateBytes(byte_len, pool.get()); + ASSERT_OK(in.Read(bytes->data(), bytes->size())); + const char* data = bytes->data(); + ASSERT_EQ(bytes->size(), size); + ASSERT_OK_AND_ASSIGN(auto dv2, + BitmapDeletionVector::Deserialize(data, bytes->size(), pool.get())); + for (int32_t i = 0; i < 50; ++i) { + ASSERT_EQ(dv.IsDeleted(i).value(), dv2->IsDeleted(i).value()); + } +} + +TEST(BitmapDeletionVectorTest, GetCardinality) { + RoaringBitmap32 empty; + BitmapDeletionVector dv_empty(empty); + ASSERT_EQ(dv_empty.GetCardinality(), 0); + + RoaringBitmap32 cont; + for (int i = 0; i < 100; ++i) cont.Add(i); + BitmapDeletionVector dv_cont(cont); + ASSERT_EQ(dv_cont.GetCardinality(), 100); + + RoaringBitmap32 gap; + for (int i = 0; i < 1000; i += 10) gap.Add(i); + BitmapDeletionVector dv_gap(gap); + ASSERT_EQ(dv_gap.GetCardinality(), 100); + + RoaringBitmap32 del; + for (int i = 0; i < 10; ++i) del.Add(i); + BitmapDeletionVector dv_del(del); + ASSERT_EQ(dv_del.GetCardinality(), 10); + ASSERT_OK(dv_del.Delete(100)); + ASSERT_EQ(dv_del.GetCardinality(), 11); +} + +} // namespace paimon::test diff --git a/src/paimon/core/deletionvectors/deletion_file_writer.h b/src/paimon/core/deletionvectors/deletion_file_writer.h new file mode 100644 index 00000000..3ef4bf8c --- /dev/null +++ b/src/paimon/core/deletionvectors/deletion_file_writer.h @@ -0,0 +1,85 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/common/io/data_output_stream.h" +#include "paimon/common/utils/linked_hash_map.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/core/deletionvectors/deletion_vectors_index_file.h" +#include "paimon/core/index/deletion_vector_meta.h" +#include "paimon/fs/file_system.h" + +namespace paimon { + +/// Writer to write deletion file. +class DeletionFileWriter { + public: + static Result> Create( + const std::shared_ptr& path_factory, + const std::shared_ptr& fs, const std::shared_ptr& pool) { + std::string path = path_factory->NewPath(); + bool is_external_path = path_factory->IsExternalPath(); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr out, + fs->Create(path, /*overwrite=*/true)); + DataOutputStream output_stream(out); + PAIMON_RETURN_NOT_OK( + output_stream.WriteValue(DeletionVectorsIndexFile::VERSION_ID_V1)); + return std::unique_ptr( + new DeletionFileWriter(path, is_external_path, out, pool)); + } + + Result GetPos() const { + return out_->GetPos(); + } + + Status Write(const std::string& key, const std::shared_ptr& deletion_vector) { + PAIMON_ASSIGN_OR_RAISE(int32_t start, out_->GetPos()); + DataOutputStream output_stream(out_); + PAIMON_ASSIGN_OR_RAISE(int32_t length, deletion_vector->SerializeTo(pool_, &output_stream)); + dv_metas_.insert(key, + DeletionVectorMeta(key, start, length, deletion_vector->GetCardinality())); + return Status::OK(); + } + + Status Close() { + return out_->Close(); + } + + Result> Result() const { + PAIMON_ASSIGN_OR_RAISE(int32_t length, GetPos()); + return std::make_unique( + DeletionVectorsIndexFile::DELETION_VECTORS_INDEX, PathUtil::GetName(path_), length, + dv_metas_.size(), dv_metas_, + is_external_path_ ? std::optional(path_) : std::optional()); + } + + private: + DeletionFileWriter(const std::string& path, bool is_external_path, + std::shared_ptr& out, const std::shared_ptr& pool) + : path_(path), is_external_path_(is_external_path), out_(std::move(out)), pool_(pool) {} + + std::string path_; + bool is_external_path_; + std::shared_ptr out_; + std::shared_ptr pool_; + LinkedHashMap dv_metas_; +}; + +} // namespace paimon diff --git a/src/paimon/core/deletionvectors/deletion_vector.h b/src/paimon/core/deletionvectors/deletion_vector.h index c443a37f..43a1d2aa 100644 --- a/src/paimon/core/deletionvectors/deletion_vector.h +++ b/src/paimon/core/deletionvectors/deletion_vector.h @@ -31,6 +31,7 @@ namespace paimon { class FileSystem; +class DataOutputStream; struct DeletionFile; /// The DeletionVector can efficiently record the positions of rows that are deleted in a file, @@ -72,6 +73,13 @@ class DeletionVector { /// @return true if the deletion vector is empty, false if it contains deletions. virtual bool IsEmpty() const = 0; + /// @return the number of distinct integers added to the DeletionVector. */ + virtual int64_t GetCardinality() const = 0; + + /// Serializes the deletion vector. + virtual Result SerializeTo(const std::shared_ptr& pool, + DataOutputStream* out) = 0; + /// Serializes the deletion vector to a byte array for storage or transmission. /// /// @return A byte array representing the serialized deletion vector. diff --git a/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.cpp b/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.cpp new file mode 100644 index 00000000..35534910 --- /dev/null +++ b/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.cpp @@ -0,0 +1,36 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/deletionvectors/deletion_vector_index_file_writer.h" + +#include "paimon/core/deletionvectors/deletion_file_writer.h" + +namespace paimon { + +Result> DeletionVectorIndexFileWriter::WriteSingleFile( + const std::map>& input) { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr writer, + DeletionFileWriter::Create(index_path_factory_, fs_, pool_)); + for (const auto& [key, value] : input) { + PAIMON_RETURN_NOT_OK(writer->Write(key, value)); + } + // TODO(yonghao.fyh): java close in try final + PAIMON_RETURN_NOT_OK(writer->Close()); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr result, writer->Result()); + return result; +} + +} // namespace paimon diff --git a/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.h b/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.h new file mode 100644 index 00000000..e441a8de --- /dev/null +++ b/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.h @@ -0,0 +1,51 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/core/deletionvectors/deletion_vector.h" +#include "paimon/core/index/index_path_factory.h" +#include "paimon/fs/file_system.h" +#include "paimon/result.h" + +namespace paimon { + +/// Writer for deletion vector index file. +class DeletionVectorIndexFileWriter { + public: + DeletionVectorIndexFileWriter(const std::shared_ptr& fs, + const std::shared_ptr& path_factory, + int64_t target_size_per_index_file, + const std::shared_ptr& pool) + : index_path_factory_(path_factory), + fs_(fs), + // target_size_in_bytes_(target_size_per_index_file), + pool_(pool) {} + + Result> WriteSingleFile( + const std::map>& input); + + private: + std::shared_ptr index_path_factory_; + std::shared_ptr fs_; + // const int64_t target_size_in_bytes_; + std::shared_ptr pool_; +}; + +} // namespace paimon diff --git a/src/paimon/core/deletionvectors/deletion_vectors_index_file.cpp b/src/paimon/core/deletionvectors/deletion_vectors_index_file.cpp new file mode 100644 index 00000000..a4323baa --- /dev/null +++ b/src/paimon/core/deletionvectors/deletion_vectors_index_file.cpp @@ -0,0 +1,33 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/deletionvectors/deletion_vectors_index_file.h" + +#include "paimon/core/deletionvectors/deletion_vector_index_file_writer.h" + +namespace paimon { + +Result> DeletionVectorsIndexFile::WriteSingleFile( + const std::map>& input) { + return CreateWriter()->WriteSingleFile(input); +} + +std::shared_ptr DeletionVectorsIndexFile::CreateWriter() const { + return std::make_shared(fs_, path_factory_, + target_size_per_index_file_, pool_); +} + +} // namespace paimon diff --git a/src/paimon/core/deletionvectors/deletion_vectors_index_file.h b/src/paimon/core/deletionvectors/deletion_vectors_index_file.h index f715f957..18b3da35 100644 --- a/src/paimon/core/deletionvectors/deletion_vectors_index_file.h +++ b/src/paimon/core/deletionvectors/deletion_vectors_index_file.h @@ -15,12 +15,49 @@ */ #pragma once + +#include +#include #include + +#include "paimon/core/deletionvectors/deletion_vector.h" +#include "paimon/core/index/index_file.h" +#include "paimon/core/index/index_file_meta.h" + namespace paimon { + +class DeletionVectorIndexFileWriter; + /// DeletionVectors index file. -// TODO(xinyu.lxy): add IndexFile -class DeletionVectorsIndexFile { +class DeletionVectorsIndexFile : public IndexFile { public: static constexpr char DELETION_VECTORS_INDEX[] = "DELETION_VECTORS"; + static constexpr int8_t VERSION_ID_V1 = 1; + + DeletionVectorsIndexFile(const std::shared_ptr& fs, + const std::shared_ptr& path_factory, + int64_t target_size_per_index_file, bool bitmap64, + const std::shared_ptr& pool) + : IndexFile(fs, path_factory), + target_size_per_index_file_(target_size_per_index_file), + bitmap64_(bitmap64), + pool_(pool) {} + + ~DeletionVectorsIndexFile() override = default; + + bool Bitmap64() const { + return bitmap64_; + } + + Result> WriteSingleFile( + const std::map>& input); + + private: + std::shared_ptr CreateWriter() const; + + const int64_t target_size_per_index_file_; + const bool bitmap64_; + std::shared_ptr pool_; }; + } // namespace paimon diff --git a/src/paimon/core/index/index_file.h b/src/paimon/core/index/index_file.h new file mode 100644 index 00000000..6dbdacc2 --- /dev/null +++ b/src/paimon/core/index/index_file.h @@ -0,0 +1,66 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/core/index/index_path_factory.h" +#include "paimon/fs/file_system.h" +#include "paimon/result.h" + +namespace paimon { + +// Base index file +class IndexFile { + public: + IndexFile(const std::shared_ptr& fs, + const std::shared_ptr& path_factory) + : fs_(fs), path_factory_(path_factory) {} + virtual ~IndexFile() = default; + + virtual std::string Path(const std::shared_ptr& file) const { + return path_factory_->ToPath(file); + } + + virtual Result FileSize(const std::shared_ptr& file) const { + return FileSize(Path(file)); + } + + virtual Result FileSize(const std::string& file) const { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr file_status, fs_->GetFileStatus(file)); + return file_status->GetLen(); + } + + virtual void Delete(const std::shared_ptr& file) const { + [[maybe_unused]] auto s = fs_->Delete(Path(file), /*recursive=*/false); + } + + virtual Result Exists(const std::shared_ptr& file) const { + return fs_->Exists(Path(file)); + } + + virtual bool IsExternalPath() const { + return path_factory_->IsExternalPath(); + } + + protected: + std::shared_ptr fs_; + std::shared_ptr path_factory_; +}; + +} // namespace paimon From 13a2420bf3ab4540136c9cafc8f45321b42cb4e3 Mon Sep 17 00:00:00 2001 From: "yonghao.fyh" Date: Sat, 28 Feb 2026 09:29:55 +0800 Subject: [PATCH 2/7] fix --- src/paimon/core/deletionvectors/deletion_file_writer.h | 2 +- src/paimon/core/deletionvectors/deletion_vector.h | 2 +- .../deletion_vector_index_file_writer.cpp | 10 ++++++++-- src/paimon/core/index/index_file.h | 5 ++++- 4 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/paimon/core/deletionvectors/deletion_file_writer.h b/src/paimon/core/deletionvectors/deletion_file_writer.h index 3ef4bf8c..579641b9 100644 --- a/src/paimon/core/deletionvectors/deletion_file_writer.h +++ b/src/paimon/core/deletionvectors/deletion_file_writer.h @@ -62,7 +62,7 @@ class DeletionFileWriter { return out_->Close(); } - Result> Result() const { + Result> GetResult() const { PAIMON_ASSIGN_OR_RAISE(int32_t length, GetPos()); return std::make_unique( DeletionVectorsIndexFile::DELETION_VECTORS_INDEX, PathUtil::GetName(path_), length, diff --git a/src/paimon/core/deletionvectors/deletion_vector.h b/src/paimon/core/deletionvectors/deletion_vector.h index 43a1d2aa..48368ba6 100644 --- a/src/paimon/core/deletionvectors/deletion_vector.h +++ b/src/paimon/core/deletionvectors/deletion_vector.h @@ -73,7 +73,7 @@ class DeletionVector { /// @return true if the deletion vector is empty, false if it contains deletions. virtual bool IsEmpty() const = 0; - /// @return the number of distinct integers added to the DeletionVector. */ + /// @return the number of distinct integers added to the DeletionVector. virtual int64_t GetCardinality() const = 0; /// Serializes the deletion vector. diff --git a/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.cpp b/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.cpp index 35534910..7f533520 100644 --- a/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.cpp +++ b/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.cpp @@ -16,6 +16,7 @@ #include "paimon/core/deletionvectors/deletion_vector_index_file_writer.h" +#include "paimon/common/utils/scope_guard.h" #include "paimon/core/deletionvectors/deletion_file_writer.h" namespace paimon { @@ -24,12 +25,17 @@ Result> DeletionVectorIndexFileWriter::WriteSingl const std::map>& input) { PAIMON_ASSIGN_OR_RAISE(std::unique_ptr writer, DeletionFileWriter::Create(index_path_factory_, fs_, pool_)); + ScopeGuard guard([&]() { + if (writer) { + (void)writer->Close(); + } + }); for (const auto& [key, value] : input) { PAIMON_RETURN_NOT_OK(writer->Write(key, value)); } - // TODO(yonghao.fyh): java close in try final + guard.Release(); PAIMON_RETURN_NOT_OK(writer->Close()); - PAIMON_ASSIGN_OR_RAISE(std::shared_ptr result, writer->Result()); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr result, writer->GetResult()); return result; } diff --git a/src/paimon/core/index/index_file.h b/src/paimon/core/index/index_file.h index 6dbdacc2..a48082c4 100644 --- a/src/paimon/core/index/index_file.h +++ b/src/paimon/core/index/index_file.h @@ -17,6 +17,7 @@ #pragma once #include +#include #include #include "paimon/core/index/index_path_factory.h" @@ -47,7 +48,9 @@ class IndexFile { } virtual void Delete(const std::shared_ptr& file) const { - [[maybe_unused]] auto s = fs_->Delete(Path(file), /*recursive=*/false); + // Deletion is best-effort + auto status = fs_->Delete(Path(file), /*recursive=*/false); + (void)status; } virtual Result Exists(const std::shared_ptr& file) const { From abcd03c9ba27a0bf929871f329ac182899313cf4 Mon Sep 17 00:00:00 2001 From: "yonghao.fyh" Date: Sat, 28 Feb 2026 09:39:16 +0800 Subject: [PATCH 3/7] fix --- src/paimon/core/deletionvectors/bitmap_deletion_vector.cpp | 3 +++ .../deletionvectors/deletion_vector_index_file_writer.h | 6 +----- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/paimon/core/deletionvectors/bitmap_deletion_vector.cpp b/src/paimon/core/deletionvectors/bitmap_deletion_vector.cpp index 649d3490..66be892d 100644 --- a/src/paimon/core/deletionvectors/bitmap_deletion_vector.cpp +++ b/src/paimon/core/deletionvectors/bitmap_deletion_vector.cpp @@ -28,6 +28,9 @@ Result BitmapDeletionVector::SerializeTo(const std::shared_ptr data, SerializeToBytes(pool)); int64_t size = data->size(); + if (size < 0 || size > std::numeric_limits::max()) { + return Status::Invalid("BitmapDeletionVector serialize size out of range: ", size); + } PAIMON_RETURN_NOT_OK(out->WriteValue(static_cast(size))); PAIMON_RETURN_NOT_OK(out->WriteBytes(data)); uint32_t crc32 = 0; diff --git a/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.h b/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.h index e441a8de..687d16e9 100644 --- a/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.h +++ b/src/paimon/core/deletionvectors/deletion_vector_index_file_writer.h @@ -33,10 +33,7 @@ class DeletionVectorIndexFileWriter { const std::shared_ptr& path_factory, int64_t target_size_per_index_file, const std::shared_ptr& pool) - : index_path_factory_(path_factory), - fs_(fs), - // target_size_in_bytes_(target_size_per_index_file), - pool_(pool) {} + : index_path_factory_(path_factory), fs_(fs), pool_(pool) {} Result> WriteSingleFile( const std::map>& input); @@ -44,7 +41,6 @@ class DeletionVectorIndexFileWriter { private: std::shared_ptr index_path_factory_; std::shared_ptr fs_; - // const int64_t target_size_in_bytes_; std::shared_ptr pool_; }; From f808c975a89cc23f36dc45fbdd20cfd52d3ddd52 Mon Sep 17 00:00:00 2001 From: "yonghao.fyh" Date: Sat, 28 Feb 2026 09:44:34 +0800 Subject: [PATCH 4/7] fix --- src/paimon/core/deletionvectors/deletion_file_writer.h | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/paimon/core/deletionvectors/deletion_file_writer.h b/src/paimon/core/deletionvectors/deletion_file_writer.h index 579641b9..b8c233ca 100644 --- a/src/paimon/core/deletionvectors/deletion_file_writer.h +++ b/src/paimon/core/deletionvectors/deletion_file_writer.h @@ -50,11 +50,14 @@ class DeletionFileWriter { } Status Write(const std::string& key, const std::shared_ptr& deletion_vector) { - PAIMON_ASSIGN_OR_RAISE(int32_t start, out_->GetPos()); + PAIMON_ASSIGN_OR_RAISE(int64_t start, out_->GetPos()); + if (start < 0 || start > std::numeric_limits::max()) { + return Status::Invalid("Output position out of int32 range: ", start); + } DataOutputStream output_stream(out_); PAIMON_ASSIGN_OR_RAISE(int32_t length, deletion_vector->SerializeTo(pool_, &output_stream)); - dv_metas_.insert(key, - DeletionVectorMeta(key, start, length, deletion_vector->GetCardinality())); + dv_metas_.insert(key, DeletionVectorMeta(key, static_cast(start), length, + deletion_vector->GetCardinality())); return Status::OK(); } From 6f0ca5cbeb6e271abe779c3ff4b069c20c832b64 Mon Sep 17 00:00:00 2001 From: "yonghao.fyh" Date: Sat, 28 Feb 2026 09:48:27 +0800 Subject: [PATCH 5/7] fix --- src/paimon/core/deletionvectors/deletion_file_writer.h | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/paimon/core/deletionvectors/deletion_file_writer.h b/src/paimon/core/deletionvectors/deletion_file_writer.h index b8c233ca..5f598616 100644 --- a/src/paimon/core/deletionvectors/deletion_file_writer.h +++ b/src/paimon/core/deletionvectors/deletion_file_writer.h @@ -19,6 +19,7 @@ #include #include +#include "fmt/format.h" #include "paimon/common/io/data_output_stream.h" #include "paimon/common/utils/linked_hash_map.h" #include "paimon/common/utils/path_util.h" @@ -52,7 +53,7 @@ class DeletionFileWriter { Status Write(const std::string& key, const std::shared_ptr& deletion_vector) { PAIMON_ASSIGN_OR_RAISE(int64_t start, out_->GetPos()); if (start < 0 || start > std::numeric_limits::max()) { - return Status::Invalid("Output position out of int32 range: ", start); + return Status::Invalid(fmt::format("Output position {} out of int32 range.", start)); } DataOutputStream output_stream(out_); PAIMON_ASSIGN_OR_RAISE(int32_t length, deletion_vector->SerializeTo(pool_, &output_stream)); @@ -66,7 +67,11 @@ class DeletionFileWriter { } Result> GetResult() const { - PAIMON_ASSIGN_OR_RAISE(int32_t length, GetPos()); + PAIMON_ASSIGN_OR_RAISE(int64_t length, GetPos()); + if (length < 0 || length > std::numeric_limits::max()) { + return Status::Invalid( + fmt::format("Deletion file result length {} out of int32 range.", length)); + } return std::make_unique( DeletionVectorsIndexFile::DELETION_VECTORS_INDEX, PathUtil::GetName(path_), length, dv_metas_.size(), dv_metas_, From 164bcd400ed8f1772a7f61257dc4b4aceeaf65cc Mon Sep 17 00:00:00 2001 From: "yonghao.fyh" Date: Sat, 28 Feb 2026 14:24:22 +0800 Subject: [PATCH 6/7] fix --- src/paimon/CMakeLists.txt | 2 +- .../deletion_vectors_index_file_test.cpp | 73 +++++++++++++++++++ .../testing/mock/mock_index_path_factory.h | 48 ++++++++++++ 3 files changed, 122 insertions(+), 1 deletion(-) create mode 100644 src/paimon/core/deletionvectors/deletion_vectors_index_file_test.cpp create mode 100644 src/paimon/testing/mock/mock_index_path_factory.h diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index 0824a76f..c0aafdd4 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -471,7 +471,6 @@ if(PAIMON_BUILD_TESTS) add_paimon_test(core_test SOURCES core/append/append_only_writer_test.cpp - core/append/bucketed_append_compact_manager_test.cpp core/casting/cast_executor_factory_test.cpp core/casting/cast_executor_test.cpp core/casting/casted_row_test.cpp @@ -485,6 +484,7 @@ if(PAIMON_BUILD_TESTS) core/deletionvectors/apply_deletion_vector_batch_reader_test.cpp core/deletionvectors/bitmap_deletion_vector_test.cpp core/deletionvectors/deletion_vector_test.cpp + core/deletionvectors/deletion_vectors_index_file_test.cpp core/index/index_in_data_file_dir_path_factory_test.cpp core/index/deletion_vector_meta_test.cpp core/index/index_file_meta_serializer_test.cpp diff --git a/src/paimon/core/deletionvectors/deletion_vectors_index_file_test.cpp b/src/paimon/core/deletionvectors/deletion_vectors_index_file_test.cpp new file mode 100644 index 00000000..77aa5c49 --- /dev/null +++ b/src/paimon/core/deletionvectors/deletion_vectors_index_file_test.cpp @@ -0,0 +1,73 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/deletionvectors/deletion_vectors_index_file.h" + +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/core/deletionvectors/bitmap_deletion_vector.h" +#include "paimon/core/index/index_file_meta.h" +#include "paimon/testing/mock/mock_file_system.h" +#include "paimon/testing/mock/mock_index_path_factory.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(DeletionVectorsIndexFileTest, Basic) { + auto fs = std::make_shared(); + auto path_factory = std::make_shared(); + auto pool = GetDefaultPool(); + DeletionVectorsIndexFile index_file(fs, path_factory, 1024 * 1024, false, pool); + + std::map> input; + RoaringBitmap32 roaring_1; + for (int i = 0; i < 10; ++i) { + roaring_1.Add(i) + } + input["dv1"] = std::make_shared(roaring_1); + RoaringBitmap32 roaring_2; + for (int i = 100; i < 110; ++i) { + roaring_2.Add(i); + } + input["dv2"] = std::make_shared(roaring_2); + + ASSERT_OK_AND_ASSIGN(auto meta, index_file.WriteSingleFile(input)); + ASSERT_GT(meta->FileSize(), 0); + ASSERT_EQ(meta->IndexType(), DeletionVectorsIndexFile::DELETION_VECTORS_INDEX); + ASSERT_EQ(meta->FileName(), "mock_path"); + ASSERT_EQ(meta->ExternalPath(), std::nullopt); +} + +TEST(DeletionVectorsIndexFileTest, ExternalPathAndIndexFileMeta) { + auto fs = std::make_shared(); + auto path_factory = std::make_shared(); + path_factory->SetExternal(true); + auto pool = GetDefaultPool(); + DeletionVectorsIndexFile index_file(fs, path_factory, 1024 * 1024, false, pool); + + std::map> input; + RoaringBitmap32 roaring; + for (int i = 0; i < 5; ++i) roaring.Add(i); + input["dv_ext"] = std::make_shared(roaring); + + ASSERT_OK_AND_ASSIGN(auto meta, index_file.WriteSingleFile(input)); + ASSERT_EQ(meta->ExternalPath().value(), "mock_path"); +} + +} // namespace paimon::test diff --git a/src/paimon/testing/mock/mock_index_path_factory.h b/src/paimon/testing/mock/mock_index_path_factory.h new file mode 100644 index 00000000..8495ee91 --- /dev/null +++ b/src/paimon/testing/mock/mock_index_path_factory.h @@ -0,0 +1,48 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/core/index/index_path_factory.h" + +namespace paimon::test { + +class MockIndexPathFactory : public IndexPathFactory { + public: + std::string NewPath() const override { + return "mock_path"; + } + std::string ToPath(const std::string& file_name) const override { + return file_name; + } + std::string ToPath(const std::shared_ptr& file) const override { + return file ? file->FileName() : "mock_file"; + } + bool IsExternalPath() const override { + return external_; + } + void SetExternal(bool v) { + external_ = v; + } + + private: + bool external_ = false; +}; + +} // namespace paimon::test From d08ed9f0191f98c1838647f51346a17433b61245 Mon Sep 17 00:00:00 2001 From: "yonghao.fyh" Date: Sat, 28 Feb 2026 15:28:23 +0800 Subject: [PATCH 7/7] fix --- src/paimon/CMakeLists.txt | 1 + src/paimon/common/io/data_output_stream.cpp | 19 ------ src/paimon/common/io/data_output_stream.h | 20 +++++- .../deletionvectors/deletion_file_writer.cpp | 61 +++++++++++++++++++ .../deletionvectors/deletion_file_writer.h | 43 +++---------- .../deletion_vectors_index_file_test.cpp | 36 +++++++---- .../testing/mock/mock_index_path_factory.h | 14 ++++- 7 files changed, 123 insertions(+), 71 deletions(-) create mode 100644 src/paimon/core/deletionvectors/deletion_file_writer.cpp diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index c0aafdd4..a46d3763 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -169,6 +169,7 @@ set(PAIMON_CORE_SRCS core/catalog/identifier.cpp core/core_options.cpp core/deletionvectors/deletion_vector.cpp + core/deletionvectors/deletion_file_writer.cpp core/deletionvectors/bitmap_deletion_vector.cpp core/deletionvectors/deletion_vectors_index_file.cpp core/deletionvectors/deletion_vector_index_file_writer.cpp diff --git a/src/paimon/common/io/data_output_stream.cpp b/src/paimon/common/io/data_output_stream.cpp index 5252ecea..6d8c82b5 100644 --- a/src/paimon/common/io/data_output_stream.cpp +++ b/src/paimon/common/io/data_output_stream.cpp @@ -16,12 +16,7 @@ #include "paimon/common/io/data_output_stream.h" -#include -#include - #include "fmt/format.h" -#include "paimon/common/utils/math.h" -#include "paimon/fs/file_system.h" #include "paimon/memory/bytes.h" #include "paimon/result.h" @@ -30,20 +25,6 @@ DataOutputStream::DataOutputStream(const std::shared_ptr& output_s : output_stream_(output_stream) { assert(output_stream_); } -template -Status DataOutputStream::WriteValue(const T& value) { - static_assert(std::is_trivially_copyable_v, "T must be trivially copyable"); - T write_value = value; - if (NeedSwap()) { - write_value = EndianSwapValue(value); - } - int32_t write_length = sizeof(T); - PAIMON_ASSIGN_OR_RAISE( - int32_t actual_write_length, - output_stream_->Write(reinterpret_cast(&write_value), write_length)); - PAIMON_RETURN_NOT_OK(AssertWriteLength(write_length, actual_write_length)); - return Status::OK(); -} Status DataOutputStream::WriteBytes(const std::shared_ptr& bytes) { int32_t write_length = bytes->size(); diff --git a/src/paimon/common/io/data_output_stream.h b/src/paimon/common/io/data_output_stream.h index 6d383a11..6c22b47c 100644 --- a/src/paimon/common/io/data_output_stream.h +++ b/src/paimon/common/io/data_output_stream.h @@ -15,11 +15,17 @@ */ #pragma once + +#include #include #include #include +#include +#include "paimon/common/utils/math.h" +#include "paimon/fs/file_system.h" #include "paimon/io/byte_order.h" +#include "paimon/result.h" #include "paimon/status.h" namespace paimon { @@ -33,7 +39,19 @@ class PAIMON_EXPORT DataOutputStream { explicit DataOutputStream(const std::shared_ptr& output_stream); template - Status WriteValue(const T& value); + Status WriteValue(const T& value) { + static_assert(std::is_trivially_copyable_v, "T must be trivially copyable"); + T write_value = value; + if (NeedSwap()) { + write_value = EndianSwapValue(value); + } + int32_t write_length = sizeof(T); + PAIMON_ASSIGN_OR_RAISE( + int32_t actual_write_length, + output_stream_->Write(reinterpret_cast(&write_value), write_length)); + PAIMON_RETURN_NOT_OK(AssertWriteLength(write_length, actual_write_length)); + return Status::OK(); + } Status WriteBytes(const std::shared_ptr& bytes); diff --git a/src/paimon/core/deletionvectors/deletion_file_writer.cpp b/src/paimon/core/deletionvectors/deletion_file_writer.cpp new file mode 100644 index 00000000..13963bc1 --- /dev/null +++ b/src/paimon/core/deletionvectors/deletion_file_writer.cpp @@ -0,0 +1,61 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/deletionvectors/deletion_file_writer.h" + +#include "paimon/common/io/data_output_stream.h" +#include "paimon/core/deletionvectors/deletion_vectors_index_file.h" + +namespace paimon { + +Result> DeletionFileWriter::Create( + const std::shared_ptr& path_factory, const std::shared_ptr& fs, + const std::shared_ptr& pool) { + std::string path = path_factory->NewPath(); + bool is_external_path = path_factory->IsExternalPath(); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr out, fs->Create(path, /*overwrite=*/true)); + DataOutputStream output_stream(out); + PAIMON_RETURN_NOT_OK(output_stream.WriteValue(DeletionVectorsIndexFile::VERSION_ID_V1)); + return std::unique_ptr( + new DeletionFileWriter(path, is_external_path, out, pool)); +} + +Status DeletionFileWriter::Write(const std::string& key, + const std::shared_ptr& deletion_vector) { + PAIMON_ASSIGN_OR_RAISE(int64_t start, out_->GetPos()); + if (start < 0 || start > std::numeric_limits::max()) { + return Status::Invalid(fmt::format("Output position {} out of int32 range.", start)); + } + DataOutputStream output_stream(out_); + PAIMON_ASSIGN_OR_RAISE(int32_t length, deletion_vector->SerializeTo(pool_, &output_stream)); + dv_metas_.insert(key, DeletionVectorMeta(key, static_cast(start), length, + deletion_vector->GetCardinality())); + return Status::OK(); +} + +Result> DeletionFileWriter::GetResult() const { + int64_t length = output_bytes_; + if (length < 0 || length > std::numeric_limits::max()) { + return Status::Invalid( + fmt::format("Deletion file result length {} out of int32 range.", length)); + } + return std::make_unique( + DeletionVectorsIndexFile::DELETION_VECTORS_INDEX, PathUtil::GetName(path_), length, + dv_metas_.size(), dv_metas_, + is_external_path_ ? std::optional(path_) : std::optional()); +} + +} // namespace paimon diff --git a/src/paimon/core/deletionvectors/deletion_file_writer.h b/src/paimon/core/deletionvectors/deletion_file_writer.h index 5f598616..064c4aec 100644 --- a/src/paimon/core/deletionvectors/deletion_file_writer.h +++ b/src/paimon/core/deletionvectors/deletion_file_writer.h @@ -20,11 +20,11 @@ #include #include "fmt/format.h" -#include "paimon/common/io/data_output_stream.h" #include "paimon/common/utils/linked_hash_map.h" #include "paimon/common/utils/path_util.h" -#include "paimon/core/deletionvectors/deletion_vectors_index_file.h" +#include "paimon/core/deletionvectors/deletion_vector.h" #include "paimon/core/index/deletion_vector_meta.h" +#include "paimon/core/index/index_path_factory.h" #include "paimon/fs/file_system.h" namespace paimon { @@ -34,49 +34,21 @@ class DeletionFileWriter { public: static Result> Create( const std::shared_ptr& path_factory, - const std::shared_ptr& fs, const std::shared_ptr& pool) { - std::string path = path_factory->NewPath(); - bool is_external_path = path_factory->IsExternalPath(); - PAIMON_ASSIGN_OR_RAISE(std::shared_ptr out, - fs->Create(path, /*overwrite=*/true)); - DataOutputStream output_stream(out); - PAIMON_RETURN_NOT_OK( - output_stream.WriteValue(DeletionVectorsIndexFile::VERSION_ID_V1)); - return std::unique_ptr( - new DeletionFileWriter(path, is_external_path, out, pool)); - } + const std::shared_ptr& fs, const std::shared_ptr& pool); Result GetPos() const { return out_->GetPos(); } - Status Write(const std::string& key, const std::shared_ptr& deletion_vector) { - PAIMON_ASSIGN_OR_RAISE(int64_t start, out_->GetPos()); - if (start < 0 || start > std::numeric_limits::max()) { - return Status::Invalid(fmt::format("Output position {} out of int32 range.", start)); - } - DataOutputStream output_stream(out_); - PAIMON_ASSIGN_OR_RAISE(int32_t length, deletion_vector->SerializeTo(pool_, &output_stream)); - dv_metas_.insert(key, DeletionVectorMeta(key, static_cast(start), length, - deletion_vector->GetCardinality())); - return Status::OK(); - } + Status Write(const std::string& key, const std::shared_ptr& deletion_vector); Status Close() { + PAIMON_RETURN_NOT_OK(out_->Flush()); + PAIMON_ASSIGN_OR_RAISE(output_bytes_, out_->GetPos()); return out_->Close(); } - Result> GetResult() const { - PAIMON_ASSIGN_OR_RAISE(int64_t length, GetPos()); - if (length < 0 || length > std::numeric_limits::max()) { - return Status::Invalid( - fmt::format("Deletion file result length {} out of int32 range.", length)); - } - return std::make_unique( - DeletionVectorsIndexFile::DELETION_VECTORS_INDEX, PathUtil::GetName(path_), length, - dv_metas_.size(), dv_metas_, - is_external_path_ ? std::optional(path_) : std::optional()); - } + Result> GetResult() const; private: DeletionFileWriter(const std::string& path, bool is_external_path, @@ -88,6 +60,7 @@ class DeletionFileWriter { std::shared_ptr out_; std::shared_ptr pool_; LinkedHashMap dv_metas_; + int64_t output_bytes_ = -1; }; } // namespace paimon diff --git a/src/paimon/core/deletionvectors/deletion_vectors_index_file_test.cpp b/src/paimon/core/deletionvectors/deletion_vectors_index_file_test.cpp index 77aa5c49..292d2585 100644 --- a/src/paimon/core/deletionvectors/deletion_vectors_index_file_test.cpp +++ b/src/paimon/core/deletionvectors/deletion_vectors_index_file_test.cpp @@ -23,51 +23,61 @@ #include "gtest/gtest.h" #include "paimon/core/deletionvectors/bitmap_deletion_vector.h" #include "paimon/core/index/index_file_meta.h" -#include "paimon/testing/mock/mock_file_system.h" +#include "paimon/fs/file_system_factory.h" #include "paimon/testing/mock/mock_index_path_factory.h" #include "paimon/testing/utils/testharness.h" namespace paimon::test { TEST(DeletionVectorsIndexFileTest, Basic) { - auto fs = std::make_shared(); - auto path_factory = std::make_shared(); + auto dir = UniqueTestDirectory::Create(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr fs, + FileSystemFactory::Get("local", dir->Str(), {})); + auto path_factory = std::make_shared(dir->Str()); auto pool = GetDefaultPool(); - DeletionVectorsIndexFile index_file(fs, path_factory, 1024 * 1024, false, pool); + DeletionVectorsIndexFile index_file( + fs, path_factory, /*target_size_per_index_file=*/1024 * 1024, /*bitmap64=*/false, pool); std::map> input; RoaringBitmap32 roaring_1; - for (int i = 0; i < 10; ++i) { - roaring_1.Add(i) + for (int32_t i = 0; i < 10; ++i) { + roaring_1.Add(i); } input["dv1"] = std::make_shared(roaring_1); RoaringBitmap32 roaring_2; - for (int i = 100; i < 110; ++i) { + for (int32_t i = 100; i < 110; ++i) { roaring_2.Add(i); } input["dv2"] = std::make_shared(roaring_2); + ASSERT_FALSE(index_file.Bitmap64()); ASSERT_OK_AND_ASSIGN(auto meta, index_file.WriteSingleFile(input)); ASSERT_GT(meta->FileSize(), 0); ASSERT_EQ(meta->IndexType(), DeletionVectorsIndexFile::DELETION_VECTORS_INDEX); - ASSERT_EQ(meta->FileName(), "mock_path"); + ASSERT_EQ(meta->FileName(), "index-0"); ASSERT_EQ(meta->ExternalPath(), std::nullopt); } TEST(DeletionVectorsIndexFileTest, ExternalPathAndIndexFileMeta) { - auto fs = std::make_shared(); - auto path_factory = std::make_shared(); + auto dir = UniqueTestDirectory::Create(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr fs, + FileSystemFactory::Get("local", dir->Str(), {})); + auto path_factory = std::make_shared(dir->Str()); path_factory->SetExternal(true); auto pool = GetDefaultPool(); - DeletionVectorsIndexFile index_file(fs, path_factory, 1024 * 1024, false, pool); + DeletionVectorsIndexFile index_file(fs, path_factory, + /*target_size_per_index_file=*/1024 * 1024, + /*bitmap64=*/false, pool); std::map> input; RoaringBitmap32 roaring; - for (int i = 0; i < 5; ++i) roaring.Add(i); + for (int32_t i = 0; i < 5; ++i) { + roaring.Add(i); + } input["dv_ext"] = std::make_shared(roaring); ASSERT_OK_AND_ASSIGN(auto meta, index_file.WriteSingleFile(input)); - ASSERT_EQ(meta->ExternalPath().value(), "mock_path"); + ASSERT_EQ(meta->ExternalPath().value(), PathUtil::JoinPath(dir->Str(), "index-0")); } } // namespace paimon::test diff --git a/src/paimon/testing/mock/mock_index_path_factory.h b/src/paimon/testing/mock/mock_index_path_factory.h index 8495ee91..7d5c979d 100644 --- a/src/paimon/testing/mock/mock_index_path_factory.h +++ b/src/paimon/testing/mock/mock_index_path_factory.h @@ -16,23 +16,28 @@ #pragma once +#include #include #include +#include "paimon/common/utils/path_util.h" #include "paimon/core/index/index_path_factory.h" namespace paimon::test { class MockIndexPathFactory : public IndexPathFactory { public: + explicit MockIndexPathFactory(const std::string& index_path) : index_path_(index_path) {} + std::string NewPath() const override { - return "mock_path"; + return PathUtil::JoinPath(index_path_, std::string(IndexPathFactory::INDEX_PREFIX) + + std::to_string(index_file_count_->fetch_add(1))); } std::string ToPath(const std::string& file_name) const override { - return file_name; + return PathUtil::JoinPath(index_path_, file_name); } std::string ToPath(const std::shared_ptr& file) const override { - return file ? file->FileName() : "mock_file"; + return PathUtil::JoinPath(index_path_, file->FileName()); } bool IsExternalPath() const override { return external_; @@ -42,7 +47,10 @@ class MockIndexPathFactory : public IndexPathFactory { } private: + std::string index_path_; bool external_ = false; + std::shared_ptr> index_file_count_ = + std::make_shared>(0); }; } // namespace paimon::test