Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/vec/exec/format/parquet/schema_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class FieldDescriptor {
* Get the column(the first level schema element, maybe nested field) by index.
* @param index Column index in _fields
*/
const FieldSchema* get_column(int index) const { return &_fields[index]; }
const FieldSchema* get_column(size_t index) const { return &_fields[index]; }

/**
* Get the column(the first level schema element, maybe nested field) by name.
Expand Down
94 changes: 47 additions & 47 deletions be/src/vec/exec/format/table/equality_delete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,69 +22,69 @@
namespace doris::vectorized {
#include "common/compile_check_begin.h"

std::unique_ptr<EqualityDeleteBase> EqualityDeleteBase::get_delete_impl(Block* delete_block) {
std::unique_ptr<EqualityDeleteBase> EqualityDeleteBase::get_delete_impl(
Block* delete_block, const std::vector<int>& delete_col_ids) {
DCHECK_EQ(delete_block->columns(), delete_col_ids.size());
if (delete_block->columns() == 1) {
return std::make_unique<SimpleEqualityDelete>(delete_block);
return std::make_unique<SimpleEqualityDelete>(delete_block, delete_col_ids);
} else {
return std::make_unique<MultiEqualityDelete>(delete_block);
return std::make_unique<MultiEqualityDelete>(delete_block, delete_col_ids);
}
}

Status SimpleEqualityDelete::_build_set() {
COUNTER_UPDATE(num_delete_rows, _delete_block->rows());
if (_delete_block->columns() != 1) {
if (_delete_block->columns() != 1) [[unlikely]] {
return Status::InternalError("Simple equality delete can be only applied with one column");
}
auto& column_and_type = _delete_block->get_by_position(0);
_delete_column_name = column_and_type.name;
_delete_column_type = remove_nullable(column_and_type.type)->get_primitive_type();
_hybrid_set.reset(create_set(_delete_column_type, _delete_block->rows(), false));
auto delete_column_type = remove_nullable(column_and_type.type)->get_primitive_type();
_hybrid_set.reset(create_set(delete_column_type, _delete_block->rows(), false));
_hybrid_set->insert_fixed_len(column_and_type.column, 0);
return Status::OK();
}

Status SimpleEqualityDelete::filter_data_block(
Block* data_block, const std::unordered_map<std::string, uint32_t>* col_name_to_block_idx) {
Block* data_block, const std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
const std::unordered_map<int, std::string>& id_to_block_column_name,
IColumn::Filter& filter) {
SCOPED_TIMER(equality_delete_time);
auto column_and_type =
data_block->get_by_position(col_name_to_block_idx->at(_delete_column_name));
if (column_and_type.type->get_primitive_type() != _delete_column_type) {
return Status::InternalError(
"Not support type change in column '{}', src type: {}, target type: {}",
_delete_column_name, column_and_type.type->get_name(), (int)_delete_column_type);
}
DCHECK(_delete_col_ids.size() == 0);
auto column_field_id = _delete_col_ids[0];

auto column_and_type = data_block->get_by_position(
col_name_to_block_idx->at(id_to_block_column_name.at(column_field_id)));

size_t rows = data_block->rows();
// _filter: 1 => in _hybrid_set; 0 => not in _hybrid_set
if (_filter == nullptr) {
_filter = std::make_unique<IColumn::Filter>(rows, 0);
// _filter: 1 => in _hybrid_set; 0 => not in _hybrid_set
if (_single_filter == nullptr) {
_single_filter = std::make_unique<IColumn::Filter>(rows, 0);
} else {
// reset the array capacity and fill all elements using the 0
_filter->assign(rows, UInt8(0));
_single_filter->assign(rows, UInt8(0));
}

if (column_and_type.column->is_nullable()) {
const NullMap& null_map =
reinterpret_cast<const ColumnNullable*>(column_and_type.column.get())
->get_null_map_data();
_hybrid_set->find_batch_nullable(
remove_nullable(column_and_type.column)->assume_mutable_ref(), rows, null_map,
*_filter);
*_single_filter);
if (_hybrid_set->contain_null()) {
auto* filter_data = _filter->data();
auto* filter_data = _single_filter->data();
for (size_t i = 0; i < rows; ++i) {
filter_data[i] = filter_data[i] || null_map[i];
}
}
} else {
_hybrid_set->find_batch(column_and_type.column->assume_mutable_ref(), rows, *_filter);
_hybrid_set->find_batch(column_and_type.column->assume_mutable_ref(), rows,
*_single_filter);
}
// should reverse _filter
auto* filter_data = _filter->data();
auto* filter_data = filter.data();
for (size_t i = 0; i < rows; ++i) {
filter_data[i] = !filter_data[i];
filter_data[i] &= !_single_filter->data()[i];
}

Block::filter_block_internal(data_block, *_filter, data_block->columns());
return Status::OK();
}

Expand All @@ -104,24 +104,32 @@ Status MultiEqualityDelete::_build_set() {
}

Status MultiEqualityDelete::filter_data_block(
Block* data_block, const std::unordered_map<std::string, uint32_t>* col_name_to_block_idx) {
Block* data_block, const std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
const std::unordered_map<int, std::string>& id_to_block_column_name,
IColumn::Filter& filter) {
SCOPED_TIMER(equality_delete_time);
DCHECK_EQ(_delete_block->get_columns_with_type_and_name().size(), _delete_col_ids.size());
size_t column_index = 0;

for (auto delete_col : _delete_block->get_columns_with_type_and_name()) {
const std::string& column_name = delete_col.name;
if (!col_name_to_block_idx->contains(column_name)) {
return Status::InternalError("Column '{}' not found in data block: {}", column_name,
data_block->dump_structure());
for (size_t idx = 0; idx < _delete_block->get_columns_with_type_and_name().size(); ++idx) {
auto delete_col = _delete_block->get_columns_with_type_and_name()[idx];
auto delete_col_id = _delete_col_ids[idx];

DCHECK(id_to_block_column_name.contains(delete_col_id));
const auto& block_column_name = id_to_block_column_name.at(delete_col_id);
if (!col_name_to_block_idx->contains(block_column_name)) [[unlikely]] {
return Status::InternalError("Column '{}' not found in data block: {}",
block_column_name, data_block->dump_structure());
}
auto column_and_type =
data_block->safe_get_by_position(col_name_to_block_idx->at(column_name));
if (!delete_col.type->equals(*column_and_type.type)) {
data_block->safe_get_by_position(col_name_to_block_idx->at(block_column_name));
if (!delete_col.type->equals(*column_and_type.type)) [[unlikely]] {
return Status::InternalError(
"Not support type change in column '{}', src type: {}, target type: {}",
column_name, delete_col.type->get_name(), column_and_type.type->get_name());
block_column_name, delete_col.type->get_name(),
column_and_type.type->get_name());
}
_data_column_index[column_index++] = col_name_to_block_idx->at(column_name);
_data_column_index[column_index++] = col_name_to_block_idx->at(block_column_name);
}
size_t rows = data_block->rows();
_data_hashes.clear();
Expand All @@ -130,26 +138,18 @@ Status MultiEqualityDelete::filter_data_block(
data_block->get_by_position(index).column->update_hashes_with_value(_data_hashes.data(),
nullptr);
}

if (_filter == nullptr) {
_filter = std::make_unique<IColumn::Filter>(rows, 1);
} else {
//reset the array capacity and fill all elements using the 0
_filter->assign(rows, UInt8(1));
}
auto* filter_data = _filter->data();
auto* filter_data = filter.data();
for (size_t i = 0; i < rows; ++i) {
for (auto beg = _delete_hash_map.lower_bound(_data_hashes[i]),
end = _delete_hash_map.upper_bound(_data_hashes[i]);
beg != end; ++beg) {
if (_equal(data_block, i, beg->second)) {
if (filter[i] && _equal(data_block, i, beg->second)) {
filter_data[i] = 0;
break;
}
}
}

Block::filter_block_internal(data_block, *_filter, data_block->columns());
return Status::OK();
}

Expand Down
36 changes: 21 additions & 15 deletions be/src/vec/exec/format/table/equality_delete.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ class EqualityDeleteBase {
RuntimeProfile::Counter* equality_delete_time;

Block* _delete_block;
std::vector<int> _delete_col_ids;

virtual Status _build_set() = 0;

public:
EqualityDeleteBase(Block* delete_block) : _delete_block(delete_block) {}
EqualityDeleteBase(Block* delete_block, const std::vector<int> delete_col_ids)
: _delete_block(delete_block), _delete_col_ids(delete_col_ids) {}
virtual ~EqualityDeleteBase() = default;

Status init(RuntimeProfile* profile) {
Expand All @@ -58,26 +60,29 @@ class EqualityDeleteBase {

virtual Status filter_data_block(
Block* data_block,
const std::unordered_map<std::string, uint32_t>* col_name_to_block_idx) = 0;
const std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
const std::unordered_map<int, std::string>& id_to_block_column_name,
IColumn::Filter& filter) = 0;

static std::unique_ptr<EqualityDeleteBase> get_delete_impl(Block* delete_block);
static std::unique_ptr<EqualityDeleteBase> get_delete_impl(
Block* delete_block, const std::vector<int>& delete_col_ids);
};

class SimpleEqualityDelete : public EqualityDeleteBase {
protected:
std::shared_ptr<HybridSetBase> _hybrid_set;
std::string _delete_column_name;
PrimitiveType _delete_column_type;
std::unique_ptr<IColumn::Filter> _filter;
std::unique_ptr<IColumn::Filter> _single_filter;

Status _build_set() override;

public:
SimpleEqualityDelete(Block* delete_block) : EqualityDeleteBase(delete_block) {}
SimpleEqualityDelete(Block* delete_block, const std::vector<int>& delete_col_ids)
: EqualityDeleteBase(delete_block, delete_col_ids) {}

Status filter_data_block(
Block* data_block,
const std::unordered_map<std::string, uint32_t>* col_name_to_block_idx) override;
Status filter_data_block(Block* data_block,
const std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
const std::unordered_map<int, std::string>& id_to_block_column_name,
IColumn::Filter& filter) override;
};

/**
Expand All @@ -95,18 +100,19 @@ class MultiEqualityDelete : public EqualityDeleteBase {
std::multimap<uint64_t, size_t> _delete_hash_map;
// the delete column indexes in data block
std::vector<size_t> _data_column_index;
std::unique_ptr<IColumn::Filter> _filter;

Status _build_set() override;

bool _equal(Block* data_block, size_t data_row_index, size_t delete_row_index);

public:
MultiEqualityDelete(Block* delete_block) : EqualityDeleteBase(delete_block) {}
MultiEqualityDelete(Block* delete_block, const std::vector<int>& delete_col_ids)
: EqualityDeleteBase(delete_block, delete_col_ids) {}

Status filter_data_block(
Block* data_block,
const std::unordered_map<std::string, uint32_t>* col_name_to_block_idx) override;
Status filter_data_block(Block* data_block,
const std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
const std::unordered_map<int, std::string>& id_to_block_column_name,
IColumn::Filter& filter) override;
};

#include "common/compile_check_end.h"
Expand Down
Loading