diff --git a/cache/cache_test.cc b/cache/cache_test.cc index 520dd5b997..79de5155c1 100644 --- a/cache/cache_test.cc +++ b/cache/cache_test.cc @@ -115,8 +115,8 @@ class CacheTest : public testing::TestWithParam { &CacheTest::Deleter); } - void Erase(std::shared_ptr cache, int key) { - cache->Erase(EncodeKey(key)); + bool Erase(std::shared_ptr cache, int key) { + return cache->Erase(EncodeKey(key)); } int Lookup(int key) { return Lookup(cache_, key); } @@ -125,7 +125,7 @@ class CacheTest : public testing::TestWithParam { Insert(cache_, key, value, charge); } - void Erase(int key) { Erase(cache_, key); } + bool Erase(int key) { return Erase(cache_, key); } int Lookup2(int key) { return Lookup(cache2_, key); } @@ -133,7 +133,7 @@ class CacheTest : public testing::TestWithParam { Insert(cache2_, key, value, charge); } - void Erase2(int key) { Erase(cache2_, key); } + bool Erase2(int key) { return Erase(cache2_, key); } }; CacheTest* CacheTest::current_; diff --git a/cache/clock_cache.cc b/cache/clock_cache.cc index 7c12055c9c..8f0d500283 100644 --- a/cache/clock_cache.cc +++ b/cache/clock_cache.cc @@ -261,7 +261,7 @@ class ClockCacheShard : public CacheShard { virtual bool Ref(Cache::Handle* handle) override; virtual bool Release(Cache::Handle* handle, bool force_erase = false) override; - virtual void Erase(const Slice& key, uint32_t hash) override; + virtual bool Erase(const Slice& key, uint32_t hash) override; bool EraseAndConfirm(const Slice& key, uint32_t hash, CleanupContext* context); virtual size_t GetUsage() const override; @@ -647,10 +647,11 @@ bool ClockCacheShard::Release(Cache::Handle* h, bool force_erase) { return erased; } -void ClockCacheShard::Erase(const Slice& key, uint32_t hash) { +bool ClockCacheShard::Erase(const Slice& key, uint32_t hash) { CleanupContext context; - EraseAndConfirm(key, hash, &context); + bool ret = EraseAndConfirm(key, hash, &context); Cleanup(context); + return ret; } bool ClockCacheShard::EraseAndConfirm(const Slice& key, uint32_t hash, diff --git a/cache/lirs_cache.cc b/cache/lirs_cache.cc index f600d13f37..4a83241925 100644 --- a/cache/lirs_cache.cc +++ b/cache/lirs_cache.cc @@ -407,7 +407,7 @@ Status LIRSCacheShard::Insert(const Slice& key, uint32_t hash, void* value, return s; } -void LIRSCacheShard::Erase(const Slice& key, uint32_t hash) { +bool LIRSCacheShard::Erase(const Slice& key, uint32_t hash) { LIRSHandle* e; bool last_reference = false; { @@ -430,6 +430,7 @@ void LIRSCacheShard::Erase(const Slice& key, uint32_t hash) { if (last_reference) { e->Free(); } + return e != nullptr; } size_t LIRSCacheShard::GetUsage() const { diff --git a/cache/lirs_cache.h b/cache/lirs_cache.h index 86c3afb911..13582377a3 100644 --- a/cache/lirs_cache.h +++ b/cache/lirs_cache.h @@ -106,7 +106,7 @@ class ALIGN_AS(CACHE_LINE_SIZE) LIRSCacheShard : public CacheShard { virtual bool Ref(Cache::Handle* handle) override; virtual bool Release(Cache::Handle* handle, bool force_erase = false) override; - virtual void Erase(const Slice& key, uint32_t hash) override; + virtual bool Erase(const Slice& key, uint32_t hash) override; virtual size_t GetUsage() const override; virtual size_t GetPinnedUsage() const override; diff --git a/cache/lru_cache.cc b/cache/lru_cache.cc index ba4bd2a3a1..e439d62392 100644 --- a/cache/lru_cache.cc +++ b/cache/lru_cache.cc @@ -438,7 +438,7 @@ Status LRUCacheShardTemplate::Insert( } template -void LRUCacheShardTemplate::Erase(const Slice& key, +bool LRUCacheShardTemplate::Erase(const Slice& key, uint32_t hash) { LRUHandle* e; bool last_reference = false; @@ -462,6 +462,7 @@ void LRUCacheShardTemplate::Erase(const Slice& key, if (last_reference) { e->Free(); } + return e != nullptr; } template diff --git a/cache/lru_cache.h b/cache/lru_cache.h index 939a076a78..c5c2ee5a74 100644 --- a/cache/lru_cache.h +++ b/cache/lru_cache.h @@ -209,7 +209,7 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShardTemplate : public CacheMonitor, virtual bool Ref(Cache::Handle* handle) override; virtual bool Release(Cache::Handle* handle, bool force_erase = false) override; - virtual void Erase(const Slice& key, uint32_t hash) override; + virtual bool Erase(const Slice& key, uint32_t hash) override; // Although in some platforms the update of size_t is atomic, to make sure // GetUsage() and GetPinnedUsage() work correctly under any platform, we'll diff --git a/cache/lru_cache_test.cc b/cache/lru_cache_test.cc index 92de17ff2a..b548f2c03a 100644 --- a/cache/lru_cache_test.cc +++ b/cache/lru_cache_test.cc @@ -89,7 +89,7 @@ class LRUCacheTest : public testing::Test, return cache_->Lookup(key, 0 /*hash*/); } - void Erase(const std::string& key) { cache_->Erase(key, 0 /*hash*/); } + bool Erase(const std::string& key) { return cache_->Erase(key, 0 /*hash*/); } void ValidateLRUList(std::vector keys, size_t num_high_pri_pool_keys = 0) { diff --git a/cache/sharded_cache.cc b/cache/sharded_cache.cc index 2b5a539b13..70cb45e62f 100644 --- a/cache/sharded_cache.cc +++ b/cache/sharded_cache.cc @@ -71,9 +71,9 @@ bool ShardedCache::Release(Handle* handle, bool force_erase) { return GetShard(Shard(hash))->Release(handle, force_erase); } -void ShardedCache::Erase(const Slice& key) { +bool ShardedCache::Erase(const Slice& key) { uint32_t hash = HashSlice(key); - GetShard(Shard(hash))->Erase(key, hash); + return GetShard(Shard(hash))->Erase(key, hash); } uint64_t ShardedCache::NewId() { diff --git a/cache/sharded_cache.h b/cache/sharded_cache.h index a8689d2e1f..e236ff8cae 100644 --- a/cache/sharded_cache.h +++ b/cache/sharded_cache.h @@ -32,7 +32,7 @@ class CacheShard { virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash) = 0; virtual bool Ref(Cache::Handle* handle) = 0; virtual bool Release(Cache::Handle* handle, bool force_erase = false) = 0; - virtual void Erase(const Slice& key, uint32_t hash) = 0; + virtual bool Erase(const Slice& key, uint32_t hash) = 0; virtual void SetCapacity(size_t capacity) = 0; virtual void SetStrictCapacityLimit(bool strict_capacity_limit) = 0; virtual size_t GetUsage() const = 0; @@ -68,7 +68,7 @@ class ShardedCache : public Cache { virtual Handle* Lookup(const Slice& key, Statistics* stats) override; virtual bool Ref(Handle* handle) override; virtual bool Release(Handle* handle, bool force_erase = false) override; - virtual void Erase(const Slice& key) override; + virtual bool Erase(const Slice& key) override; virtual uint64_t NewId() override; virtual size_t GetCapacity() const override; virtual bool HasStrictCapacityLimit() const override; diff --git a/db/builder.cc b/db/builder.cc index 1a85576c72..b773ce5c04 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -21,6 +21,7 @@ #include "db/range_del_aggregator.h" #include "db/table_cache.h" #include "db/version_edit.h" +#include "db/version_set.h" #include "monitoring/iostats_context_imp.h" #include "monitoring/thread_status_util.h" #include "rocksdb/env.h" @@ -457,7 +458,7 @@ Status BuildTable( ro.fill_cache = false; for (auto& meta : *meta_vec) { std::unique_ptr it(table_cache->NewIterator( - ro, env_options, internal_comparator, meta, empty_dependence_map, + ro, env_options, meta, empty_dependence_map, nullptr /* range_del_agg */, mutable_cf_options.prefix_extractor.get(), nullptr, (internal_stats == nullptr) ? nullptr diff --git a/db/column_family.cc b/db/column_family.cc index 28bd93ba17..ddfe6c4030 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -414,7 +414,7 @@ ColumnFamilyData::ColumnFamilyData( uint32_t id, const std::string& name, Version* _dummy_versions, Cache* _table_cache, WriteBufferManager* write_buffer_manager, const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options, - const EnvOptions& env_options, ColumnFamilySet* column_family_set) + const EnvOptions* env_options, ColumnFamilySet* column_family_set) : id_(id), name_(name), dummy_versions_(_dummy_versions), @@ -424,9 +424,10 @@ ColumnFamilyData::ColumnFamilyData( dropped_(false), optimize_filters_for_hits_(cf_options.optimize_filters_for_hits), internal_comparator_(cf_options.comparator), - initial_cf_options_(SanitizeOptions(db_options, cf_options)), - ioptions_(db_options, initial_cf_options_), - mutable_cf_options_(initial_cf_options_, db_options.env), + table_cache_(new TableCache(SanitizeOptions(db_options, cf_options), + db_options, env_options, _table_cache)), + ioptions_(table_cache_->ioptions()), + mutable_cf_options_(table_cache_->initial_cf_options(), db_options.env), is_delete_range_supported_( cf_options.table_factory->IsDeleteRangeSupported()), write_buffer_manager_(write_buffer_manager), @@ -449,21 +450,21 @@ ColumnFamilyData::ColumnFamilyData( last_memtable_id_(0) { Ref(); + table_cache_->bind_life_cycle(); + // if _dummy_versions is nullptr, then this is a dummy column family. if (_dummy_versions != nullptr) { - internal_stats_.reset( - new InternalStats(ioptions_.num_levels, db_options.env, this)); - table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache)); + table_cache_->InitInternalStats(this); if (ioptions_.compaction_style == kCompactionStyleLevel) { compaction_picker_.reset(new LevelCompactionPicker( - table_cache_.get(), env_options, ioptions_, &internal_comparator_)); + table_cache_.get(), *env_options, ioptions_, &internal_comparator_)); #ifndef ROCKSDB_LITE } else if (ioptions_.compaction_style == kCompactionStyleUniversal) { compaction_picker_.reset(new UniversalCompactionPicker( - table_cache_.get(), env_options, ioptions_, &internal_comparator_)); + table_cache_.get(), *env_options, ioptions_, &internal_comparator_)); } else if (ioptions_.compaction_style == kCompactionStyleNone) { compaction_picker_.reset(new NullCompactionPicker( - table_cache_.get(), env_options, ioptions_, &internal_comparator_)); + table_cache_.get(), *env_options, ioptions_, &internal_comparator_)); ROCKS_LOG_WARN(ioptions_.info_log, "Column family %s does not use any background compaction. " "Compactions can only be done via CompactFiles\n", @@ -475,14 +476,14 @@ ColumnFamilyData::ColumnFamilyData( "Column family %s will use kCompactionStyleLevel.\n", ioptions_.compaction_style, GetName().c_str()); compaction_picker_.reset(new LevelCompactionPicker( - table_cache_.get(), env_options, ioptions_, &internal_comparator_)); + table_cache_.get(), *env_options, ioptions_, &internal_comparator_)); } if (column_family_set_->NumberOfColumnFamilies() < 10) { ROCKS_LOG_INFO(ioptions_.info_log, "--------------- Options for column family [%s]:\n", name.c_str()); - initial_cf_options_.Dump(ioptions_.info_log); + initial_cf_options().Dump(ioptions_.info_log); } else { ROCKS_LOG_INFO(ioptions_.info_log, "\t(skipping printing options)\n"); } @@ -561,7 +562,7 @@ void ColumnFamilyData::SetDropped() { } ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const { - return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_); + return BuildColumnFamilyOptions(initial_cf_options(), mutable_cf_options_); } uint64_t ColumnFamilyData::OldestLogToKeep() { @@ -754,7 +755,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( if (write_stall_condition == WriteStallCondition::kStopped && write_stall_cause == WriteStallCause::kMemtableLimit) { write_controller_token_ = write_controller->GetStopToken(); - internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1); + internal_stats()->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1); ROCKS_LOG_WARN( ioptions_.info_log, "[%s] Stopping writes because we have %d immutable memtables " @@ -764,9 +765,9 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( } else if (write_stall_condition == WriteStallCondition::kStopped && write_stall_cause == WriteStallCause::kL0FileCountLimit) { write_controller_token_ = write_controller->GetStopToken(); - internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1); + internal_stats()->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1); if (compaction_picker_->IsLevel0CompactionInProgress()) { - internal_stats_->AddCFStats( + internal_stats()->AddCFStats( InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1); } ROCKS_LOG_WARN(ioptions_.info_log, @@ -775,7 +776,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( } else if (write_stall_condition == WriteStallCondition::kStopped && write_stall_cause == WriteStallCause::kPendingCompactionBytes) { write_controller_token_ = write_controller->GetStopToken(); - internal_stats_->AddCFStats( + internal_stats()->AddCFStats( InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1); ROCKS_LOG_WARN( ioptions_.info_log, @@ -785,7 +786,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( } else if (write_stall_condition == WriteStallCondition::kStopped && write_stall_cause == WriteStallCause::kReadAmpLimit) { write_controller_token_ = write_controller->GetStopToken(); - internal_stats_->AddCFStats(InternalStats::READ_AMP_LIMIT_STOPS, 1); + internal_stats()->AddCFStats(InternalStats::READ_AMP_LIMIT_STOPS, 1); ROCKS_LOG_WARN( ioptions_.info_log, "[%s] Stopping writes because we have %f times read amplification " @@ -797,7 +798,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( SetupDelay(write_controller, compaction_needed_bytes, prev_compaction_needed_bytes_, was_stopped, mutable_cf_options.disable_auto_compactions); - internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1); + internal_stats()->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1); ROCKS_LOG_WARN( ioptions_.info_log, "[%s] Stalling writes because we have %d immutable memtables " @@ -815,10 +816,10 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( SetupDelay(write_controller, compaction_needed_bytes, prev_compaction_needed_bytes_, was_stopped || near_stop, mutable_cf_options.disable_auto_compactions); - internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS, - 1); + internal_stats()->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS, + 1); if (compaction_picker_->IsLevel0CompactionInProgress()) { - internal_stats_->AddCFStats( + internal_stats()->AddCFStats( InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1); } ROCKS_LOG_WARN(ioptions_.info_log, @@ -844,7 +845,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( SetupDelay(write_controller, compaction_needed_bytes, prev_compaction_needed_bytes_, was_stopped || near_stop, mutable_cf_options.disable_auto_compactions); - internal_stats_->AddCFStats( + internal_stats()->AddCFStats( InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1); ROCKS_LOG_WARN( ioptions_.info_log, @@ -860,7 +861,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( SetupDelay(write_controller, compaction_needed_bytes, prev_compaction_needed_bytes_, was_stopped || near_stop, mutable_cf_options.disable_auto_compactions); - internal_stats_->AddCFStats(InternalStats::READ_AMP_LIMIT_SLOWDOWNS, 1); + internal_stats()->AddCFStats(InternalStats::READ_AMP_LIMIT_SLOWDOWNS, 1); ROCKS_LOG_WARN( ioptions_.info_log, "[%s] Stalling writes because we have %f times read amplification " @@ -1262,7 +1263,7 @@ Status ColumnFamilyData::SetOptions( if (s.ok()) { new_mutable_cf_options = MutableCFOptions( SanitizeOptions(db_options, - BuildColumnFamilyOptions(initial_cf_options_, + BuildColumnFamilyOptions(initial_cf_options(), new_mutable_cf_options)), db_options.env); optimize_filters_for_hits_.store( @@ -1277,7 +1278,7 @@ Status ColumnFamilyData::SetOptions( // REQUIRES: DB mutex held Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) { - if (initial_cf_options_.compaction_style != kCompactionStyleLevel) { + if (initial_cf_options().compaction_style != kCompactionStyleLevel) { return Env::WLTH_NOT_SET; } if (level == 0) { @@ -1323,7 +1324,7 @@ Directory* ColumnFamilyData::GetDataDir(size_t path_id) const { ColumnFamilySet::ColumnFamilySet(const std::string& dbname, const ImmutableDBOptions* db_options, - const EnvOptions& env_options, + const EnvOptions* env_options, Cache* table_cache, WriteBufferManager* write_buffer_manager, WriteController* write_controller) @@ -1334,7 +1335,7 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname, default_cfd_cache_(nullptr), db_name_(dbname), db_options_(db_options), - env_options_(env_options), + env_options_(*env_options), table_cache_(table_cache), write_buffer_manager_(write_buffer_manager), write_controller_(write_controller) { @@ -1343,7 +1344,9 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname, dummy_cfd_->next_ = dummy_cfd_; } -ColumnFamilySet::~ColumnFamilySet() { +ColumnFamilySet::~ColumnFamilySet() { assert(column_family_data_.empty()); } + +void ColumnFamilySet::Cleanup() { while (column_family_data_.size() > 0) { // cfd destructor will delete itself from column_family_data_ auto cfd = column_family_data_.begin()->second; @@ -1405,7 +1408,7 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily( assert(column_families_.find(name) == column_families_.end()); ColumnFamilyData* new_cfd = new ColumnFamilyData( id, name, dummy_versions, table_cache_, write_buffer_manager_, options, - *db_options_, env_options_, this); + *db_options_, &env_options_, this); column_families_.insert({name, id}); column_family_data_.insert({id, new_cfd}); max_column_family_ = std::max(max_column_family_, id); diff --git a/db/column_family.h b/db/column_family.h index 5330f578f1..11aff494e3 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -213,7 +213,9 @@ class ColumnFamilyData { FlushReason GetFlushReason() const { return flush_reason_; } // thread-safe const EnvOptions* soptions() const; - const ImmutableCFOptions* ioptions() const { return &ioptions_; } + const ImmutableCFOptions* ioptions() const { + return &table_cache_->ioptions(); + } // REQUIRES: DB mutex held // This returns the MutableCFOptions used by current SuperVersion // You should use this API to reference MutableCFOptions most of the time. @@ -240,7 +242,7 @@ class ColumnFamilyData { const std::unordered_map& options_map); #endif // ROCKSDB_LITE - InternalStats* internal_stats() { return internal_stats_.get(); } + InternalStats* internal_stats() { return table_cache_->internal_stats(); } MemTableList* imm() { return &imm_; } MemTable* mem() { return mem_; } @@ -268,6 +270,7 @@ class ColumnFamilyData { bool needs_dup_key_check, SequenceNumber earliest_seq); TableCache* table_cache() const { return table_cache_.get(); } + std::shared_ptr& table_cache_shared_ptr() { return table_cache_; } // See documentation in compaction_picker.h // REQUIRES: DB mutex held @@ -409,8 +412,8 @@ class ColumnFamilyData { bool initialized() const { return initialized_.load(); } - const ColumnFamilyOptions& initial_cf_options() { - return initial_cf_options_; + const ColumnFamilyOptions& initial_cf_options() const { + return table_cache_->initial_cf_options(); } Env::WriteLifeTimeHint CalculateSSTWriteHint(int level); @@ -426,7 +429,7 @@ class ColumnFamilyData { WriteBufferManager* write_buffer_manager, const ColumnFamilyOptions& options, const ImmutableDBOptions& db_options, - const EnvOptions& env_options, + const EnvOptions* env_options, ColumnFamilySet* column_family_set); uint32_t id_; @@ -440,16 +443,12 @@ class ColumnFamilyData { std::atomic optimize_filters_for_hits_; // for read output mutex const InternalKeyComparator internal_comparator_; - const ColumnFamilyOptions initial_cf_options_; - const ImmutableCFOptions ioptions_; + std::shared_ptr table_cache_; + const ImmutableCFOptions& ioptions_; MutableCFOptions mutable_cf_options_; const bool is_delete_range_supported_; - std::unique_ptr table_cache_; - - std::unique_ptr internal_stats_; - WriteBufferManager* write_buffer_manager_; MemTable* mem_; @@ -552,11 +551,13 @@ class ColumnFamilySet { ColumnFamilySet(const std::string& dbname, const ImmutableDBOptions* db_options, - const EnvOptions& env_options, Cache* table_cache, + const EnvOptions* env_options, Cache* table_cache, WriteBufferManager* write_buffer_manager, WriteController* write_controller); ~ColumnFamilySet(); + void Cleanup(); + ColumnFamilyData* GetDefault() const; // GetColumnFamily() calls return nullptr if column family is not found ColumnFamilyData* GetColumnFamily(uint32_t id) const; diff --git a/db/compaction_iterator_test.cc b/db/compaction_iterator_test.cc index ec9f1882eb..fcdb02d297 100644 --- a/db/compaction_iterator_test.cc +++ b/db/compaction_iterator_test.cc @@ -171,7 +171,7 @@ class FakeCompaction : public CompactionIterator::CompactionProxy { std::vector* /*level_ptrs*/) const override { return is_bottommost_level || key_not_exists_beyond_output_level; } - virtual SeparationType separation_type() const { + virtual SeparationType separation_type() const override { return kCompactionIgnoreSeparate; } virtual bool need_rebuild(uint64_t fn) { return false; } diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 11b859e4f6..b023670b5b 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -468,8 +468,8 @@ struct CompactionJob::SubcompactionState { const DependenceMap& depend_map, Arena* arena, TableReader** table_reader_ptr) { return table_cache->NewIterator( - ReadOptions(), job->env_options_, icmp, *file_metadata, depend_map, - nullptr, compaction->mutable_cf_options()->prefix_extractor.get(), + ReadOptions(), job->env_options_, *file_metadata, depend_map, nullptr, + compaction->mutable_cf_options()->prefix_extractor.get(), table_reader_ptr, nullptr, false, arena, true, -1); }; InternalKey internal_begin, internal_end; @@ -531,7 +531,7 @@ struct CompactionJob::SubcompactionState { compaction->column_family_data()->table_cache(); Cache::Handle* handle = nullptr; auto s = table_cache->FindTable( - job->env_options_, icmp, meta->fd, &handle, + job->env_options_, meta->fd, &handle, compaction->mutable_cf_options()->prefix_extractor.get()); if (!s.ok()) { return s; @@ -1349,9 +1349,8 @@ Status CompactionJob::VerifyFiles() { ReadOptions ro; ro.fill_cache = false; InternalIterator* iter = cfd->table_cache()->NewIterator( - ro, env_options_, cfd->internal_comparator(), *files_meta[file_idx], - empty_dependence_map, nullptr /* range_del_agg */, prefix_extractor, - nullptr, + ro, env_options_, *files_meta[file_idx], empty_dependence_map, + nullptr /* range_del_agg */, prefix_extractor, nullptr, output_level == -1 ? nullptr : cfd->internal_stats()->GetFileReadHist(output_level), @@ -2705,8 +2704,8 @@ Status CompactionJob::InstallCompactionResults( ReadOptions ro; ro.fill_cache = false; InternalIterator* iter = cfd->table_cache()->NewIterator( - ro, env_options_, cfd->internal_comparator(), o.file_meta, - empty_dependence_map, nullptr /* range_del_agg */, + ro, env_options_, o.file_meta, empty_dependence_map, + nullptr /* range_del_agg */, mutable_cf_options.prefix_extractor.get(), nullptr, cfd->internal_stats()->GetFileReadHist(compaction->output_level()), false, nullptr /* arena */, false /* skip_filters */, @@ -2788,8 +2787,8 @@ Status CompactionJob::InstallCompactionResults( ReadOptions ro; ro.fill_cache = false; InternalIterator* iter = cfd->table_cache()->NewIterator( - ro, env_options_, cfd->internal_comparator(), file_meta, - empty_dependence_map, nullptr /* range_del_agg */, + ro, env_options_, file_meta, empty_dependence_map, + nullptr /* range_del_agg */, mutable_cf_options.prefix_extractor.get(), nullptr, cfd->internal_stats()->GetFileReadHist(compaction->output_level()), false, nullptr /* arena */, false /* skip_filters */, diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index e958fe282d..b935b75509 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -74,7 +74,7 @@ class CompactionJobTest : public testing::Test { mutable_cf_options_(cf_options_, env_), table_cache_(NewLRUCache(50000, 16)), write_buffer_manager_(db_options_.db_write_buffer_size), - versions_(new VersionSet(dbname_, &db_options_, env_options_, + versions_(new VersionSet(dbname_, &db_options_, &env_options_, /* seq_per_batch */ false, table_cache_.get(), &write_buffer_manager_, &write_controller_)), shutting_down_(false), diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index e03a89f208..00b0887ab7 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -1007,7 +1007,7 @@ void CompactionPicker::InitFilesBeingCompact( const DependenceMap& depend_map, Arena* arena, TableReader** table_reader_ptr) { return table_cache_->NewIterator( - options, env_options_, *icmp_, *file_metadata, depend_map, nullptr, + options, env_options_, *file_metadata, depend_map, nullptr, mutable_cf_options.prefix_extractor.get(), table_reader_ptr, nullptr, false, arena, true, -1); }; @@ -1341,7 +1341,7 @@ Compaction* CompactionPicker::PickRangeCompaction( const DependenceMap& depend_map, Arena* arena, TableReader** table_reader_ptr) { return table_cache_->NewIterator( - options, env_options_, *icmp_, *file_metadata, depend_map, nullptr, + options, env_options_, *file_metadata, depend_map, nullptr, mutable_cf_options.prefix_extractor.get(), table_reader_ptr, nullptr, false, arena, true, -1); }; @@ -1847,7 +1847,7 @@ Compaction* CompactionPicker::PickCompositeCompaction( const DependenceMap& dependence_map, Arena* arena, TableReader** reader) { return table_cache_->NewIterator( - options, env_options_, *icmp_, *f, dependence_map, nullptr, + options, env_options_, *f, dependence_map, nullptr, mutable_cf_options.prefix_extractor.get(), nullptr, nullptr, false, arena, true, input.level); }; @@ -1889,8 +1889,8 @@ Compaction* CompactionPicker::PickCompositeCompaction( } std::shared_ptr tp; auto s = table_cache_->GetTableProperties( - env_options_, *icmp_, *f, &tp, - mutable_cf_options.prefix_extractor.get(), true); + env_options_, *f, &tp, mutable_cf_options.prefix_extractor.get(), + true); if (s.IsIncomplete()) { if (f->fd.largest_seqno < oldest_snapshot_seqnum) { return false; @@ -2511,8 +2511,7 @@ Compaction* LevelCompactionBuilder::PickLazyCompaction( const DependenceMap& depend_map, Arena* arena, TableReader** table_reader_ptr) { return picker->table_cache()->NewIterator( - options, picker->env_options(), ioptions_.internal_comparator, - *file_metadata, depend_map, nullptr, + options, picker->env_options(), *file_metadata, depend_map, nullptr, mutable_cf_options_.prefix_extractor.get(), table_reader_ptr, nullptr, false, arena, true, -1); }; @@ -2732,8 +2731,7 @@ Compaction* LevelCompactionBuilder::PickLazyCompaction( const DependenceMap& depend_map, Arena* arena, TableReader** table_reader_ptr) { return picker->table_cache()->NewIterator( - options, picker->env_options(), ioptions_.internal_comparator, - *file_metadata, depend_map, nullptr, + options, picker->env_options(), *file_metadata, depend_map, nullptr, mutable_cf_options_.prefix_extractor.get(), table_reader_ptr, nullptr, false, arena, true, -1); }; diff --git a/db/db_block_cache_test.cc b/db/db_block_cache_test.cc index 082f740e60..fcd359f86d 100644 --- a/db/db_block_cache_test.cc +++ b/db/db_block_cache_test.cc @@ -471,6 +471,7 @@ TEST_F(DBBlockCacheTest, IndexAndFilterBlocksCachePriority) { } } +// For this test, we DO want paranoid_file_checks WITHOUT fill_cache TEST_F(DBBlockCacheTest, ParanoidFileChecks) { Options options = CurrentOptions(); options.create_if_missing = true; @@ -487,7 +488,7 @@ TEST_F(DBBlockCacheTest, ParanoidFileChecks) { ASSERT_OK(Put(1, "9_key", "val")); // Create a new table. ASSERT_OK(Flush(1)); - ASSERT_EQ(1, /* read and cache data block */ + ASSERT_EQ(0, /* flush will not cache data block */ TestGetTickerCount(options, BLOCK_CACHE_ADD)); ASSERT_OK(Put(1, "1_key2", "val2")); @@ -496,7 +497,7 @@ TEST_F(DBBlockCacheTest, ParanoidFileChecks) { // and generate another file. ASSERT_OK(Flush(1)); dbfull()->TEST_WaitForCompact(); - ASSERT_EQ(3, /* Totally 3 files created up to now */ + ASSERT_EQ(0, /* flush or compaction will not cache data block */ TestGetTickerCount(options, BLOCK_CACHE_ADD)); // After disabling options.paranoid_file_checks. NO further block @@ -511,7 +512,7 @@ TEST_F(DBBlockCacheTest, ParanoidFileChecks) { ASSERT_OK(Put(1, "9_key4", "val4")); ASSERT_OK(Flush(1)); dbfull()->TEST_WaitForCompact(); - ASSERT_EQ(3, /* Totally 3 files created up to now */ + ASSERT_EQ(0, /* flush or compaction will not cache data block */ TestGetTickerCount(options, BLOCK_CACHE_ADD)); } diff --git a/db/db_impl.cc b/db/db_impl.cc index 612f81244c..58554a4f32 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -185,8 +185,8 @@ class TablePropertiesCollectionIteratorImpl filename_ = TableFileName(cfd_->ioptions()->cf_paths, f->fd.GetNumber(), f->fd.GetPathId()); status_ = cfd_->table_cache()->GetTableProperties( - impl_->env_options(), cfd_->internal_comparator(), *f, &properties_, - prefix_extractor_.get(), false); + impl_->env_options(), *f, &properties_, prefix_extractor_.get(), + false); } if (status_.ok()) { iter_ = where; @@ -453,9 +453,10 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, table_cache_ = NewLRUCache(table_cache_size, immutable_db_options_.table_cache_numshardbits); - versions_.reset(new VersionSet(dbname_, &immutable_db_options_, env_options_, + versions_.reset(new VersionSet(dbname_, &immutable_db_options_, &env_options_, seq_per_batch, table_cache_.get(), write_buffer_manager_, &write_controller_)); + versions_->set_table_evict_type(mutable_db_options_.table_evict_type); column_family_memtables_.reset( new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet())); @@ -1590,6 +1591,7 @@ Status DBImpl::SetDBOptions( new_options.bytes_per_sync = 1024 * 1024; } mutable_db_options_ = new_options; + versions_->set_table_evict_type(mutable_db_options_.table_evict_type); env_options_for_compaction_ = EnvOptions( BuildDBOptions(immutable_db_options_, mutable_db_options_)); env_options_for_compaction_ = env_->OptimizeForCompactionTableWrite( diff --git a/db/db_impl_files.cc b/db/db_impl_files.cc index 1e61fd5c90..571aa3025a 100644 --- a/db/db_impl_files.cc +++ b/db/db_impl_files.cc @@ -14,11 +14,11 @@ #include #include -#include #include "db/event_helpers.h" #include "db/memtable_list.h" #include "rocksdb/terark_namespace.h" +#include "util/chash_set.h" #include "util/file_util.h" #include "util/sst_file_manager_impl.h" #include "utilities/util/valvec.hpp" @@ -196,7 +196,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, // will not add these files to candidate list for purge. for (const auto& sst_to_del : job_context->sst_delete_files) { job_context->files_grabbed_for_purge.emplace_back( - sst_to_del.metadata->fd.GetNumber()); + sst_to_del.second.metadata->fd.GetNumber()); } for (const auto& manifest_to_del : job_context->manifest_delete_files) { uint64_t number; @@ -354,7 +354,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { // Now, convert live list to an unordered map, WITHOUT mutex held; // set is slow. - std::unordered_set sst_live; + chash_set sst_live; for (auto v : state.version_ref) { auto vstorage = v->storage_info(); for (int i = -1; i < vstorage->num_levels(); ++i) { @@ -363,8 +363,9 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { } } } - std::unordered_set log_recycle_files_set( - state.log_recycle_files.begin(), state.log_recycle_files.end()); + + chash_set log_recycle_files_set(state.log_recycle_files.begin(), + state.log_recycle_files.end()); auto& candidate_files = state.full_scan_candidate_files; candidate_files.reserve( @@ -372,14 +373,14 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { state.log_delete_files.size() + state.manifest_delete_files.size()); // We may ignore the dbname when generating the file names. const char* kDumbDbName = ""; - for (auto& file : state.sst_delete_files) { + for (auto& pair : state.sst_delete_files) { + auto& file = pair.second; candidate_files.emplace_back(JobContext::CandidateFileInfo{ MakeTableFileName(kDumbDbName, file.metadata->fd.GetNumber()), state.PushPath(file.path)}); if (file.metadata->table_reader_handle) { table_cache_->Release(file.metadata->table_reader_handle); } - file.DeleteMetadata(); } for (auto file_num : state.log_delete_files) { @@ -444,6 +445,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { } ++i; } + int table_evict_type = versions_->table_evict_type(); for (const auto& candidate_file : candidate_files) { const std::string& to_delete = candidate_file.file_name; @@ -518,18 +520,30 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { std::string dir_to_sync; if (type == kTableFile) { // evict from cache - TableCache::Evict(table_cache_.get(), number); + auto find = state.sst_delete_files.end(); + if (table_evict_type != kSkipForceEvict && + (find = state.sst_delete_files.find(number)) != + state.sst_delete_files.end() && + find->second.table_cache != nullptr) { + find->second.table_cache->ForceEvict( + number, table_evict_type == kAlwaysForceEvict + ? find->second.metadata + : nullptr); + } else { + TableCache::Evict(table_cache_.get(), number); + } + fname = MakeTableFileName(*candidate_file.file_path, number); dir_to_sync = *candidate_file.file_path; } else { dir_to_sync = (type == kLogFile) ? immutable_db_options_.wal_dir : dbname_; - fname = dir_to_sync + - ((!dir_to_sync.empty() && dir_to_sync.back() == '/') || - (!to_delete.empty() && to_delete.front() == '/') - ? "" - : "/") + - to_delete; + fname = dir_to_sync; + fname += ((!dir_to_sync.empty() && dir_to_sync.back() == '/') || + (!to_delete.empty() && to_delete.front() == '/') + ? "" + : "/"); + fname += to_delete; } // TODO: Workaround for ZNS and Windows. @@ -571,6 +585,11 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { files_grabbed_for_purge_.erase(&state.files_grabbed_for_purge); } + for (auto& pair : state.sst_delete_files) { + pair.second.DeleteMetadata(); + pair.second.table_cache.reset(); + } + // Delete old info log files. size_t old_info_log_file_count = old_info_log_files.size(); if (old_info_log_file_count != 0 && diff --git a/db/db_options_test.cc b/db/db_options_test.cc index 610f5ec3c6..7e08116a0e 100644 --- a/db/db_options_test.cc +++ b/db/db_options_test.cc @@ -40,7 +40,7 @@ class DBOptionsTest : public DBTestBase { std::unordered_map options_map; StringToMap(options_str, &options_map); std::unordered_map mutable_map; - for (const auto opt : db_options_type_info) { + for (const auto& opt : db_options_type_info) { if (opt.second.is_mutable && opt.second.verification != OptionVerificationType::kDeprecated) { mutable_map[opt.first] = options_map[opt.first]; @@ -56,7 +56,7 @@ class DBOptionsTest : public DBTestBase { std::unordered_map options_map; StringToMap(options_str, &options_map); std::unordered_map mutable_map; - for (const auto opt : cf_options_type_info) { + for (const auto& opt : cf_options_type_info) { if (opt.second.is_mutable && opt.second.verification != OptionVerificationType::kDeprecated) { mutable_map[opt.first] = options_map[opt.first]; diff --git a/db/db_test.cc b/db/db_test.cc index e969b6e45a..895f61e9ca 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -5741,6 +5741,246 @@ TEST_F(DBTest, SkipValueGet) { s = db_->Get(ReadOptions{}, Key(1), static_cast(nullptr)); ASSERT_TRUE(s.IsNotFound()); } + +TEST_F(DBTest, ForceEvict) { + Options options = CurrentOptions(); + CompressionType compressions[] = {kZlibCompression, kBZip2Compression, + kLZ4Compression, kLZ4HCCompression, + kXpressCompression}; + + BlockBasedTableOptions bbto; + bbto.cache_index_and_filter_blocks = false; + bbto.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch; + bbto.filter_policy.reset(TERARKDB_NAMESPACE::NewBloomFilterPolicy(10)); + bbto.partition_filters = true; + bbto.block_cache = TERARKDB_NAMESPACE::NewLRUCache(64ull << 20, 4); + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + options.statistics = TERARKDB_NAMESPACE::CreateDBStatistics(); + options.optimize_filters_for_hits = false; + options.target_file_size_base = 1; + options.disable_auto_compactions = true; + options.compression = kNoCompression; + + int TC_evict = 0; + int TC_force_evict = 0; + int TC_found = 0; + int TC_open = 0; + int TC_run = 0; + int BC_erase = 0; + int BC_failures = 0; + + int test_sst = 40; + int max_open_files = 20; + + SyncPoint::GetInstance()->SetCallBack("TableCache::Evict", + [&](void* /* arg */) { ++TC_evict; }); + SyncPoint::GetInstance()->SetCallBack( + "TableCache::ForceEvict", [&](void* /* arg */) { ++TC_force_evict; }); + SyncPoint::GetInstance()->SetCallBack("TableCache::ForceEvict:Found", + [&](void* /* arg */) { ++TC_found; }); + SyncPoint::GetInstance()->SetCallBack("TableCache::ForceEvict:Open", + [&](void* /* arg */) { ++TC_open; }); + SyncPoint::GetInstance()->SetCallBack("TableCache::ForceEvict:Run", + [&](void* /* arg */) { ++TC_run; }); + SyncPoint::GetInstance()->EnableProcessing(); + + auto do_something = [&] { + for (int i = 0; i < 40; ++i) { + ASSERT_OK(Put(Key(i), "")); + ASSERT_OK(Flush()); + } + for (int i = 0; i < 40; ++i) { + ASSERT_OK(db_->Get(ReadOptions{}, db_->DefaultColumnFamily(), Key(i))); + } + TC_evict = 0; + TC_force_evict = 0; + TC_found = 0; + TC_open = 0; + TC_run = 0; + TestGetAndResetTickerCount(options, TERARKDB_NAMESPACE::BLOCK_CACHE_ERASE); + TestGetAndResetTickerCount(options, + TERARKDB_NAMESPACE::BLOCK_CACHE_ERASE_FAILURES); + + ASSERT_OK(db_->CompactRange(TERARKDB_NAMESPACE::CompactRangeOptions(), + db_->DefaultColumnFamily(), nullptr, nullptr)); + for (int i = 0; i < 40; ++i) { + ASSERT_OK(db_->Get(ReadOptions{}, db_->DefaultColumnFamily(), Key(i))); + } + + BC_erase = TestGetAndResetTickerCount( + options, TERARKDB_NAMESPACE::BLOCK_CACHE_ERASE); + BC_failures = TestGetAndResetTickerCount( + options, TERARKDB_NAMESPACE::BLOCK_CACHE_ERASE_FAILURES); + + fprintf(stderr, + "TC_evict = %d;" + "TC_force_evict = %d;" + "TC_found = %d;" + "TC_open = %d;" + "TC_run = %d;" + "BC_erase = %d;" + "BC_failures = %d;\n", + TC_evict, TC_force_evict, TC_found, TC_open, TC_run, BC_erase, + BC_failures); + }; + + options.table_evict_type = TERARKDB_NAMESPACE::kSkipForceEvict; + options.max_open_files = -1; + DestroyAndReopen(options); + do_something(); + ASSERT_EQ(TC_evict, test_sst); + ASSERT_EQ(TC_force_evict, 0); + ASSERT_EQ(TC_found, 0); + ASSERT_EQ(TC_open, 0); + ASSERT_EQ(TC_run, 0); + // block_cache erase: + // index_block(N) + // filter_block(N) + // filter_block_partitions(N) + ASSERT_EQ(BC_erase, test_sst * 3); + // block_cache failures: + // index_block(N) + // filter_block(N) + ASSERT_EQ(BC_failures, test_sst * 2); + + options.table_evict_type = TERARKDB_NAMESPACE::kForceEvictIfOpen; + options.max_open_files = max_open_files; + DestroyAndReopen(options); + do_something(); + ASSERT_EQ(TC_evict, test_sst); + ASSERT_EQ(TC_force_evict, test_sst); + ASSERT_GE(TC_found, max_open_files); + ASSERT_LT(TC_found, test_sst); + ASSERT_EQ(TC_open, 0); + ASSERT_EQ(TC_run, TC_found); + // block_cache erase: + // index_block(N+F) + // index_block_partitions(F) + // filter_block(N) + // filter_block_partitions(N) + // data_blocks(F) + ASSERT_EQ(BC_erase, test_sst * 3 + TC_found * 3); + // block_cache failures: + // index_block(N+F) + // filter_block(N) + ASSERT_EQ(BC_failures, test_sst * 2 + TC_found); + + options.table_evict_type = TERARKDB_NAMESPACE::kForceEvictIfOpen; + options.max_open_files = -1; + DestroyAndReopen(options); + do_something(); + ASSERT_EQ(TC_evict, test_sst); + ASSERT_EQ(TC_force_evict, test_sst); + ASSERT_EQ(TC_found, test_sst); + ASSERT_EQ(TC_open, 0); + ASSERT_EQ(TC_run, test_sst); + // block_cache erase: + // index_block(2N) + // index_block_partitions(N) + // filter_block(N) + // filter_block_partitions(N) + // data_blocks(N) + ASSERT_EQ(BC_erase, test_sst * 6); + // block_cache failures: + // index_block(2N) + // filter_block(N) + ASSERT_EQ(BC_failures, test_sst * 3); + + options.table_evict_type = TERARKDB_NAMESPACE::kAlwaysForceEvict; + options.max_open_files = max_open_files; + DestroyAndReopen(options); + do_something(); + ASSERT_EQ(TC_evict, test_sst); + ASSERT_EQ(TC_force_evict, test_sst); + ASSERT_GE(TC_found, max_open_files); + ASSERT_LT(TC_found, test_sst); + ASSERT_EQ(TC_open, test_sst - TC_found); + ASSERT_EQ(TC_run, test_sst); + // block_cache erase: + // index_block(2N+O) + // index_block_partitions(N) + // filter_block(N+O) + // filter_block_partitions(N+O) + // data_blocks(N) + ASSERT_EQ(BC_erase, test_sst * 6 + TC_open * 3); + // block_cache failures: + // index_block(2N+O) + // filter_block(N+O) + ASSERT_EQ(BC_failures, test_sst * 3 + TC_open * 2); + + for (auto comp : compressions) { + if (CompressionTypeSupported(comp)) { + options.compression = comp; + break; + } + } + if (options.compression == kNoCompression) { + return; + } + bbto.no_block_cache = true; + bbto.block_cache_compressed.swap(bbto.block_cache); + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + + options.table_evict_type = TERARKDB_NAMESPACE::kSkipForceEvict; + options.max_open_files = -1; + DestroyAndReopen(options); + do_something(); + ASSERT_EQ(TC_evict, test_sst); + ASSERT_EQ(TC_force_evict, 0); + ASSERT_EQ(TC_found, 0); + ASSERT_EQ(TC_open, 0); + ASSERT_EQ(TC_run, 0); + ASSERT_EQ(BC_erase, 0); + ASSERT_EQ(BC_failures, 0); + + options.table_evict_type = TERARKDB_NAMESPACE::kForceEvictIfOpen; + options.max_open_files = max_open_files; + DestroyAndReopen(options); + do_something(); + ASSERT_EQ(TC_evict, test_sst); + ASSERT_EQ(TC_force_evict, test_sst); + ASSERT_GE(TC_found, max_open_files); + ASSERT_LT(TC_found, test_sst); + ASSERT_EQ(TC_open, 0); + ASSERT_EQ(TC_run, TC_found); + // block_cache_compressed erase: + // index_block_partitions(F) + // data_blocks(F) + ASSERT_EQ(BC_erase, TC_found * 2); + ASSERT_EQ(BC_failures, 0); + + options.table_evict_type = TERARKDB_NAMESPACE::kForceEvictIfOpen; + options.max_open_files = -1; + DestroyAndReopen(options); + do_something(); + ASSERT_EQ(TC_evict, test_sst); + ASSERT_EQ(TC_force_evict, test_sst); + ASSERT_EQ(TC_found, test_sst); + ASSERT_EQ(TC_open, 0); + ASSERT_EQ(TC_run, test_sst); + // block_cache_compressed erase: + // index_block_partitions(N) + // data_blocks(N) + ASSERT_EQ(BC_erase, test_sst * 2); + ASSERT_EQ(BC_failures, 0); + + options.table_evict_type = TERARKDB_NAMESPACE::kAlwaysForceEvict; + options.max_open_files = max_open_files; + DestroyAndReopen(options); + do_something(); + ASSERT_EQ(TC_evict, test_sst); + ASSERT_EQ(TC_force_evict, test_sst); + ASSERT_GE(TC_found, max_open_files); + ASSERT_LT(TC_found, test_sst); + ASSERT_EQ(TC_open, test_sst - TC_found); + ASSERT_EQ(TC_run, test_sst); + // block_cache_compressed erase: + // index_block_partitions(N) + // data_blocks(N) + ASSERT_EQ(BC_erase, test_sst * 2); + ASSERT_EQ(BC_failures, 0); +} + } // namespace TERARKDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 9fe4ae9701..98a42a6413 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -518,10 +518,6 @@ Options DBTestBase::GetOptions( set_block_based_table_factory = true; break; } - case kRowCache: { - options.row_cache = NewLRUCache(1024 * 1024); - break; - } case kRecycleLogFiles: { options.recycle_log_file_num = 2; break; diff --git a/db/db_test_util.h b/db/db_test_util.h index 8e4c94cd8c..bc4f71bc78 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -674,7 +674,6 @@ class DBTestBase : public testing::Test { kxxHashChecksum = 24, kFIFOCompaction = 25, kOptimizeFiltersForHits = 26, - kRowCache = 27, kRecycleLogFiles = 28, kConcurrentSkipList = 29, kPipelinedWrite = 30, diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 51c03a527d..c6f56af117 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -815,7 +815,7 @@ class RecoveryTestHelper { std::unique_ptr wal_manager; WriteController write_controller; - versions.reset(new VersionSet(test->dbname_, &db_options, env_options, + versions.reset(new VersionSet(test->dbname_, &db_options, &env_options, /* seq_per_batch */ false, table_cache.get(), &write_buffer_manager, &write_controller)); diff --git a/db/dbformat.h b/db/dbformat.h index 7861edeec6..8c9289414d 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -10,6 +10,7 @@ #pragma once #include +#include #include #include #include @@ -67,6 +68,13 @@ enum ValueType : unsigned char { kMaxValue = 0x7F // Not used for storing records. }; +// Sst purpose +enum SstPurpose { + kEssenceSst, // Actual data storage sst + kLogSst, // Log as sst + kMapSst, // Dummy sst +}; + // Defined in dbformat.cc extern const ValueType kValueTypeForSeek; extern const ValueType kValueTypeForSeekForPrev; @@ -606,6 +614,8 @@ extern Status ReadRecordFromWriteBatch(Slice* input, char* tag, uint32_t* column_family, Slice* key, Slice* value, Slice* blob, Slice* xid); +class LifeCycle : public std::enable_shared_from_this {}; + // When user call DeleteRange() to delete a range of keys, // we will store a serialized RangeTombstone in MemTable and SST. // the struct here is a easy-understood form diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index c26f52f37b..4cd9056ddd 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -14,13 +14,12 @@ #include #include -#include #include +#include "db/compaction_picker.h" #include "db/version_edit.h" #include "rocksdb/terark_namespace.h" #include "table/merging_iterator.h" -#include "table/scoped_arena_iterator.h" #include "table/sst_file_writer_collectors.h" #include "table/table_builder.h" #include "util/file_reader_writer.h" diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index ab0b421ee4..727315e609 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -35,7 +35,7 @@ class FlushJobTest : public testing::Test { column_family_names_({kDefaultColumnFamilyName, "foo", "bar"}), table_cache_(NewLRUCache(50000, 16)), write_buffer_manager_(db_options_.db_write_buffer_size), - versions_(new VersionSet(dbname_, &db_options_, env_options_, + versions_(new VersionSet(dbname_, &db_options_, &env_options_, /* seq_per_batch */ false, table_cache_.get(), &write_buffer_manager_, &write_controller_)), shutting_down_(false), diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index 2432353669..9987c63308 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -71,8 +71,8 @@ class ForwardLevelIterator : public InternalIterator { ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(), kMaxSequenceNumber /* upper_bound */); file_iter_ = cfd_->table_cache()->NewIterator( - read_options_, *(cfd_->soptions()), cfd_->internal_comparator(), - *files_[file_index_], dependence_map_, + read_options_, *(cfd_->soptions()), *files_[file_index_], + dependence_map_, read_options_.ignore_range_deletions ? nullptr : &range_del_agg, prefix_extractor_, nullptr /* table_reader_ptr */, nullptr, false); valid_ = false; @@ -560,8 +560,7 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) { continue; } l0_iters_.push_back(cfd_->table_cache()->NewIterator( - read_options_, *cfd_->soptions(), cfd_->internal_comparator(), *l0, - vstorage->dependence_map(), + read_options_, *cfd_->soptions(), *l0, vstorage->dependence_map(), read_options_.ignore_range_deletions ? nullptr : &range_del_agg, sv_->mutable_cf_options.prefix_extractor.get())); } @@ -631,8 +630,8 @@ void ForwardIterator::RenewIterators() { continue; } l0_iters_new.push_back(cfd_->table_cache()->NewIterator( - read_options_, *cfd_->soptions(), cfd_->internal_comparator(), - *l0_files_new[inew], vstorage_new->dependence_map(), + read_options_, *cfd_->soptions(), *l0_files_new[inew], + vstorage_new->dependence_map(), read_options_.ignore_range_deletions ? nullptr : &range_del_agg, svnew->mutable_cf_options.prefix_extractor.get())); } @@ -690,8 +689,8 @@ void ForwardIterator::ResetIncompleteIterators() { } DeleteIterator(l0_iters_[i]); l0_iters_[i] = cfd_->table_cache()->NewIterator( - read_options_, *cfd_->soptions(), cfd_->internal_comparator(), - *l0_files[i], vstorage.dependence_map(), nullptr /* range_del_agg */, + read_options_, *cfd_->soptions(), *l0_files[i], + vstorage.dependence_map(), nullptr /* range_del_agg */, sv_->mutable_cf_options.prefix_extractor.get()); } diff --git a/db/internal_stats.h b/db/internal_stats.h index a7395c89cd..7bb1cac517 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -13,14 +13,20 @@ #include #include -#include "db/version_set.h" +#include "monitoring/histogram.h" +#include "rocksdb/cache.h" +#include "rocksdb/env.h" +#include "rocksdb/listener.h" +#include "rocksdb/slice.h" #include "rocksdb/terark_namespace.h" namespace TERARKDB_NAMESPACE { class ColumnFamilyData; class DBImpl; +class InternalStats; class MemTableList; +class Version; // Config for retrieving a property's value. struct DBPropertyInfo { diff --git a/db/job_context.h b/db/job_context.h index 5097088654..2a43fb05ab 100644 --- a/db/job_context.h +++ b/db/job_context.h @@ -14,7 +14,9 @@ #include "db/column_family.h" #include "db/log_writer.h" +#include "db/version_set.h" #include "rocksdb/terark_namespace.h" +#include "util/chash_map.h" namespace TERARKDB_NAMESPACE { @@ -150,7 +152,7 @@ struct JobContext { std::vector version_ref; // a list of sst files that we need to delete - std::vector sst_delete_files; + chash_map sst_delete_files; // a list of log files that we need to delete std::vector log_delete_files; diff --git a/db/map_builder.cc b/db/map_builder.cc index 264d699216..ce0f598047 100644 --- a/db/map_builder.cc +++ b/db/map_builder.cc @@ -87,7 +87,7 @@ struct IteratorCacheContext { read_options.total_order_seek = true; return ctx->cfd->table_cache()->NewIterator( - read_options, *ctx->env_options, ctx->cfd->internal_comparator(), *f, + read_options, *ctx->env_options, *f, f->prop.is_map_sst() ? empty_dependence_map : dependence_map, nullptr /* range_del_agg */, ctx->mutable_cf_options->prefix_extractor.get(), reader_ptr, diff --git a/db/map_builder_test.cc b/db/map_builder_test.cc index 8426edd0ab..938edc2566 100644 --- a/db/map_builder_test.cc +++ b/db/map_builder_test.cc @@ -59,7 +59,7 @@ class MapBuilderTest : public testing::Test { ioptions_(db_options_, cf_options_), table_cache_(NewLRUCache(50000, 16)), write_buffer_manager_(db_options_.db_write_buffer_size), - versions_(new VersionSet(dbname_, &db_options_, env_options_, + versions_(new VersionSet(dbname_, &db_options_, &env_options_, /* seq_per_batch */ false, table_cache_.get(), &write_buffer_manager_, &write_controller_)), mock_table_factory_(new mock::MockTableFactory()), @@ -201,10 +201,9 @@ class MapBuilderTest : public testing::Test { Status s; DependenceMap empty_dependence_map; std::unique_ptr iter(cfd_->table_cache()->NewIterator( - ReadOptions(), env_options_, cfd_->internal_comparator(), *f, - empty_dependence_map, nullptr /* range_del_agg */, - mutable_cf_options_.prefix_extractor.get(), nullptr, nullptr, - false /* for_compaction */, nullptr /* arena */, + ReadOptions(), env_options_, *f, empty_dependence_map, + nullptr /* range_del_agg */, mutable_cf_options_.prefix_extractor.get(), + nullptr, nullptr, false /* for_compaction */, nullptr /* arena */, false /* skip_filter */, 1)); s = iter->status(); if (!s.ok()) { diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index 19105edcd0..6ca3bbea21 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -101,7 +101,7 @@ class MemTableListTest : public testing::Test { WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size); WriteController write_controller(10000000u); - VersionSet versions(dbname, &immutable_db_options, env_options, false, + VersionSet versions(dbname, &immutable_db_options, &env_options, false, table_cache.get(), &write_buffer_manager, &write_controller); std::vector cf_descs; @@ -145,7 +145,7 @@ class MemTableListTest : public testing::Test { WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size); WriteController write_controller(10000000u); - VersionSet versions(dbname, &immutable_db_options, env_options, + VersionSet versions(dbname, &immutable_db_options, &env_options, /* seq_per_batch */ false, table_cache.get(), &write_buffer_manager, &write_controller); std::vector cf_descs; diff --git a/db/repair.cc b/db/repair.cc index 8560f810fb..37dd5863a8 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -116,11 +116,11 @@ class Repairer { // TableCache can be small since we expect each table to be opened // once. NewLRUCache(10, db_options_.table_cache_numshardbits)), - table_cache_(new TableCache(default_cf_iopts_, env_options_, - raw_table_cache_.get())), + table_cache_(new TableCache(default_cf_opts_, immutable_db_options_, + &env_options_, raw_table_cache_.get())), wb_(db_options_.db_write_buffer_size), wc_(db_options_.delayed_write_rate), - vset_(dbname_, &immutable_db_options_, env_options_, + vset_(dbname_, &immutable_db_options_, &env_options_, /* seq_per_batch */ false, raw_table_cache_.get(), &wb_, &wc_), next_file_number_(1), db_lock_(nullptr) { @@ -171,7 +171,6 @@ class Repairer { if (db_lock_ != nullptr) { env_->UnlockFile(db_lock_); } - delete table_cache_; } Status Run() { @@ -245,7 +244,7 @@ class Repairer { const ColumnFamilyOptions unknown_cf_opts_; const bool create_unknown_cfs_; std::shared_ptr raw_table_cache_; - TableCache* table_cache_; + std::unique_ptr table_cache_; WriteBufferManager wb_; WriteController wc_; VersionSet vset_; @@ -437,7 +436,7 @@ class Repairer { auto& moptions = *cfd->GetLatestMutableCFOptions(); status = BuildTable( dbname_, &vset_, env_, *cfd->ioptions(), moptions, env_options_, - table_cache_, c_style_callback(get_arena_input_iter), + table_cache_.get(), c_style_callback(get_arena_input_iter), &get_arena_input_iter, c_style_callback(get_range_del_iters), &get_range_del_iters, &meta, cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(moptions), @@ -580,8 +579,7 @@ class Repairer { file_size); std::shared_ptr props; if (status.ok()) { - status = table_cache_->GetTableProperties(env_options_, icmp_, t->meta, - &props); + status = table_cache_->GetTableProperties(env_options_, t->meta, &props); } if (status.ok()) { t->column_family_id = static_cast(props->column_family_id); @@ -622,8 +620,8 @@ class Repairer { // P.S. depend files in VersionStorage has not build yet ... DependenceMap empty_dependence_map; InternalIterator* iter = table_cache_->NewIterator( - ReadOptions(), env_options_, cfd->internal_comparator(), t->meta, - empty_dependence_map, nullptr /* range_del_agg */, + ReadOptions(), env_options_, t->meta, empty_dependence_map, + nullptr /* range_del_agg */, cfd->GetLatestMutableCFOptions()->prefix_extractor.get()); bool empty = true; ParsedInternalKey parsed; diff --git a/db/table_cache.cc b/db/table_cache.cc index a8c9f5242c..b182a63e55 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -17,7 +17,6 @@ #include "rocksdb/terark_namespace.h" #include "table/get_context.h" #include "table/internal_iterator.h" -#include "table/iterator_wrapper.h" #include "table/table_builder.h" #include "table/table_reader.h" #include "table/two_level_iterator.h" @@ -90,8 +89,7 @@ class LazyCreateIterator : public Snapshot { TableCache* table_cache_; ReadOptions options_; // deep copy SequenceNumber snapshot_; - const EnvOptions& env_options_; - const InternalKeyComparator& icomparator_; + EnvOptions env_options_; RangeDelAggregator* range_del_agg_; const SliceTransform* prefix_extractor_; bool for_compaction_; @@ -101,7 +99,6 @@ class LazyCreateIterator : public Snapshot { public: LazyCreateIterator(TableCache* table_cache, const ReadOptions& options, const EnvOptions& env_options, - const InternalKeyComparator& icomparator, RangeDelAggregator* range_del_agg, const SliceTransform* prefix_extractor, bool for_compaction, bool skip_filters, @@ -110,7 +107,6 @@ class LazyCreateIterator : public Snapshot { options_(options), snapshot_(0), env_options_(env_options), - icomparator_(icomparator), range_del_agg_(range_del_agg), prefix_extractor_(prefix_extractor), for_compaction_(for_compaction), @@ -132,28 +128,35 @@ class LazyCreateIterator : public Snapshot { const DependenceMap& _dependence_map, Arena* _arena, TableReader** _reader_ptr) { return table_cache_->NewIterator( - options_, env_options_, icomparator_, *_f, _dependence_map, - range_del_agg_, prefix_extractor_, _reader_ptr, nullptr, - for_compaction_, _arena, skip_filters_, level_); + options_, env_options_, *_f, _dependence_map, range_del_agg_, + prefix_extractor_, _reader_ptr, nullptr, for_compaction_, _arena, + skip_filters_, level_); } }; } // namespace -TableCache::TableCache(const ImmutableCFOptions& ioptions, - const EnvOptions& env_options, Cache* const cache) - : ioptions_(ioptions), - env_options_(env_options), +TableCache::TableCache(const ColumnFamilyOptions& initial_cf_options, + const ImmutableDBOptions& db_options, + const EnvOptions* env_options, Cache* const cache) + : initial_cf_options_(initial_cf_options), + ioptions_(db_options, initial_cf_options_), + internal_stats_(nullptr), + env_options_(*env_options), cache_(cache), - immortal_tables_(false) { - if (ioptions_.row_cache) { - // If the same cache is shared by multiple instances, we need to - // disambiguate its entries. - PutVarint64(&row_cache_id_, ioptions_.row_cache->NewId()); + immortal_tables_(false) {} + +TableCache::~TableCache() { + if (internal_stats_ != nullptr) { + internal_stats_->~InternalStats(); } } -TableCache::~TableCache() {} +void TableCache::InitInternalStats(ColumnFamilyData* cfd) { + assert(internal_stats_ == nullptr); + internal_stats_ = new (&internal_stats_storage_) + InternalStats(ioptions_.num_levels, ioptions_.env, cfd); +} TableReader* TableCache::GetTableReaderFromHandle(Cache::Handle* handle) { return reinterpret_cast(cache_->Value(handle)); @@ -164,18 +167,16 @@ void TableCache::ReleaseHandle(Cache::Handle* handle) { } Status TableCache::GetTableReader( - const EnvOptions& env_options, - const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, + const EnvOptions& env_options, const FileDescriptor& fd, bool sequential_mode, size_t readahead, bool record_read_stats, HistogramImpl* file_read_hist, std::unique_ptr* table_reader, const SliceTransform* prefix_extractor, bool skip_filters, int level, bool prefetch_index_and_filter_in_cache, bool for_compaction, bool force_memory) { auto s = GetTableReaderImpl( - env_options, internal_comparator, fd, sequential_mode, readahead, - record_read_stats, file_read_hist, table_reader, prefix_extractor, - skip_filters, level, prefetch_index_and_filter_in_cache, for_compaction, - force_memory); + env_options, fd, sequential_mode, readahead, record_read_stats, + file_read_hist, table_reader, prefix_extractor, skip_filters, level, + prefetch_index_and_filter_in_cache, for_compaction, force_memory); if (s.IsInvalidArgument() && s.subcode() == Status::kRequireMmap) { // this table requires mmap open, make it happy assert(!env_options.use_mmap_reads); @@ -184,17 +185,15 @@ Status TableCache::GetTableReader( mmap_env_options.use_direct_reads = false; mmap_env_options.use_aio_reads = false; s = GetTableReaderImpl( - mmap_env_options, internal_comparator, fd, sequential_mode, readahead, - record_read_stats, file_read_hist, table_reader, prefix_extractor, - skip_filters, level, prefetch_index_and_filter_in_cache, for_compaction, - force_memory); + mmap_env_options, fd, sequential_mode, readahead, record_read_stats, + file_read_hist, table_reader, prefix_extractor, skip_filters, level, + prefetch_index_and_filter_in_cache, for_compaction, force_memory); } return s; } Status TableCache::GetTableReaderImpl( - const EnvOptions& env_options, - const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, + const EnvOptions& env_options, const FileDescriptor& fd, bool sequential_mode, size_t readahead, bool record_read_stats, HistogramImpl* file_read_hist, std::unique_ptr* table_reader, const SliceTransform* prefix_extractor, bool skip_filters, int level, @@ -228,8 +227,9 @@ Status TableCache::GetTableReaderImpl( ioptions_.listeners)); s = ioptions_.table_factory->NewTableReader( TableReaderOptions(ioptions_, prefix_extractor, env_options, - internal_comparator, skip_filters, immortal_tables_, - level, fd.GetNumber(), fd.largest_seqno), + ioptions_.internal_comparator, skip_filters, + immortal_tables_, level, fd.GetNumber(), + fd.largest_seqno), std::move(file_reader), fd.GetFileSize(), table_reader, prefetch_index_and_filter_in_cache); TEST_SYNC_POINT("TableCache::GetTableReader:0"); @@ -245,7 +245,6 @@ void TableCache::EraseHandle(const FileDescriptor& fd, Cache::Handle* handle) { } Status TableCache::FindTable(const EnvOptions& env_options, - const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, Cache::Handle** handle, const SliceTransform* prefix_extractor, const bool no_io, bool record_read_stats, @@ -265,10 +264,9 @@ Status TableCache::FindTable(const EnvOptions& env_options, return Status::Incomplete("Table not found in table_cache, no_io is set"); } std::unique_ptr table_reader; - s = GetTableReader(env_options, internal_comparator, fd, - false /* sequential mode */, 0 /* readahead */, - record_read_stats, file_read_hist, &table_reader, - prefix_extractor, skip_filters, level, + s = GetTableReader(env_options, fd, false /* sequential mode */, + 0 /* readahead */, record_read_stats, file_read_hist, + &table_reader, prefix_extractor, skip_filters, level, prefetch_index_and_filter_in_cache, false /* for_compaction */, force_memory); if (!s.ok()) { @@ -291,11 +289,10 @@ Status TableCache::FindTable(const EnvOptions& env_options, InternalIterator* TableCache::NewIterator( const ReadOptions& options, const EnvOptions& env_options, - const InternalKeyComparator& icomparator, const FileMetaData& file_meta, - const DependenceMap& dependence_map, RangeDelAggregator* range_del_agg, - const SliceTransform* prefix_extractor, TableReader** table_reader_ptr, - HistogramImpl* file_read_hist, bool for_compaction, Arena* arena, - bool skip_filters, int level) { + const FileMetaData& file_meta, const DependenceMap& dependence_map, + RangeDelAggregator* range_del_agg, const SliceTransform* prefix_extractor, + TableReader** table_reader_ptr, HistogramImpl* file_read_hist, + bool for_compaction, Arena* arena, bool skip_filters, int level) { PERF_TIMER_GUARD(new_table_iterator_nanos); Status s; @@ -332,10 +329,9 @@ InternalIterator* TableCache::NewIterator( auto& fd = file_meta.fd; if (create_new_table_reader) { std::unique_ptr table_reader_unique_ptr; - s = GetTableReader(env_options, icomparator, fd, true /* sequential_mode */, - readahead, record_stats, nullptr, - &table_reader_unique_ptr, prefix_extractor, - false /* skip_filters */, level, + s = GetTableReader(env_options, fd, true /* sequential_mode */, readahead, + record_stats, nullptr, &table_reader_unique_ptr, + prefix_extractor, false /* skip_filters */, level, true /* prefetch_index_and_filter_in_cache */, for_compaction, file_meta.prop.is_map_sst()); if (s.ok()) { @@ -344,7 +340,7 @@ InternalIterator* TableCache::NewIterator( } else { table_reader = fd.table_reader; if (table_reader == nullptr) { - s = FindTable(env_options, icomparator, fd, &handle, prefix_extractor, + s = FindTable(env_options, fd, &handle, prefix_extractor, options.read_tier == kBlockCacheTier /* no_io */, record_stats, file_read_hist, skip_filters, level, true /* prefetch_index_and_filter_in_cache */, @@ -379,19 +375,17 @@ InternalIterator* TableCache::NewIterator( if (arena != nullptr) { void* buffer = arena->AllocateAligned(sizeof(LazyCreateIterator)); lazy_create_iter = new (buffer) LazyCreateIterator( - this, options, env_options, icomparator, range_del_agg, - prefix_extractor, for_compaction, skip_filters, - ignore_range_deletions, level); + this, options, env_options, range_del_agg, prefix_extractor, + for_compaction, skip_filters, ignore_range_deletions, level); } else { lazy_create_iter = new LazyCreateIterator( - this, options, env_options, icomparator, range_del_agg, - prefix_extractor, for_compaction, skip_filters, - ignore_range_deletions, level); + this, options, env_options, range_del_agg, prefix_extractor, + for_compaction, skip_filters, ignore_range_deletions, level); } auto map_sst_iter = NewMapSstIterator( - &file_meta, result, dependence_map, icomparator, lazy_create_iter, - c_style_callback(*lazy_create_iter), arena); + &file_meta, result, dependence_map, ioptions_.internal_comparator, + lazy_create_iter, c_style_callback(*lazy_create_iter), arena); if (arena != nullptr) { map_sst_iter->RegisterCleanup( [](void* arg1, void* arg2) { @@ -452,7 +446,6 @@ InternalIterator* TableCache::NewIterator( } Status TableCache::Get(const ReadOptions& options, - const InternalKeyComparator& internal_comparator, const FileMetaData& file_meta, const DependenceMap& dependence_map, const Slice& k, GetContext* get_context, @@ -480,8 +473,7 @@ Status TableCache::Get(const ReadOptions& options, TableReader* t = fd.table_reader; Cache::Handle* handle = nullptr; if (t == nullptr) { - s = FindTable(env_options_, internal_comparator, fd, &handle, - prefix_extractor, + s = FindTable(env_options_, fd, &handle, prefix_extractor, options.read_tier == kBlockCacheTier /* no_io */, true /* record_read_stats */, file_read_hist, skip_filters, level, true /* prefetch_index_and_filter_in_cache */, @@ -516,7 +508,7 @@ Status TableCache::Get(const ReadOptions& options, uint64_t link_count; uint64_t flags; Slice find_k = k; - auto& icomp = internal_comparator; + auto& icomp = ioptions_.internal_comparator; if (!GetVarint64(&map_input, &flags) || !GetVarint64(&map_input, &link_count) || @@ -587,9 +579,9 @@ Status TableCache::Get(const ReadOptions& options, return false; } assert(find->second->fd.GetNumber() == file_number); - s = Get(forward_options, internal_comparator, *find->second, - dependence_map, find_k, get_context, prefix_extractor, - file_read_hist, skip_filters, level, inheritance); + s = Get(forward_options, *find->second, dependence_map, find_k, + get_context, prefix_extractor, file_read_hist, skip_filters, + level, inheritance); if (!s.ok() || get_context->is_finished()) { // error or found, recovery min_seq_type_backup is unnecessary @@ -615,9 +607,7 @@ Status TableCache::Get(const ReadOptions& options, } Status TableCache::GetTableProperties( - const EnvOptions& env_options, - const InternalKeyComparator& internal_comparator, - const FileMetaData& file_meta, + const EnvOptions& env_options, const FileMetaData& file_meta, std::shared_ptr* properties, const SliceTransform* prefix_extractor, bool no_io) { Status s; @@ -631,10 +621,10 @@ Status TableCache::GetTableProperties( } Cache::Handle* table_handle = nullptr; - s = FindTable(env_options, internal_comparator, fd, &table_handle, - prefix_extractor, no_io, true /* record_read_stats */, - nullptr /* file_read_hist */, false /* skip_filters */, - -1 /* level */, true /* prefetch_index_and_filter_in_cache */, + s = FindTable(env_options, fd, &table_handle, prefix_extractor, no_io, + true /* record_read_stats */, nullptr /* file_read_hist */, + false /* skip_filters */, -1 /* level */, + true /* prefetch_index_and_filter_in_cache */, file_meta.prop.is_map_sst()); if (!s.ok()) { return s; @@ -647,8 +637,7 @@ Status TableCache::GetTableProperties( } size_t TableCache::GetMemoryUsageByTableReader( - const EnvOptions& env_options, - const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, + const EnvOptions& env_options, const FileDescriptor& fd, const SliceTransform* prefix_extractor) { Status s; auto table_reader = fd.table_reader; @@ -658,8 +647,7 @@ size_t TableCache::GetMemoryUsageByTableReader( } Cache::Handle* table_handle = nullptr; - s = FindTable(env_options, internal_comparator, fd, &table_handle, - prefix_extractor, true); + s = FindTable(env_options, fd, &table_handle, prefix_extractor, true); if (!s.ok()) { return 0; } @@ -671,9 +659,38 @@ size_t TableCache::GetMemoryUsageByTableReader( } void TableCache::Evict(Cache* cache, uint64_t file_number) { + TEST_SYNC_POINT("TableCache::Evict"); cache->Erase(GetSliceForFileNumber(&file_number)); } +void TableCache::ForceEvict(uint64_t file_number, + const FileMetaData* file_meta) { + TEST_SYNC_POINT("TableCache::ForceEvict"); + std::unique_ptr reader_holder; + TableReader* reader = nullptr; + Slice key = GetSliceForFileNumber(&file_number); + auto handle = cache_->Lookup(key); + if (handle != nullptr) { + TEST_SYNC_POINT("TableCache::ForceEvict:Found"); + reader = GetTableReaderFromHandle(handle); + } else if (file_meta != nullptr) { + TEST_SYNC_POINT("TableCache::ForceEvict:Open"); + auto s = GetTableReader(env_options_, file_meta->fd, false, 0, 0, nullptr, + &reader_holder); + if (s.ok()) { + reader = reader_holder.get(); + } + } + if (reader != nullptr) { + TEST_SYNC_POINT("TableCache::ForceEvict:Run"); + reader->ForceEvict(); + } + if (handle != nullptr) { + cache_->Release(handle); + } + Evict(cache_, file_number); +} + void TableCache::TEST_AddMockTableReader(TableReader* table_reader, FileDescriptor fd) { Status s; diff --git a/db/table_cache.h b/db/table_cache.h index a388eb6254..ca75e2dd7d 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -13,9 +13,11 @@ #include #include +#include #include #include "db/dbformat.h" +#include "db/internal_stats.h" #include "db/range_del_aggregator.h" #include "options/cf_options.h" #include "port/port.h" @@ -29,18 +31,25 @@ namespace TERARKDB_NAMESPACE { -class Env; class Arena; +class ColumnFamilyData; +class Env; struct FileDescriptor; class GetContext; class HistogramImpl; +class InternalStats; -class TableCache { +class TableCache : public LifeCycle { public: - TableCache(const ImmutableCFOptions& ioptions, - const EnvOptions& storage_options, Cache* cache); + TableCache(const ColumnFamilyOptions& initial_cf_options, + const ImmutableDBOptions& db_options, + const EnvOptions* storage_options, Cache* cache); ~TableCache(); + void InitInternalStats(ColumnFamilyData* cfd); + + void bind_life_cycle() { ioptions_.life_cycle = shared_from_this(); } + // Return an iterator for the specified file number (the corresponding // file length must be exactly "file_size" bytes). If "tableptr" is // non-nullptr, also sets "*tableptr" to point to the Table object @@ -54,7 +63,6 @@ class TableCache { // @param level The level this table is at, -1 for "not set / don't know" InternalIterator* NewIterator( const ReadOptions& options, const EnvOptions& toptions, - const InternalKeyComparator& internal_comparator, const FileMetaData& file_meta, const DependenceMap& dependence_map, RangeDelAggregator* range_del_agg, const SliceTransform* prefix_extractor = nullptr, @@ -70,18 +78,20 @@ class TableCache { // returns non-ok status. // @param skip_filters Disables loading/accessing the filter block // @param level The level this table is at, -1 for "not set / don't know" - Status Get(const ReadOptions& options, - const InternalKeyComparator& internal_comparator, - const FileMetaData& file_meta, const DependenceMap& dependence_map, - const Slice& k, GetContext* get_context, + Status Get(const ReadOptions& options, const FileMetaData& file_meta, + const DependenceMap& dependence_map, const Slice& k, + GetContext* get_context, const SliceTransform* prefix_extractor = nullptr, HistogramImpl* file_read_hist = nullptr, bool skip_filters = false, - int level = -1, - const FileMetaData* inheritance = nullptr); + int level = -1, const FileMetaData* inheritance = nullptr); // Evict any entry for the specified file number static void Evict(Cache* cache, uint64_t file_number); + // Evict any entry for the specified file number + void ForceEvict(uint64_t file_number, + const FileMetaData* file_meta = nullptr); + // Clean table handle and erase it from the table cache // Used in DB close, or the file is not live anymore. void EraseHandle(const FileDescriptor& fd, Cache::Handle* handle); @@ -89,9 +99,8 @@ class TableCache { // Find table reader // @param skip_filters Disables loading/accessing the filter block // @param level == -1 means not specified - Status FindTable(const EnvOptions& toptions, - const InternalKeyComparator& internal_comparator, - const FileDescriptor& file_fd, Cache::Handle**, + Status FindTable(const EnvOptions& toptions, const FileDescriptor& file_fd, + Cache::Handle**, const SliceTransform* prefix_extractor = nullptr, const bool no_io = false, bool record_read_stats = true, HistogramImpl* file_read_hist = nullptr, @@ -109,7 +118,6 @@ class TableCache { // return Status::Incomplete() if table is not present in cache and // we set `no_io` to be true. Status GetTableProperties(const EnvOptions& toptions, - const InternalKeyComparator& internal_comparator, const FileMetaData& file_meta, std::shared_ptr* properties, const SliceTransform* prefix_extractor = nullptr, @@ -118,9 +126,7 @@ class TableCache { // Return total memory usage of the table reader of the file. // 0 if table reader of the file is not loaded. size_t GetMemoryUsageByTableReader( - const EnvOptions& toptions, - const InternalKeyComparator& internal_comparator, - const FileDescriptor& fd, + const EnvOptions& toptions, const FileDescriptor& fd, const SliceTransform* prefix_extractor = nullptr); // Release the handle from a cache @@ -138,22 +144,25 @@ class TableCache { } } + const ColumnFamilyOptions& initial_cf_options() const { + return initial_cf_options_; + } + const ImmutableCFOptions& ioptions() const { return ioptions_; } + InternalStats* internal_stats() { return internal_stats_; } + void TEST_AddMockTableReader(TableReader* table_reader, FileDescriptor fd); private: // Build a table reader - Status GetTableReader(const EnvOptions& env_options, - const InternalKeyComparator& internal_comparator, - const FileDescriptor& fd, bool sequential_mode, - size_t readahead, bool record_read_stats, - HistogramImpl* file_read_hist, + Status GetTableReader(const EnvOptions& env_options, const FileDescriptor& fd, + bool sequential_mode, size_t readahead, + bool record_read_stats, HistogramImpl* file_read_hist, std::unique_ptr* table_reader, const SliceTransform* prefix_extractor = nullptr, bool skip_filters = false, int level = -1, bool prefetch_index_and_filter_in_cache = true, bool for_compaction = false, bool force_memory = false); Status GetTableReaderImpl(const EnvOptions& env_options, - const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, bool sequential_mode, size_t readahead, bool record_read_stats, HistogramImpl* file_read_hist, @@ -163,10 +172,13 @@ class TableCache { bool prefetch_index_and_filter_in_cache, bool for_compaction, bool force_memory); - const ImmutableCFOptions& ioptions_; + ColumnFamilyOptions initial_cf_options_; + ImmutableCFOptions ioptions_; + InternalStats* internal_stats_; + std::aligned_storage::type + internal_stats_storage_; const EnvOptions& env_options_; Cache* const cache_; - std::string row_cache_id_; bool immortal_tables_; }; diff --git a/db/version_builder.cc b/db/version_builder.cc index 3e260ab95b..1500201792 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -781,10 +781,10 @@ class VersionBuilder::Rep { auto file_read_hist = level >= 0 ? internal_stats->GetFileReadHist(level) : nullptr; table_cache_->FindTable( - env_options_, *base_vstorage_->InternalComparator(), file_meta->fd, - &file_meta->table_reader_handle, prefix_extractor, false /*no_io */, - true /* record_read_stats */, file_read_hist, false, level, - prefetch_index_and_filter_in_cache, file_meta->prop.is_map_sst()); + env_options_, file_meta->fd, &file_meta->table_reader_handle, + prefix_extractor, false /*no_io */, true /* record_read_stats */, + file_read_hist, false, level, prefetch_index_and_filter_in_cache, + file_meta->prop.is_map_sst()); if (file_meta->table_reader_handle != nullptr) { // Load table_reader file_meta->fd.table_reader = table_cache_->GetTableReaderFromHandle( @@ -831,9 +831,9 @@ class VersionBuilder::Rep { auto* file_meta = files_meta[file_idx]; std::shared_ptr properties; - auto s = table_cache_->GetTableProperties( - env_options_, *base_vstorage_->InternalComparator(), *file_meta, - &properties, prefix_extractor, false /*no_io */); + auto s = table_cache_->GetTableProperties(env_options_, *file_meta, + &properties, prefix_extractor, + false /*no_io */); if (s.ok() && properties) { file_meta->prop.num_entries = properties->num_entries; diff --git a/db/version_set.cc b/db/version_set.cc index c222131c39..07c0d7cd19 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -399,8 +399,9 @@ Version::~Version() { assert(cfd_ != nullptr); uint32_t path_id = f->fd.GetPathId(); assert(path_id < cfd_->ioptions()->cf_paths.size()); - vset_->obsolete_files_.push_back( - ObsoleteFileInfo(f, cfd_->ioptions()->cf_paths[path_id].path)); + vset_->obsolete_files_.emplace_back( + f, cfd_->ioptions()->cf_paths[path_id].path, + cfd_->table_cache_shared_ptr()); } } } @@ -577,8 +578,8 @@ class LevelIterator final : public InternalIterator, public Snapshot { sample_file_read_inc(file_meta.file_metadata); } return table_cache_->NewIterator( - read_options_, env_options_, icomparator_, *file_meta.file_metadata, - dependence_map_, range_del_agg_, prefix_extractor_, + read_options_, env_options_, *file_meta.file_metadata, dependence_map_, + range_del_agg_, prefix_extractor_, nullptr /* don't need reference to table */, file_read_hist_, for_compaction_, nullptr /* arena */, skip_filters_, level_); } @@ -762,8 +763,8 @@ Status Version::GetTableProperties(std::shared_ptr* tp, auto table_cache = cfd_->table_cache(); auto ioptions = cfd_->ioptions(); Status s = table_cache->GetTableProperties( - env_options_, cfd_->internal_comparator(), *file_meta, tp, - mutable_cf_options_.prefix_extractor.get(), true /* no io */); + env_options_, *file_meta, tp, mutable_cf_options_.prefix_extractor.get(), + true /* no io */); if (s.ok()) { return s; } @@ -928,13 +929,13 @@ size_t Version::GetMemoryUsageByTableReaders() { for (auto& file_level : storage_info_.level_files_brief_) { for (size_t i = 0; i < file_level.num_files; i++) { total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader( - env_options_, cfd_->internal_comparator(), file_level.files[i].fd, + env_options_, file_level.files[i].fd, mutable_cf_options_.prefix_extractor.get()); } } for (auto file_meta : storage_info_.LevelFiles(-1)) { total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader( - env_options_, cfd_->internal_comparator(), file_meta->fd, + env_options_, file_meta->fd, mutable_cf_options_.prefix_extractor.get()); } return total_usage; @@ -1115,8 +1116,8 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options, i++) { const auto& file = storage_info_.LevelFilesBrief(level).files[i]; merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator( - read_options, soptions, cfd_->internal_comparator(), - *file.file_metadata, storage_info_.dependence_map(), range_del_agg, + read_options, soptions, *file.file_metadata, + storage_info_.dependence_map(), range_del_agg, mutable_cf_options_.prefix_extractor.get(), nullptr, cfd_->internal_stats()->GetFileReadHist(level), false, arena, false /* skip_filters */, 0 /* level */)); @@ -1172,8 +1173,8 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options, continue; } ScopedArenaIterator iter(cfd_->table_cache()->NewIterator( - read_options, env_options, cfd_->internal_comparator(), - *file->file_metadata, storage_info_.dependence_map(), &range_del_agg, + read_options, env_options, *file->file_metadata, + storage_info_.dependence_map(), &range_del_agg, mutable_cf_options_.prefix_extractor.get(), nullptr, cfd_->internal_stats()->GetFileReadHist(level), false, &arena, false /* skip_filters */, 0 /* level */)); @@ -1294,8 +1295,8 @@ Status Version::fetch_buffer(LazyBuffer* buffer) const { IterKey iter_key; iter_key.SetInternalKey(user_key, sequence, kValueTypeForSeek); auto s = table_cache_->Get( - ReadOptions(), cfd_->internal_comparator(), *pair.second, - storage_info_.dependence_map(), iter_key.GetInternalKey(), &get_context, + ReadOptions(), *pair.second, storage_info_.dependence_map(), + iter_key.GetInternalKey(), &get_context, mutable_cf_options_.prefix_extractor.get(), nullptr, true); if (!s.ok()) { return s; @@ -1378,9 +1379,8 @@ void Version::Get(const ReadOptions& read_options, const Slice& user_key, get_perf_context()->per_level_perf_context_enabled; StopWatchNano timer(env_, timer_enabled /* auto_start */); *status = table_cache_->Get( - read_options, *internal_comparator(), *f->file_metadata, - storage_info_.dependence_map(), ikey, &get_context, - mutable_cf_options_.prefix_extractor.get(), + read_options, *f->file_metadata, storage_info_.dependence_map(), ikey, + &get_context, mutable_cf_options_.prefix_extractor.get(), cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), IsFilterSkipped(static_cast(fp.GetHitFileLevel()), fp.IsHitFileLastInLevel()), @@ -1474,11 +1474,10 @@ void Version::GetKey(const Slice& user_key, const Slice& ikey, Status* status, FdWithKeyRange* f = fp.GetNextFile(); while (f != nullptr) { - *status = - table_cache_->Get(options, *internal_comparator(), *f->file_metadata, - storage_info_.dependence_map(), ikey, &get_context, - mutable_cf_options_.prefix_extractor.get(), nullptr, - true, fp.GetCurrentLevel(), &blob); + *status = table_cache_->Get( + options, *f->file_metadata, storage_info_.dependence_map(), ikey, + &get_context, mutable_cf_options_.prefix_extractor.get(), nullptr, true, + fp.GetCurrentLevel(), &blob); if (!status->ok()) { return; } @@ -2963,7 +2962,7 @@ struct VersionSet::ManifestWriter { VersionSet::VersionSet(const std::string& dbname, const ImmutableDBOptions* _db_options, - const EnvOptions& storage_options, bool seq_per_batch, + const EnvOptions* storage_options, bool seq_per_batch, Cache* table_cache, WriteBufferManager* write_buffer_manager, WriteController* write_controller) @@ -2985,7 +2984,7 @@ VersionSet::VersionSet(const std::string& dbname, manifest_file_size_(0), manifest_edit_count_(0), seq_per_batch_(seq_per_batch), - env_options_(storage_options) {} + env_options_(*storage_options) {} void CloseTables(void* ptr, size_t) { TableReader* table_reader = reinterpret_cast(ptr); @@ -2996,16 +2995,25 @@ VersionSet::~VersionSet() { // we need to delete column_family_set_ because its destructor depends on // VersionSet Cache* table_cache = column_family_set_->get_table_cache(); - table_cache->ApplyToAllCacheEntries(&CloseTables, false /* thread_safe */); - column_family_set_.reset(); + column_family_set_->Cleanup(); + int evict_type = table_evict_type(); for (auto& file : obsolete_files_) { if (file.metadata->table_reader_handle) { table_cache->Release(file.metadata->table_reader_handle); + } + if (file.table_cache && evict_type != kSkipForceEvict) { + file.table_cache->ForceEvict( + file.metadata->fd.GetNumber(), + evict_type == kAlwaysForceEvict ? file.metadata : nullptr); + } else { TableCache::Evict(table_cache, file.metadata->fd.GetNumber()); } file.DeleteMetadata(); + file.table_cache.reset(); } + column_family_set_.reset(); obsolete_files_.clear(); + table_cache->ApplyToAllCacheEntries(&CloseTables, false /* thread_safe */); } void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, @@ -4094,8 +4102,8 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, WriteController wc(options->delayed_write_rate); WriteBufferManager wb(options->db_write_buffer_size); const bool seq_per_batch = false; - VersionSet versions(dbname, &db_options, env_options, seq_per_batch, tc.get(), - &wb, &wc); + VersionSet versions(dbname, &db_options, &env_options, seq_per_batch, + tc.get(), &wb, &wc); Status status; std::vector dummy; @@ -4568,8 +4576,8 @@ uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f, TableCache* table_cache = v->cfd_->table_cache(); Cache::Handle* handle = nullptr; auto s = table_cache->FindTable( - v->env_options_, v->cfd_->internal_comparator(), file_meta->fd, - &handle, v->GetMutableCFOptions().prefix_extractor.get()); + v->env_options_, file_meta->fd, &handle, + v->GetMutableCFOptions().prefix_extractor.get()); if (s.ok()) { table_reader_ptr = table_cache->GetTableReaderFromHandle(handle); result = table_reader_ptr->ApproximateOffsetOf(key); @@ -4671,7 +4679,7 @@ InternalIterator* VersionSet::MakeInputIterator( const LevelFilesBrief* flevel = c->input_levels(which); for (size_t i = 0; i < flevel->num_files; i++) { list[num++] = cfd->table_cache()->NewIterator( - read_options, env_options_compactions, cfd->internal_comparator(), + read_options, env_options_compactions, *flevel->files[i].file_metadata, dependence_map, range_del_agg, c->mutable_cf_options()->prefix_extractor.get(), nullptr /* table_reader_ptr */, @@ -4811,7 +4819,7 @@ void VersionSet::GetLiveFilesMetaData(std::vector* metadata) { } } -void VersionSet::GetObsoleteFiles(std::vector* files, +void VersionSet::GetObsoleteFiles(chash_map* files, std::vector* manifest_filenames, uint64_t min_pending_output) { assert(manifest_filenames->empty()); @@ -4819,7 +4827,10 @@ void VersionSet::GetObsoleteFiles(std::vector* files, std::vector pending_files; for (auto& f : obsolete_files_) { if (f.metadata->fd.GetNumber() < min_pending_output) { - files->push_back(std::move(f)); + auto metadata = f.metadata; + if (!files->emplace(f.metadata->fd.GetNumber(), std::move(f)).second) { + delete metadata; + } } else { pending_files.push_back(std::move(f)); } diff --git a/db/version_set.h b/db/version_set.h index badaaed9c3..070e2ba007 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -754,8 +754,6 @@ class Version : public SeparateHelper, private LazyBufferState { ColumnFamilyData* cfd() const { return cfd_; } - void ForEachVersionList(void (*callback)(Version*), void* args); - // Return the next Version in the linked list. Version* Next() const { return next_; } @@ -848,10 +846,14 @@ class Version : public SeparateHelper, private LazyBufferState { struct ObsoleteFileInfo { FileMetaData* metadata; std::string path; + std::shared_ptr table_cache; ObsoleteFileInfo() noexcept : metadata(nullptr) {} ObsoleteFileInfo(FileMetaData* f, const std::string& file_path) : metadata(f), path(file_path) {} + ObsoleteFileInfo(FileMetaData* f, const std::string& file_path, + std::shared_ptr& _table_cache) + : metadata(f), path(file_path), table_cache(_table_cache) {} ObsoleteFileInfo(const ObsoleteFileInfo&) = delete; ObsoleteFileInfo& operator=(const ObsoleteFileInfo&) = delete; @@ -863,6 +865,7 @@ struct ObsoleteFileInfo { ObsoleteFileInfo& operator=(ObsoleteFileInfo&& rhs) noexcept { path = std::move(rhs.path); metadata = rhs.metadata; + table_cache = std::move(rhs.table_cache); rhs.metadata = nullptr; return *this; @@ -881,7 +884,7 @@ class BaseReferencedVersionBuilder; class VersionSet { public: VersionSet(const std::string& dbname, const ImmutableDBOptions* db_options, - const EnvOptions& env_options, bool seq_per_batch, + const EnvOptions* env_options, bool seq_per_batch, Cache* table_cache, WriteBufferManager* write_buffer_manager, WriteController* write_controller); ~VersionSet(); @@ -1107,7 +1110,7 @@ class VersionSet { // This function doesn't support leveldb SST filenames void GetLiveFilesMetaData(std::vector* metadata); - void GetObsoleteFiles(std::vector* files, + void GetObsoleteFiles(chash_map* files, std::vector* manifest_filenames, uint64_t min_pending_output); @@ -1118,6 +1121,9 @@ class VersionSet { new_options.writable_file_max_buffer_size; } + int table_evict_type() const { return table_evict_type_; } + void set_table_evict_type(int type) { table_evict_type_ = type; } + const ImmutableDBOptions* db_options() const { return db_options_; } static uint64_t GetNumLiveVersions(Version* dummy_versions); @@ -1220,6 +1226,7 @@ class VersionSet { std::vector obsolete_manifests_; const bool seq_per_batch_; + std::atomic table_evict_type_{0}; // env options for all reads and writes except compactions EnvOptions env_options_; diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 842b2f0834..8af649b9ee 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -620,7 +620,7 @@ class VersionSetTestBase { mutable_cf_options_(cf_options_, env_), table_cache_(NewLRUCache(50000, 16)), write_buffer_manager_(db_options_.db_write_buffer_size), - versions_(new VersionSet(dbname_, &db_options_, env_options_, + versions_(new VersionSet(dbname_, &db_options_, &env_options_, /* needs_dup_key_check */ false, table_cache_.get(), &write_buffer_manager_, &write_controller_)), diff --git a/db/wal_manager_test.cc b/db/wal_manager_test.cc index 16edb21456..f120c7c060 100644 --- a/db/wal_manager_test.cc +++ b/db/wal_manager_test.cc @@ -49,7 +49,7 @@ class WalManagerTest : public testing::Test { db_options_.wal_dir = dbname_; db_options_.env = env_.get(); - versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, + versions_.reset(new VersionSet(dbname_, &db_options_, &env_options_, /* seq_per_batch */ false, table_cache_.get(), &write_buffer_manager_, &write_controller_)); diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index b8f3c5faf5..b4fc57418b 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -221,7 +221,8 @@ class Cache { // If the cache contains entry for key, erase it. Note that the // underlying entry will be kept around until all existing handles // to it have been released. - virtual void Erase(const Slice& key) = 0; + // Return false if erase a non-exists key + virtual bool Erase(const Slice& key) = 0; // Return a new numeric id. May be used by multiple clients who are // sharding the same cache to partition the key space. Typically the // client will allocate a new id at startup and prepend the id to diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 110f5ec52a..dd8ca47295 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -81,15 +81,15 @@ enum CompressionType : unsigned char { kDisableCompressionOption = 0xff, }; -enum WriteBufferFlushPri : unsigned char { kFlushOldest, kFlushLargest }; - -// Sst purpose -enum SstPurpose { - kEssenceSst, // Actual data storage sst - kLogSst, // Log as sst - kMapSst, // Dummy sst +// Shall we do ForceEvict before SST dropped ? +enum TableEvictType : unsigned char { + kSkipForceEvict = 0, + kForceEvictIfOpen = 1, + kAlwaysForceEvict = 2, }; +enum WriteBufferFlushPri : unsigned char { kFlushOldest, kFlushLargest }; + struct Options; struct DbPath; @@ -993,11 +993,6 @@ struct DBOptions { // transaction is encountered in the WAL bool allow_2pc = false; - // A global cache for table-level rows. - // Default: nullptr (disabled) - // Not supported in ROCKSDB_LITE mode! - std::shared_ptr row_cache = nullptr; - std::shared_ptr metrics_reporter_factory = nullptr; #ifndef ROCKSDB_LITE @@ -1092,6 +1087,9 @@ struct DBOptions { double zenfs_low_gc_ratio = 0.25; double zenfs_high_gc_ratio = 0.6; double zenfs_force_gc_ratio = 0.9; + + // + TableEvictType table_evict_type = kSkipForceEvict; }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 4331829901..52992dc072 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -217,10 +217,6 @@ enum Tickers : uint32_t { MERGE_OPERATION_TOTAL_TIME, FILTER_OPERATION_TOTAL_TIME, - // Row cache. - ROW_CACHE_HIT, - ROW_CACHE_MISS, - // Read amplification statistics. // Read amplification can be calculated using this formula // (READ_AMP_TOTAL_READ_BYTES / READ_AMP_ESTIMATE_USEFUL_BYTES) @@ -262,6 +258,11 @@ enum Tickers : uint32_t { READ_BLOB_VALID, READ_BLOB_INVALID, + // # of blocks erased from block cache. + BLOCK_CACHE_ERASE, + // # of failures when erasing blocks from block cache. + BLOCK_CACHE_ERASE_FAILURES, + TICKER_ENUM_MAX }; diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h index 118746dacb..c52653e5af 100644 --- a/java/rocksjni/portal.h +++ b/java/rocksjni/portal.h @@ -3480,8 +3480,12 @@ class TickerTypeJni { return 0x65; case TERARKDB_NAMESPACE::Tickers::READ_BLOB_INVALID: return 0x66; - case TERARKDB_NAMESPACE::Tickers::TICKER_ENUM_MAX: + case TERARKDB_NAMESPACE::Tickers::BLOCK_CACHE_ERASE: return 0x67; + case TERARKDB_NAMESPACE::Tickers::BLOCK_CACHE_ERASE_FAILURES: + return 0x68; + case TERARKDB_NAMESPACE::Tickers::TICKER_ENUM_MAX: + return 0x69; default: // undefined/default return 0x0; @@ -3699,6 +3703,10 @@ class TickerTypeJni { case 0x66: return TERARKDB_NAMESPACE::Tickers::READ_BLOB_INVALID; case 0x67: + return TERARKDB_NAMESPACE::Tickers::BLOCK_CACHE_ERASE; + case 0x68: + return TERARKDB_NAMESPACE::Tickers::BLOCK_CACHE_ERASE_FAILURES; + case 0x69: return TERARKDB_NAMESPACE::Tickers::TICKER_ENUM_MAX; default: diff --git a/monitoring/statistics.cc b/monitoring/statistics.cc index b47930345b..87c22e2b2e 100644 --- a/monitoring/statistics.cc +++ b/monitoring/statistics.cc @@ -27,6 +27,8 @@ const std::vector> TickersNameMap = { {BLOCK_CACHE_HIT, "rocksdb.block.cache.hit"}, {BLOCK_CACHE_ADD, "rocksdb.block.cache.add"}, {BLOCK_CACHE_ADD_FAILURES, "rocksdb.block.cache.add.failures"}, + {BLOCK_CACHE_ERASE, "rocksdb.block.cache.erase"}, + {BLOCK_CACHE_ERASE_FAILURES, "rocksdb.block.cache.erase.failures"}, {BLOCK_CACHE_INDEX_MISS, "rocksdb.block.cache.index.miss"}, {BLOCK_CACHE_INDEX_HIT, "rocksdb.block.cache.index.hit"}, {BLOCK_CACHE_INDEX_ADD, "rocksdb.block.cache.index.add"}, @@ -121,8 +123,6 @@ const std::vector> TickersNameMap = { {NUMBER_BLOCK_NOT_COMPRESSED, "rocksdb.number.block.not_compressed"}, {MERGE_OPERATION_TOTAL_TIME, "rocksdb.merge.operation.time.nanos"}, {FILTER_OPERATION_TOTAL_TIME, "rocksdb.filter.operation.time.nanos"}, - {ROW_CACHE_HIT, "rocksdb.row.cache.hit"}, - {ROW_CACHE_MISS, "rocksdb.row.cache.miss"}, {READ_AMP_ESTIMATE_USEFUL_BYTES, "rocksdb.read.amp.estimate.useful.bytes"}, {READ_AMP_TOTAL_READ_BYTES, "rocksdb.read.amp.total.read.bytes"}, {NUMBER_RATE_LIMITER_DRAINS, "rocksdb.number.rate_limiter.drains"}, diff --git a/options/cf_options.cc b/options/cf_options.cc index 14f081b4f9..e934cff61e 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -31,7 +31,7 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options, : compaction_style(cf_options.compaction_style), compaction_pri(cf_options.compaction_pri), user_comparator(cf_options.comparator), - internal_comparator(InternalKeyComparator(cf_options.comparator)), + internal_comparator(cf_options.comparator), merge_operator(cf_options.merge_operator.get()), value_meta_extractor_factory( cf_options.value_meta_extractor_factory.get()), diff --git a/options/cf_options.h b/options/cf_options.h index 8aa25aae6f..6f9b7f9e15 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -135,6 +135,8 @@ struct ImmutableCFOptions { std::shared_ptr>> int_tbl_prop_collector_factories_for_blob; + + std::weak_ptr life_cycle; }; struct BlobConfig { diff --git a/options/db_options.cc b/options/db_options.cc index 649196ee69..6926b5edfe 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -81,7 +81,6 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) skip_stats_update_on_db_open(options.skip_stats_update_on_db_open), wal_recovery_mode(options.wal_recovery_mode), allow_2pc(options.allow_2pc), - row_cache(options.row_cache), #ifndef ROCKSDB_LITE wal_filter(options.wal_filter), #endif // ROCKSDB_LITE @@ -269,7 +268,8 @@ MutableDBOptions::MutableDBOptions() compaction_readahead_size(0), zenfs_low_gc_ratio(0.25), zenfs_high_gc_ratio(0.6), - zenfs_force_gc_ratio(0.9) {} + zenfs_force_gc_ratio(0.9), + table_evict_type(kSkipForceEvict) {} MutableDBOptions::MutableDBOptions(const DBOptions& options) : max_background_jobs(options.max_background_jobs), @@ -293,7 +293,8 @@ MutableDBOptions::MutableDBOptions(const DBOptions& options) compaction_readahead_size(options.compaction_readahead_size), zenfs_low_gc_ratio(options.zenfs_low_gc_ratio), zenfs_high_gc_ratio(options.zenfs_high_gc_ratio), - zenfs_force_gc_ratio(options.zenfs_force_gc_ratio) {} + zenfs_force_gc_ratio(options.zenfs_force_gc_ratio), + table_evict_type(options.table_evict_type) {} void MutableDBOptions::Dump(Logger* log) const { ROCKS_LOG_HEADER(log, " Options.max_background_jobs: %d", @@ -342,6 +343,8 @@ void MutableDBOptions::Dump(Logger* log) const { zenfs_high_gc_ratio); ROCKS_LOG_HEADER(log, " Options.zenfs_force_ratio: %lf", zenfs_force_gc_ratio); + ROCKS_LOG_HEADER(log, " Options.table_evict_type: %d", + static_cast(table_evict_type)); } } // namespace TERARKDB_NAMESPACE diff --git a/options/db_options.h b/options/db_options.h index caa67ab463..1cf05224c7 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -116,6 +116,7 @@ struct MutableDBOptions { double zenfs_low_gc_ratio; double zenfs_high_gc_ratio; double zenfs_force_gc_ratio; + TableEvictType table_evict_type; }; } // namespace TERARKDB_NAMESPACE diff --git a/options/options_helper.cc b/options/options_helper.cc index 93f18d9273..875cbc198c 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -107,6 +107,7 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, options.zenfs_low_gc_ratio = mutable_db_options.zenfs_low_gc_ratio; options.zenfs_high_gc_ratio = mutable_db_options.zenfs_high_gc_ratio; options.zenfs_force_gc_ratio = mutable_db_options.zenfs_force_gc_ratio; + options.table_evict_type = mutable_db_options.table_evict_type; options.random_access_max_buffer_size = immutable_db_options.random_access_max_buffer_size; options.writable_file_max_buffer_size = @@ -128,7 +129,6 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, immutable_db_options.skip_stats_update_on_db_open; options.wal_recovery_mode = immutable_db_options.wal_recovery_mode; options.allow_2pc = immutable_db_options.allow_2pc; - options.row_cache = immutable_db_options.row_cache; #ifndef ROCKSDB_LITE options.wal_filter = immutable_db_options.wal_filter; #endif // ROCKSDB_LITE @@ -552,6 +552,10 @@ bool ParseOptionHelper(char* opt_address, const OptionType& opt_type, return ParseEnum( wal_recovery_mode_string_map, value, reinterpret_cast(opt_address)); + case OptionType::kTableEviceType: + return ParseEnum( + table_evict_type_string_map, value, + reinterpret_cast(opt_address)); case OptionType::kAccessHint: return ParseEnum( access_hint_string_map, value, @@ -747,6 +751,10 @@ bool SerializeSingleOptionHelper(const char* opt_address, return SerializeEnum( wal_recovery_mode_string_map, *reinterpret_cast(opt_address), value); + case OptionType::kTableEviceType: + return SerializeEnum( + table_evict_type_string_map, + *reinterpret_cast(opt_address), value); case OptionType::kAccessHint: return SerializeEnum( access_hint_string_map, @@ -1592,6 +1600,10 @@ std::unordered_map {offsetof(struct DBOptions, compaction_readahead_size), OptionType::kSizeT, OptionVerificationType::kNormal, true, offsetof(struct MutableDBOptions, compaction_readahead_size)}}, + {"table_evict_type", + {offsetof(struct DBOptions, table_evict_type), + OptionType::kTableEviceType, OptionVerificationType::kNormal, true, + offsetof(struct MutableDBOptions, table_evict_type)}}, {"random_access_max_buffer_size", {offsetof(struct DBOptions, random_access_max_buffer_size), OptionType::kSizeT, OptionVerificationType::kNormal, false, 0}}, @@ -1793,7 +1805,11 @@ std::unordered_map {"zenfs_force_gc_ratio", {offsetof(struct DBOptions, zenfs_force_gc_ratio), OptionType::kDouble, OptionVerificationType::kNormal, true, - offsetof(struct MutableDBOptions, zenfs_force_gc_ratio)}}}; + offsetof(struct MutableDBOptions, zenfs_force_gc_ratio)}}, + {"table_evict_type", + {offsetof(struct DBOptions, table_evict_type), + OptionType::kTableEviceType, OptionVerificationType::kNormal, true, + offsetof(struct MutableDBOptions, table_evict_type)}}}; std::unordered_map OptionsHelper::block_base_table_index_type_string_map = { @@ -1839,6 +1855,12 @@ std::unordered_map {"kSkipAnyCorruptedRecords", WALRecoveryMode::kSkipAnyCorruptedRecords}}; +std::unordered_map + OptionsHelper::table_evict_type_string_map = { + {"kSkipForceEvict", TableEvictType::kSkipForceEvict}, + {"kForceEvictIfOpen", TableEvictType::kForceEvictIfOpen}, + {"kAlwaysForceEvict", TableEvictType::kAlwaysForceEvict}}; + std::unordered_map OptionsHelper::access_hint_string_map = { {"NONE", DBOptions::AccessHint::NONE}, diff --git a/options/options_helper.h b/options/options_helper.h index 05c7571c91..aecea131a0 100644 --- a/options/options_helper.h +++ b/options/options_helper.h @@ -76,6 +76,7 @@ enum class OptionType { kChecksumType, kEncodingType, kWALRecoveryMode, + kTableEviceType, kAccessHint, kInfoLogLevel, kLRUCacheOptions, @@ -184,6 +185,8 @@ struct OptionsHelper { write_buffer_flush_pri_string_map; static std::unordered_map wal_recovery_mode_string_map; + static std::unordered_map + table_evict_type_string_map; static std::unordered_map access_hint_string_map; static std::unordered_map @@ -229,6 +232,8 @@ static auto& write_buffer_flush_pri_string_map = OptionsHelper::write_buffer_flush_pri_string_map; static auto& wal_recovery_mode_string_map = OptionsHelper::wal_recovery_mode_string_map; +static auto& table_evict_type_string_map = + OptionsHelper::table_evict_type_string_map; static auto& access_hint_string_map = OptionsHelper::access_hint_string_map; static auto& info_log_level_string_map = OptionsHelper::info_log_level_string_map; diff --git a/options/options_parser.cc b/options/options_parser.cc index 37faed85a7..132593e4be 100644 --- a/options/options_parser.cc +++ b/options/options_parser.cc @@ -559,6 +559,9 @@ bool AreEqualOptions( case OptionType::kWALRecoveryMode: return (*reinterpret_cast(offset1) == *reinterpret_cast(offset2)); + case OptionType::kTableEviceType: + return (*reinterpret_cast(offset1) == + *reinterpret_cast(offset2)); case OptionType::kAccessHint: return (*reinterpret_cast(offset1) == *reinterpret_cast(offset2)); diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index b524248857..cb4789564a 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -199,7 +199,6 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { sizeof(std::shared_ptr)}, {offsetof(struct DBOptions, listeners), sizeof(std::vector>)}, - {offsetof(struct DBOptions, row_cache), sizeof(std::shared_ptr)}, {offsetof(struct DBOptions, metrics_reporter_factory), sizeof(std::shared_ptr)}, {offsetof(struct DBOptions, wal_filter), sizeof(const WalFilter*)}, @@ -253,6 +252,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "zenfs_low_gc_ratio=0.3;" "zenfs_high_gc_ratio=0.33;" "zenfs_force_gc_ratio=0.333;" + "table_evict_type=kForceEvictIfOpen;" "new_table_reader_for_compaction_inputs=false;" "keep_log_file_num=4890;" "skip_stats_update_on_db_open=false;" @@ -311,7 +311,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "avoid_unnecessary_blocking_io=false;" "zenfs_low_gc_ratio=0.25;" "zenfs_high_gc_ratio=0.6;" - "zenfs_force_gc_ratio=0.9;", + "zenfs_force_gc_ratio=0.9;" + "table_evict_type=kAlwaysForceEvict;", new_options)); ASSERT_EQ(unset_bytes_base, NumUnsetBytes(new_options_ptr, sizeof(DBOptions), @@ -415,6 +416,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { options->soft_rate_limit = 0; options->purge_redundant_kvs_while_flush = false; options->max_mem_compaction_level = 0; + options->optimize_range_deletion = false; char* new_options_ptr = new char[sizeof(ColumnFamilyOptions)]; ColumnFamilyOptions* new_options = @@ -480,7 +482,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { "max_dependence_blob_overlap=1024;" "maintainer_job_ratio=0.1;" "optimize_filters_for_hits=false;" - "optimize_range_deletion=false;" + "optimize_range_deletion=true;" "report_bg_io_stats=true;" "ttl_gc_ratio=3.000;" "ttl_max_scan_gap=1;", @@ -491,6 +493,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { kColumnFamilyOptionsBlacklist)); EXPECT_EQ(new_options->ttl_gc_ratio, 3.000); EXPECT_EQ(new_options->ttl_max_scan_gap, 1); + EXPECT_EQ(new_options->optimize_range_deletion, true); options->~ColumnFamilyOptions(); new_options->~ColumnFamilyOptions(); diff --git a/options/options_test.cc b/options/options_test.cc index b360220ef5..ec732404db 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -144,6 +144,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"zenfs_low_gc_ratio", "0.25"}, {"zenfs_high_gc_ratio", "0.5"}, {"zenfs_force_gc_ratio", "0.75"}, + {"table_evict_type", "kAlwaysForceEvict"}, {"random_access_max_buffer_size", "3145728"}, {"writable_file_max_buffer_size", "314159"}, {"bytes_per_sync", "47"}, @@ -286,6 +287,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.zenfs_low_gc_ratio, 0.25); ASSERT_EQ(new_db_opt.zenfs_high_gc_ratio, 0.5); ASSERT_EQ(new_db_opt.zenfs_force_gc_ratio, 0.75); + ASSERT_EQ(new_db_opt.table_evict_type, kAlwaysForceEvict); ASSERT_EQ(new_db_opt.random_access_max_buffer_size, 3145728); ASSERT_EQ(new_db_opt.writable_file_max_buffer_size, 314159); ASSERT_EQ(new_db_opt.bytes_per_sync, static_cast(47)); diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 3e8083eb0b..8be710519a 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -346,6 +346,58 @@ class PartitionIndexReader : public IndexReader, public Cleanable { } } + virtual void ForceEvict() override { + auto rep = table_->rep_; + IndexBlockIter biter; + Statistics* kNullStats = nullptr; + index_block_->NewIterator( + icomparator_, icomparator_->user_comparator(), &biter, kNullStats, true, + index_key_includes_seq_, index_value_is_full_); + + BlockHandle handle; + char + cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length]; + char compressed_cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + + kMaxVarint64Length]; + + uint64_t block_cache_erase_count = 0; + uint64_t block_cache_erase_failures_count = 0; + + Cache* block_cache = rep->table_options.block_cache.get(); + Cache* block_cache_compressed = + rep->immortal_table ? nullptr + : rep->table_options.block_cache_compressed.get(); + + for (biter.SeekToFirst(); biter.Valid(); biter.Next()) { + handle = biter.value(); + + if (block_cache != nullptr) { + auto key = BlockBasedTable::GetCacheKey(rep->cache_key_prefix, + rep->cache_key_prefix_size, + handle, cache_key); + + bool erased = block_cache->Erase(key); + ++block_cache_erase_count; + block_cache_erase_failures_count += !erased; + } + if (block_cache_compressed != nullptr) { + auto key = + BlockBasedTable::GetCacheKey(rep->compressed_cache_key_prefix, + rep->compressed_cache_key_prefix_size, + handle, compressed_cache_key); + + bool erased = block_cache_compressed->Erase(key); + ++block_cache_erase_count; + block_cache_erase_failures_count += !erased; + } + } + + RecordTick(rep->ioptions.statistics, BLOCK_CACHE_ERASE, + block_cache_erase_count); + RecordTick(rep->ioptions.statistics, BLOCK_CACHE_ERASE_FAILURES, + block_cache_erase_failures_count); + } + virtual size_t size() const override { return index_block_->size(); } virtual size_t usable_size() const override { return index_block_->usable_size(); @@ -2555,6 +2607,81 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, return s; } +Status BlockBasedTable::ForceEvict() { + if (!rep_->table_options.block_cache && + !rep_->table_options.block_cache_compressed) { + return Status::OK(); + } + char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; + char compressed_cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; + + // Manual inline GetCacheKey + assert(rep_->cache_key_prefix_size <= kMaxCacheKeyPrefixSize); + assert(rep_->compressed_cache_key_prefix_size <= kMaxCacheKeyPrefixSize); + + memcpy(cache_key, rep_->cache_key_prefix, rep_->cache_key_prefix_size); + memcpy(compressed_cache_key, rep_->compressed_cache_key_prefix, + rep_->compressed_cache_key_prefix_size); + + auto block_cache = rep_->table_options.block_cache.get(); + auto block_cache_compressed = + rep_->table_options.block_cache_compressed.get(); + auto cache_key_prefix_size = rep_->cache_key_prefix_size; + auto compressed_cache_key_prefix_size = + rep_->compressed_cache_key_prefix_size; + + uint64_t block_cache_erase_count = 0; + uint64_t block_cache_erase_failures_count = 0; + + auto do_evict = [=, &cache_key, &compressed_cache_key, + &block_cache_erase_count, &block_cache_erase_failures_count]( + uint64_t offset, bool with_block_cache_compressed) { + if (block_cache != nullptr) { + char* end = EncodeVarint64(cache_key + cache_key_prefix_size, offset); + bool erased = block_cache->Erase( + Slice(cache_key, static_cast(end - cache_key))); + ++block_cache_erase_count; + block_cache_erase_failures_count += !erased; + } + if (with_block_cache_compressed && block_cache_compressed != nullptr) { + char* end = EncodeVarint64( + compressed_cache_key + compressed_cache_key_prefix_size, offset); + bool erased = block_cache_compressed->Erase( + Slice(compressed_cache_key, + static_cast(end - compressed_cache_key))); + ++block_cache_erase_count; + block_cache_erase_failures_count += !erased; + } + }; + + ReadOptions read_options; + read_options.fill_cache = false; + std::unique_ptr> iiter( + NewIndexIterator(read_options)); + + // DataBlock + for (iiter->SeekToFirst(); iiter->Valid(); iiter->Next()) { + do_evict(iiter->value().offset(), true); + } + + iiter.reset(); + + // FilterBlock is evicted in BlockBasedTable::Close() + + // IndexBlock + if (rep_->index_reader != nullptr) { + rep_->index_reader->ForceEvict(); + } + do_evict(rep_->dummy_index_reader_offset, false); + + RecordTick(rep_->ioptions.statistics, BLOCK_CACHE_ERASE, + block_cache_erase_count); + RecordTick(rep_->ioptions.statistics, BLOCK_CACHE_ERASE_FAILURES, + block_cache_erase_failures_count); + + return Status::OK(); +} + Status BlockBasedTable::Prefetch(const Slice* const begin, const Slice* const end) { auto& comparator = rep_->internal_comparator; @@ -3066,15 +3193,25 @@ void BlockBasedTable::Close() { // cleanup index and filter blocks to avoid accessing dangling pointer if (!rep_->table_options.no_block_cache) { char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; + uint64_t erased_count = 0; + // Get the filter block key auto key = GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size, rep_->filter_handle, cache_key); - rep_->table_options.block_cache.get()->Erase(key); + erased_count += rep_->table_options.block_cache.get()->Erase(key); // Get the index block key key = GetCacheKeyFromOffset(rep_->cache_key_prefix, rep_->cache_key_prefix_size, rep_->dummy_index_reader_offset, cache_key); - rep_->table_options.block_cache.get()->Erase(key); + erased_count += rep_->table_options.block_cache.get()->Erase(key); + + auto life_cycle = rep_->cf_life_cycle.lock(); + + if (life_cycle) { + RecordTick(rep_->ioptions.statistics, BLOCK_CACHE_ERASE, 2); + RecordTick(rep_->ioptions.statistics, BLOCK_CACHE_ERASE_FAILURES, + 2 - erased_count); + } } rep_->closed = true; } diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 843aa6f79a..e78fd040f7 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -124,6 +124,8 @@ class BlockBasedTable : public TableReader { GetContext* get_context, const SliceTransform* prefix_extractor, bool skip_filters = false) override; + Status ForceEvict() override; + // Pre-fetch the disk blocks that correspond to the key range specified by // (kbegin, kend). The call will return error status in the event of // IO or iteration error. @@ -200,6 +202,8 @@ class BlockBasedTable : public TableReader { virtual void CacheDependencies(bool /* unused */) {} + virtual void ForceEvict() {} + // Prefetch all the blocks referenced by this index to the buffer void PrefetchBlocks(FilePrefetchBuffer* buf); @@ -441,6 +445,7 @@ struct BlockBasedTable::Rep { const InternalKeyComparator& _internal_comparator, bool skip_filters, int _level, const bool _immortal_table) : ioptions(_ioptions), + cf_life_cycle(_ioptions.life_cycle), env_options(_env_options), table_options(_table_opt), filter_policy(skip_filters ? nullptr : _table_opt.filter_policy.get()), @@ -456,6 +461,7 @@ struct BlockBasedTable::Rep { immortal_table(_immortal_table) {} const ImmutableCFOptions& ioptions; + std::weak_ptr cf_life_cycle; const EnvOptions& env_options; const BlockBasedTableOptions table_options; const FilterPolicy* const filter_policy; diff --git a/table/partitioned_filter_block.cc b/table/partitioned_filter_block.cc index b539daf6f5..9378825775 100644 --- a/table/partitioned_filter_block.cc +++ b/table/partitioned_filter_block.cc @@ -148,6 +148,10 @@ PartitionedFilterBlockReader::~PartitionedFilterBlockReader() { IndexBlockIter biter; BlockHandle handle; Statistics* kNullStats = nullptr; + + uint64_t block_cache_erase_count = 0; + uint64_t block_cache_erase_failures_count = 0; + idx_on_fltr_blk_->NewIterator( &comparator_, comparator_.user_comparator(), &biter, kNullStats, true, index_key_includes_seq_, index_value_is_full_); @@ -157,8 +161,13 @@ PartitionedFilterBlockReader::~PartitionedFilterBlockReader() { auto key = BlockBasedTable::GetCacheKey(table_->rep_->cache_key_prefix, table_->rep_->cache_key_prefix_size, handle, cache_key); - block_cache->Erase(key); + bool erased = block_cache->Erase(key); + ++block_cache_erase_count; + block_cache_erase_failures_count += !erased; } + RecordTick(statistics(), BLOCK_CACHE_ERASE, block_cache_erase_count); + RecordTick(statistics(), BLOCK_CACHE_ERASE_FAILURES, + block_cache_erase_failures_count); } bool PartitionedFilterBlockReader::KeyMayMatch( diff --git a/table/table_reader.h b/table/table_reader.h index 93e33172cc..9ed6bb25f8 100644 --- a/table/table_reader.h +++ b/table/table_reader.h @@ -99,6 +99,10 @@ class TableReader { bool (*callback_func)(void* arg, const Slice& key, LazyBuffer&& value)); + // Evict all possible block keys no matter if it's in BlockCache + // NOTICE: this call allows doing ineffective evict + virtual Status ForceEvict() { return Status::NotSupported(); } + // Prefetch data corresponding to a give range of keys // Typically this functionality is required for table implementations that // persists the data on a non volatile storage medium like disk/SSD diff --git a/table/table_test.cc b/table/table_test.cc index 1b2fb345e5..3639dea72d 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -224,7 +224,7 @@ class BlockConstructor : public Constructor { block_ = nullptr; BlockBuilder builder(table_options.block_restart_interval); - for (const auto kv : kv_map) { + for (const auto& kv : kv_map) { builder.Add(kv.first, kv.second); } // Open the block @@ -282,7 +282,7 @@ class TableConstructor : public Constructor { TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, file_writer_.get())); - for (const auto kv : kv_map) { + for (const auto& kv : kv_map) { if (convert_to_internal_key_) { ParsedInternalKey ikey(kv.first, kMaxSequenceNumber, kTypeValue); std::string encoded; @@ -409,7 +409,7 @@ class MemTableConstructor : public Constructor { kMaxSequenceNumber, 0 /* column_family_id */); memtable_->Ref(); int seq = 1; - for (const auto kv : kv_map) { + for (const auto& kv : kv_map) { memtable_->Add(seq, kTypeValue, kv.first, kv.second); seq++; } @@ -451,7 +451,7 @@ class DBConstructor : public Constructor { delete db_; db_ = nullptr; NewDB(); - for (const auto kv : kv_map) { + for (const auto& kv : kv_map) { WriteBatch batch; batch.Put(kv.first, kv.second); EXPECT_TRUE(db_->Write(WriteOptions(), &batch).ok()); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index ac7eca0544..8e1ee4be16 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -510,10 +510,6 @@ DEFINE_double(data_block_hash_table_util_ratio, 0.75, DEFINE_int64(compressed_cache_size, -1, "Number of bytes to use as a cache of compressed data."); -DEFINE_int64(row_cache_size, 0, - "Number of bytes to use as a cache of individual rows" - " (0 = disabled)."); - DEFINE_int32(open_files, TERARKDB_NAMESPACE::Options().max_open_files, "Maximum number of files to keep open at the same time" " (use default if == 0)"); @@ -528,6 +524,10 @@ DEFINE_bool(new_table_reader_for_compaction_inputs, true, DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size"); +DEFINE_int32(table_evict_type, 0, + "Table evict type" + "(0:SkipForceEvict, 1:ForceEvictIfOpen, 2:AlwaysForceEvict)"); + DEFINE_int32(random_access_max_buffer_size, 1024 * 1024, "Maximum windows randomaccess buffer size"); @@ -733,7 +733,6 @@ DEFINE_string( "that are related to RocksDB options will be ignored:\n" "\t--use_existing_db\n" "\t--statistics\n" - "\t--row_cache_size\n" "\t--row_cache_numshardbits\n" "\t--enable_io_prio\n" "\t--dump_malloc_stats\n" @@ -3299,6 +3298,8 @@ class Benchmark { options.new_table_reader_for_compaction_inputs = FLAGS_new_table_reader_for_compaction_inputs; options.compaction_readahead_size = FLAGS_compaction_readahead_size; + options.table_evict_type = + static_cast(FLAGS_table_evict_type); options.random_access_max_buffer_size = FLAGS_random_access_max_buffer_size; options.writable_file_max_buffer_size = FLAGS_writable_file_max_buffer_size; options.use_fsync = FLAGS_use_fsync; @@ -3663,14 +3664,6 @@ class Benchmark { FLAGS_bloom_bits, FLAGS_use_block_based_filter)); } } - if (FLAGS_row_cache_size) { - if (FLAGS_cache_numshardbits >= 1) { - options.row_cache = - NewLRUCache(FLAGS_row_cache_size, FLAGS_cache_numshardbits); - } else { - options.row_cache = NewLRUCache(FLAGS_row_cache_size); - } - } if (FLAGS_enable_io_prio) { FLAGS_env->LowerThreadPoolIOPriority(Env::LOW); FLAGS_env->LowerThreadPoolIOPriority(Env::HIGH); diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 88c3a2686e..241f27ccfa 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -950,7 +950,7 @@ void DumpManifestFile(std::string file, bool verbose, bool hex, bool json) { WriteBufferManager wb(options.db_write_buffer_size); ImmutableDBOptions immutable_db_options(options); const bool seq_per_batch = false; - VersionSet versions(dbname, &immutable_db_options, sopt, seq_per_batch, + VersionSet versions(dbname, &immutable_db_options, &sopt, seq_per_batch, tc.get(), &wb, &wc); Status s = versions.DumpManifest(options, file, verbose, hex, json); if (!s.ok()) { @@ -1653,7 +1653,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt, int* levels) { WriteController wc(opt.delayed_write_rate); WriteBufferManager wb(opt.db_write_buffer_size); const bool seq_per_batch = false; - VersionSet versions(db_path_, &db_options, soptions, seq_per_batch, tc.get(), + VersionSet versions(db_path_, &db_options, &soptions, seq_per_batch, tc.get(), &wb, &wc); std::vector dummy; ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName, diff --git a/tools/ldb_cmd_test.cc b/tools/ldb_cmd_test.cc index 88a236ff27..d3ad695945 100644 --- a/tools/ldb_cmd_test.cc +++ b/tools/ldb_cmd_test.cc @@ -6,11 +6,12 @@ #ifndef ROCKSDB_LITE #include "rocksdb/utilities/ldb_cmd.h" + #include "util/testharness.h" +using std::map; using std::string; using std::vector; -using std::map; class LdbCmdTest : public testing::Test {}; @@ -26,7 +27,7 @@ TEST_F(LdbCmdTest, HexToString) { auto actual = TERARKDB_NAMESPACE::LDBCommand::HexToString(inPair.first); auto expected = inPair.second; for (unsigned int i = 0; i < actual.length(); i++) { - EXPECT_EQ(expected[i], static_cast((signed char) actual[i])); + EXPECT_EQ(expected[i], static_cast((signed char)actual[i])); } auto reverse = TERARKDB_NAMESPACE::LDBCommand::StringToHex(actual); EXPECT_STRCASEEQ(inPair.first.c_str(), reverse.c_str()); @@ -37,7 +38,7 @@ TEST_F(LdbCmdTest, HexToStringBadInputs) { const vector badInputs = { "0xZZ", "123", "0xx5", "0x111G", "0x123", "Ox12", "0xT", "0x1Q1", }; - for (const auto badInput : badInputs) { + for (const auto& badInput : badInputs) { try { TERARKDB_NAMESPACE::LDBCommand::HexToString(badInput); std::cerr << "Should fail on bad hex value: " << badInput << "\n"; diff --git a/utilities/memory/memory_test.cc b/utilities/memory/memory_test.cc index cf33f79ffb..c718a8238f 100644 --- a/utilities/memory/memory_test.cc +++ b/utilities/memory/memory_test.cc @@ -65,9 +65,6 @@ class MemoryTest : public testing::Test { cache_set->insert(db_impl->TEST_table_cache()); } - // Cache from DBOptions - cache_set->insert(db->GetDBOptions().row_cache.get()); - // Cache from table factories std::unordered_map iopts_map; if (db_impl != nullptr) { diff --git a/utilities/simulator_cache/sim_cache.cc b/utilities/simulator_cache/sim_cache.cc index d571ba4337..a757f27bb4 100644 --- a/utilities/simulator_cache/sim_cache.cc +++ b/utilities/simulator_cache/sim_cache.cc @@ -216,9 +216,10 @@ class SimCacheImpl : public SimCache { return cache_->Release(handle, force_erase); } - virtual void Erase(const Slice& key) override { - cache_->Erase(key); + virtual bool Erase(const Slice& key) override { + bool ret = cache_->Erase(key); key_only_cache_->Erase(key); + return ret; } virtual void* Value(Handle* handle) override { return cache_->Value(handle); }