From 54149f60fa5a0f6fd096217cecce66ca8e02d10c Mon Sep 17 00:00:00 2001 From: daidai Date: Sat, 17 Jan 2026 10:37:31 +0800 Subject: [PATCH] [fix](iceberg)fix core when read a iceberg table that have schema change and equality delete. --- be/src/vec/exec/format/parquet/schema_desc.h | 2 +- .../vec/exec/format/table/equality_delete.cpp | 94 +-- .../vec/exec/format/table/equality_delete.h | 36 +- .../vec/exec/format/table/iceberg_reader.cpp | 622 ++++++++++++++---- be/src/vec/exec/format/table/iceberg_reader.h | 49 +- .../exec/format/table/table_format_reader.cpp | 223 ++++--- .../exec/format/table/table_format_reader.h | 21 +- .../apache/doris/datasource/ExternalUtil.java | 109 +++ .../iceberg/source/IcebergScanNode.java | 52 +- gensrc/thrift/ExternalTableSchema.thrift | 1 + 10 files changed, 871 insertions(+), 338 deletions(-) diff --git a/be/src/vec/exec/format/parquet/schema_desc.h b/be/src/vec/exec/format/parquet/schema_desc.h index 23cdee50ddf3c0..7d05ac978680d1 100644 --- a/be/src/vec/exec/format/parquet/schema_desc.h +++ b/be/src/vec/exec/format/parquet/schema_desc.h @@ -134,7 +134,7 @@ class FieldDescriptor { * Get the column(the first level schema element, maybe nested field) by index. * @param index Column index in _fields */ - const FieldSchema* get_column(int index) const { return &_fields[index]; } + const FieldSchema* get_column(size_t index) const { return &_fields[index]; } /** * Get the column(the first level schema element, maybe nested field) by name. diff --git a/be/src/vec/exec/format/table/equality_delete.cpp b/be/src/vec/exec/format/table/equality_delete.cpp index 20162e082f1bc9..f7409c527e8543 100644 --- a/be/src/vec/exec/format/table/equality_delete.cpp +++ b/be/src/vec/exec/format/table/equality_delete.cpp @@ -22,69 +22,69 @@ namespace doris::vectorized { #include "common/compile_check_begin.h" -std::unique_ptr EqualityDeleteBase::get_delete_impl(Block* delete_block) { +std::unique_ptr EqualityDeleteBase::get_delete_impl( + Block* delete_block, const std::vector& delete_col_ids) { + DCHECK_EQ(delete_block->columns(), delete_col_ids.size()); if (delete_block->columns() == 1) { - return std::make_unique(delete_block); + return std::make_unique(delete_block, delete_col_ids); } else { - return std::make_unique(delete_block); + return std::make_unique(delete_block, delete_col_ids); } } Status SimpleEqualityDelete::_build_set() { COUNTER_UPDATE(num_delete_rows, _delete_block->rows()); - if (_delete_block->columns() != 1) { + if (_delete_block->columns() != 1) [[unlikely]] { return Status::InternalError("Simple equality delete can be only applied with one column"); } auto& column_and_type = _delete_block->get_by_position(0); - _delete_column_name = column_and_type.name; - _delete_column_type = remove_nullable(column_and_type.type)->get_primitive_type(); - _hybrid_set.reset(create_set(_delete_column_type, _delete_block->rows(), false)); + auto delete_column_type = remove_nullable(column_and_type.type)->get_primitive_type(); + _hybrid_set.reset(create_set(delete_column_type, _delete_block->rows(), false)); _hybrid_set->insert_fixed_len(column_and_type.column, 0); return Status::OK(); } Status SimpleEqualityDelete::filter_data_block( - Block* data_block, const std::unordered_map* col_name_to_block_idx) { + Block* data_block, const std::unordered_map* col_name_to_block_idx, + const std::unordered_map& id_to_block_column_name, + IColumn::Filter& filter) { SCOPED_TIMER(equality_delete_time); - auto column_and_type = - data_block->get_by_position(col_name_to_block_idx->at(_delete_column_name)); - if (column_and_type.type->get_primitive_type() != _delete_column_type) { - return Status::InternalError( - "Not support type change in column '{}', src type: {}, target type: {}", - _delete_column_name, column_and_type.type->get_name(), (int)_delete_column_type); - } + DCHECK(_delete_col_ids.size() == 0); + auto column_field_id = _delete_col_ids[0]; + + auto column_and_type = data_block->get_by_position( + col_name_to_block_idx->at(id_to_block_column_name.at(column_field_id))); + size_t rows = data_block->rows(); - // _filter: 1 => in _hybrid_set; 0 => not in _hybrid_set - if (_filter == nullptr) { - _filter = std::make_unique(rows, 0); + // _filter: 1 => in _hybrid_set; 0 => not in _hybrid_set + if (_single_filter == nullptr) { + _single_filter = std::make_unique(rows, 0); } else { // reset the array capacity and fill all elements using the 0 - _filter->assign(rows, UInt8(0)); + _single_filter->assign(rows, UInt8(0)); } - if (column_and_type.column->is_nullable()) { const NullMap& null_map = reinterpret_cast(column_and_type.column.get()) ->get_null_map_data(); _hybrid_set->find_batch_nullable( remove_nullable(column_and_type.column)->assume_mutable_ref(), rows, null_map, - *_filter); + *_single_filter); if (_hybrid_set->contain_null()) { - auto* filter_data = _filter->data(); + auto* filter_data = _single_filter->data(); for (size_t i = 0; i < rows; ++i) { filter_data[i] = filter_data[i] || null_map[i]; } } } else { - _hybrid_set->find_batch(column_and_type.column->assume_mutable_ref(), rows, *_filter); + _hybrid_set->find_batch(column_and_type.column->assume_mutable_ref(), rows, + *_single_filter); } // should reverse _filter - auto* filter_data = _filter->data(); + auto* filter_data = filter.data(); for (size_t i = 0; i < rows; ++i) { - filter_data[i] = !filter_data[i]; + filter_data[i] &= !_single_filter->data()[i]; } - - Block::filter_block_internal(data_block, *_filter, data_block->columns()); return Status::OK(); } @@ -104,24 +104,32 @@ Status MultiEqualityDelete::_build_set() { } Status MultiEqualityDelete::filter_data_block( - Block* data_block, const std::unordered_map* col_name_to_block_idx) { + Block* data_block, const std::unordered_map* col_name_to_block_idx, + const std::unordered_map& id_to_block_column_name, + IColumn::Filter& filter) { SCOPED_TIMER(equality_delete_time); + DCHECK_EQ(_delete_block->get_columns_with_type_and_name().size(), _delete_col_ids.size()); size_t column_index = 0; - for (auto delete_col : _delete_block->get_columns_with_type_and_name()) { - const std::string& column_name = delete_col.name; - if (!col_name_to_block_idx->contains(column_name)) { - return Status::InternalError("Column '{}' not found in data block: {}", column_name, - data_block->dump_structure()); + for (size_t idx = 0; idx < _delete_block->get_columns_with_type_and_name().size(); ++idx) { + auto delete_col = _delete_block->get_columns_with_type_and_name()[idx]; + auto delete_col_id = _delete_col_ids[idx]; + + DCHECK(id_to_block_column_name.contains(delete_col_id)); + const auto& block_column_name = id_to_block_column_name.at(delete_col_id); + if (!col_name_to_block_idx->contains(block_column_name)) [[unlikely]] { + return Status::InternalError("Column '{}' not found in data block: {}", + block_column_name, data_block->dump_structure()); } auto column_and_type = - data_block->safe_get_by_position(col_name_to_block_idx->at(column_name)); - if (!delete_col.type->equals(*column_and_type.type)) { + data_block->safe_get_by_position(col_name_to_block_idx->at(block_column_name)); + if (!delete_col.type->equals(*column_and_type.type)) [[unlikely]] { return Status::InternalError( "Not support type change in column '{}', src type: {}, target type: {}", - column_name, delete_col.type->get_name(), column_and_type.type->get_name()); + block_column_name, delete_col.type->get_name(), + column_and_type.type->get_name()); } - _data_column_index[column_index++] = col_name_to_block_idx->at(column_name); + _data_column_index[column_index++] = col_name_to_block_idx->at(block_column_name); } size_t rows = data_block->rows(); _data_hashes.clear(); @@ -130,26 +138,18 @@ Status MultiEqualityDelete::filter_data_block( data_block->get_by_position(index).column->update_hashes_with_value(_data_hashes.data(), nullptr); } - - if (_filter == nullptr) { - _filter = std::make_unique(rows, 1); - } else { - //reset the array capacity and fill all elements using the 0 - _filter->assign(rows, UInt8(1)); - } - auto* filter_data = _filter->data(); + auto* filter_data = filter.data(); for (size_t i = 0; i < rows; ++i) { for (auto beg = _delete_hash_map.lower_bound(_data_hashes[i]), end = _delete_hash_map.upper_bound(_data_hashes[i]); beg != end; ++beg) { - if (_equal(data_block, i, beg->second)) { + if (filter[i] && _equal(data_block, i, beg->second)) { filter_data[i] = 0; break; } } } - Block::filter_block_internal(data_block, *_filter, data_block->columns()); return Status::OK(); } diff --git a/be/src/vec/exec/format/table/equality_delete.h b/be/src/vec/exec/format/table/equality_delete.h index 1c9f91a01cb7e9..c16b25e193b100 100644 --- a/be/src/vec/exec/format/table/equality_delete.h +++ b/be/src/vec/exec/format/table/equality_delete.h @@ -37,11 +37,13 @@ class EqualityDeleteBase { RuntimeProfile::Counter* equality_delete_time; Block* _delete_block; + std::vector _delete_col_ids; virtual Status _build_set() = 0; public: - EqualityDeleteBase(Block* delete_block) : _delete_block(delete_block) {} + EqualityDeleteBase(Block* delete_block, const std::vector delete_col_ids) + : _delete_block(delete_block), _delete_col_ids(delete_col_ids) {} virtual ~EqualityDeleteBase() = default; Status init(RuntimeProfile* profile) { @@ -58,26 +60,29 @@ class EqualityDeleteBase { virtual Status filter_data_block( Block* data_block, - const std::unordered_map* col_name_to_block_idx) = 0; + const std::unordered_map* col_name_to_block_idx, + const std::unordered_map& id_to_block_column_name, + IColumn::Filter& filter) = 0; - static std::unique_ptr get_delete_impl(Block* delete_block); + static std::unique_ptr get_delete_impl( + Block* delete_block, const std::vector& delete_col_ids); }; class SimpleEqualityDelete : public EqualityDeleteBase { protected: std::shared_ptr _hybrid_set; - std::string _delete_column_name; - PrimitiveType _delete_column_type; - std::unique_ptr _filter; + std::unique_ptr _single_filter; Status _build_set() override; public: - SimpleEqualityDelete(Block* delete_block) : EqualityDeleteBase(delete_block) {} + SimpleEqualityDelete(Block* delete_block, const std::vector& delete_col_ids) + : EqualityDeleteBase(delete_block, delete_col_ids) {} - Status filter_data_block( - Block* data_block, - const std::unordered_map* col_name_to_block_idx) override; + Status filter_data_block(Block* data_block, + const std::unordered_map* col_name_to_block_idx, + const std::unordered_map& id_to_block_column_name, + IColumn::Filter& filter) override; }; /** @@ -95,18 +100,19 @@ class MultiEqualityDelete : public EqualityDeleteBase { std::multimap _delete_hash_map; // the delete column indexes in data block std::vector _data_column_index; - std::unique_ptr _filter; Status _build_set() override; bool _equal(Block* data_block, size_t data_row_index, size_t delete_row_index); public: - MultiEqualityDelete(Block* delete_block) : EqualityDeleteBase(delete_block) {} + MultiEqualityDelete(Block* delete_block, const std::vector& delete_col_ids) + : EqualityDeleteBase(delete_block, delete_col_ids) {} - Status filter_data_block( - Block* data_block, - const std::unordered_map* col_name_to_block_idx) override; + Status filter_data_block(Block* data_block, + const std::unordered_map* col_name_to_block_idx, + const std::unordered_map& id_to_block_column_name, + IColumn::Filter& filter) override; }; #include "common/compile_check_end.h" diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 7bda260f4c9501..7f2814f0930243 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include "common/compiler_util.h" // IWYU pragma: keep #include "common/status.h" @@ -100,13 +101,19 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr file_forma Status IcebergTableReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) { RETURN_IF_ERROR(_expand_block_if_need(block)); - RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof)); - if (_equality_delete_impl != nullptr) { - RETURN_IF_ERROR(_equality_delete_impl->filter_data_block(block, _col_name_to_block_idx)); - *read_rows = block->rows(); + if (_equality_delete_impls.size() > 0) { + std::unique_ptr filter = + std::make_unique(block->rows(), 1); + for (auto& equality_delete_impl : _equality_delete_impls) { + RETURN_IF_ERROR(equality_delete_impl->filter_data_block( + block, _col_name_to_block_idx, _id_to_block_column_name, *filter)); + } + Block::filter_block_internal(block, *filter, block->columns()); } + + *read_rows = block->rows(); return _shrink_block_if_need(block); } @@ -136,7 +143,7 @@ Status IcebergTableReader::init_row_filters() { } if (!equality_delete_files.empty()) { - RETURN_IF_ERROR(_equality_delete_base(equality_delete_files)); + RETURN_IF_ERROR(_process_equality_delete(equality_delete_files)); _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); } @@ -164,82 +171,6 @@ Status IcebergTableReader::init_row_filters() { return Status::OK(); } -Status IcebergTableReader::_equality_delete_base( - const std::vector& delete_files) { - bool init_schema = false; - std::vector equality_delete_col_names; - std::unordered_map delete_col_name_to_block_idx; - std::vector equality_delete_col_types; - std::unordered_map> - partition_columns; - std::unordered_map missing_columns; - - for (const auto& delete_file : delete_files) { - TFileRangeDesc delete_desc; - // must use __set() method to make sure __isset is true - delete_desc.__set_fs_name(_range.fs_name); - delete_desc.path = delete_file.path; - delete_desc.start_offset = 0; - delete_desc.size = -1; - delete_desc.file_size = -1; - std::unique_ptr delete_reader = _create_equality_reader(delete_desc); - if (!init_schema) { - RETURN_IF_ERROR(delete_reader->init_schema_reader()); - RETURN_IF_ERROR(delete_reader->get_parsed_schema(&equality_delete_col_names, - &equality_delete_col_types)); - _generate_equality_delete_block(&_equality_delete_block, equality_delete_col_names, - equality_delete_col_types); - for (uint32_t idx = 0; idx < equality_delete_col_names.size(); ++idx) { - delete_col_name_to_block_idx[equality_delete_col_names[idx]] = idx; - } - init_schema = true; - } - if (auto* parquet_reader = typeid_cast(delete_reader.get())) { - phmap::flat_hash_map>> tmp; - RETURN_IF_ERROR(parquet_reader->init_reader( - equality_delete_col_names, &delete_col_name_to_block_idx, {}, tmp, nullptr, - nullptr, nullptr, nullptr, nullptr, - TableSchemaChangeHelper::ConstNode::get_instance(), false)); - } else if (auto* orc_reader = typeid_cast(delete_reader.get())) { - RETURN_IF_ERROR(orc_reader->init_reader(&equality_delete_col_names, - &delete_col_name_to_block_idx, {}, false, {}, - {}, nullptr, nullptr)); - } else { - return Status::InternalError("Unsupported format of delete file"); - } - - RETURN_IF_ERROR(delete_reader->set_fill_columns(partition_columns, missing_columns)); - - bool eof = false; - while (!eof) { - Block block; - _generate_equality_delete_block(&block, equality_delete_col_names, - equality_delete_col_types); - size_t read_rows = 0; - RETURN_IF_ERROR(delete_reader->get_next_block(&block, &read_rows, &eof)); - if (read_rows > 0) { - MutableBlock mutable_block(&_equality_delete_block); - RETURN_IF_ERROR(mutable_block.merge(block)); - } - } - } - for (int i = 0; i < equality_delete_col_names.size(); ++i) { - const std::string& delete_col = equality_delete_col_names[i]; - if (std::find(_all_required_col_names.begin(), _all_required_col_names.end(), delete_col) == - _all_required_col_names.end()) { - _expand_col_names.emplace_back(delete_col); - DataTypePtr data_type = make_nullable(equality_delete_col_types[i]); - MutableColumnPtr data_column = data_type->create_column(); - _expand_columns.emplace_back(std::move(data_column), data_type, delete_col); - } - } - for (const std::string& delete_col : _expand_col_names) { - _all_required_col_names.emplace_back(delete_col); - } - _equality_delete_impl = EqualityDeleteBase::get_delete_impl(&_equality_delete_block); - return _equality_delete_impl->init(_profile); -} - void IcebergTableReader::_generate_equality_delete_block( Block* block, const std::vector& equality_delete_col_names, const std::vector& equality_delete_col_types) { @@ -493,41 +424,112 @@ Status IcebergParquetReader::init_reader( _file_format = Fileformat::PARQUET; _col_name_to_block_idx = col_name_to_block_idx; auto* parquet_reader = static_cast(_file_format_reader.get()); - const FieldDescriptor* field_desc = nullptr; - RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&field_desc)); - DCHECK(field_desc != nullptr); + RETURN_IF_ERROR(parquet_reader->get_file_metadata_schema(&_data_file_field_desc)); + DCHECK(_data_file_field_desc != nullptr); + + auto column_id_result = _create_column_ids(_data_file_field_desc, tuple_descriptor); + auto& column_ids = column_id_result.column_ids; + const auto& filter_column_ids = column_id_result.filter_column_ids; + + RETURN_IF_ERROR(init_row_filters()); if (!_params.__isset.history_schema_info || _params.history_schema_info.empty()) [[unlikely]] { - RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_name(tuple_descriptor, *field_desc, - table_info_node_ptr)); + RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_name( + tuple_descriptor, *_data_file_field_desc, table_info_node_ptr)); } else { bool exist_field_id = true; - // Iceberg will record the field id in the parquet file and find the column to read by matching it with the field id of the table (from fe). - RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_field_id( - _params.history_schema_info.front().root_field, *field_desc, table_info_node_ptr, - exist_field_id)); - if (!exist_field_id) { - // For early iceberg version, field id may not be available, so name matching is used here. - RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_name(tuple_descriptor, *field_desc, - table_info_node_ptr)); + for (int idx = 0; idx < _data_file_field_desc->size(); idx++) { + if (_data_file_field_desc->get_column(idx)->field_id == -1) { + // the data file may be from hive table migrated to iceberg, field id is missing + exist_field_id = false; + break; + } } - } + const auto& table_schema = _params.history_schema_info.front().root_field; - _all_required_col_names = file_col_names; + table_info_node_ptr = std::make_shared(); + if (exist_field_id) { + _all_required_col_names = file_col_names; - auto column_id_result = _create_column_ids(field_desc, tuple_descriptor); - auto& column_ids = column_id_result.column_ids; - const auto& filter_column_ids = column_id_result.filter_column_ids; + // id -> table column name. columns that need read data file. + std::unordered_map> id_to_table_field; + for (const auto& table_field : table_schema.fields) { + auto field = table_field.field_ptr; + id_to_table_field.emplace(field->id, field); + } - RETURN_IF_ERROR(init_row_filters()); - for (int i = 0; i < field_desc->size(); ++i) { - auto field_schema = field_desc->get_column(i); - std::string col_name = field_schema->name; - if (std::find(_expand_col_names.begin(), _expand_col_names.end(), col_name) != - _expand_col_names.end()) { - column_ids.insert(field_schema->get_column_id()); + for (int idx = 0; idx < _data_file_field_desc->size(); idx++) { + const auto& data_file_field = _data_file_field_desc->get_column(idx); + auto data_file_column_id = _data_file_field_desc->get_column(idx)->field_id; + + if (id_to_table_field.contains(data_file_column_id)) { + const auto& table_field = id_to_table_field[data_file_column_id]; + + std::shared_ptr field_node = nullptr; + RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_field_id( + *table_field, *data_file_field, exist_field_id, field_node)); + table_info_node_ptr->add_children(table_field->name, data_file_field->name, + field_node); + + _id_to_block_column_name.emplace(data_file_column_id, table_field->name); + } else if (_equality_delete_col_ids.contains(data_file_column_id)) { + // Columns that need to be read for equality delete. + const static std::string EQ_DELETE_PRE = "__equality_delete_column__"; + + // Construct table column names that avoid duplication with current table schema. + // As the columns currently being read may have been deleted in the latest + // table structure or have undergone a series of schema changes... + std::string table_column_name = EQ_DELETE_PRE + data_file_field->name; + table_info_node_ptr->add_children( + table_column_name, data_file_field->name, + std::make_shared()); + + _id_to_block_column_name.emplace(data_file_column_id, table_column_name); + _expand_col_names.emplace_back(table_column_name); + auto expand_data_type = make_nullable(data_file_field->data_type); + _expand_columns.emplace_back( + ColumnWithTypeAndName {expand_data_type->create_column(), + expand_data_type, table_column_name}); + + _all_required_col_names.emplace_back(table_column_name); + column_ids.insert(data_file_field->get_column_id()); + } + } + } else { + if (!_equality_delete_col_ids.empty()) [[unlikely]] { + return Status::InternalError( + "Can not read missing field id data file when have equality delete"); + } + _all_required_col_names = file_col_names; + std::map file_column_idx_map; + for (size_t idx = 0; idx < _data_file_field_desc->size(); idx++) { + file_column_idx_map.emplace(_data_file_field_desc->get_column(idx)->name, idx); + } + + for (const auto& table_field : table_schema.fields) { + const auto& table_column_name = table_field.field_ptr->name; + if (table_field.field_ptr->__isset.name_mapping || + table_field.field_ptr->name_mapping.size() == 0) { + return Status::DataQualityError( + "name_mapping must be set when read missing field id data file."); + } + for (const auto& mapped_name : table_field.field_ptr->name_mapping) { + if (file_column_idx_map.contains(mapped_name)) { + std::shared_ptr field_node = nullptr; + const auto& file_field = _data_file_field_desc->get_column( + file_column_idx_map.at(mapped_name)); + RETURN_IF_ERROR(BuildTableInfoUtil::by_parquet_field_id( + *table_field.field_ptr, *file_field, exist_field_id, field_node)); + table_info_node_ptr->add_children(table_column_name, file_field->name, + field_node); + } else { + table_info_node_ptr->add_not_exist_children(table_column_name); + } + } + } } } + return parquet_reader->init_reader( _all_required_col_names, _col_name_to_block_idx, conjuncts, slot_id_to_predicates, tuple_descriptor, row_descriptor, colname_to_slot_id, not_single_slot_filter_conjuncts, @@ -655,39 +657,120 @@ Status IcebergOrcReader::init_reader( _file_format = Fileformat::ORC; _col_name_to_block_idx = col_name_to_block_idx; auto* orc_reader = static_cast(_file_format_reader.get()); - const orc::Type* orc_type_ptr = nullptr; - RETURN_IF_ERROR(orc_reader->get_file_type(&orc_type_ptr)); - _all_required_col_names = file_col_names; + RETURN_IF_ERROR(orc_reader->get_file_type(&_data_file_type_desc)); + std::vector data_file_col_names; + std::vector data_file_col_types; + RETURN_IF_ERROR(orc_reader->get_parsed_schema(&data_file_col_names, &data_file_col_types)); + + auto column_id_result = _create_column_ids(_data_file_type_desc, tuple_descriptor); + auto& column_ids = column_id_result.column_ids; + const auto& filter_column_ids = column_id_result.filter_column_ids; + + RETURN_IF_ERROR(init_row_filters()); if (!_params.__isset.history_schema_info || _params.history_schema_info.empty()) [[unlikely]] { - RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_name(tuple_descriptor, orc_type_ptr, + RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_name(tuple_descriptor, _data_file_type_desc, table_info_node_ptr)); } else { bool exist_field_id = true; - // Iceberg will record the field id in the parquet file and find the column to read by matching it with the field id of the table (from fe). - RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_field_id( - _params.history_schema_info.front().root_field, orc_type_ptr, ICEBERG_ORC_ATTRIBUTE, - table_info_node_ptr, exist_field_id)); - if (!exist_field_id) { - // For early iceberg version, field id may not be available, so name matching is used here. - RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_name(tuple_descriptor, orc_type_ptr, - table_info_node_ptr)); + for (size_t idx = 0; idx < _data_file_type_desc->getSubtypeCount(); idx++) { + if (!_data_file_type_desc->getSubtype(idx)->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) { + exist_field_id = false; + break; + } } - } - auto column_id_result = _create_column_ids(orc_type_ptr, tuple_descriptor); - auto& column_ids = column_id_result.column_ids; - const auto& filter_column_ids = column_id_result.filter_column_ids; + const auto& table_schema = _params.history_schema_info.front().root_field; + table_info_node_ptr = std::make_shared(); + if (exist_field_id) { + _all_required_col_names = file_col_names; - RETURN_IF_ERROR(init_row_filters()); - for (uint64_t i = 0; i < orc_type_ptr->getSubtypeCount(); ++i) { - const orc::Type* sub_type = orc_type_ptr->getSubtype(i); - std::string col_name = orc_type_ptr->getFieldName(i); - if (std::find(_expand_col_names.begin(), _expand_col_names.end(), col_name) != - _expand_col_names.end()) { - column_ids.insert(sub_type->getColumnId()); + // id -> table column name. columns that need read data file. + std::unordered_map> id_to_table_field; + for (const auto& table_field : table_schema.fields) { + auto field = table_field.field_ptr; + id_to_table_field.emplace(field->id, field); + } + + for (int idx = 0; idx < _data_file_type_desc->getSubtypeCount(); idx++) { + const auto& data_file_field = _data_file_type_desc->getSubtype(idx); + auto data_file_column_id = + std::stoi(data_file_field->getAttributeValue(ICEBERG_ORC_ATTRIBUTE)); + auto const& file_column_name = _data_file_type_desc->getFieldName(idx); + + if (id_to_table_field.contains(data_file_column_id)) { + const auto& table_field = id_to_table_field[data_file_column_id]; + + std::shared_ptr field_node = nullptr; + RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_field_id( + *table_field, data_file_field, ICEBERG_ORC_ATTRIBUTE, exist_field_id, + field_node)); + table_info_node_ptr->add_children(table_field->name, file_column_name, + field_node); + + _id_to_block_column_name.emplace(data_file_column_id, table_field->name); + } else if (_equality_delete_col_ids.contains(data_file_column_id)) { + // Columns that need to be read for equality delete. + const static std::string EQ_DELETE_PRE = "__equality_delete_column__"; + + // Construct table column names that avoid duplication with current table schema. + // As the columns currently being read may have been deleted in the latest + // table structure or have undergone a series of schema changes... + std::string table_column_name = EQ_DELETE_PRE + file_column_name; + table_info_node_ptr->add_children( + table_column_name, file_column_name, + std::make_shared()); + + _id_to_block_column_name.emplace(data_file_column_id, table_column_name); + _expand_col_names.emplace_back(table_column_name); + + auto expand_data_type = make_nullable(data_file_col_types[idx]); + _expand_columns.emplace_back( + ColumnWithTypeAndName {expand_data_type->create_column(), + expand_data_type, table_column_name}); + + _all_required_col_names.emplace_back(table_column_name); + column_ids.insert(data_file_field->getColumnId()); + } + } + } else { + if (!_equality_delete_col_ids.empty()) [[unlikely]] { + return Status::InternalError( + "Can not read missing field id data file when have equality delete"); + } + _all_required_col_names = file_col_names; + std::map file_column_idx_map; + for (int idx = 0; idx < _data_file_type_desc->getSubtypeCount(); idx++) { + auto const& file_column_name = _data_file_type_desc->getFieldName(idx); + file_column_idx_map.emplace(file_column_name, idx); + } + + for (const auto& table_field : table_schema.fields) { + const auto& table_column_name = table_field.field_ptr->name; + if (table_field.field_ptr->__isset.name_mapping || + table_field.field_ptr->name_mapping.size() == 0) { + return Status::DataQualityError( + "name_mapping must be set when read missing field id data file."); + } + for (const auto& mapped_name : table_field.field_ptr->name_mapping) { + if (file_column_idx_map.contains(mapped_name)) { + auto file_column_idx = file_column_idx_map.at(mapped_name); + std::shared_ptr field_node = nullptr; + const auto& file_field = _data_file_type_desc->getSubtype(file_column_idx); + RETURN_IF_ERROR(BuildTableInfoUtil::by_orc_field_id( + *table_field.field_ptr, file_field, ICEBERG_ORC_ATTRIBUTE, + exist_field_id, field_node)); + table_info_node_ptr->add_children( + table_column_name, + _data_file_type_desc->getFieldName(file_column_idx), field_node); + } else { + table_info_node_ptr->add_not_exist_children(table_column_name); + } + } + } } } + return orc_reader->init_reader(&_all_required_col_names, _col_name_to_block_idx, conjuncts, false, tuple_descriptor, row_descriptor, not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts, @@ -872,5 +955,286 @@ Status IcebergTableReader::read_deletion_vector(const std::string& data_file_pat return Status::OK(); } +Status IcebergParquetReader::_process_equality_delete( + const std::vector& delete_files) { + std::unordered_map> + partition_columns; + std::unordered_map missing_columns; + + std::map data_file_id_to_field_schema; + for (int idx = 0; idx < _data_file_field_desc->size(); ++idx) { + auto field_schema = _data_file_field_desc->get_column(idx); + if (_data_file_field_desc->get_column(idx)->field_id == -1) { + return Status::DataQualityError("Iceberg equality delete data file missing field id."); + } + data_file_id_to_field_schema[_data_file_field_desc->get_column(idx)->field_id] = + field_schema; + } + + for (const auto& delete_file : delete_files) { + TFileRangeDesc delete_desc; + // must use __set() method to make sure __isset is true + delete_desc.__set_fs_name(_range.fs_name); + delete_desc.path = delete_file.path; + delete_desc.start_offset = 0; + delete_desc.size = -1; + delete_desc.file_size = -1; + + if (!delete_file.__isset.field_ids) [[unlikely]] { + return Status::InternalError( + "missing delete field ids when reading equality delete file"); + } + auto& read_column_field_ids = delete_file.field_ids; + std::set read_column_field_ids_set; + for (const auto& field_id : read_column_field_ids) { + read_column_field_ids_set.insert(field_id); + _equality_delete_col_ids.insert(field_id); + } + + auto delete_reader = ParquetReader::create_unique( + _profile, _params, delete_desc, READ_DELETE_FILE_BATCH_SIZE, + &_state->timezone_obj(), _io_ctx, _state, _meta_cache); + RETURN_IF_ERROR(delete_reader->init_schema_reader()); + + // the column that to read equality delete file. + // (delete file may be have extra columns that don't need to read) + std::vector delete_col_names; + std::vector delete_col_types; + std::vector delete_col_ids; + std::unordered_map delete_col_name_to_block_idx; + + const FieldDescriptor* delete_field_desc = nullptr; + RETURN_IF_ERROR(delete_reader->get_file_metadata_schema(&delete_field_desc)); + DCHECK(delete_field_desc != nullptr); + + auto eq_file_node = std::make_shared(); + for (const auto& delete_file_field : delete_field_desc->get_fields_schema()) { + if (delete_file_field.field_id == -1) [[unlikely]] { // missing delete_file_field id + // equality delete file must have delete_file_field id to match column. + return Status::DataQualityError( + "missing delete_file_field id when reading equality delete file"); + } else if (read_column_field_ids_set.contains(delete_file_field.field_id)) { + // the column that need to read. + if (delete_file_field.children.size() > 0) [[unlikely]] { // complex column + return Status::InternalError( + "can not support read complex column in equality delete file"); + } else if (!data_file_id_to_field_schema.contains(delete_file_field.field_id)) + [[unlikely]] { + return Status::DataQualityError( + "can not find delete field id in data file schema when reading " + "equality delete file"); + } + auto data_file_field = data_file_id_to_field_schema[delete_file_field.field_id]; + if (data_file_field->data_type->get_primitive_type() != + delete_file_field.data_type->get_primitive_type()) [[unlikely]] { + return Status::NotSupported( + "Not Support type change in equality delete, field: {}, delete " + "file type: {}, data file type: {}", + delete_file_field.field_id, delete_file_field.data_type->get_name(), + data_file_field->data_type->get_name()); + } + + std::string filed_lower_name = to_lower(delete_file_field.name); + eq_file_node->add_children(filed_lower_name, delete_file_field.name, + std::make_shared()); + + delete_col_ids.emplace_back(delete_file_field.field_id); + delete_col_names.emplace_back(filed_lower_name); + delete_col_types.emplace_back(make_nullable(delete_file_field.data_type)); + + read_column_field_ids_set.erase(delete_file_field.field_id); + } else { + // delete file may be have extra columns that don't need to read + } + } + if (!read_column_field_ids_set.empty()) [[unlikely]] { + return Status::DataQualityError("some field ids not found in equality delete file."); + } + + for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) { + delete_col_name_to_block_idx[delete_col_names[idx]] = idx; + } + phmap::flat_hash_map>> tmp; + RETURN_IF_ERROR(delete_reader->init_reader(delete_col_names, &delete_col_name_to_block_idx, + tmp, {}, nullptr, nullptr, nullptr, nullptr, nullptr, + eq_file_node, false)); + RETURN_IF_ERROR(delete_reader->set_fill_columns(partition_columns, missing_columns)); + + if (!_equality_delete_block_map.contains(delete_col_ids)) { + _equality_delete_block_map.emplace(delete_col_ids, _equality_delete_blocks.size()); + Block block; + _generate_equality_delete_block(&block, delete_col_names, delete_col_types); + _equality_delete_blocks.emplace_back(block); + } + Block& eq_file_block = _equality_delete_blocks[_equality_delete_block_map[delete_col_ids]]; + bool eof = false; + while (!eof) { + Block tmp_block; + _generate_equality_delete_block(&tmp_block, delete_col_names, delete_col_types); + size_t read_rows = 0; + RETURN_IF_ERROR(delete_reader->get_next_block(&tmp_block, &read_rows, &eof)); + if (read_rows > 0) { + MutableBlock mutable_block(&eq_file_block); + RETURN_IF_ERROR(mutable_block.merge(tmp_block)); + } + } + } + + for (const auto& [delete_col_ids, block_idx] : _equality_delete_block_map) { + auto& eq_file_block = _equality_delete_blocks[block_idx]; + auto equality_delete_impl = + EqualityDeleteBase::get_delete_impl(&eq_file_block, delete_col_ids); + RETURN_IF_ERROR(equality_delete_impl->init(_profile)); + _equality_delete_impls.emplace_back(std::move(equality_delete_impl)); + } + return Status::OK(); +} + +Status IcebergOrcReader::_process_equality_delete( + const std::vector& delete_files) { + std::unordered_map> + partition_columns; + std::unordered_map missing_columns; + + std::map data_file_id_to_field_idx; + for (int idx = 0; idx < _data_file_type_desc->getSubtypeCount(); ++idx) { + if (!_data_file_type_desc->getSubtype(idx)->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) { + return Status::DataQualityError("Iceberg equality delete data file missing field id."); + } + auto field_id = std::stoi( + _data_file_type_desc->getSubtype(idx)->getAttributeValue(ICEBERG_ORC_ATTRIBUTE)); + data_file_id_to_field_idx[field_id] = idx; + } + + for (const auto& delete_file : delete_files) { + TFileRangeDesc delete_desc; + // must use __set() method to make sure __isset is true + delete_desc.__set_fs_name(_range.fs_name); + delete_desc.path = delete_file.path; + delete_desc.start_offset = 0; + delete_desc.size = -1; + delete_desc.file_size = -1; + + if (!delete_file.__isset.field_ids) [[unlikely]] { + return Status::InternalError( + "missing delete field ids when reading equality delete file"); + } + auto& read_column_field_ids = delete_file.field_ids; + std::set read_column_field_ids_set; + for (const auto& field_id : read_column_field_ids) { + read_column_field_ids_set.insert(field_id); + _equality_delete_col_ids.insert(field_id); + } + + auto delete_reader = OrcReader::create_unique(_profile, _state, _params, delete_desc, + READ_DELETE_FILE_BATCH_SIZE, + _state->timezone(), _io_ctx, _meta_cache); + RETURN_IF_ERROR(delete_reader->init_schema_reader()); + // delete file schema + std::vector delete_file_col_names; + std::vector delete_file_col_types; + RETURN_IF_ERROR( + delete_reader->get_parsed_schema(&delete_file_col_names, &delete_file_col_types)); + + // the column that to read equality delete file. + // (delete file maybe have extra columns that don't need to read) + std::vector delete_col_names; + std::vector delete_col_types; + std::vector delete_col_ids; + std::unordered_map delete_col_name_to_block_idx; + + const orc::Type* delete_field_desc = nullptr; + RETURN_IF_ERROR(delete_reader->get_file_type(&delete_field_desc)); + DCHECK(delete_field_desc != nullptr); + + auto eq_file_node = std::make_shared(); + + for (size_t idx = 0; idx < delete_field_desc->getSubtypeCount(); idx++) { + auto delete_file_field = delete_field_desc->getSubtype(idx); + + if (!delete_file_field->hasAttributeKey(ICEBERG_ORC_ATTRIBUTE)) + [[unlikely]] { // missing delete_file_field id + // equality delete file must have delete_file_field id to match column. + return Status::DataQualityError( + "missing delete_file_field id when reading equality delete file"); + } else { + auto delete_field_id = + std::stoi(delete_file_field->getAttributeValue(ICEBERG_ORC_ATTRIBUTE)); + if (read_column_field_ids_set.contains(delete_field_id)) { + // the column that need to read. + if (is_complex_type(delete_file_col_types[idx]->get_primitive_type())) + [[unlikely]] { + return Status::InternalError( + "can not support read complex column in equality delete file."); + } else if (!data_file_id_to_field_idx.contains(delete_field_id)) [[unlikely]] { + return Status::DataQualityError( + "can not find delete field id in data file schema when reading " + "equality delete file"); + } + + auto data_file_field = _data_file_type_desc->getSubtype( + data_file_id_to_field_idx[delete_field_id]); + + if (delete_file_field->getKind() != data_file_field->getKind()) [[unlikely]] { + return Status::NotSupported( + "Not Support type change in equality delete, field: {}, delete " + "file type: {}, data file type: {}", + delete_field_id, delete_file_field->getKind(), + data_file_field->getKind()); + } + std::string filed_lower_name = to_lower(delete_file_field->getFieldName(idx)); + eq_file_node->add_children( + filed_lower_name, delete_file_field->getFieldName(idx), + std::make_shared()); + + delete_col_ids.emplace_back(delete_field_id); + delete_col_names.emplace_back(filed_lower_name); + delete_col_types.emplace_back(make_nullable(delete_file_col_types[idx])); + read_column_field_ids_set.erase(delete_field_id); + } + } + } + if (!read_column_field_ids_set.empty()) [[unlikely]] { + return Status::DataQualityError("some field ids not found in equality delete file."); + } + + for (uint32_t idx = 0; idx < delete_col_names.size(); ++idx) { + delete_col_name_to_block_idx[delete_col_names[idx]] = idx; + } + + RETURN_IF_ERROR(delete_reader->init_reader(&delete_col_names, &delete_col_name_to_block_idx, + {}, false, nullptr, nullptr, nullptr, nullptr, + eq_file_node)); + RETURN_IF_ERROR(delete_reader->set_fill_columns(partition_columns, missing_columns)); + + if (!_equality_delete_block_map.contains(delete_col_ids)) { + _equality_delete_block_map.emplace(delete_col_ids, _equality_delete_blocks.size()); + Block block; + _generate_equality_delete_block(&block, delete_col_names, delete_col_types); + _equality_delete_blocks.emplace_back(block); + } + Block& eq_file_block = _equality_delete_blocks[_equality_delete_block_map[delete_col_ids]]; + bool eof = false; + while (!eof) { + Block tmp_block; + _generate_equality_delete_block(&tmp_block, delete_col_names, delete_col_types); + size_t read_rows = 0; + RETURN_IF_ERROR(delete_reader->get_next_block(&tmp_block, &read_rows, &eof)); + if (read_rows > 0) { + MutableBlock mutable_block(&eq_file_block); + RETURN_IF_ERROR(mutable_block.merge(tmp_block)); + } + } + } + + for (const auto& [delete_col_ids, block_idx] : _equality_delete_block_map) { + auto& eq_file_block = _equality_delete_blocks[block_idx]; + auto equality_delete_impl = + EqualityDeleteBase::get_delete_impl(&eq_file_block, delete_col_ids); + RETURN_IF_ERROR(equality_delete_impl->init(_profile)); + _equality_delete_impls.emplace_back(std::move(equality_delete_impl)); + } + return Status::OK(); +} #include "common/compile_check_end.h" } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index ffbb18d1255ea6..5a9555a66975c6 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -118,9 +118,8 @@ class IcebergTableReader : public TableFormatReader, public TableSchemaChangeHel Status _position_delete_base(const std::string data_file_path, const std::vector& delete_files); - Status _equality_delete_base(const std::vector& delete_files); - virtual std::unique_ptr _create_equality_reader( - const TFileRangeDesc& delete_desc) = 0; + virtual Status _process_equality_delete( + const std::vector& delete_files) = 0; void _generate_equality_delete_block(Block* block, const std::vector& equality_delete_col_names, const std::vector& equality_delete_col_types); @@ -134,9 +133,6 @@ class IcebergTableReader : public TableFormatReader, public TableSchemaChangeHel IcebergProfile _iceberg_profile; // _iceberg_delete_rows from kv_cache const std::vector* _iceberg_delete_rows = nullptr; - std::vector _expand_col_names; - std::vector _expand_columns; - std::vector _all_required_col_names; // Pointer to external column name to block index mapping (from FileScanner) // Used to dynamically add expand columns for equality delete @@ -160,9 +156,22 @@ class IcebergTableReader : public TableFormatReader, public TableSchemaChangeHel void _gen_position_delete_file_range(Block& block, DeleteFile* const position_delete, size_t read_rows, bool file_path_column_dictionary_coded); - // equality delete - Block _equality_delete_block; - std::unique_ptr _equality_delete_impl; + // read table colummn + extra equality delete columns + std::vector _all_required_col_names; + + // extra equality delete name and type + std::vector _expand_col_names; + std::vector _expand_columns; + + // all ids that need read for eq delete (from all qe delte file.) + std::set _equality_delete_col_ids; + // eq delete column ids -> location of _equality_delete_blocks / _equality_delete_impls + std::map, int> _equality_delete_block_map; + std::vector _equality_delete_blocks; + std::vector> _equality_delete_impls; + + // id -> block column name. + std::unordered_map _id_to_block_column_name; }; class IcebergParquetReader final : public IcebergTableReader { @@ -191,20 +200,15 @@ class IcebergParquetReader final : public IcebergTableReader { parquet_reader->set_delete_rows(_iceberg_delete_rows); } -protected: - std::unique_ptr _create_equality_reader( - const TFileRangeDesc& delete_desc) final { - return ParquetReader::create_unique(_profile, _params, delete_desc, - READ_DELETE_FILE_BATCH_SIZE, &_state->timezone_obj(), - _io_ctx, _state, _meta_cache); - } - private: static ColumnIdResult _create_column_ids(const FieldDescriptor* field_desc, const TupleDescriptor* tuple_descriptor); Status _read_position_delete_file(const TFileRangeDesc* delete_range, DeleteFile* position_delete) final; + Status _process_equality_delete(const std::vector& delete_files) final; + + const FieldDescriptor* _data_file_field_desc = nullptr; }; class IcebergOrcReader final : public IcebergTableReader { public: @@ -234,20 +238,15 @@ class IcebergOrcReader final : public IcebergTableReader { const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts); -protected: - std::unique_ptr _create_equality_reader( - const TFileRangeDesc& delete_desc) override { - return OrcReader::create_unique(_profile, _state, _params, delete_desc, - READ_DELETE_FILE_BATCH_SIZE, _state->timezone(), _io_ctx, - _meta_cache); - } - private: + Status _process_equality_delete(const std::vector& delete_files) final; + static ColumnIdResult _create_column_ids(const orc::Type* orc_type, const TupleDescriptor* tuple_descriptor); private: static const std::string ICEBERG_ORC_ATTRIBUTE; + const orc::Type* _data_file_type_desc = nullptr; }; } // namespace vectorized diff --git a/be/src/vec/exec/format/table/table_format_reader.cpp b/be/src/vec/exec/format/table/table_format_reader.cpp index 2c614777a890f3..e7f937f0b96d2c 100644 --- a/be/src/vec/exec/format/table/table_format_reader.cpp +++ b/be/src/vec/exec/format/table/table_format_reader.cpp @@ -376,45 +376,9 @@ Status TableSchemaChangeHelper::BuildTableInfoUtil::by_table_field_id( return Status::OK(); } -Status TableSchemaChangeHelper::BuildTableInfoUtil::by_parquet_field_id( - const schema::external::TStructField& table_schema, - const FieldDescriptor& parquet_field_desc, - std::shared_ptr& node, bool& exist_field_id) { - auto struct_node = std::make_shared(); - auto parquet_fields_schema = parquet_field_desc.get_fields_schema(); - std::map file_column_id_idx_map; - for (size_t idx = 0; idx < parquet_fields_schema.size(); idx++) { - if (parquet_fields_schema[idx].field_id == -1) { - exist_field_id = false; - return Status::OK(); - } else { - file_column_id_idx_map.emplace(parquet_fields_schema[idx].field_id, idx); - } - } - - for (const auto& table_field : table_schema.fields) { - const auto& table_column_name = table_field.field_ptr->name; - - if (file_column_id_idx_map.contains(table_field.field_ptr->id)) { - auto file_column_idx = file_column_id_idx_map[table_field.field_ptr->id]; - std::shared_ptr field_node = nullptr; - RETURN_IF_ERROR(by_parquet_field_id(*table_field.field_ptr, - parquet_fields_schema[file_column_idx], field_node, - exist_field_id)); - struct_node->add_children(table_column_name, - parquet_fields_schema[file_column_idx].name, field_node); - } else { - struct_node->add_not_exist_children(table_column_name); - } - } - - node = struct_node; - return Status::OK(); -} - Status TableSchemaChangeHelper::BuildTableInfoUtil::by_parquet_field_id( const schema::external::TField& table_schema, const FieldSchema& parquet_field, - std::shared_ptr& node, bool& exist_field_id) { + const bool exist_field_id, std::shared_ptr& node) { switch (table_schema.type.type) { case TPrimitiveType::MAP: { if (parquet_field.data_type->get_primitive_type() != TYPE_MAP) [[unlikely]] { @@ -433,11 +397,11 @@ Status TableSchemaChangeHelper::BuildTableInfoUtil::by_parquet_field_id( std::shared_ptr value_node = nullptr; RETURN_IF_ERROR(by_parquet_field_id(*table_schema.nestedField.map_field.key_field.field_ptr, - parquet_field.children[0], key_node, exist_field_id)); + parquet_field.children[0], exist_field_id, key_node)); RETURN_IF_ERROR( by_parquet_field_id(*table_schema.nestedField.map_field.value_field.field_ptr, - parquet_field.children[1], value_node, exist_field_id)); + parquet_field.children[1], exist_field_id, value_node)); node = std::make_shared(key_node, value_node); break; @@ -456,7 +420,7 @@ Status TableSchemaChangeHelper::BuildTableInfoUtil::by_parquet_field_id( std::shared_ptr element_node = nullptr; RETURN_IF_ERROR( by_parquet_field_id(*table_schema.nestedField.array_field.item_field.field_ptr, - parquet_field.children[0], element_node, exist_field_id)); + parquet_field.children[0], exist_field_id, element_node)); node = std::make_shared(element_node); break; @@ -470,27 +434,56 @@ Status TableSchemaChangeHelper::BuildTableInfoUtil::by_parquet_field_id( auto struct_node = std::make_shared(); - std::map file_column_id_idx_map; - for (size_t idx = 0; idx < parquet_field.children.size(); idx++) { - if (parquet_field.children[idx].field_id == -1) { - exist_field_id = false; - return Status::OK(); - } else { + if (exist_field_id) { + std::map file_column_id_idx_map; + for (size_t idx = 0; idx < parquet_field.children.size(); idx++) { + DCHECK_NE(parquet_field.children[idx].field_id, -1); file_column_id_idx_map.emplace(parquet_field.children[idx].field_id, idx); } - } - for (const auto& table_field : table_schema.nestedField.struct_field.fields) { - const auto& table_column_name = table_field.field_ptr->name; - if (file_column_id_idx_map.contains(table_field.field_ptr->id)) { - const auto& file_field = parquet_field.children.at( - file_column_id_idx_map[table_field.field_ptr->id]); - std::shared_ptr field_node = nullptr; - RETURN_IF_ERROR(by_parquet_field_id(*table_field.field_ptr, file_field, field_node, - exist_field_id)); - struct_node->add_children(table_column_name, file_field.name, field_node); - } else { - struct_node->add_not_exist_children(table_column_name); + for (const auto& table_field : table_schema.nestedField.struct_field.fields) { + const auto& table_column_name = table_field.field_ptr->name; + if (file_column_id_idx_map.contains(table_field.field_ptr->id)) { + const auto& file_field = parquet_field.children.at( + file_column_id_idx_map[table_field.field_ptr->id]); + std::shared_ptr field_node = nullptr; + RETURN_IF_ERROR(by_parquet_field_id(*table_field.field_ptr, file_field, + exist_field_id, field_node)); + struct_node->add_children(table_column_name, file_field.name, field_node); + } else { + struct_node->add_not_exist_children(table_column_name); + } + } + } else { + std::map file_column_idx_map; + for (size_t idx = 0; idx < parquet_field.children.size(); idx++) { + file_column_idx_map.emplace(parquet_field.children[idx].name, idx); + } + + for (const auto& table_field : table_schema.nestedField.struct_field.fields) { + const auto& table_column_name = table_field.field_ptr->name; + if (!table_field.field_ptr->__isset.name_mapping || + table_field.field_ptr->name_mapping.size() == 0) { + return Status::DataQualityError( + "name_mapping must be set when read missing field id data file."); + } + + auto have_mapping = false; + for (const auto& mapped_name : table_field.field_ptr->name_mapping) { + if (file_column_idx_map.contains(mapped_name)) { + std::shared_ptr field_node = nullptr; + const auto& file_field = + parquet_field.children.at(file_column_idx_map.at(mapped_name)); + RETURN_IF_ERROR(by_parquet_field_id(*table_field.field_ptr, file_field, + exist_field_id, field_node)); + struct_node->add_children(table_column_name, file_field.name, field_node); + have_mapping = true; + break; + } + } + if (!have_mapping) { + struct_node->add_not_exist_children(table_column_name); + } } } node = struct_node; @@ -504,46 +497,10 @@ Status TableSchemaChangeHelper::BuildTableInfoUtil::by_parquet_field_id( return Status::OK(); } -Status TableSchemaChangeHelper::BuildTableInfoUtil::by_orc_field_id( - const schema::external::TStructField& table_schema, const orc::Type* orc_root, - const std::string& field_id_attribute_key, - std::shared_ptr& node, bool& exist_field_id) { - auto struct_node = std::make_shared(); - - std::map file_column_id_idx_map; - for (size_t idx = 0; idx < orc_root->getSubtypeCount(); idx++) { - if (orc_root->getSubtype(idx)->hasAttributeKey(field_id_attribute_key)) { - auto field_id = - std::stoi(orc_root->getSubtype(idx)->getAttributeValue(field_id_attribute_key)); - file_column_id_idx_map.emplace(field_id, idx); - } else { - exist_field_id = false; - return Status::OK(); - } - } - - for (const auto& table_field : table_schema.fields) { - const auto& table_column_name = table_field.field_ptr->name; - if (file_column_id_idx_map.contains(table_field.field_ptr->id)) { - auto file_field_idx = file_column_id_idx_map[table_field.field_ptr->id]; - const auto& file_field = orc_root->getSubtype(file_field_idx); - std::shared_ptr field_node = nullptr; - RETURN_IF_ERROR(by_orc_field_id(*table_field.field_ptr, file_field, - field_id_attribute_key, field_node, exist_field_id)); - struct_node->add_children(table_column_name, orc_root->getFieldName(file_field_idx), - field_node); - } else { - struct_node->add_not_exist_children(table_column_name); - } - } - node = struct_node; - return Status::OK(); -} - Status TableSchemaChangeHelper::BuildTableInfoUtil::by_orc_field_id( const schema::external::TField& table_schema, const orc::Type* orc_root, - const std::string& field_id_attribute_key, - std::shared_ptr& node, bool& exist_field_id) { + const std::string& field_id_attribute_key, const bool exist_field_id, + std::shared_ptr& node) { switch (table_schema.type.type) { case TPrimitiveType::MAP: { if (orc_root->getKind() != orc::TypeKind::MAP) [[unlikely]] { @@ -562,12 +519,12 @@ Status TableSchemaChangeHelper::BuildTableInfoUtil::by_orc_field_id( std::shared_ptr value_node = nullptr; RETURN_IF_ERROR(by_orc_field_id(*table_schema.nestedField.map_field.key_field.field_ptr, - orc_root->getSubtype(0), field_id_attribute_key, key_node, - exist_field_id)); + orc_root->getSubtype(0), field_id_attribute_key, + exist_field_id, key_node)); RETURN_IF_ERROR(by_orc_field_id(*table_schema.nestedField.map_field.value_field.field_ptr, - orc_root->getSubtype(1), field_id_attribute_key, value_node, - exist_field_id)); + orc_root->getSubtype(1), field_id_attribute_key, + exist_field_id, value_node)); node = std::make_shared(key_node, value_node); break; @@ -586,7 +543,7 @@ Status TableSchemaChangeHelper::BuildTableInfoUtil::by_orc_field_id( std::shared_ptr element_node = nullptr; RETURN_IF_ERROR(by_orc_field_id(*table_schema.nestedField.array_field.item_field.field_ptr, orc_root->getSubtype(0), field_id_attribute_key, - element_node, exist_field_id)); + exist_field_id, element_node)); node = std::make_shared(element_node); break; @@ -597,9 +554,69 @@ Status TableSchemaChangeHelper::BuildTableInfoUtil::by_orc_field_id( } MOCK_REMOVE(DCHECK(table_schema.__isset.nestedField)); MOCK_REMOVE(DCHECK(table_schema.nestedField.__isset.struct_field)); - RETURN_IF_ERROR(by_orc_field_id(table_schema.nestedField.struct_field, orc_root, - field_id_attribute_key, node, exist_field_id)); + auto struct_node = std::make_shared(); + if (exist_field_id) { + std::map file_column_id_idx_map; + for (size_t idx = 0; idx < orc_root->getSubtypeCount(); idx++) { + auto field_id = std::stoi( + orc_root->getSubtype(idx)->getAttributeValue(field_id_attribute_key)); + file_column_id_idx_map.emplace(field_id, idx); + } + + for (const auto& table_field : table_schema.nestedField.struct_field.fields) { + const auto& table_column_name = table_field.field_ptr->name; + if (file_column_id_idx_map.contains(table_field.field_ptr->id)) { + auto file_field_idx = file_column_id_idx_map[table_field.field_ptr->id]; + const auto& file_field = orc_root->getSubtype(file_field_idx); + std::shared_ptr field_node = nullptr; + RETURN_IF_ERROR(by_orc_field_id(*table_field.field_ptr, file_field, + field_id_attribute_key, exist_field_id, + field_node)); + struct_node->add_children(table_column_name, + orc_root->getFieldName(file_field_idx), field_node); + } else { + struct_node->add_not_exist_children(table_column_name); + } + } + } else { + std::map file_column_idx_map; + + for (size_t idx = 0; idx < orc_root->getSubtypeCount(); idx++) { + file_column_idx_map.emplace(orc_root->getFieldName(idx), idx); + } + + for (const auto& table_field : table_schema.nestedField.struct_field.fields) { + const auto& table_column_name = table_field.field_ptr->name; + if (!table_field.field_ptr->__isset.name_mapping || + table_field.field_ptr->name_mapping.size() == 0) { + return Status::DataQualityError( + "name_mapping must be set when read missing field id data file."); + } + auto have_mapping = false; + for (const auto& mapped_name : table_field.field_ptr->name_mapping) { + if (file_column_idx_map.contains(mapped_name)) { + std::shared_ptr field_node = nullptr; + auto file_field_idx = file_column_idx_map.at(mapped_name); + const auto& file_field = orc_root->getSubtype(file_field_idx); + + RETURN_IF_ERROR(by_orc_field_id(*table_field.field_ptr, file_field, + field_id_attribute_key, exist_field_id, + field_node)); + struct_node->add_children(table_column_name, + orc_root->getFieldName(file_field_idx), + field_node); + have_mapping = true; + break; + } + } + if (!have_mapping) { + struct_node->add_not_exist_children(table_column_name); + } + } + } + + node = struct_node; break; } default: { diff --git a/be/src/vec/exec/format/table/table_format_reader.h b/be/src/vec/exec/format/table/table_format_reader.h index 00442dc45f992f..d32170c0afa370 100644 --- a/be/src/vec/exec/format/table/table_format_reader.h +++ b/be/src/vec/exec/format/table/table_format_reader.h @@ -385,31 +385,18 @@ class TableSchemaChangeHelper { const schema::external::TStructField& file_schema, std::shared_ptr& node); - //for iceberg parquet: Use the field id in the `table schema` and the parquet file to match columns. - static Status by_parquet_field_id(const schema::external::TStructField& table_schema, - const FieldDescriptor& parquet_field_desc, - std::shared_ptr& node, - bool& exist_field_id); - // for iceberg parquet static Status by_parquet_field_id(const schema::external::TField& table_schema, const FieldSchema& parquet_field, - std::shared_ptr& node, - bool& exist_field_id); - - // for iceberg orc : Use the field id in the `table schema` and the orc file to match columns. - static Status by_orc_field_id(const schema::external::TStructField& table_schema, - const orc::Type* orc_root, - const std::string& field_id_attribute_key, - std::shared_ptr& node, - bool& exist_field_id); + const bool exist_field_id, + std::shared_ptr& node); // for iceberg orc static Status by_orc_field_id(const schema::external::TField& table_schema, const orc::Type* orc_root, const std::string& field_id_attribute_key, - std::shared_ptr& node, - bool& exist_field_id); + const bool exist_field_id, + std::shared_ptr& node); }; }; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalUtil.java index ed3b34db3273b8..17261e1ca247f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalUtil.java @@ -17,7 +17,13 @@ package org.apache.doris.datasource; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.catalog.ArrayType; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.StructField; +import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.Type; import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.schema.external.TArrayField; import org.apache.doris.thrift.schema.external.TField; @@ -27,7 +33,10 @@ import org.apache.doris.thrift.schema.external.TSchema; import org.apache.doris.thrift.schema.external.TStructField; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class ExternalUtil { private static TField getExternalSchema(Column column) { @@ -80,4 +89,104 @@ public static void initSchemaInfo(TFileScanRangeParams params, Long schemaId, Li tSchema.setRootField(getExternalSchema(columns)); params.addToHistorySchemaInfo(tSchema); } + + + /** + * Initialize schema info based on SlotDescriptors, only including columns that are actually needed. + * For nested columns, only include sub-columns that are accessed according to pruned type. + * + * @param params TFileScanRangeParams to fill + * @param schemaId Schema ID + * @param slots List of SlotDescriptors that are actually needed + * @param nameMapping NameMapping from Iceberg table properties (can be null and empty.) + */ + public static void initSchemaInfo(TFileScanRangeParams params, Long schemaId, + List slots, Map> nameMapping) { + params.setCurrentSchemaId(schemaId); + TSchema tSchema = new TSchema(); + tSchema.setSchemaId(schemaId); + tSchema.setRootField(getExternalSchema(slots, nameMapping)); + params.addToHistorySchemaInfo(tSchema); + } + + private static TStructField getExternalSchema(List slots, + Map> nameMapping) { + TStructField structField = new TStructField(); + for (SlotDescriptor slot : slots) { + String colName = slot.getColumn().getName(); + if (colName.startsWith(Column.GLOBAL_ROWID_COL)) { + continue; + } + + TFieldPtr fieldPtr = new TFieldPtr(); + TField field = getExternalSchema(slot.getType(), slot.getColumn(), nameMapping); + fieldPtr.setFieldPtr(field); + structField.addToFields(fieldPtr); + } + return structField; + } + + private static TField getExternalSchema(Type columnType, Column dorisColumn, + Map> nameMapping) { + TField root = new TField(); + root.setName(dorisColumn.getName()); + root.setId(dorisColumn.getUniqueId()); + root.setIsOptional(dorisColumn.isAllowNull()); + root.setType(dorisColumn.getType().toColumnTypeThrift()); + + if (nameMapping != null && nameMapping.containsKey(dorisColumn.getUniqueId())) { + // for iceberg set name mapping. + root.setNameMapping(new ArrayList<>(nameMapping.get(dorisColumn.getUniqueId()))); + } + + TNestedField nestedField = new TNestedField(); + + if (columnType.isStructType()) { + StructType dorisStructType = (StructType) columnType; + TStructField structField = new TStructField(); + + Map subNameToSubColumn = new HashMap<>(); + for (int i = 0; i < dorisColumn.getChildren().size(); i++) { + Column subColumn = dorisColumn.getChildren().get(i); + subNameToSubColumn.put(subColumn.getName(), subColumn); + } + + for (StructField subField : dorisStructType.getFields()) { + TFieldPtr fieldPtr = new TFieldPtr(); + Column subColumn = subNameToSubColumn.get(subField.getName()); + fieldPtr.setFieldPtr(getExternalSchema(subField.getType(), subColumn, nameMapping)); + structField.addToFields(fieldPtr); + } + + nestedField.setStructField(structField); + root.setNestedField(nestedField); + } else if (columnType.isArrayType()) { + ArrayType dorisArrayType = (ArrayType) columnType; + + TArrayField listField = new TArrayField(); + TFieldPtr fieldPtr = new TFieldPtr(); + fieldPtr.setFieldPtr(getExternalSchema( + dorisArrayType.getItemType(), dorisColumn.getChildren().get(0), nameMapping)); + listField.setItemField(fieldPtr); + nestedField.setArrayField(listField); + root.setNestedField(nestedField); + } else if (columnType.isMapType()) { + MapType dorisMapType = (MapType) columnType; + + TMapField mapField = new TMapField(); + TFieldPtr keyPtr = new TFieldPtr(); + keyPtr.setFieldPtr(getExternalSchema( + dorisMapType.getKeyType(), dorisColumn.getChildren().get(0), nameMapping)); + + mapField.setKeyField(keyPtr); + TFieldPtr valuePtr = new TFieldPtr(); + valuePtr.setFieldPtr(getExternalSchema( + dorisMapType.getKeyType(), dorisColumn.getChildren().get(1), nameMapping)); + mapField.setValueField(valuePtr); + nestedField.setMapField(mapField); + root.setNestedField(nestedField); + } + return root; + } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index f1dc6cd4eb9fd2..4d2961e3f30dc0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -77,6 +77,7 @@ import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -85,6 +86,10 @@ import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.mapping.MappedField; +import org.apache.iceberg.mapping.MappedFields; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.util.ScanTaskUtil; import org.apache.iceberg.util.TableScanUtil; import org.apache.logging.log4j.LogManager; @@ -201,7 +206,40 @@ protected void doInitialize() throws UserException { ); backendStorageProperties = CredentialUtils.getBackendPropertiesFromStorageMap(storagePropertiesMap); super.doInitialize(); - ExternalUtil.initSchemaInfo(params, -1L, source.getTargetTable().getColumns()); + } + + /** + * Extract name mapping from Iceberg table properties. + * Returns a map from field ID to list of mapped names. + */ + private Map> extractNameMapping() { + Map> result = new HashMap<>(); + try { + String nameMappingJson = icebergTable.properties().get(TableProperties.DEFAULT_NAME_MAPPING); + if (nameMappingJson != null && !nameMappingJson.isEmpty()) { + NameMapping mapping = NameMappingParser.fromJson(nameMappingJson); + if (mapping != null) { + // Extract mappings from NameMapping + // NameMapping contains field mappings, we need to convert them to our format + extractMappingsFromNameMapping(mapping.asMappedFields(), result); + } + } + } catch (Exception e) { + // If name mapping parsing fails, continue without it + LOG.warn("Failed to parse name mapping from Iceberg table properties", e); + } + return result; + } + + private void extractMappingsFromNameMapping(MappedFields mappingFields, Map> result) { + if (mappingFields == null) { + return; + } + for (MappedField mappedField : mappingFields.fields()) { + result.put(mappedField.id(), new ArrayList<>(mappedField.names())); + extractMappingsFromNameMapping(mappedField.nestedMapping(), result); + } + } @Override @@ -277,8 +315,19 @@ private void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSpli rangeDesc.setTableFormatParams(tableFormatFileDesc); } + @Override + public void createScanRangeLocations() throws UserException { + super.createScanRangeLocations(); + // Extract name mapping from Iceberg table properties + Map> nameMapping = extractNameMapping(); + + // Use new initSchemaInfo method that only includes needed columns based on slots and pruned type + ExternalUtil.initSchemaInfo(params, -1L, desc.getSlots(), nameMapping); + } + @Override public List getSplits(int numBackends) throws UserException { + try { return preExecutionAuthenticator.execute(() -> doGetSplits(numBackends)); } catch (Exception e) { @@ -807,6 +856,7 @@ private List getDeleteFileFilters(FileScanTask spitTask if (delete.content() == FileContent.POSITION_DELETES) { filters.add(IcebergDeleteFileFilter.createPositionDelete(delete)); } else if (delete.content() == FileContent.EQUALITY_DELETES) { + // spitTask.schema().findField() filters.add(IcebergDeleteFileFilter.createEqualityDelete( delete.path().toString(), delete.equalityFieldIds(), delete.fileSizeInBytes(), delete.format())); diff --git a/gensrc/thrift/ExternalTableSchema.thrift b/gensrc/thrift/ExternalTableSchema.thrift index aff93b811431d1..14972f8d434784 100644 --- a/gensrc/thrift/ExternalTableSchema.thrift +++ b/gensrc/thrift/ExternalTableSchema.thrift @@ -49,6 +49,7 @@ struct TField { 3: optional string name, // Field name 4: optional Types.TColumnType type, // Corresponding Doris column type 5: optional TNestedField nestedField // Nested field definition (for array, struct, or map types) + 6: optional list name_mapping // iceberg : schema.name-mapping.default, for missing column id. }