Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ if(PAIMON_BUILD_TESTS)
common/file_index/bsi/bit_slice_index_roaring_bitmap_test.cpp
common/file_index/bloomfilter/bloom_filter_file_index_test.cpp
common/file_index/bloomfilter/fast_hash_test.cpp
common/file_index/rangebitmap/range_bitmap_file_index_test.cpp
common/global_index/complete_index_score_batch_reader_test.cpp
common/global_index/global_index_result_test.cpp
common/global_index/global_indexer_factory_test.cpp
Expand Down
10 changes: 9 additions & 1 deletion src/paimon/common/file_index/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,15 @@ set(PAIMON_FILE_INDEX_SRC
bsi/bit_slice_index_roaring_bitmap.cpp
bloomfilter/bloom_filter_file_index.cpp
bloomfilter/bloom_filter_file_index_factory.cpp
bloomfilter/fast_hash.cpp)
bloomfilter/fast_hash.cpp
rangebitmap/range_bitmap_file_index.cpp
rangebitmap/range_bitmap_file_index_factory.cpp
rangebitmap/range_bitmap.cpp
rangebitmap/bit_slice_index_bitmap.cpp
rangebitmap/dictionary/chunked_dictionary.cpp
rangebitmap/dictionary/fixed_length_chunk.cpp
rangebitmap/dictionary/key_factory.cpp
rangebitmap/utils/literal_serialization_utils.cpp)

add_paimon_lib(paimon_file_index
SOURCES
Expand Down
49 changes: 47 additions & 2 deletions src/paimon/common/file_index/file_index_format_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
#include "paimon/common/file_index/bloomfilter/bloom_filter_file_index.h"
#include "paimon/common/file_index/bsi/bit_slice_index_bitmap_file_index.h"
#include "paimon/common/file_index/empty/empty_file_index_reader.h"
#include "paimon/common/file_index/rangebitmap/range_bitmap.h"
#include "paimon/common/file_index/rangebitmap/range_bitmap_file_index.h"
#include "paimon/data/timestamp.h"
#include "paimon/defs.h"
#include "paimon/file_index/file_index_result.h"
#include "paimon/file_index/file_indexer_factory.h"
#include "paimon/fs/local/local_file_system.h"
#include "paimon/io/byte_array_input_stream.h"
#include "paimon/memory/memory_pool.h"
Expand Down Expand Up @@ -149,6 +151,50 @@ TEST_F(FileIndexFormatTest, TestSimple) {
}
}

// index file generated by paimon Java implementation
// type: int32
// data: 17,3,5,7,9,null,null,10
TEST_F(FileIndexFormatTest, TestRangeBitmapCompatibleWithJava) {
const auto schema = arrow::schema({arrow::field("data", arrow::int32())});
const auto index_file_bytes =
std::make_unique<std::vector<uint8_t>>(std::initializer_list<uint8_t>{
0, 5, 78, 78, 208, 26, 53, 174, 0, 0, 0, 1, 0, 0, 0, 56, 0, 0, 0,
1, 0, 4, 100, 97, 116, 97, 0, 0, 0, 1, 0, 12, 114, 97, 110, 103, 101, 45,
98, 105, 116, 109, 97, 112, 0, 0, 0, 56, 0, 0, 0, 210, 0, 0, 0, 0, 0,
0, 0, 21, 1, 0, 0, 0, 8, 0, 0, 0, 6, 0, 0, 0, 3, 0, 0, 0,
17, 0, 0, 0, 66, 0, 0, 0, 13, 1, 0, 0, 0, 1, 0, 0, 0, 4, 0,
0, 0, 25, 0, 0, 0, 0, 1, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 5, 0, 0, 0, 20, 0, 0, 0, 4, 0, 0, 0, 5, 0, 0,
0, 7, 0, 0, 0, 9, 0, 0, 0, 10, 0, 0, 0, 17, 0, 0, 0, 34, 1,
3, 0, 0, 0, 19, 0, 0, 0, 24, 0, 0, 0, 0, 0, 0, 0, 22, 0, 0,
0, 22, 0, 0, 0, 20, 0, 0, 0, 42, 0, 0, 0, 20, 59, 48, 0, 0, 1,
0, 0, 5, 0, 2, 0, 0, 0, 4, 0, 7, 0, 0, 0, 58, 48, 0, 0, 1,
0, 0, 0, 0, 0, 2, 0, 16, 0, 0, 0, 0, 0, 2, 0, 4, 0, 58, 48,
0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 3, 0, 4, 0, 58,
48, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 0, 0, 7, 0,
});
const auto input_stream = std::make_shared<ByteArrayInputStream>(
reinterpret_cast<char*>(index_file_bytes->data()), index_file_bytes->size());
ASSERT_OK_AND_ASSIGN(const auto reader, FileIndexFormat::CreateReader(input_stream, pool_));
ASSERT_OK_AND_ASSIGN(const auto index_file_readers,
reader->ReadColumnIndex("data", CreateArrowSchema(schema).get()));
ASSERT_EQ(1, index_file_readers.size());
auto* range_bitmap_reader =
dynamic_cast<RangeBitmapFileIndexReader*>(index_file_readers[0].get());
ASSERT_TRUE(range_bitmap_reader);

ASSERT_OK_AND_ASSIGN(const auto eq_result, range_bitmap_reader->VisitEqual(Literal(3)));
ASSERT_TRUE(eq_result);
ASSERT_EQ(eq_result->ToString(), "{1}");

ASSERT_OK_AND_ASSIGN(const auto lt_result, range_bitmap_reader->VisitLessThan(Literal(10)));
ASSERT_TRUE(lt_result);
ASSERT_EQ(lt_result->ToString(), "{1,2,3,4}");

ASSERT_OK_AND_ASSIGN(const auto gt_result, range_bitmap_reader->VisitIsNull());
ASSERT_EQ(gt_result->ToString(), "{5,6}");
}

// NOLINTNEXTLINE(google-readability-function-size)
TEST_F(FileIndexFormatTest, TestBitmapIndexWithTimestamp) {
auto schema = arrow::schema({
Expand Down Expand Up @@ -816,5 +862,4 @@ TEST_F(FileIndexFormatTest, TestBitmapIndexWithTimestamp) {
check_nano("ts_nano");
check_nano("ts_tz_nano");
}

} // namespace paimon::test
281 changes: 281 additions & 0 deletions src/paimon/common/file_index/rangebitmap/bit_slice_index_bitmap.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "paimon/common/file_index/rangebitmap/bit_slice_index_bitmap.h"

#include <fmt/format.h>

#include <algorithm>
#include <cmath>
#include <string>

#include "paimon/common/io/memory_segment_output_stream.h"
#include "paimon/common/memory/memory_segment_utils.h"
#include "paimon/io/data_input_stream.h"
#include "paimon/result.h"
#include "paimon/status.h"

namespace paimon {

Result<std::unique_ptr<BitSliceIndexBitmap>> BitSliceIndexBitmap::Create(
const std::shared_ptr<MemoryPool>& pool, const std::shared_ptr<InputStream>& input_stream,
const int32_t offset) {
auto data_in = std::make_unique<DataInputStream>(input_stream);
PAIMON_RETURN_NOT_OK(data_in->Seek(offset));
PAIMON_ASSIGN_OR_RAISE(int32_t header_length, data_in->ReadValue<int32_t>());
PAIMON_ASSIGN_OR_RAISE(int8_t version, data_in->ReadValue<int8_t>());
if (version != CURRENT_VERSION) {
return Status::Invalid(fmt::format("Unknown BitSliceBitmap version: {}, Expected: {}",
version, CURRENT_VERSION));
}
PAIMON_ASSIGN_OR_RAISE(int8_t slices_size, data_in->ReadValue<int8_t>());
auto slices = std::vector<std::unique_ptr<RoaringBitmap32>>();
slices.resize(slices_size);
PAIMON_ASSIGN_OR_RAISE(int32_t ebm_size, data_in->ReadValue<int32_t>());
PAIMON_ASSIGN_OR_RAISE(int32_t indexes_length, data_in->ReadValue<int32_t>());
auto indexes = Bytes::AllocateBytes(indexes_length, pool.get());
PAIMON_RETURN_NOT_OK(data_in->Read(indexes->data(), indexes_length));
auto body_offset = offset + sizeof(int32_t) + header_length;
return std::make_unique<BitSliceIndexBitmap>(pool, indexes_length, std::move(indexes), ebm_size,
slices_size, input_stream, body_offset);
}

static int32_t NumberOfLeadingZeros(const int64_t value) {
if (value == 0) {
return 64;
}
return __builtin_clzll(static_cast<uint64_t>(value));
}

static int32_t NumberOfTrailingZeros(const int64_t value) {
if (value == 0) {
return 64;
}
return __builtin_ctzll(static_cast<uint64_t>(value));
}

BitSliceIndexBitmap::BitSliceIndexBitmap(const std::shared_ptr<MemoryPool>& pool,
const int32_t indexes_length,
PAIMON_UNIQUE_PTR<Bytes> indexes, const int32_t ebm_length,
const int32_t slices_size,
const std::shared_ptr<InputStream>& input_stream,
const int32_t body_offset)
: pool_(pool),
initialized_(false),
bit_slices_(std::vector<std::optional<RoaringBitmap32>>(slices_size, std::nullopt)),
ebm(std::nullopt),
input_stream_(input_stream),
body_offset_(body_offset),
indexes_(std::move(indexes)),
ebm_length_(ebm_length),
indexes_length_(indexes_length) {}

Result<const RoaringBitmap32*> BitSliceIndexBitmap::GetEmptyBitmap() {
if (!ebm.has_value()) {
PAIMON_RETURN_NOT_OK(input_stream_->Seek(body_offset_, FS_SEEK_SET));
const auto bytes = Bytes::AllocateBytes(ebm_length_, pool_.get());
PAIMON_RETURN_NOT_OK(input_stream_->Read(bytes->data(), ebm_length_));
RoaringBitmap32 bitmap;
PAIMON_RETURN_NOT_OK(bitmap.Deserialize(bytes->data(), ebm_length_));
ebm = bitmap;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ebm = std::move(bitmap);
bit_slices_[idx] = std::move(bitmap);

}
return &ebm.value();
}

Result<const RoaringBitmap32*> BitSliceIndexBitmap::GetSliceBitmap(const int32_t idx) {
if (!bit_slices_[idx].has_value()) {
auto data_in = std::make_unique<DataInputStream>(
std::make_shared<ByteArrayInputStream>(indexes_->data(), indexes_length_));
const int position = static_cast<int32_t>(2 * sizeof(int32_t) * idx);
PAIMON_RETURN_NOT_OK(data_in->Seek(position));
PAIMON_ASSIGN_OR_RAISE(int32_t offset, data_in->ReadValue<int32_t>());
PAIMON_ASSIGN_OR_RAISE(int32_t length, data_in->ReadValue<int32_t>());
PAIMON_RETURN_NOT_OK(input_stream_->Seek(body_offset_ + ebm_length_ + offset, FS_SEEK_SET));
RoaringBitmap32 bitmap;
const auto bytes = Bytes::AllocateBytes(length, pool_.get());
PAIMON_RETURN_NOT_OK(input_stream_->Read(bytes->data(), length));
PAIMON_RETURN_NOT_OK(bitmap.Deserialize(bytes->data(), length));
bit_slices_[idx] = bitmap;
}
return &bit_slices_[idx].value();
}

/// Batch load slices from start to end to avoid unnecessary IO
Status BitSliceIndexBitmap::LoadSlices(const int32_t start, const int32_t end) {
if (initialized_) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like GetSliceBitmap and LoadSlices have significant code duplication. Consider having GetSliceBitmap delegate to LoadSlices(idx, idx + 1) instead of reimplementing similar logic. Could you evaluate if this refactoring improves clarity and simplify?

return Status::OK();
}
auto indexes_stream = std::make_shared<ByteArrayInputStream>(indexes_->data(), indexes_length_);
auto data_in = std::make_unique<DataInputStream>(indexes_stream);
auto position = static_cast<int32_t>(2 * sizeof(int32_t) * start);
PAIMON_RETURN_NOT_OK(data_in->Seek(position));
PAIMON_ASSIGN_OR_RAISE(int32_t offset, data_in->ReadValue<int32_t>());
PAIMON_ASSIGN_OR_RAISE(int32_t length, data_in->ReadValue<int32_t>());
std::vector<int32_t> lengths(end);
lengths[start] = length;

for (int32_t i = start + 1; i < end; ++i) {
PAIMON_RETURN_NOT_OK(data_in->ReadValue<int32_t>());
PAIMON_ASSIGN_OR_RAISE(int32_t slice_length, data_in->ReadValue<int32_t>());
lengths[i] = slice_length;
length += slice_length;
}
PAIMON_RETURN_NOT_OK(input_stream_->Seek(body_offset_ + ebm_length_ + offset, FS_SEEK_SET));
const auto bytes = Bytes::AllocateBytes(length, pool_.get());
PAIMON_RETURN_NOT_OK(input_stream_->Read(bytes->data(), length));
int32_t byte_position = 0;
for (int32_t i = start; i < end; ++i) {
const int32_t slice_length = lengths[i];
RoaringBitmap32 bitmap;
PAIMON_RETURN_NOT_OK(bitmap.Deserialize(bytes->data() + byte_position, slice_length));
bit_slices_[i] = std::move(bitmap);
byte_position += slice_length;
}
initialized_ = true;
return Status::OK();
}

Result<RoaringBitmap32> BitSliceIndexBitmap::Eq(const int32_t code) {
PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap32* empty_bitmap, GetEmptyBitmap());
auto state = RoaringBitmap32(*empty_bitmap);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetEmptyBitmap() might be a bit confusing — the term "empty" could imply null, but from the code it looks like it returns a non-null bitmap. Would GetNotNullBitmap() be clearer?

if (state.IsEmpty()) {
return RoaringBitmap32();
}
PAIMON_RETURN_NOT_OK(LoadSlices(0, static_cast<int32_t>(bit_slices_.size())));
for (int32_t i = 0; i < static_cast<int32_t>(bit_slices_.size()); i++) {
PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap32* slice_bitmap, GetSliceBitmap(i));
if ((code >> i & 1) == 1) {
state &= *slice_bitmap;
} else {
state -= *slice_bitmap;
}
}
return state;
}

Result<RoaringBitmap32> BitSliceIndexBitmap::Gt(const int32_t code) {
if (code < 0) {
return IsNotNull({});
}
PAIMON_ASSIGN_OR_RAISE(RoaringBitmap32 found_set, IsNotNull({}));
if (found_set.IsEmpty()) {
return RoaringBitmap32();
}
auto state = RoaringBitmap32{};
auto state_inited = false;
const auto start = NumberOfTrailingZeros(~code);
PAIMON_RETURN_NOT_OK(LoadSlices(start, static_cast<int32_t>(bit_slices_.size())));
for (int i = start; i < static_cast<int32_t>(bit_slices_.size()); ++i) {
if (!state_inited) {
PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap32* slice_ptr, GetSliceBitmap(i));
state = *slice_ptr;
state_inited = true;
continue;
}
const auto bit = code >> i & 1;
PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap32* slice_ptr, GetSliceBitmap(i));
if (bit == 1) {
state &= *slice_ptr;
} else {
state |= *slice_ptr;
}
}
if (!state_inited) {
return RoaringBitmap32();
}
return state &= found_set;
}

Result<RoaringBitmap32> BitSliceIndexBitmap::Gte(const int32_t code) {
return Gt(code - 1);
}

Result<RoaringBitmap32> BitSliceIndexBitmap::IsNotNull(const RoaringBitmap32& found_set) {
if (!ebm.has_value()) {
PAIMON_RETURN_NOT_OK(input_stream_->Seek(body_offset_, FS_SEEK_SET));
const auto bytes = Bytes::AllocateBytes(ebm_length_, pool_.get());
PAIMON_RETURN_NOT_OK(input_stream_->Read(bytes->data(), ebm_length_));
RoaringBitmap32 bitmap;
PAIMON_RETURN_NOT_OK(bitmap.Deserialize(bytes->data(), ebm_length_));
ebm = bitmap;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like IsNotNull and GetEmptyBitmatp have some code duplication. Refactor is better.

return found_set.IsEmpty() ? *ebm : *ebm &= found_set;
}

BitSliceIndexBitmap::Appender::Appender(const std::shared_ptr<MemoryPool>& pool, const int32_t min,
const int32_t max)
: pool_(pool), min_(min), max_(max) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is const really necessary for the min and max variables?

ebm_ = RoaringBitmap32{};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, we place memory pool parameters at the end of function parameter lists, and memory pool member variables at the beginning of class member declarations (which helps ensure safe destruction order).

int32_t slices_size = std::max(64 - NumberOfLeadingZeros(max), 1);
slices_.resize(slices_size);
}

Status BitSliceIndexBitmap::Appender::Append(const int32_t key, const int32_t value) {
if (key < 0) {
return Status::Invalid(fmt::format("Invalid key: {}", key));
}
if (value < min_ || value > max_) {
return Status::Invalid(fmt::format("value not in range [{}, {}]", min_, max_));
}
int bits = value;
while (bits != 0) {
slices_[NumberOfTrailingZeros(bits)].Add(key);
bits &= (bits - 1);
}
ebm_.Add(key);
return Status::OK();
}

Result<PAIMON_UNIQUE_PTR<Bytes>> BitSliceIndexBitmap::Appender::Serialize() const {
auto indexes_length = static_cast<int32_t>(2 * sizeof(int32_t) * slices_.size());
PAIMON_UNIQUE_PTR<Bytes> ebm_bytes = ebm_.Serialize(pool_.get());
auto ebm_length = static_cast<int32_t>(ebm_bytes->size());
int32_t header_size = 0;
header_size += sizeof(int8_t); // version
header_size += sizeof(int8_t); // slices size
header_size += sizeof(int32_t); // ebm length
header_size += sizeof(int32_t); // indexes length
header_size += indexes_length;
int32_t offset = 0;
auto data_output_stream = std::make_unique<MemorySegmentOutputStream>(
MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool_);
std::vector<PAIMON_UNIQUE_PTR<Bytes>> slices_bytes_vector{};
std::vector<std::pair<int32_t, int32_t>> indexes_vector{};
for (const auto& slice : slices_) {
auto slice_bytes = slice.Serialize(pool_.get());
auto length = static_cast<int32_t>(slice_bytes->size());
indexes_vector.emplace_back(offset, length);
offset += length;
slices_bytes_vector.emplace_back(std::move(slice_bytes));
}
data_output_stream->WriteValue<int32_t>(header_size);
data_output_stream->WriteValue<int8_t>(CURRENT_VERSION);
data_output_stream->WriteValue<int8_t>(static_cast<int8_t>(slices_.size()));
data_output_stream->WriteValue<int32_t>(ebm_length);
data_output_stream->WriteValue<int32_t>(indexes_length);
for (const auto& [slice_offset, length] : indexes_vector) {
data_output_stream->WriteValue<int32_t>(slice_offset);
data_output_stream->WriteValue<int32_t>(length);
}
data_output_stream->Write(ebm_bytes->data(), ebm_length);
for (const auto& slice_bytes : slices_bytes_vector) {
data_output_stream->Write(slice_bytes->data(), slice_bytes->size());
}
return MemorySegmentUtils::CopyToBytes(data_output_stream->Segments(), 0,
static_cast<int32_t>(data_output_stream->CurrentSize()),
pool_.get());
}
} // namespace paimon
Loading