Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 0 additions & 26 deletions src/paimon/core/operation/append_only_file_store_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,30 +157,4 @@ Result<bool> AppendOnlyFileStoreScan::TestFileIndex(
return index_result->IsRemain();
}

Result<std::shared_ptr<Predicate>> AppendOnlyFileStoreScan::ReconstructPredicateWithNonCastedFields(
const std::shared_ptr<Predicate>& predicate,
const std::shared_ptr<SimpleStatsEvolution>& evolution) {
const auto& id_to_data_fields = evolution->GetFieldIdToDataField();
const auto& name_to_table_fields = evolution->GetFieldNameToTableField();

std::set<std::string> field_names_in_predicate;
PAIMON_RETURN_NOT_OK(PredicateUtils::GetAllNames(predicate, &field_names_in_predicate));
std::set<std::string> excluded_field_names;
for (const auto& field_name : field_names_in_predicate) {
auto table_iter = name_to_table_fields.find(field_name);
if (table_iter == name_to_table_fields.end()) {
return Status::Invalid(
fmt::format("field {} in predicate is not included in table schema", field_name));
}
auto data_iter = id_to_data_fields.find(table_iter->second.Id());
if (data_iter != id_to_data_fields.end()) {
// TODO(liancheng.lsz): isnull/notnull predicates trimming might not be required
if (!data_iter->second.second.Type()->Equals(table_iter->second.Type())) {
excluded_field_names.insert(field_name);
}
}
}
return PredicateUtils::ExcludePredicateWithFields(predicate, excluded_field_names);
}

} // namespace paimon
5 changes: 0 additions & 5 deletions src/paimon/core/operation/append_only_file_store_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,6 @@ class AppendOnlyFileStoreScan : public FileStoreScan {
Result<bool> FilterByStats(const ManifestEntry& entry) const override;

private:
// TODO(liancheng.lsz): to be moved in class FileStoreScan
static Result<std::shared_ptr<Predicate>> ReconstructPredicateWithNonCastedFields(
const std::shared_ptr<Predicate>& predicate,
const std::shared_ptr<SimpleStatsEvolution>& evolution);

Result<bool> TestFileIndex(const std::shared_ptr<DataFileMeta>& meta,
const std::shared_ptr<SimpleStatsEvolution>& evolution,
const std::shared_ptr<TableSchema>& data_schema) const;
Expand Down
29 changes: 29 additions & 0 deletions src/paimon/core/operation/file_store_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <future>
#include <list>
#include <numeric>
#include <set>
#include <unordered_map>
#include <unordered_set>

Expand All @@ -28,6 +29,7 @@
#include "paimon/common/data/binary_array.h"
#include "paimon/common/executor/future.h"
#include "paimon/common/predicate/literal_converter.h"
#include "paimon/common/predicate/predicate_utils.h"
#include "paimon/common/types/data_field.h"
#include "paimon/common/utils/field_type_utils.h"
#include "paimon/core/io/data_file_meta.h"
Expand All @@ -39,6 +41,7 @@
#include "paimon/core/operation/metrics/scan_metrics.h"
#include "paimon/core/partition/partition_info.h"
#include "paimon/core/stats/simple_stats.h"
#include "paimon/core/stats/simple_stats_evolution.h"
#include "paimon/core/utils/duration.h"
#include "paimon/core/utils/field_mapping.h"
#include "paimon/core/utils/snapshot_manager.h"
Expand All @@ -50,6 +53,32 @@
namespace paimon {
enum class FieldType;

Result<std::shared_ptr<Predicate>> FileStoreScan::ReconstructPredicateWithNonCastedFields(
const std::shared_ptr<Predicate>& predicate,
const std::shared_ptr<SimpleStatsEvolution>& evolution) {
const auto& id_to_data_fields = evolution->GetFieldIdToDataField();
const auto& name_to_table_fields = evolution->GetFieldNameToTableField();

std::set<std::string> field_names_in_predicate;
PAIMON_RETURN_NOT_OK(PredicateUtils::GetAllNames(predicate, &field_names_in_predicate));
std::set<std::string> excluded_field_names;
for (const auto& field_name : field_names_in_predicate) {
auto table_iter = name_to_table_fields.find(field_name);
if (table_iter == name_to_table_fields.end()) {
return Status::Invalid(
fmt::format("field {} in predicate is not included in table schema", field_name));
}
auto data_iter = id_to_data_fields.find(table_iter->second.Id());
if (data_iter != id_to_data_fields.end()) {
// Exclude fields requiring casting to avoid false negatives in stats filtering.
if (!data_iter->second.second.Type()->Equals(table_iter->second.Type())) {
excluded_field_names.insert(field_name);
}
}
}
return PredicateUtils::ExcludePredicateWithFields(predicate, excluded_field_names);
}

std::vector<ManifestEntry> FileStoreScan::RawPlan::Files(const FileKind& kind) {
std::vector<ManifestEntry> entries = Files();
std::vector<ManifestEntry> filtered_entries;
Expand Down
8 changes: 8 additions & 0 deletions src/paimon/core/operation/file_store_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ class ManifestFile;
class ManifestFileMeta;
class ManifestList;
class MemoryPool;
class Predicate;
class ScanFilter;
class SchemaManager;
class SimpleStatsEvolution;
class SnapshotManager;
class TableSchema;

Expand Down Expand Up @@ -214,6 +216,12 @@ class FileStoreScan {
const std::shared_ptr<arrow::Schema>& arrow_schema,
const std::shared_ptr<ScanFilter>& scan_filters);

// When schema evolves, predicates might contain fields requiring casting. To avoid false
// negatives when filtering by stats, we exclude those fields from predicate.
static Result<std::shared_ptr<Predicate>> ReconstructPredicateWithNonCastedFields(
const std::shared_ptr<Predicate>& predicate,
const std::shared_ptr<SimpleStatsEvolution>& evolution);

private:
Status ReadManifests(std::optional<Snapshot>* snapshot_ptr,
std::vector<ManifestFileMeta>* all_manifests_ptr,
Expand Down
54 changes: 48 additions & 6 deletions src/paimon/core/operation/key_value_file_store_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
#include "paimon/core/operation/key_value_file_store_scan.h"

#include <cstdint>
#include <exception>
#include <map>
#include <optional>
#include <set>
#include <utility>

#include "fmt/format.h"
#include "paimon/common/data/binary_array.h"
#include "paimon/common/data/binary_row.h"
#include "paimon/common/predicate/predicate_filter.h"
Expand All @@ -32,6 +34,8 @@
#include "paimon/core/options/merge_engine.h"
#include "paimon/core/schema/table_schema.h"
#include "paimon/core/stats/simple_stats.h"
#include "paimon/core/stats/simple_stats_evolution.h"
#include "paimon/core/stats/simple_stats_evolutions.h"
#include "paimon/predicate/predicate.h"

namespace arrow {
Expand All @@ -44,7 +48,6 @@ class ManifestFile;
class ManifestList;
class MemoryPool;
class ScanFilter;
class SchemaManager;
class SnapshotManager;

Result<std::unique_ptr<KeyValueFileStoreScan>> KeyValueFileStoreScan::Create(
Expand Down Expand Up @@ -154,15 +157,41 @@ Result<bool> KeyValueFileStoreScan::IsValueFilterEnabled() const {
}

Result<bool> KeyValueFileStoreScan::FilterByValueFilter(const ManifestEntry& entry) const {
if (entry.File()->value_stats_cols != std::nullopt) {
return Status::NotImplemented("do not support value stats cols in DataFileMeta");
if (!value_filter_) {
return true;
}
if (entry.File()->embedded_index != nullptr) {
return Status::NotImplemented("do not support embedded index in DataFileMeta");
}
const auto& stats = entry.File()->value_stats;
return value_filter_->Test(schema_, entry.File()->row_count, stats.MinValues(),
stats.MaxValues(), stats.NullCounts());

const auto& meta = entry.File();

// Primary key table currently does not support schema evolution for value filtering.
// Here we only handle `value_stats_cols` (dense stats) projection.
if (meta->schema_id != table_schema_->Id()) {
return Status::NotImplemented(
"Primary key table does not support schema evolution in FilterByValueFilter");
}

auto evolution = evolutions_->GetOrCreate(table_schema_);

PAIMON_ASSIGN_OR_RAISE(
SimpleStatsEvolution::EvolutionStats new_stats,
evolution->Evolution(meta->value_stats, meta->row_count, meta->value_stats_cols));

try {
PAIMON_ASSIGN_OR_RAISE(
bool predicate_result,
value_filter_->Test(schema_, meta->row_count, *(new_stats.min_values),
*(new_stats.max_values), *(new_stats.null_counts)));
return predicate_result;
} catch (const std::exception& e) {
return Status::Invalid(fmt::format("FilterByValueFilter failed for file {}, with {} error",
meta->file_name, e.what()));
} catch (...) {
return Status::Invalid(fmt::format(
"FilterByValueFilter failed for file {}, with unknown error", meta->file_name));
}
}

bool KeyValueFileStoreScan::NoOverlapping(const std::vector<ManifestEntry>& entries) {
Expand Down Expand Up @@ -212,4 +241,17 @@ Result<std::vector<ManifestEntry>> KeyValueFileStoreScan::FilterWholeBucketAllFi
return std::vector<ManifestEntry>();
}

KeyValueFileStoreScan::KeyValueFileStoreScan(
const std::shared_ptr<SnapshotManager>& snapshot_manager,
const std::shared_ptr<SchemaManager>& schema_manager,
const std::shared_ptr<ManifestList>& manifest_list,
const std::shared_ptr<ManifestFile>& manifest_file,
const std::shared_ptr<TableSchema>& table_schema, const std::shared_ptr<arrow::Schema>& schema,
const CoreOptions& core_options, const std::shared_ptr<Executor>& executor,
const std::shared_ptr<MemoryPool>& pool)
: FileStoreScan(snapshot_manager, schema_manager, manifest_list, manifest_file, table_schema,
schema, core_options, executor, pool) {
evolutions_ = std::make_shared<SimpleStatsEvolutions>(table_schema, pool);
}

} // namespace paimon
7 changes: 3 additions & 4 deletions src/paimon/core/operation/key_value_file_store_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <string>
#include <vector>

#include "paimon/common/predicate/predicate_utils.h"
#include "paimon/core/manifest/manifest_entry.h"
#include "paimon/core/operation/file_store_scan.h"
#include "paimon/core/table/source/scan_mode.h"
Expand All @@ -40,6 +39,7 @@ class MemoryPool;
class PredicateFilter;
class ScanFilter;
class SchemaManager;
class SimpleStatsEvolutions;
class SnapshotManager;
class TableSchema;

Expand Down Expand Up @@ -105,13 +105,12 @@ class KeyValueFileStoreScan : public FileStoreScan {
const std::shared_ptr<arrow::Schema>& schema,
const CoreOptions& core_options,
const std::shared_ptr<Executor>& executor,
const std::shared_ptr<MemoryPool>& pool)
: FileStoreScan(snapshot_manager, schema_manager, manifest_list, manifest_file,
table_schema, schema, core_options, executor, pool) {}
const std::shared_ptr<MemoryPool>& pool);

private:
bool value_filter_force_enabled_ = false;
std::shared_ptr<PredicateFilter> key_filter_;
std::shared_ptr<PredicateFilter> value_filter_;
std::shared_ptr<SimpleStatsEvolutions> evolutions_;
};
} // namespace paimon
78 changes: 78 additions & 0 deletions src/paimon/core/operation/key_value_file_store_scan_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "paimon/predicate/literal.h"
#include "paimon/predicate/predicate_builder.h"
#include "paimon/scan_context.h"
#include "paimon/testing/utils/binary_row_generator.h"
#include "paimon/testing/utils/testharness.h"

namespace arrow {
Expand Down Expand Up @@ -323,4 +324,81 @@ TEST_F(KeyValueFileStoreScanTest, TestNoOverlapping) {
ASSERT_FALSE(KeyValueFileStoreScan::NoOverlapping(generate_manifest_entries({0, 1, 1})));
ASSERT_FALSE(KeyValueFileStoreScan::NoOverlapping(generate_manifest_entries({2, 1, 1})));
}

TEST_F(KeyValueFileStoreScanTest, TestFilterByValueFilterWithValueStatsCols) {
std::string table_path =
paimon::test::GetDataDir() + "orc/pk_table_with_mor.db/pk_table_with_mor";
std::vector<std::map<std::string, std::string>> partition_filters = {};

// `v0` is at index 6 in schema-0 of pk_table_with_mor.
auto greater_than = PredicateBuilder::GreaterThan(/*field_index=*/6, /*field_name=*/"v0",
FieldType::DOUBLE, Literal(30.1));
auto scan_filter = std::make_shared<ScanFilter>(/*predicate=*/greater_than,
/*partition_filters=*/partition_filters,
/*bucket_filter=*/0,
/*vector_search=*/nullptr);
ASSERT_OK_AND_ASSIGN(std::unique_ptr<KeyValueFileStoreScan> scan,
CreateFileStoreScan(table_path, scan_filter,
/*table_schema_id=*/0, /*snapshot_id=*/1));
scan->EnableValueFilter();

// Build dense stats for only one column `v0`.
auto pool = GetDefaultPool();
SimpleStats value_stats = BinaryRowGenerator::GenerateStats(
/*min=*/{10.0}, /*max=*/{20.0}, /*null=*/{0}, pool.get());
std::vector<std::string> value_stats_cols = {"v0"};
ManifestEntry entry(
/*kind=*/FileKind::Add(), /*partition=*/BinaryRow::EmptyRow(), /*bucket=*/0,
/*total_buckets=*/1,
std::make_shared<DataFileMeta>(
/*file_name=*/"name", /*file_size=*/1024, /*row_count=*/10,
/*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(),
/*key_stats=*/SimpleStats::EmptyStats(),
/*value_stats=*/value_stats,
/*min_sequence_number=*/0,
/*max_sequence_number=*/10,
/*schema_id=*/0,
/*level=*/1,
/*extra_files=*/std::vector<std::optional<std::string>>(),
/*creation_time=*/Timestamp(0, 0),
/*delete_row_count=*/std::nullopt,
/*embedded_index=*/nullptr,
/*file_source=*/FileSource::Append(),
/*value_stats_cols=*/value_stats_cols,
/*external_path=*/std::nullopt,
/*first_row_id=*/std::nullopt,
/*write_cols=*/std::nullopt));

// max(v0)=50 > 30.1, should be kept.
SimpleStats value_stats_keep = BinaryRowGenerator::GenerateStats(
/*min=*/{40.0}, /*max=*/{50.0}, /*null=*/{0}, pool.get());
ManifestEntry entry_keep(
/*kind=*/FileKind::Add(), /*partition=*/BinaryRow::EmptyRow(), /*bucket=*/0,
/*total_buckets=*/1,
std::make_shared<DataFileMeta>(
/*file_name=*/"name_keep", /*file_size=*/1024, /*row_count=*/10,
/*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(),
/*key_stats=*/SimpleStats::EmptyStats(),
/*value_stats=*/value_stats_keep,
/*min_sequence_number=*/0,
/*max_sequence_number=*/10,
/*schema_id=*/0,
/*level=*/1,
/*extra_files=*/std::vector<std::optional<std::string>>(),
/*creation_time=*/Timestamp(0, 0),
/*delete_row_count=*/std::nullopt,
/*embedded_index=*/nullptr,
/*file_source=*/FileSource::Append(),
/*value_stats_cols=*/value_stats_cols,
/*external_path=*/std::nullopt,
/*first_row_id=*/std::nullopt,
/*write_cols=*/std::nullopt));

// max(v0)=20 <= 30.1, should be filtered out.
ASSERT_OK_AND_ASSIGN(bool keep, scan->FilterByStats(entry));
ASSERT_FALSE(keep);

ASSERT_OK_AND_ASSIGN(keep, scan->FilterByStats(entry_keep));
ASSERT_TRUE(keep);
}
} // namespace paimon::test
Loading