From 3e7d7728194164a29ee000c4779122e40e665c90 Mon Sep 17 00:00:00 2001 From: zhangwei Date: Fri, 27 Feb 2026 19:34:07 +0800 Subject: [PATCH 1/2] feat(core): pk table read support data manifest value_stats_cols filter --- .../operation/append_only_file_store_scan.cpp | 26 --------- .../operation/append_only_file_store_scan.h | 5 -- src/paimon/core/operation/file_store_scan.cpp | 29 ++++++++++ src/paimon/core/operation/file_store_scan.h | 8 +++ .../operation/key_value_file_store_scan.cpp | 54 ++++++++++++++++--- .../operation/key_value_file_store_scan.h | 7 ++- .../key_value_file_store_scan_test.cpp | 50 +++++++++++++++++ 7 files changed, 138 insertions(+), 41 deletions(-) diff --git a/src/paimon/core/operation/append_only_file_store_scan.cpp b/src/paimon/core/operation/append_only_file_store_scan.cpp index 00ea0a04..679223d9 100644 --- a/src/paimon/core/operation/append_only_file_store_scan.cpp +++ b/src/paimon/core/operation/append_only_file_store_scan.cpp @@ -157,30 +157,4 @@ Result AppendOnlyFileStoreScan::TestFileIndex( return index_result->IsRemain(); } -Result> AppendOnlyFileStoreScan::ReconstructPredicateWithNonCastedFields( - const std::shared_ptr& predicate, - const std::shared_ptr& evolution) { - const auto& id_to_data_fields = evolution->GetFieldIdToDataField(); - const auto& name_to_table_fields = evolution->GetFieldNameToTableField(); - - std::set field_names_in_predicate; - PAIMON_RETURN_NOT_OK(PredicateUtils::GetAllNames(predicate, &field_names_in_predicate)); - std::set excluded_field_names; - for (const auto& field_name : field_names_in_predicate) { - auto table_iter = name_to_table_fields.find(field_name); - if (table_iter == name_to_table_fields.end()) { - return Status::Invalid( - fmt::format("field {} in predicate is not included in table schema", field_name)); - } - auto data_iter = id_to_data_fields.find(table_iter->second.Id()); - if (data_iter != id_to_data_fields.end()) { - // TODO(liancheng.lsz): isnull/notnull predicates trimming might not be required - if (!data_iter->second.second.Type()->Equals(table_iter->second.Type())) { - excluded_field_names.insert(field_name); - } - } - } - return PredicateUtils::ExcludePredicateWithFields(predicate, excluded_field_names); -} - } // namespace paimon diff --git a/src/paimon/core/operation/append_only_file_store_scan.h b/src/paimon/core/operation/append_only_file_store_scan.h index 5b88272d..4053f70a 100644 --- a/src/paimon/core/operation/append_only_file_store_scan.h +++ b/src/paimon/core/operation/append_only_file_store_scan.h @@ -61,11 +61,6 @@ class AppendOnlyFileStoreScan : public FileStoreScan { Result FilterByStats(const ManifestEntry& entry) const override; private: - // TODO(liancheng.lsz): to be moved in class FileStoreScan - static Result> ReconstructPredicateWithNonCastedFields( - const std::shared_ptr& predicate, - const std::shared_ptr& evolution); - Result TestFileIndex(const std::shared_ptr& meta, const std::shared_ptr& evolution, const std::shared_ptr& data_schema) const; diff --git a/src/paimon/core/operation/file_store_scan.cpp b/src/paimon/core/operation/file_store_scan.cpp index 9fb9ed3b..d7f3a679 100644 --- a/src/paimon/core/operation/file_store_scan.cpp +++ b/src/paimon/core/operation/file_store_scan.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -28,6 +29,7 @@ #include "paimon/common/data/binary_array.h" #include "paimon/common/executor/future.h" #include "paimon/common/predicate/literal_converter.h" +#include "paimon/common/predicate/predicate_utils.h" #include "paimon/common/types/data_field.h" #include "paimon/common/utils/field_type_utils.h" #include "paimon/core/io/data_file_meta.h" @@ -39,6 +41,7 @@ #include "paimon/core/operation/metrics/scan_metrics.h" #include "paimon/core/partition/partition_info.h" #include "paimon/core/stats/simple_stats.h" +#include "paimon/core/stats/simple_stats_evolution.h" #include "paimon/core/utils/duration.h" #include "paimon/core/utils/field_mapping.h" #include "paimon/core/utils/snapshot_manager.h" @@ -50,6 +53,32 @@ namespace paimon { enum class FieldType; +Result> FileStoreScan::ReconstructPredicateWithNonCastedFields( + const std::shared_ptr& predicate, + const std::shared_ptr& evolution) { + const auto& id_to_data_fields = evolution->GetFieldIdToDataField(); + const auto& name_to_table_fields = evolution->GetFieldNameToTableField(); + + std::set field_names_in_predicate; + PAIMON_RETURN_NOT_OK(PredicateUtils::GetAllNames(predicate, &field_names_in_predicate)); + std::set excluded_field_names; + for (const auto& field_name : field_names_in_predicate) { + auto table_iter = name_to_table_fields.find(field_name); + if (table_iter == name_to_table_fields.end()) { + return Status::Invalid( + fmt::format("field {} in predicate is not included in table schema", field_name)); + } + auto data_iter = id_to_data_fields.find(table_iter->second.Id()); + if (data_iter != id_to_data_fields.end()) { + // Exclude fields requiring casting to avoid false negatives in stats filtering. + if (!data_iter->second.second.Type()->Equals(table_iter->second.Type())) { + excluded_field_names.insert(field_name); + } + } + } + return PredicateUtils::ExcludePredicateWithFields(predicate, excluded_field_names); +} + std::vector FileStoreScan::RawPlan::Files(const FileKind& kind) { std::vector entries = Files(); std::vector filtered_entries; diff --git a/src/paimon/core/operation/file_store_scan.h b/src/paimon/core/operation/file_store_scan.h index 4e387024..a8c60405 100644 --- a/src/paimon/core/operation/file_store_scan.h +++ b/src/paimon/core/operation/file_store_scan.h @@ -62,8 +62,10 @@ class ManifestFile; class ManifestFileMeta; class ManifestList; class MemoryPool; +class Predicate; class ScanFilter; class SchemaManager; +class SimpleStatsEvolution; class SnapshotManager; class TableSchema; @@ -214,6 +216,12 @@ class FileStoreScan { const std::shared_ptr& arrow_schema, const std::shared_ptr& scan_filters); + // When schema evolves, predicates might contain fields requiring casting. To avoid false + // negatives when filtering by stats, we exclude those fields from predicate. + static Result> ReconstructPredicateWithNonCastedFields( + const std::shared_ptr& predicate, + const std::shared_ptr& evolution); + private: Status ReadManifests(std::optional* snapshot_ptr, std::vector* all_manifests_ptr, diff --git a/src/paimon/core/operation/key_value_file_store_scan.cpp b/src/paimon/core/operation/key_value_file_store_scan.cpp index de84c017..fbd0dc66 100644 --- a/src/paimon/core/operation/key_value_file_store_scan.cpp +++ b/src/paimon/core/operation/key_value_file_store_scan.cpp @@ -17,11 +17,13 @@ #include "paimon/core/operation/key_value_file_store_scan.h" #include +#include #include #include #include #include +#include "fmt/format.h" #include "paimon/common/data/binary_array.h" #include "paimon/common/data/binary_row.h" #include "paimon/common/predicate/predicate_filter.h" @@ -32,6 +34,8 @@ #include "paimon/core/options/merge_engine.h" #include "paimon/core/schema/table_schema.h" #include "paimon/core/stats/simple_stats.h" +#include "paimon/core/stats/simple_stats_evolution.h" +#include "paimon/core/stats/simple_stats_evolutions.h" #include "paimon/predicate/predicate.h" namespace arrow { @@ -44,7 +48,6 @@ class ManifestFile; class ManifestList; class MemoryPool; class ScanFilter; -class SchemaManager; class SnapshotManager; Result> KeyValueFileStoreScan::Create( @@ -154,15 +157,41 @@ Result KeyValueFileStoreScan::IsValueFilterEnabled() const { } Result KeyValueFileStoreScan::FilterByValueFilter(const ManifestEntry& entry) const { - if (entry.File()->value_stats_cols != std::nullopt) { - return Status::NotImplemented("do not support value stats cols in DataFileMeta"); + if (!value_filter_) { + return true; } if (entry.File()->embedded_index != nullptr) { return Status::NotImplemented("do not support embedded index in DataFileMeta"); } - const auto& stats = entry.File()->value_stats; - return value_filter_->Test(schema_, entry.File()->row_count, stats.MinValues(), - stats.MaxValues(), stats.NullCounts()); + + const auto& meta = entry.File(); + + // Primary key table currently does not support schema evolution for value filtering. + // Here we only handle `value_stats_cols` (dense stats) projection. + if (meta->schema_id != table_schema_->Id()) { + return Status::NotImplemented( + "Primary key table does not support schema evolution in FilterByValueFilter"); + } + + auto evolution = evolutions_->GetOrCreate(table_schema_); + + PAIMON_ASSIGN_OR_RAISE( + SimpleStatsEvolution::EvolutionStats new_stats, + evolution->Evolution(meta->value_stats, meta->row_count, meta->value_stats_cols)); + + try { + PAIMON_ASSIGN_OR_RAISE( + bool predicate_result, + value_filter_->Test(schema_, meta->row_count, *(new_stats.min_values), + *(new_stats.max_values), *(new_stats.null_counts))); + return predicate_result; + } catch (const std::exception& e) { + return Status::Invalid(fmt::format("FilterByValueFilter failed for file {}, with {} error", + meta->file_name, e.what())); + } catch (...) { + return Status::Invalid(fmt::format( + "FilterByValueFilter failed for file {}, with unknown error", meta->file_name)); + } } bool KeyValueFileStoreScan::NoOverlapping(const std::vector& entries) { @@ -212,4 +241,17 @@ Result> KeyValueFileStoreScan::FilterWholeBucketAllFi return std::vector(); } +KeyValueFileStoreScan::KeyValueFileStoreScan( + const std::shared_ptr& snapshot_manager, + const std::shared_ptr& schema_manager, + const std::shared_ptr& manifest_list, + const std::shared_ptr& manifest_file, + const std::shared_ptr& table_schema, const std::shared_ptr& schema, + const CoreOptions& core_options, const std::shared_ptr& executor, + const std::shared_ptr& pool) + : FileStoreScan(snapshot_manager, schema_manager, manifest_list, manifest_file, table_schema, + schema, core_options, executor, pool) { + evolutions_ = std::make_shared(table_schema, pool); +} + } // namespace paimon diff --git a/src/paimon/core/operation/key_value_file_store_scan.h b/src/paimon/core/operation/key_value_file_store_scan.h index 44f1db1c..9bd21ef3 100644 --- a/src/paimon/core/operation/key_value_file_store_scan.h +++ b/src/paimon/core/operation/key_value_file_store_scan.h @@ -20,7 +20,6 @@ #include #include -#include "paimon/common/predicate/predicate_utils.h" #include "paimon/core/manifest/manifest_entry.h" #include "paimon/core/operation/file_store_scan.h" #include "paimon/core/table/source/scan_mode.h" @@ -40,6 +39,7 @@ class MemoryPool; class PredicateFilter; class ScanFilter; class SchemaManager; +class SimpleStatsEvolutions; class SnapshotManager; class TableSchema; @@ -105,13 +105,12 @@ class KeyValueFileStoreScan : public FileStoreScan { const std::shared_ptr& schema, const CoreOptions& core_options, const std::shared_ptr& executor, - const std::shared_ptr& pool) - : FileStoreScan(snapshot_manager, schema_manager, manifest_list, manifest_file, - table_schema, schema, core_options, executor, pool) {} + const std::shared_ptr& pool); private: bool value_filter_force_enabled_ = false; std::shared_ptr key_filter_; std::shared_ptr value_filter_; + std::shared_ptr evolutions_; }; } // namespace paimon diff --git a/src/paimon/core/operation/key_value_file_store_scan_test.cpp b/src/paimon/core/operation/key_value_file_store_scan_test.cpp index 5b653746..9996379d 100644 --- a/src/paimon/core/operation/key_value_file_store_scan_test.cpp +++ b/src/paimon/core/operation/key_value_file_store_scan_test.cpp @@ -47,6 +47,7 @@ #include "paimon/predicate/literal.h" #include "paimon/predicate/predicate_builder.h" #include "paimon/scan_context.h" +#include "paimon/testing/utils/binary_row_generator.h" #include "paimon/testing/utils/testharness.h" namespace arrow { @@ -323,4 +324,53 @@ TEST_F(KeyValueFileStoreScanTest, TestNoOverlapping) { ASSERT_FALSE(KeyValueFileStoreScan::NoOverlapping(generate_manifest_entries({0, 1, 1}))); ASSERT_FALSE(KeyValueFileStoreScan::NoOverlapping(generate_manifest_entries({2, 1, 1}))); } + +TEST_F(KeyValueFileStoreScanTest, TestFilterByValueFilterWithValueStatsCols) { + std::string table_path = + paimon::test::GetDataDir() + "orc/pk_table_with_mor.db/pk_table_with_mor"; + std::vector> partition_filters = {}; + + // `v0` is at index 6 in schema-0 of pk_table_with_mor. + auto greater_than = PredicateBuilder::GreaterThan(/*field_index=*/6, /*field_name=*/"v0", + FieldType::DOUBLE, Literal(30.1)); + auto scan_filter = std::make_shared(/*predicate=*/greater_than, + /*partition_filters=*/partition_filters, + /*bucket_filter=*/0, + /*vector_search=*/nullptr); + ASSERT_OK_AND_ASSIGN(std::unique_ptr scan, + CreateFileStoreScan(table_path, scan_filter, + /*table_schema_id=*/0, /*snapshot_id=*/1)); + scan->EnableValueFilter(); + + // Build dense stats for only one column `v0`. + auto pool = GetDefaultPool(); + SimpleStats value_stats = BinaryRowGenerator::GenerateStats( + /*min=*/{10.0}, /*max=*/{20.0}, /*null=*/{0}, pool.get()); + std::vector value_stats_cols = {"v0"}; + ManifestEntry entry( + /*kind=*/FileKind::Add(), /*partition=*/BinaryRow::EmptyRow(), /*bucket=*/0, + /*total_buckets=*/1, + std::make_shared( + /*file_name=*/"name", /*file_size=*/1024, /*row_count=*/10, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + /*value_stats=*/value_stats, + /*min_sequence_number=*/0, + /*max_sequence_number=*/10, + /*schema_id=*/0, + /*level=*/1, + /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(0, 0), + /*delete_row_count=*/std::nullopt, + /*embedded_index=*/nullptr, + /*file_source=*/FileSource::Append(), + /*value_stats_cols=*/value_stats_cols, + /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt)); + + // max(v0)=20 <= 30.1, should be filtered out. + ASSERT_OK_AND_ASSIGN(bool keep, scan->FilterByStats(entry)); + ASSERT_FALSE(keep); +} } // namespace paimon::test From c4e7b0755a72968c051de2896654a449c7abacb2 Mon Sep 17 00:00:00 2001 From: zhangwei Date: Fri, 27 Feb 2026 22:22:43 +0800 Subject: [PATCH 2/2] add entry keep ut --- .../key_value_file_store_scan_test.cpp | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/paimon/core/operation/key_value_file_store_scan_test.cpp b/src/paimon/core/operation/key_value_file_store_scan_test.cpp index 9996379d..16a6eff5 100644 --- a/src/paimon/core/operation/key_value_file_store_scan_test.cpp +++ b/src/paimon/core/operation/key_value_file_store_scan_test.cpp @@ -369,8 +369,36 @@ TEST_F(KeyValueFileStoreScanTest, TestFilterByValueFilterWithValueStatsCols) { /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt)); + // max(v0)=50 > 30.1, should be kept. + SimpleStats value_stats_keep = BinaryRowGenerator::GenerateStats( + /*min=*/{40.0}, /*max=*/{50.0}, /*null=*/{0}, pool.get()); + ManifestEntry entry_keep( + /*kind=*/FileKind::Add(), /*partition=*/BinaryRow::EmptyRow(), /*bucket=*/0, + /*total_buckets=*/1, + std::make_shared( + /*file_name=*/"name_keep", /*file_size=*/1024, /*row_count=*/10, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + /*value_stats=*/value_stats_keep, + /*min_sequence_number=*/0, + /*max_sequence_number=*/10, + /*schema_id=*/0, + /*level=*/1, + /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(0, 0), + /*delete_row_count=*/std::nullopt, + /*embedded_index=*/nullptr, + /*file_source=*/FileSource::Append(), + /*value_stats_cols=*/value_stats_cols, + /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt)); + // max(v0)=20 <= 30.1, should be filtered out. ASSERT_OK_AND_ASSIGN(bool keep, scan->FilterByStats(entry)); ASSERT_FALSE(keep); + + ASSERT_OK_AND_ASSIGN(keep, scan->FilterByStats(entry_keep)); + ASSERT_TRUE(keep); } } // namespace paimon::test