diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index f180aae3..d02fa95f 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -118,6 +118,7 @@ set(PAIMON_COMMON_SRCS common/types/data_type_json_parser.cpp common/types/row_kind.cpp common/types/row_type.cpp + common/utils/arrow/arrow_utils.cpp common/utils/arrow/mem_utils.cpp common/utils/binary_row_partition_computer.cpp common/utils/bit_set.cpp diff --git a/src/paimon/common/data/columnar/columnar_batch_context.h b/src/paimon/common/data/columnar/columnar_batch_context.h index 5267df66..6da8cd30 100644 --- a/src/paimon/common/data/columnar/columnar_batch_context.h +++ b/src/paimon/common/data/columnar/columnar_batch_context.h @@ -30,18 +30,20 @@ class MemoryPool; struct ColumnarBatchContext { ColumnarBatchContext(const std::shared_ptr& struct_array_in, - const arrow::ArrayVector& array_vec_holder_in, + const arrow::ArrayVector& array_vec_in, const std::shared_ptr& pool_in) - : struct_array(struct_array_in), pool(pool_in), array_vec_holder(array_vec_holder_in) { - array_ptrs.reserve(array_vec_holder.size()); - for (const auto& array : array_vec_holder) { + : struct_array(struct_array_in), pool(pool_in) { + array_ptrs.reserve(array_vec_in.size()); + for (const auto& array : array_vec_in) { array_ptrs.push_back(array.get()); } } + /// @note `struct_array` is the data holder for columnar row, ensure that the data life + /// cycle is consistent with the columnar row, `array_ptrs` maybe a subset of + /// `struct_array`, so `struct_array` cannot be used for `GetXXX()` std::shared_ptr struct_array; std::shared_ptr pool; - arrow::ArrayVector array_vec_holder; std::vector array_ptrs; }; } // namespace paimon diff --git a/src/paimon/common/utils/arrow/arrow_utils.cpp b/src/paimon/common/utils/arrow/arrow_utils.cpp new file mode 100644 index 00000000..ebd270f6 --- /dev/null +++ b/src/paimon/common/utils/arrow/arrow_utils.cpp @@ -0,0 +1,118 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/common/utils/arrow/arrow_utils.h" +namespace paimon { +Result> ArrowUtils::DataTypeToSchema( + const std::shared_ptr& data_type) { + if (data_type->id() != arrow::Type::STRUCT) { + return Status::Invalid( + fmt::format("Expected struct data type, actual data type: {}", data_type->ToString())); + } + const auto& struct_type = std::static_pointer_cast(data_type); + return std::make_shared(struct_type->fields()); +} + +Result> ArrowUtils::CreateProjection( + const std::shared_ptr& file_schema, const arrow::FieldVector& read_fields) { + std::vector target_to_src_mapping; + target_to_src_mapping.reserve(read_fields.size()); + for (const auto& field : read_fields) { + auto src_field_idx = file_schema->GetFieldIndex(field->name()); + if (src_field_idx < 0) { + return Status::Invalid( + fmt::format("Field '{}' not found or duplicate in file schema", field->name())); + } + target_to_src_mapping.push_back(src_field_idx); + } + return target_to_src_mapping; +} + +Status ArrowUtils::CheckNullabilityMatch(const std::shared_ptr& schema, + const std::shared_ptr& data) { + auto struct_array = arrow::internal::checked_pointer_cast(data); + if (struct_array->num_fields() != schema->num_fields()) { + return Status::Invalid(fmt::format( + "CheckNullabilityMatch failed, data field count {} mismatch schema field count {}", + struct_array->num_fields(), schema->num_fields())); + } + for (int32_t i = 0; i < schema->num_fields(); i++) { + PAIMON_RETURN_NOT_OK(InnerCheckNullabilityMatch(schema->field(i), struct_array->field(i))); + } + return Status::OK(); +} + +void ArrowUtils::TraverseArray(const std::shared_ptr& array) { + arrow::Type::type type = array->type()->id(); + switch (type) { + case arrow::Type::type::DICTIONARY: { + auto* dict_array = arrow::internal::checked_cast(array.get()); + [[maybe_unused]] auto dict = dict_array->dictionary(); + return; + } + case arrow::Type::type::STRUCT: { + auto* struct_array = arrow::internal::checked_cast(array.get()); + for (const auto& field : struct_array->fields()) { + TraverseArray(field); + } + return; + } + case arrow::Type::type::MAP: { + auto* map_array = arrow::internal::checked_cast(array.get()); + TraverseArray(map_array->keys()); + TraverseArray(map_array->items()); + return; + } + case arrow::Type::type::LIST: { + auto* list_array = arrow::internal::checked_cast(array.get()); + TraverseArray(list_array->values()); + return; + } + default: + return; + } +} + +Status ArrowUtils::InnerCheckNullabilityMatch(const std::shared_ptr& field, + const std::shared_ptr& data) { + if (PAIMON_UNLIKELY(!field->nullable() && data->null_count() != 0)) { + return Status::Invalid(fmt::format( + "CheckNullabilityMatch failed, field {} not nullable while data have null value", + field->name())); + } + auto type = field->type(); + if (type->id() == arrow::Type::STRUCT) { + auto struct_type = arrow::internal::checked_pointer_cast(field->type()); + auto struct_array = arrow::internal::checked_pointer_cast(data); + for (int32_t i = 0; i < struct_type->num_fields(); ++i) { + PAIMON_RETURN_NOT_OK( + InnerCheckNullabilityMatch(struct_type->field(i), struct_array->field(i))); + } + } else if (type->id() == arrow::Type::LIST) { + auto list_type = arrow::internal::checked_pointer_cast(field->type()); + auto list_array = arrow::internal::checked_pointer_cast(data); + PAIMON_RETURN_NOT_OK( + InnerCheckNullabilityMatch(list_type->value_field(), list_array->values())); + } else if (type->id() == arrow::Type::MAP) { + auto map_type = arrow::internal::checked_pointer_cast(field->type()); + auto map_array = arrow::internal::checked_pointer_cast(data); + PAIMON_RETURN_NOT_OK(InnerCheckNullabilityMatch(map_type->key_field(), map_array->keys())); + PAIMON_RETURN_NOT_OK( + InnerCheckNullabilityMatch(map_type->item_field(), map_array->items())); + } + return Status::OK(); +} +} // namespace paimon diff --git a/src/paimon/common/utils/arrow/arrow_utils.h b/src/paimon/common/utils/arrow/arrow_utils.h index 2fc1d1d8..abb26cf9 100644 --- a/src/paimon/common/utils/arrow/arrow_utils.h +++ b/src/paimon/common/utils/arrow/arrow_utils.h @@ -1,5 +1,5 @@ /* - * Copyright 2024-present Alibaba Inc. + * Copyright 2026-present Alibaba Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,83 +24,28 @@ namespace paimon { -class ArrowUtils { +class PAIMON_EXPORT ArrowUtils { public: ArrowUtils() = delete; ~ArrowUtils() = delete; static Result> DataTypeToSchema( - const std::shared_ptr& data_type) { - if (data_type->id() != arrow::Type::STRUCT) { - return Status::Invalid(fmt::format("Expected struct data type, actual data type: {}", - data_type->ToString())); - } - const auto& struct_type = std::static_pointer_cast(data_type); - return std::make_shared(struct_type->fields()); - } + const std::shared_ptr& data_type); static Result> CreateProjection( - const std::shared_ptr& file_schema, const arrow::FieldVector& read_fields) { - std::vector target_to_src_mapping; - target_to_src_mapping.reserve(read_fields.size()); - for (const auto& field : read_fields) { - auto src_field_idx = file_schema->GetFieldIndex(field->name()); - if (src_field_idx < 0) { - return Status::Invalid( - fmt::format("Field '{}' not found or duplicate in file schema", field->name())); - } - target_to_src_mapping.push_back(src_field_idx); - } - return target_to_src_mapping; - } + const std::shared_ptr& file_schema, const arrow::FieldVector& read_fields); static Status CheckNullabilityMatch(const std::shared_ptr& schema, - const std::shared_ptr& data) { - auto struct_array = arrow::internal::checked_pointer_cast(data); - if (struct_array->num_fields() != schema->num_fields()) { - return Status::Invalid(fmt::format( - "CheckNullabilityMatch failed, data field count {} mismatch schema field count {}", - struct_array->num_fields(), schema->num_fields())); - } - for (int32_t i = 0; i < schema->num_fields(); i++) { - PAIMON_RETURN_NOT_OK( - InnerCheckNullabilityMatch(schema->field(i), struct_array->field(i))); - } - return Status::OK(); - } + const std::shared_ptr& data); + + // For struct array, arrow is unsafe for fields() and field(); for dict array, arrow is unsafe + // for dictionary(). Therefore, access array in advance before merge sort and projection to + // avoid subsequent multi-threading problems. + static void TraverseArray(const std::shared_ptr& array); private: static Status InnerCheckNullabilityMatch(const std::shared_ptr& field, - const std::shared_ptr& data) { - if (PAIMON_UNLIKELY(!field->nullable() && data->null_count() != 0)) { - return Status::Invalid(fmt::format( - "CheckNullabilityMatch failed, field {} not nullable while data have null value", - field->name())); - } - auto type = field->type(); - if (type->id() == arrow::Type::STRUCT) { - auto struct_type = - arrow::internal::checked_pointer_cast(field->type()); - auto struct_array = arrow::internal::checked_pointer_cast(data); - for (int32_t i = 0; i < struct_type->num_fields(); ++i) { - PAIMON_RETURN_NOT_OK( - InnerCheckNullabilityMatch(struct_type->field(i), struct_array->field(i))); - } - } else if (type->id() == arrow::Type::LIST) { - auto list_type = arrow::internal::checked_pointer_cast(field->type()); - auto list_array = arrow::internal::checked_pointer_cast(data); - PAIMON_RETURN_NOT_OK( - InnerCheckNullabilityMatch(list_type->value_field(), list_array->values())); - } else if (type->id() == arrow::Type::MAP) { - auto map_type = arrow::internal::checked_pointer_cast(field->type()); - auto map_array = arrow::internal::checked_pointer_cast(data); - PAIMON_RETURN_NOT_OK( - InnerCheckNullabilityMatch(map_type->key_field(), map_array->keys())); - PAIMON_RETURN_NOT_OK( - InnerCheckNullabilityMatch(map_type->item_field(), map_array->items())); - } - return Status::OK(); - } + const std::shared_ptr& data); }; } // namespace paimon diff --git a/src/paimon/core/io/key_value_data_file_record_reader.cpp b/src/paimon/core/io/key_value_data_file_record_reader.cpp index 76e8bcb4..ecff5095 100644 --- a/src/paimon/core/io/key_value_data_file_record_reader.cpp +++ b/src/paimon/core/io/key_value_data_file_record_reader.cpp @@ -31,9 +31,9 @@ #include "paimon/common/data/columnar/columnar_row_ref.h" #include "paimon/common/table/special_fields.h" #include "paimon/common/types/row_kind.h" +#include "paimon/common/utils/arrow/arrow_utils.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/status.h" - namespace paimon { class MemoryPool; @@ -137,7 +137,7 @@ Result> KeyValueDataFileRecordRe value_fields_ = value_struct_array_->fields(); key_ctx_ = std::make_shared(nullptr, key_fields_, pool_); value_ctx_ = std::make_shared(value_struct_array_, value_fields_, pool_); - TraverseArray(value_struct_array_); + ArrowUtils::TraverseArray(value_struct_array_); return std::make_unique(this); } @@ -151,36 +151,4 @@ void KeyValueDataFileRecordReader::Reset() { sequence_number_array_.reset(); row_kind_array_.reset(); } - -void KeyValueDataFileRecordReader::TraverseArray(const std::shared_ptr& array) { - arrow::Type::type type = array->type()->id(); - switch (type) { - case arrow::Type::type::DICTIONARY: { - auto* dict_array = arrow::internal::checked_cast(array.get()); - [[maybe_unused]] auto dict = dict_array->dictionary(); - return; - } - case arrow::Type::type::STRUCT: { - auto* struct_array = arrow::internal::checked_cast(array.get()); - for (const auto& field : struct_array->fields()) { - TraverseArray(field); - } - return; - } - case arrow::Type::type::MAP: { - auto* map_array = arrow::internal::checked_cast(array.get()); - TraverseArray(map_array->keys()); - TraverseArray(map_array->items()); - return; - } - case arrow::Type::type::LIST: { - auto* list_array = arrow::internal::checked_cast(array.get()); - TraverseArray(list_array->values()); - return; - } - default: - return; - } -} - } // namespace paimon diff --git a/src/paimon/core/io/key_value_data_file_record_reader.h b/src/paimon/core/io/key_value_data_file_record_reader.h index d950e25d..3a1a298b 100644 --- a/src/paimon/core/io/key_value_data_file_record_reader.h +++ b/src/paimon/core/io/key_value_data_file_record_reader.h @@ -76,12 +76,6 @@ class KeyValueDataFileRecordReader : public KeyValueRecordReader { // virtual for test virtual void Reset(); - private: - // For struct array, arrow is unsafe for fields() and field(); for dict array, arrow is unsafe - // for dictionary(). Therefore, access array in advance before merge sort and projection to - // avoid subsequent multi-threading problems. - static void TraverseArray(const std::shared_ptr& array); - private: int32_t key_arity_; int32_t level_; diff --git a/src/paimon/core/io/key_value_in_memory_record_reader.cpp b/src/paimon/core/io/key_value_in_memory_record_reader.cpp index 1297fed1..1042d792 100644 --- a/src/paimon/core/io/key_value_in_memory_record_reader.cpp +++ b/src/paimon/core/io/key_value_in_memory_record_reader.cpp @@ -30,11 +30,11 @@ #include "paimon/common/data/columnar/columnar_row.h" #include "paimon/common/data/internal_row.h" #include "paimon/common/types/row_kind.h" +#include "paimon/common/utils/arrow/arrow_utils.h" #include "paimon/common/utils/arrow/status_utils.h" #include "paimon/core/mergetree/compact/merge_function_wrapper.h" #include "paimon/core/utils/fields_comparator.h" #include "paimon/status.h" - namespace paimon { class MemoryPool; @@ -85,6 +85,7 @@ KeyValueInMemoryRecordReader::KeyValueInMemoryRecordReader( key_comparator_(key_comparator), merge_function_wrapper_(merge_function_wrapper) { assert(value_struct_array_); + ArrowUtils::TraverseArray(value_struct_array_); } Result> KeyValueInMemoryRecordReader::NextBatch() { diff --git a/test/inte/scan_and_read_inte_test.cpp b/test/inte/scan_and_read_inte_test.cpp index 8e867acf..603a5979 100644 --- a/test/inte/scan_and_read_inte_test.cpp +++ b/test/inte/scan_and_read_inte_test.cpp @@ -2584,7 +2584,6 @@ TEST_P(ScanAndReadInteTest, TestCastTimestampType) { ASSERT_TRUE(expected->Equals(read_result)) << read_result->ToString(); } -#ifdef PAIMON_ENABLE_AVRO TEST_F(ScanAndReadInteTest, TestAvroWithAppendTable) { auto read_data = [](int64_t snapshot_id, const std::string& result_json) { std::string table_path = GetDataDir() + "/avro/append_multiple.db/append_multiple"; @@ -2721,6 +2720,5 @@ TEST_F(ScanAndReadInteTest, TestAvroWithPkTable) { [0, true, 10, 1, 1, 1000, 1.5, 2.5, "Tony", "abcdef", 100, "123.45", [[["key",123]],[1,2,3]]] ])"); } -#endif } // namespace paimon::test