Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ set(PAIMON_COMMON_SRCS
common/types/data_type_json_parser.cpp
common/types/row_kind.cpp
common/types/row_type.cpp
common/utils/arrow/arrow_utils.cpp
common/utils/arrow/mem_utils.cpp
common/utils/binary_row_partition_computer.cpp
common/utils/bit_set.cpp
Expand Down
12 changes: 7 additions & 5 deletions src/paimon/common/data/columnar/columnar_batch_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,20 @@ class MemoryPool;

struct ColumnarBatchContext {
ColumnarBatchContext(const std::shared_ptr<arrow::StructArray>& struct_array_in,
const arrow::ArrayVector& array_vec_holder_in,
const arrow::ArrayVector& array_vec_in,
const std::shared_ptr<MemoryPool>& pool_in)
: struct_array(struct_array_in), pool(pool_in), array_vec_holder(array_vec_holder_in) {
array_ptrs.reserve(array_vec_holder.size());
for (const auto& array : array_vec_holder) {
: struct_array(struct_array_in), pool(pool_in) {
array_ptrs.reserve(array_vec_in.size());
for (const auto& array : array_vec_in) {
array_ptrs.push_back(array.get());
}
}

/// @note `struct_array` is the data holder for columnar row, ensure that the data life
/// cycle is consistent with the columnar row, `array_ptrs` maybe a subset of
/// `struct_array`, so `struct_array` cannot be used for `GetXXX()`
std::shared_ptr<arrow::StructArray> struct_array;
std::shared_ptr<MemoryPool> pool;
arrow::ArrayVector array_vec_holder;
std::vector<const arrow::Array*> array_ptrs;
};
} // namespace paimon
118 changes: 118 additions & 0 deletions src/paimon/common/utils/arrow/arrow_utils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "paimon/common/utils/arrow/arrow_utils.h"
namespace paimon {
Result<std::shared_ptr<arrow::Schema>> ArrowUtils::DataTypeToSchema(
const std::shared_ptr<arrow::DataType>& data_type) {
if (data_type->id() != arrow::Type::STRUCT) {
return Status::Invalid(
fmt::format("Expected struct data type, actual data type: {}", data_type->ToString()));
}
const auto& struct_type = std::static_pointer_cast<arrow::StructType>(data_type);
return std::make_shared<arrow::Schema>(struct_type->fields());
}

Result<std::vector<int32_t>> ArrowUtils::CreateProjection(
const std::shared_ptr<arrow::Schema>& file_schema, const arrow::FieldVector& read_fields) {
std::vector<int32_t> target_to_src_mapping;
target_to_src_mapping.reserve(read_fields.size());
for (const auto& field : read_fields) {
auto src_field_idx = file_schema->GetFieldIndex(field->name());
if (src_field_idx < 0) {
return Status::Invalid(
fmt::format("Field '{}' not found or duplicate in file schema", field->name()));
}
target_to_src_mapping.push_back(src_field_idx);
}
return target_to_src_mapping;
}

Status ArrowUtils::CheckNullabilityMatch(const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::Array>& data) {
auto struct_array = arrow::internal::checked_pointer_cast<arrow::StructArray>(data);
if (struct_array->num_fields() != schema->num_fields()) {
return Status::Invalid(fmt::format(
"CheckNullabilityMatch failed, data field count {} mismatch schema field count {}",
struct_array->num_fields(), schema->num_fields()));
}
for (int32_t i = 0; i < schema->num_fields(); i++) {
PAIMON_RETURN_NOT_OK(InnerCheckNullabilityMatch(schema->field(i), struct_array->field(i)));
}
return Status::OK();
}

void ArrowUtils::TraverseArray(const std::shared_ptr<arrow::Array>& array) {
arrow::Type::type type = array->type()->id();
switch (type) {
case arrow::Type::type::DICTIONARY: {
auto* dict_array = arrow::internal::checked_cast<arrow::DictionaryArray*>(array.get());
[[maybe_unused]] auto dict = dict_array->dictionary();
return;
}
case arrow::Type::type::STRUCT: {
auto* struct_array = arrow::internal::checked_cast<arrow::StructArray*>(array.get());
for (const auto& field : struct_array->fields()) {
TraverseArray(field);
}
return;
}
case arrow::Type::type::MAP: {
auto* map_array = arrow::internal::checked_cast<arrow::MapArray*>(array.get());
TraverseArray(map_array->keys());
TraverseArray(map_array->items());
return;
}
case arrow::Type::type::LIST: {
auto* list_array = arrow::internal::checked_cast<arrow::ListArray*>(array.get());
TraverseArray(list_array->values());
return;
}
default:
return;
}
}

Status ArrowUtils::InnerCheckNullabilityMatch(const std::shared_ptr<arrow::Field>& field,
const std::shared_ptr<arrow::Array>& data) {
if (PAIMON_UNLIKELY(!field->nullable() && data->null_count() != 0)) {
return Status::Invalid(fmt::format(
"CheckNullabilityMatch failed, field {} not nullable while data have null value",
field->name()));
}
auto type = field->type();
if (type->id() == arrow::Type::STRUCT) {
auto struct_type = arrow::internal::checked_pointer_cast<arrow::StructType>(field->type());
auto struct_array = arrow::internal::checked_pointer_cast<arrow::StructArray>(data);
for (int32_t i = 0; i < struct_type->num_fields(); ++i) {
PAIMON_RETURN_NOT_OK(
InnerCheckNullabilityMatch(struct_type->field(i), struct_array->field(i)));
}
} else if (type->id() == arrow::Type::LIST) {
auto list_type = arrow::internal::checked_pointer_cast<arrow::ListType>(field->type());
auto list_array = arrow::internal::checked_pointer_cast<arrow::ListArray>(data);
PAIMON_RETURN_NOT_OK(
InnerCheckNullabilityMatch(list_type->value_field(), list_array->values()));
} else if (type->id() == arrow::Type::MAP) {
auto map_type = arrow::internal::checked_pointer_cast<arrow::MapType>(field->type());
auto map_array = arrow::internal::checked_pointer_cast<arrow::MapArray>(data);
PAIMON_RETURN_NOT_OK(InnerCheckNullabilityMatch(map_type->key_field(), map_array->keys()));
PAIMON_RETURN_NOT_OK(
InnerCheckNullabilityMatch(map_type->item_field(), map_array->items()));
}
return Status::OK();
}
} // namespace paimon
77 changes: 11 additions & 66 deletions src/paimon/common/utils/arrow/arrow_utils.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024-present Alibaba Inc.
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,83 +24,28 @@

namespace paimon {

class ArrowUtils {
class PAIMON_EXPORT ArrowUtils {
public:
ArrowUtils() = delete;
~ArrowUtils() = delete;

static Result<std::shared_ptr<arrow::Schema>> DataTypeToSchema(
const std::shared_ptr<arrow::DataType>& data_type) {
if (data_type->id() != arrow::Type::STRUCT) {
return Status::Invalid(fmt::format("Expected struct data type, actual data type: {}",
data_type->ToString()));
}
const auto& struct_type = std::static_pointer_cast<arrow::StructType>(data_type);
return std::make_shared<arrow::Schema>(struct_type->fields());
}
const std::shared_ptr<arrow::DataType>& data_type);

static Result<std::vector<int32_t>> CreateProjection(
const std::shared_ptr<arrow::Schema>& file_schema, const arrow::FieldVector& read_fields) {
std::vector<int32_t> target_to_src_mapping;
target_to_src_mapping.reserve(read_fields.size());
for (const auto& field : read_fields) {
auto src_field_idx = file_schema->GetFieldIndex(field->name());
if (src_field_idx < 0) {
return Status::Invalid(
fmt::format("Field '{}' not found or duplicate in file schema", field->name()));
}
target_to_src_mapping.push_back(src_field_idx);
}
return target_to_src_mapping;
}
const std::shared_ptr<arrow::Schema>& file_schema, const arrow::FieldVector& read_fields);

static Status CheckNullabilityMatch(const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::Array>& data) {
auto struct_array = arrow::internal::checked_pointer_cast<arrow::StructArray>(data);
if (struct_array->num_fields() != schema->num_fields()) {
return Status::Invalid(fmt::format(
"CheckNullabilityMatch failed, data field count {} mismatch schema field count {}",
struct_array->num_fields(), schema->num_fields()));
}
for (int32_t i = 0; i < schema->num_fields(); i++) {
PAIMON_RETURN_NOT_OK(
InnerCheckNullabilityMatch(schema->field(i), struct_array->field(i)));
}
return Status::OK();
}
const std::shared_ptr<arrow::Array>& data);

// For struct array, arrow is unsafe for fields() and field(); for dict array, arrow is unsafe
// for dictionary(). Therefore, access array in advance before merge sort and projection to
// avoid subsequent multi-threading problems.
static void TraverseArray(const std::shared_ptr<arrow::Array>& array);

private:
static Status InnerCheckNullabilityMatch(const std::shared_ptr<arrow::Field>& field,
const std::shared_ptr<arrow::Array>& data) {
if (PAIMON_UNLIKELY(!field->nullable() && data->null_count() != 0)) {
return Status::Invalid(fmt::format(
"CheckNullabilityMatch failed, field {} not nullable while data have null value",
field->name()));
}
auto type = field->type();
if (type->id() == arrow::Type::STRUCT) {
auto struct_type =
arrow::internal::checked_pointer_cast<arrow::StructType>(field->type());
auto struct_array = arrow::internal::checked_pointer_cast<arrow::StructArray>(data);
for (int32_t i = 0; i < struct_type->num_fields(); ++i) {
PAIMON_RETURN_NOT_OK(
InnerCheckNullabilityMatch(struct_type->field(i), struct_array->field(i)));
}
} else if (type->id() == arrow::Type::LIST) {
auto list_type = arrow::internal::checked_pointer_cast<arrow::ListType>(field->type());
auto list_array = arrow::internal::checked_pointer_cast<arrow::ListArray>(data);
PAIMON_RETURN_NOT_OK(
InnerCheckNullabilityMatch(list_type->value_field(), list_array->values()));
} else if (type->id() == arrow::Type::MAP) {
auto map_type = arrow::internal::checked_pointer_cast<arrow::MapType>(field->type());
auto map_array = arrow::internal::checked_pointer_cast<arrow::MapArray>(data);
PAIMON_RETURN_NOT_OK(
InnerCheckNullabilityMatch(map_type->key_field(), map_array->keys()));
PAIMON_RETURN_NOT_OK(
InnerCheckNullabilityMatch(map_type->item_field(), map_array->items()));
}
return Status::OK();
}
const std::shared_ptr<arrow::Array>& data);
};

} // namespace paimon
36 changes: 2 additions & 34 deletions src/paimon/core/io/key_value_data_file_record_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
#include "paimon/common/data/columnar/columnar_row_ref.h"
#include "paimon/common/table/special_fields.h"
#include "paimon/common/types/row_kind.h"
#include "paimon/common/utils/arrow/arrow_utils.h"
#include "paimon/common/utils/arrow/status_utils.h"
#include "paimon/status.h"

namespace paimon {
class MemoryPool;

Expand Down Expand Up @@ -137,7 +137,7 @@ Result<std::unique_ptr<KeyValueRecordReader::Iterator>> KeyValueDataFileRecordRe
value_fields_ = value_struct_array_->fields();
key_ctx_ = std::make_shared<ColumnarBatchContext>(nullptr, key_fields_, pool_);
value_ctx_ = std::make_shared<ColumnarBatchContext>(value_struct_array_, value_fields_, pool_);
TraverseArray(value_struct_array_);
ArrowUtils::TraverseArray(value_struct_array_);
return std::make_unique<KeyValueDataFileRecordReader::Iterator>(this);
}

Expand All @@ -151,36 +151,4 @@ void KeyValueDataFileRecordReader::Reset() {
sequence_number_array_.reset();
row_kind_array_.reset();
}

void KeyValueDataFileRecordReader::TraverseArray(const std::shared_ptr<arrow::Array>& array) {
arrow::Type::type type = array->type()->id();
switch (type) {
case arrow::Type::type::DICTIONARY: {
auto* dict_array = arrow::internal::checked_cast<arrow::DictionaryArray*>(array.get());
[[maybe_unused]] auto dict = dict_array->dictionary();
return;
}
case arrow::Type::type::STRUCT: {
auto* struct_array = arrow::internal::checked_cast<arrow::StructArray*>(array.get());
for (const auto& field : struct_array->fields()) {
TraverseArray(field);
}
return;
}
case arrow::Type::type::MAP: {
auto* map_array = arrow::internal::checked_cast<arrow::MapArray*>(array.get());
TraverseArray(map_array->keys());
TraverseArray(map_array->items());
return;
}
case arrow::Type::type::LIST: {
auto* list_array = arrow::internal::checked_cast<arrow::ListArray*>(array.get());
TraverseArray(list_array->values());
return;
}
default:
return;
}
}

} // namespace paimon
6 changes: 0 additions & 6 deletions src/paimon/core/io/key_value_data_file_record_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,6 @@ class KeyValueDataFileRecordReader : public KeyValueRecordReader {
// virtual for test
virtual void Reset();

private:
// For struct array, arrow is unsafe for fields() and field(); for dict array, arrow is unsafe
// for dictionary(). Therefore, access array in advance before merge sort and projection to
// avoid subsequent multi-threading problems.
static void TraverseArray(const std::shared_ptr<arrow::Array>& array);

private:
int32_t key_arity_;
int32_t level_;
Expand Down
3 changes: 2 additions & 1 deletion src/paimon/core/io/key_value_in_memory_record_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
#include "paimon/common/data/columnar/columnar_row.h"
#include "paimon/common/data/internal_row.h"
#include "paimon/common/types/row_kind.h"
#include "paimon/common/utils/arrow/arrow_utils.h"
#include "paimon/common/utils/arrow/status_utils.h"
#include "paimon/core/mergetree/compact/merge_function_wrapper.h"
#include "paimon/core/utils/fields_comparator.h"
#include "paimon/status.h"

namespace paimon {
class MemoryPool;

Expand Down Expand Up @@ -85,6 +85,7 @@ KeyValueInMemoryRecordReader::KeyValueInMemoryRecordReader(
key_comparator_(key_comparator),
merge_function_wrapper_(merge_function_wrapper) {
assert(value_struct_array_);
ArrowUtils::TraverseArray(value_struct_array_);
}

Result<std::unique_ptr<KeyValueRecordReader::Iterator>> KeyValueInMemoryRecordReader::NextBatch() {
Expand Down
2 changes: 0 additions & 2 deletions test/inte/scan_and_read_inte_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2584,7 +2584,6 @@ TEST_P(ScanAndReadInteTest, TestCastTimestampType) {
ASSERT_TRUE(expected->Equals(read_result)) << read_result->ToString();
}

#ifdef PAIMON_ENABLE_AVRO
TEST_F(ScanAndReadInteTest, TestAvroWithAppendTable) {
auto read_data = [](int64_t snapshot_id, const std::string& result_json) {
std::string table_path = GetDataDir() + "/avro/append_multiple.db/append_multiple";
Expand Down Expand Up @@ -2721,6 +2720,5 @@ TEST_F(ScanAndReadInteTest, TestAvroWithPkTable) {
[0, true, 10, 1, 1, 1000, 1.5, 2.5, "Tony", "abcdef", 100, "123.45", [[["key",123]],[1,2,3]]]
])");
}
#endif

} // namespace paimon::test
Loading