From 258a8e004f4ac6d7948181226acb1f0d61a93a1b Mon Sep 17 00:00:00 2001 From: wangyi Date: Wed, 7 Jul 2021 11:23:06 +0800 Subject: [PATCH 01/11] 1. add manifestrollback command, don't need open db. 2. add kv_combine flags to compactor command 3. in class version_edit, we add rollback_ member as a switch to control encode process, and we check the sst' status, if we open the rollback_ switch, we will check the file's prop, like purpose, dependence, inheritance. 4. in class version_set, we add a public method manifestRollback, and we add a rollback param to the WriteSnapshot method. --- db/version_edit.cc | 18 ++++++++- db/version_edit.h | 7 +++- db/version_set.cc | 35 ++++++++++++++++- db/version_set.h | 3 +- include/rocksdb/utilities/ldb_cmd.h | 1 + tools/ldb_cmd.cc | 59 ++++++++++++++++++++++++++++- tools/ldb_cmd_impl.h | 15 ++++++++ 7 files changed, 133 insertions(+), 5 deletions(-) diff --git a/db/version_edit.cc b/db/version_edit.cc index 1b8bfb402d..96f2071c73 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -184,7 +184,13 @@ bool VersionEdit::EncodeTo(std::string* dst) const { PutLengthPrefixedSlice(dst, varint_log_number); min_log_num_written = true; } - if (true) { + if (rollback_) { + if (!f.prop.is_essense_sst() || f.prop.dependence.size() != 0 || + f.prop.inheritance.size() != 0) { + return false; + } + } + if (!rollback_) { PutVarint32(dst, CustomTag::kPropertyCache); std::string encode_property_cache; encode_property_cache.push_back((char)f.prop.purpose); @@ -666,6 +672,16 @@ std::string VersionEdit::DebugString(bool hex_key) const { r.append(" .. "); r.append(f.largest.DebugString(hex_key)); } + if(rollback_){ + r.append("\n Rollback Info: "); + for (size_t i = 0; i < new_files_.size(); i++) { + const FileMetaData& f = new_files_[i].second; + if (!f.prop.is_essense_sst() || f.prop.dependence.size() != 0 || + f.prop.inheritance.size() != 0){ + r.append("%d .. ",f.fd.GetNumber()); + } + } + } r.append("\n ColumnFamily: "); AppendNumberTo(&r, column_family_); if (is_column_family_add_) { diff --git a/db/version_edit.h b/db/version_edit.h index 0078e5fe34..890df42167 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -100,6 +100,7 @@ struct TablePropertyCache { uint64_t latest_time_end_compact = port::kMaxUint64; bool is_map_sst() const { return purpose == kMapSst; } + bool is_essense_sst() const { return purpose == kEssenceSst; } bool has_range_deletions() const { return (flags & kNoRangeDeletions) == 0; } bool map_handle_range_deletions() const { return (flags & kMapHandleRangeDeletions) != 0; @@ -389,7 +390,9 @@ class VersionEdit { std::string DebugString(bool hex_key = false) const; std::string DebugJSON(int edit_num, bool hex_key = false) const; - + void setRollback(const bool rollback = true){ + rollback_ = rollback; + } private: friend class VersionSet; friend class Version; @@ -432,6 +435,8 @@ class VersionEdit { bool is_open_db_; bool is_in_atomic_group_; uint32_t remaining_entries_; + + bool rollback_ = false; }; } // namespace TERARKDB_NAMESPACE diff --git a/db/version_set.cc b/db/version_set.cc index 73dcb419b5..50291544f4 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2992,7 +2992,34 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, v->prev_->next_ = v; v->next_->prev_ = v; } +Status VersionSet::ManifestRollback() { + Status s; + pending_manifest_file_number_ = NewFileNumber(); + std::string descriptor_fname = + DescriptorFileName(dbname_, pending_manifest_file_number_); + std::unique_ptr descriptor_file; + EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_); + s = NewWritableFile(env_, descriptor_fname, &descriptor_file, + opt_env_opts); + if (s.ok()) { + descriptor_file->SetPreallocationBlockSize( + db_options_->manifest_preallocation_size); + + std::unique_ptr file_writer(new WritableFileWriter( + std::move(descriptor_file), descriptor_fname, opt_env_opts, nullptr, + db_options_->listeners)); + descriptor_log_.reset( + new log::Writer(std::move(file_writer), 0, false)); + s = WriteSnapshot(descriptor_log_.get(), true); + } + if (s.ok()) { + s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_,nullptr); + } + + + return s; +} Status VersionSet::ProcessManifestWrites(std::deque& writers, InstrumentedMutex* mu, Directory* db_directory, @@ -4339,7 +4366,7 @@ void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) { } } -Status VersionSet::WriteSnapshot(log::Writer* log) { +Status VersionSet::WriteSnapshot(log::Writer* log, const bool rollback) { // TODO: Break up into multiple records to reduce memory usage on recovery? // WARNING: This method doesn't hold a mutex!! @@ -4348,6 +4375,9 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { // LogAndApply. Column family manipulations can only happen within LogAndApply // (the same single thread), so we're safe to iterate. for (auto cfd : *column_family_set_) { + if(rollback && cfd->current()->storage_info()->LevelFiles(-1).size() != 0){ + return Status::Corruption("cfd: %s's level -1 is not null", cfd->GetName()); + } if (cfd->IsDropped()) { continue; } @@ -4377,6 +4407,9 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { { // Save files VersionEdit edit; + if(rollback) { + edit.setRollback(); + } edit.SetColumnFamily(cfd->GetID()); for (int level = -1; level < cfd->NumberLevels(); level++) { diff --git a/db/version_set.h b/db/version_set.h index 46e2914f9e..d968c37f91 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -927,6 +927,7 @@ class VersionSet { // are not opened Status Recover(const std::vector& column_families, bool read_only = false); + Status ManifestRollback(); // Reads a manifest file and returns a list of column families in // column_families. @@ -1128,7 +1129,7 @@ class VersionSet { const Slice& key); // Save current contents to *log - Status WriteSnapshot(log::Writer* log); + Status WriteSnapshot(log::Writer* log, const bool rollback = false); void AppendVersion(ColumnFamilyData* column_family_data, Version* v); diff --git a/include/rocksdb/utilities/ldb_cmd.h b/include/rocksdb/utilities/ldb_cmd.h index 4edfd73241..f452249086 100644 --- a/include/rocksdb/utilities/ldb_cmd.h +++ b/include/rocksdb/utilities/ldb_cmd.h @@ -57,6 +57,7 @@ class LDBCommand { static const std::string ARG_FILE_SIZE; static const std::string ARG_CREATE_IF_MISSING; static const std::string ARG_NO_VALUE; + static const std::string ARG_REBUILD; struct ParsedParams { std::string cmd; diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index a41fb18fdc..06010c0d4c 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -81,6 +81,7 @@ const std::string LDBCommand::ARG_WRITE_BUFFER_SIZE = "write_buffer_size"; const std::string LDBCommand::ARG_FILE_SIZE = "file_size"; const std::string LDBCommand::ARG_CREATE_IF_MISSING = "create_if_missing"; const std::string LDBCommand::ARG_NO_VALUE = "no_value"; +const std::string LDBCommand::ARG_REBUILD = "kv_combine"; const char* LDBCommand::DELIM = " ==> "; @@ -193,6 +194,9 @@ LDBCommand* LDBCommand::SelectCommand(const ParsedParams& parsed_params) { } else if (parsed_params.cmd == CompactorCommand::Name()) { return new CompactorCommand(parsed_params.cmd_params, parsed_params.option_map, parsed_params.flags); + } else if (parsed_params.cmd == ManifestRollbackCommand::Name()) { + return new ManifestRollbackCommand(parsed_params.cmd_params, + parsed_params.option_map, parsed_params.flags); } else if (parsed_params.cmd == WALDumperCommand::Name()) { return new WALDumperCommand(parsed_params.cmd_params, parsed_params.option_map, parsed_params.flags); @@ -792,7 +796,7 @@ CompactorCommand::CompactorCommand( const std::vector& flags) : LDBCommand(options, flags, false, BuildCmdLineOptions({ARG_FROM, ARG_TO, ARG_HEX, ARG_KEY_HEX, - ARG_VALUE_HEX, ARG_TTL})), + ARG_VALUE_HEX, ARG_TTL, ARG_REBUILD})), null_from_(true), null_to_(true) { std::map::const_iterator itr = @@ -816,6 +820,11 @@ CompactorCommand::CompactorCommand( to_ = HexToString(to_); } } + if(IsFlagPresent(flags, ARG_REBUILD)){ + printf("compact with kv_combine \n"); + separation_type = kCompactionCombineValue; + } + } void CompactorCommand::Help(std::string& ret) { @@ -841,6 +850,9 @@ void CompactorCommand::DoCommand() { } CompactRangeOptions cro; + if(separation_type == kCompactionCombineValue){ + cro.separation_type = kCompactionCombineValue; + } cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; db_->CompactRange(cro, GetCfHandle(), begin, end); @@ -849,6 +861,51 @@ void CompactorCommand::DoCommand() { delete begin; delete end; } +ManifestRollbackCommand::ManifestRollbackCommand( + const std::vector& /*params*/, + const std::map& options, + const std::vector& flags) + : LDBCommand(options, flags, true, BuildCmdLineOptions({})) {} + +void ManifestRollbackCommand::Help(std::string& ret) { + ret.append(" "); + ret.append(ManifestRollbackCommand::Name()); + ret.append("\n"); +} +void ManifestRollbackCommand::DoCommand() { + Options options = PrepareOptionsForOpenDB(); + + if (options_.db_paths.empty()) { + // `VersionSet` expects options that have been through `SanitizeOptions()`, + // which would sanitize an empty `db_paths`. + options_.db_paths.emplace_back(db_path_, 0 /* target_size */); + } + + WriteController wc(options_.delayed_write_rate); + WriteBufferManager wb(options_.db_write_buffer_size); + + std::shared_ptr tc( + NewLRUCache(1 << 20 /* capacity */, options_.table_cache_numshardbits)); + EnvOptions sopt; + const bool seq_per_batch = false; + ImmutableDBOptions immutable_db_options(options); + VersionSet versions(db_path_, &immutable_db_options, sopt, seq_per_batch, tc.get(), &wb, &wc); + Status s = LoadLatestOptions(db_path_, Env::Default(), &options_, + &column_families_, ignore_unknown_options_); + if (!s.ok()) { + printf("LoadLatestOptions Error %s\n",s.ToString().c_str()); + } + versions.Recover(column_families_); + if (!s.ok()) { + printf("Error in Recover DB %s\n",s.ToString().c_str()); + } + s = versions.ManifestRollback(); + if (!s.ok()) { + printf("Error in Manifest Rollback %s\n",s.ToString().c_str()); + } + + +} // ---------------------------------------------------------------------------- diff --git a/tools/ldb_cmd_impl.h b/tools/ldb_cmd_impl.h index 20d6d00134..ea9a8776ed 100644 --- a/tools/ldb_cmd_impl.h +++ b/tools/ldb_cmd_impl.h @@ -32,6 +32,21 @@ class CompactorCommand : public LDBCommand { std::string from_; bool null_to_; std::string to_; + SeparationType separation_type; +}; +// Command that removes the SST file forcibly from the manifest. +class ManifestRollbackCommand : public LDBCommand { + public: + static std::string Name() { return "manifest_rollback"; } + + ManifestRollbackCommand(const std::vector& params, + const std::map& options, + const std::vector& flags); + + static void Help(std::string& ret); + + virtual void DoCommand() override; + virtual bool NoDBOpen() override { return true; } }; class DBFileDumperCommand : public LDBCommand { From 085fa2bd77bd6227387d6cd8fca3da5ff68f8c0c Mon Sep 17 00:00:00 2001 From: wangyi Date: Thu, 8 Jul 2021 10:38:08 +0800 Subject: [PATCH 02/11] 1. add Next File Number edit record --- db/version_set.cc | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/db/version_set.cc b/db/version_set.cc index 50291544f4..e36103ca7d 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3012,6 +3012,29 @@ Status VersionSet::ManifestRollback() { new log::Writer(std::move(file_writer), 0, false)); s = WriteSnapshot(descriptor_log_.get(), true); } + do { + if(s.ok()) { + VersionEdit e; + e.SetNextFile(next_file_number_.load()); + std::string record; + if (!e.EncodeTo(&record)) { + s = Status::Corruption("Unable to encode VersionEdit:" + + e.DebugString(true)); + } + s = descriptor_log_->AddRecord(record); + if (!s.ok()) { + break; + } + if (s.ok()) { + s = SyncManifest(env_, db_options_, descriptor_log_->file()); + } + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n", + s.ToString().c_str()); + } + } + }while(false); + if (s.ok()) { s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_,nullptr); } From abf5d2d3890ed6036b9d9d97a53ca683f521d02c Mon Sep 17 00:00:00 2001 From: wangyi Date: Thu, 8 Jul 2021 14:55:53 +0800 Subject: [PATCH 03/11] 1. fix recover failed capture --- tools/ldb_cmd.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 06010c0d4c..fda985d993 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -895,9 +895,10 @@ void ManifestRollbackCommand::DoCommand() { if (!s.ok()) { printf("LoadLatestOptions Error %s\n",s.ToString().c_str()); } - versions.Recover(column_families_); + s = versions.Recover(column_families_); if (!s.ok()) { printf("Error in Recover DB %s\n",s.ToString().c_str()); + return; } s = versions.ManifestRollback(); if (!s.ok()) { From a3720807024f5eb2e87c77a4e15ce9e37d4bed61 Mon Sep 17 00:00:00 2001 From: wangyi Date: Thu, 8 Jul 2021 14:58:32 +0800 Subject: [PATCH 04/11] 1. fix descriptor no largest-sequence-number entry --- db/version_set.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/db/version_set.cc b/db/version_set.cc index e36103ca7d..a510054fc7 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3016,6 +3016,7 @@ Status VersionSet::ManifestRollback() { if(s.ok()) { VersionEdit e; e.SetNextFile(next_file_number_.load()); + LogAndApplyCFHelper(&e); std::string record; if (!e.EncodeTo(&record)) { s = Status::Corruption("Unable to encode VersionEdit:" + From 51bcc46bdd4e5b2ce26a067770c4e856d8295c75 Mon Sep 17 00:00:00 2001 From: wangyi Date: Thu, 8 Jul 2021 15:27:20 +0800 Subject: [PATCH 05/11] 1. fix descriptor no largest-sequence-number entry v2 --- db/version_set.cc | 6 ++++-- db/version_set.h | 2 +- tools/ldb_cmd.cc | 3 ++- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/db/version_set.cc b/db/version_set.cc index a510054fc7..55a9df9b00 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2992,7 +2992,7 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, v->prev_->next_ = v; v->next_->prev_ = v; } -Status VersionSet::ManifestRollback() { +Status VersionSet::ManifestRollback(InstrumentedMutex* mu) { Status s; pending_manifest_file_number_ = NewFileNumber(); std::string descriptor_fname = @@ -3015,7 +3015,9 @@ Status VersionSet::ManifestRollback() { do { if(s.ok()) { VersionEdit e; - e.SetNextFile(next_file_number_.load()); + for(auto cfd : *column_family_set_) { + LogAndApplyHelper(cfd, nullptr, cfd->current(), &e, mu); + } LogAndApplyCFHelper(&e); std::string record; if (!e.EncodeTo(&record)) { diff --git a/db/version_set.h b/db/version_set.h index d968c37f91..f862e5adef 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -927,7 +927,7 @@ class VersionSet { // are not opened Status Recover(const std::vector& column_families, bool read_only = false); - Status ManifestRollback(); + Status ManifestRollback(InstrumentedMutex* mu); // Reads a manifest file and returns a list of column families in // column_families. diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index fda985d993..08f4db1129 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -900,7 +900,8 @@ void ManifestRollbackCommand::DoCommand() { printf("Error in Recover DB %s\n",s.ToString().c_str()); return; } - s = versions.ManifestRollback(); + InstrumentedMutex mutex; + s = versions.ManifestRollback(&mutex); if (!s.ok()) { printf("Error in Manifest Rollback %s\n",s.ToString().c_str()); } From fa1116d4645e4b38e922b4554c05f85e875a16de Mon Sep 17 00:00:00 2001 From: wangyi Date: Thu, 8 Jul 2021 15:33:49 +0800 Subject: [PATCH 06/11] 1. fix descriptor no largest-sequence-number entry v3 --- db/version_set.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/db/version_set.cc b/db/version_set.cc index 55a9df9b00..97dbbab3bb 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3016,9 +3016,8 @@ Status VersionSet::ManifestRollback(InstrumentedMutex* mu) { if(s.ok()) { VersionEdit e; for(auto cfd : *column_family_set_) { - LogAndApplyHelper(cfd, nullptr, cfd->current(), &e, mu); + LogAndApplyHelper(cfd, nullptr, cfd->current(), &e, mu, false); } - LogAndApplyCFHelper(&e); std::string record; if (!e.EncodeTo(&record)) { s = Status::Corruption("Unable to encode VersionEdit:" + From 64a7b777d76ffc90b19958e93b237068d60e6259 Mon Sep 17 00:00:00 2001 From: wangyi Date: Tue, 13 Jul 2021 16:05:19 +0800 Subject: [PATCH 07/11] 1. add KvCombineCommand, we use compactFile to 2. struct SstFileMetaData add terarkdb_file member, the value is true Means the sst is a terarkdb file instead of rocksdb file --- db/version_set.cc | 3 +++ include/rocksdb/metadata.h | 2 ++ tools/ldb_cmd.cc | 35 ++++++++++++++++++++++++++++++++++- tools/ldb_cmd_impl.h | 15 +++++++++++++++ 4 files changed, 54 insertions(+), 1 deletion(-) diff --git a/db/version_set.cc b/db/version_set.cc index 97dbbab3bb..ec3ce8e2fa 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -4817,6 +4817,9 @@ void VersionSet::GetLiveFilesMetaData(std::vector* metadata) { filemetadata.being_compacted = file->being_compacted; filemetadata.num_entries = file->prop.num_entries; filemetadata.num_deletions = file->prop.num_deletions; + if(file->prop.dependence.size() != 0 || file->prop.inheritance.size() !=0 || !file->prop.is_essense_sst()) { + filemetadata.terarkdb_file = true; + } metadata->push_back(filemetadata); } } diff --git a/include/rocksdb/metadata.h b/include/rocksdb/metadata.h index 3977b07a28..7a8a08c75f 100644 --- a/include/rocksdb/metadata.h +++ b/include/rocksdb/metadata.h @@ -101,6 +101,8 @@ struct SstFileMetaData { uint64_t num_entries; uint64_t num_deletions; + // true Means a terarkdb sst file instead of rocksdb file + bool terarkdb_file; }; // The full set of metadata associated with each SST file. diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 08f4db1129..1e7fb1526a 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -194,7 +194,10 @@ LDBCommand* LDBCommand::SelectCommand(const ParsedParams& parsed_params) { } else if (parsed_params.cmd == CompactorCommand::Name()) { return new CompactorCommand(parsed_params.cmd_params, parsed_params.option_map, parsed_params.flags); - } else if (parsed_params.cmd == ManifestRollbackCommand::Name()) { + } else if (parsed_params.cmd == KvCombineCommand::Name()) { + return new KvCombineCommand(parsed_params.cmd_params, + parsed_params.option_map, parsed_params.flags); + }else if (parsed_params.cmd == ManifestRollbackCommand::Name()) { return new ManifestRollbackCommand(parsed_params.cmd_params, parsed_params.option_map, parsed_params.flags); } else if (parsed_params.cmd == WALDumperCommand::Name()) { @@ -861,6 +864,36 @@ void CompactorCommand::DoCommand() { delete begin; delete end; } +KvCombineCommand::KvCombineCommand( + const std::vector& /*params*/, + const std::map& options, + const std::vector& flags) + : LDBCommand(options, flags, false,BuildCmdLineOptions({})){ +} +void KvCombineCommand::Help(std::string& ret) { + ret.append(" "); + ret.append(KvCombineCommand::Name()); + ret.append("\n"); +} +void KvCombineCommand::DoCommand() { + // rate_limiter_bytes_per_sec + CompactionOptions cfo; + + cfo.separation_type = kCompactionCombineValue; + // we should skip the last level file that kv already combine + ColumnFamilyHandle* cfh = GetCfHandle(); + std::vector LiveFiles; + db_->GetLiveFilesMetaData(&LiveFiles); + for(auto& file : LiveFiles){ + if(file.terarkdb_file){ + std::vector inputs = {file.name}; + std::cout <<"cf:" << file.column_family_name << " level " << file.level << " compact file " <CompactFiles(cfo,inputs,file.level); + } + } + std::cout << "kv_combine finish!" << std::endl; + +} ManifestRollbackCommand::ManifestRollbackCommand( const std::vector& /*params*/, const std::map& options, diff --git a/tools/ldb_cmd_impl.h b/tools/ldb_cmd_impl.h index ea9a8776ed..1ea9849f96 100644 --- a/tools/ldb_cmd_impl.h +++ b/tools/ldb_cmd_impl.h @@ -34,6 +34,21 @@ class CompactorCommand : public LDBCommand { std::string to_; SeparationType separation_type; }; + +class KvCombineCommand : public LDBCommand { + public: + static std::string Name() { return "combine"; } + + KvCombineCommand(const std::vector& params, + const std::map& options, + const std::vector& flags); + + static void Help(std::string& ret); + + virtual void DoCommand() override; + +}; + // Command that removes the SST file forcibly from the manifest. class ManifestRollbackCommand : public LDBCommand { public: From ebe8bc343573588466e285b4a7e2f54ad8a12523 Mon Sep 17 00:00:00 2001 From: wangyi Date: Wed, 14 Jul 2021 21:07:59 +0800 Subject: [PATCH 08/11] 1. KvCombineCommand add parallel param and support threadpool to parallel compactFile 2. fix level0 FilesRangeOverlapWithCompaction issue --- db/compaction_picker.cc | 4 +- db/version_set.cc | 2 +- include/rocksdb/metadata.h | 8 ++- include/rocksdb/utilities/ldb_cmd.h | 1 + tools/ldb_cmd.cc | 94 +++++++++++++++++++++-------- tools/ldb_cmd_impl.h | 2 + 6 files changed, 80 insertions(+), 31 deletions(-) diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index e01c7669b0..8d1db0721a 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -556,7 +556,7 @@ Compaction* CompactionPicker::CompactFiles( // This compaction output should not overlap with a running compaction as // `SanitizeCompactionInputFiles` should've checked earlier and db mutex // shouldn't have been released since. - assert(!FilesRangeOverlapWithCompaction(input_files, output_level)); + assert(output_level == 0 || !FilesRangeOverlapWithCompaction(input_files, output_level)); CompressionType compression_type; if (compact_options.compression == kDisableCompressionOption) { @@ -1591,7 +1591,7 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels( } } } - if (RangeOverlapWithCompaction(smallestkey, largestkey, output_level)) { + if (output_level != 0 && RangeOverlapWithCompaction(smallestkey, largestkey, output_level)) { return Status::Aborted( "A running compaction is writing to the same output level in an " "overlapping key range"); diff --git a/db/version_set.cc b/db/version_set.cc index ec3ce8e2fa..256ceb92fb 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -4818,7 +4818,7 @@ void VersionSet::GetLiveFilesMetaData(std::vector* metadata) { filemetadata.num_entries = file->prop.num_entries; filemetadata.num_deletions = file->prop.num_deletions; if(file->prop.dependence.size() != 0 || file->prop.inheritance.size() !=0 || !file->prop.is_essense_sst()) { - filemetadata.terarkdb_file = true; + filemetadata.non_origin_file = true; } metadata->push_back(filemetadata); } diff --git a/include/rocksdb/metadata.h b/include/rocksdb/metadata.h index 7a8a08c75f..70fb990e3b 100644 --- a/include/rocksdb/metadata.h +++ b/include/rocksdb/metadata.h @@ -65,7 +65,8 @@ struct SstFileMetaData { num_reads_sampled(0), being_compacted(false), num_entries(0), - num_deletions(0) {} + num_deletions(0), + non_origin_file(false) {} SstFileMetaData(const std::string& _file_name, const std::string& _path, size_t _size, SequenceNumber _smallest_seqno, @@ -83,7 +84,8 @@ struct SstFileMetaData { num_reads_sampled(_num_reads_sampled), being_compacted(_being_compacted), num_entries(0), - num_deletions(0) {} + num_deletions(0), + non_origin_file(false) {} // File size in bytes. size_t size; @@ -102,7 +104,7 @@ struct SstFileMetaData { uint64_t num_entries; uint64_t num_deletions; // true Means a terarkdb sst file instead of rocksdb file - bool terarkdb_file; + bool non_origin_file; }; // The full set of metadata associated with each SST file. diff --git a/include/rocksdb/utilities/ldb_cmd.h b/include/rocksdb/utilities/ldb_cmd.h index f452249086..8c8c35ff34 100644 --- a/include/rocksdb/utilities/ldb_cmd.h +++ b/include/rocksdb/utilities/ldb_cmd.h @@ -58,6 +58,7 @@ class LDBCommand { static const std::string ARG_CREATE_IF_MISSING; static const std::string ARG_NO_VALUE; static const std::string ARG_REBUILD; + static const std::string ARG_PARALLEL; struct ParsedParams { std::string cmd; diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 1e7fb1526a..d85b13d19b 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -82,6 +82,7 @@ const std::string LDBCommand::ARG_FILE_SIZE = "file_size"; const std::string LDBCommand::ARG_CREATE_IF_MISSING = "create_if_missing"; const std::string LDBCommand::ARG_NO_VALUE = "no_value"; const std::string LDBCommand::ARG_REBUILD = "kv_combine"; +const std::string LDBCommand::ARG_PARALLEL = "parallel"; const char* LDBCommand::DELIM = " ==> "; @@ -197,9 +198,10 @@ LDBCommand* LDBCommand::SelectCommand(const ParsedParams& parsed_params) { } else if (parsed_params.cmd == KvCombineCommand::Name()) { return new KvCombineCommand(parsed_params.cmd_params, parsed_params.option_map, parsed_params.flags); - }else if (parsed_params.cmd == ManifestRollbackCommand::Name()) { + } else if (parsed_params.cmd == ManifestRollbackCommand::Name()) { return new ManifestRollbackCommand(parsed_params.cmd_params, - parsed_params.option_map, parsed_params.flags); + parsed_params.option_map, + parsed_params.flags); } else if (parsed_params.cmd == WALDumperCommand::Name()) { return new WALDumperCommand(parsed_params.cmd_params, parsed_params.option_map, parsed_params.flags); @@ -823,11 +825,10 @@ CompactorCommand::CompactorCommand( to_ = HexToString(to_); } } - if(IsFlagPresent(flags, ARG_REBUILD)){ + if (IsFlagPresent(flags, ARG_REBUILD)) { printf("compact with kv_combine \n"); separation_type = kCompactionCombineValue; } - } void CompactorCommand::Help(std::string& ret) { @@ -853,7 +854,7 @@ void CompactorCommand::DoCommand() { } CompactRangeOptions cro; - if(separation_type == kCompactionCombineValue){ + if (separation_type == kCompactionCombineValue) { cro.separation_type = kCompactionCombineValue; } cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; @@ -868,31 +869,75 @@ KvCombineCommand::KvCombineCommand( const std::vector& /*params*/, const std::map& options, const std::vector& flags) - : LDBCommand(options, flags, false,BuildCmdLineOptions({})){ -} + : LDBCommand(options, flags, false, BuildCmdLineOptions({ARG_PARALLEL})), + parallel_(8) {} void KvCombineCommand::Help(std::string& ret) { - ret.append(" "); - ret.append(KvCombineCommand::Name()); - ret.append("\n"); + ret.append(" "); + ret.append(KvCombineCommand::Name()); + ret.append("\n"); } void KvCombineCommand::DoCommand() { + if (!db_) { + assert(GetExecuteState().IsFailed()); + return; + } + int parallel; + if (ParseIntOption(option_map_, ARG_PARALLEL, parallel, exec_state_)) { + if (parallel > 0) { + parallel_ = parallel; + } else { + exec_state_ = + LDBCommandExecuteResult::Failed(ARG_PARALLEL + " must be > 0."); + return; + } + } // rate_limiter_bytes_per_sec CompactionOptions cfo; cfo.separation_type = kCompactionCombineValue; // we should skip the last level file that kv already combine - ColumnFamilyHandle* cfh = GetCfHandle(); - std::vector LiveFiles; - db_->GetLiveFilesMetaData(&LiveFiles); - for(auto& file : LiveFiles){ - if(file.terarkdb_file){ - std::vector inputs = {file.name}; - std::cout <<"cf:" << file.column_family_name << " level " << file.level << " compact file " <CompactFiles(cfo,inputs,file.level); + + while (true) { + std::atomic failed_cnt(0); + std::vector LiveFiles; + db_->GetLiveFilesMetaData(&LiveFiles); + std::atomic next_file_index(0); + std::function worker_func([&]() { + while (true) { + size_t index = next_file_index.fetch_add(1); + if (index >= LiveFiles.size()) { + break; + } + std::ostringstream output; + auto file = LiveFiles[index]; + if (file.non_origin_file) { + std::vector inputs = {file.name}; + output << "file_index: " << index << "cf:" << file.column_family_name + << " level " << file.level << " compact file " << file.name + << std::endl; + + ColumnFamilyHandle* cfh = cf_handles_[file.column_family_name]; + Status s = db_->CompactFiles(cfo, cfh, inputs, file.level); + output << "file_index: " << index << " " << s.ToString() << std::endl; + if (!s.ok()) { + failed_cnt.fetch_add(1); + std::cerr << output.str(); + } else + std::cout << index << "/" << LiveFiles.size() << std::endl; + } + } + }); + std::vector threads; + for (int i = 1; i < parallel_; i++) { + threads.emplace_back(worker_func); + } + worker_func(); + for (auto& t : threads) { + t.join(); } + if (failed_cnt.load() == 0) break; } std::cout << "kv_combine finish!" << std::endl; - } ManifestRollbackCommand::ManifestRollbackCommand( const std::vector& /*params*/, @@ -922,24 +967,23 @@ void ManifestRollbackCommand::DoCommand() { EnvOptions sopt; const bool seq_per_batch = false; ImmutableDBOptions immutable_db_options(options); - VersionSet versions(db_path_, &immutable_db_options, sopt, seq_per_batch, tc.get(), &wb, &wc); + VersionSet versions(db_path_, &immutable_db_options, sopt, seq_per_batch, + tc.get(), &wb, &wc); Status s = LoadLatestOptions(db_path_, Env::Default(), &options_, &column_families_, ignore_unknown_options_); if (!s.ok()) { - printf("LoadLatestOptions Error %s\n",s.ToString().c_str()); + printf("LoadLatestOptions Error %s\n", s.ToString().c_str()); } s = versions.Recover(column_families_); if (!s.ok()) { - printf("Error in Recover DB %s\n",s.ToString().c_str()); + printf("Error in Recover DB %s\n", s.ToString().c_str()); return; } InstrumentedMutex mutex; s = versions.ManifestRollback(&mutex); if (!s.ok()) { - printf("Error in Manifest Rollback %s\n",s.ToString().c_str()); + printf("Error in Manifest Rollback %s\n", s.ToString().c_str()); } - - } // ---------------------------------------------------------------------------- diff --git a/tools/ldb_cmd_impl.h b/tools/ldb_cmd_impl.h index 1ea9849f96..57e2aaa167 100644 --- a/tools/ldb_cmd_impl.h +++ b/tools/ldb_cmd_impl.h @@ -46,6 +46,8 @@ class KvCombineCommand : public LDBCommand { static void Help(std::string& ret); virtual void DoCommand() override; + private: + int parallel_; }; From 588ca7400bdbf3874c85f115211f2b0f12fd2a9b Mon Sep 17 00:00:00 2001 From: wangyi Date: Thu, 15 Jul 2021 15:08:38 +0800 Subject: [PATCH 09/11] 1. KvCombineCommand add rate_limiter_bytes_per_sec support write rate control --- include/rocksdb/utilities/ldb_cmd.h | 1 + tools/ldb_cmd.cc | 30 +++++++++++++++++++++++------ 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/include/rocksdb/utilities/ldb_cmd.h b/include/rocksdb/utilities/ldb_cmd.h index 8c8c35ff34..a8bf5e4c7f 100644 --- a/include/rocksdb/utilities/ldb_cmd.h +++ b/include/rocksdb/utilities/ldb_cmd.h @@ -59,6 +59,7 @@ class LDBCommand { static const std::string ARG_NO_VALUE; static const std::string ARG_REBUILD; static const std::string ARG_PARALLEL; + static const std::string ARG_RATE_LIMITER; struct ParsedParams { std::string cmd; diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index d85b13d19b..04ac043c10 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -83,6 +83,7 @@ const std::string LDBCommand::ARG_CREATE_IF_MISSING = "create_if_missing"; const std::string LDBCommand::ARG_NO_VALUE = "no_value"; const std::string LDBCommand::ARG_REBUILD = "kv_combine"; const std::string LDBCommand::ARG_PARALLEL = "parallel"; +const std::string LDBCommand::ARG_RATE_LIMITER = "rate_limiter_bytes_per_sec"; const char* LDBCommand::DELIM = " ==> "; @@ -650,6 +651,18 @@ Options LDBCommand::PrepareOptionsForOpenDB() { LDBCommandExecuteResult::Failed(ARG_FIX_PREFIX_LEN + " must be > 0."); } } + int rate_limiter_bytes_per_sec; + if (ParseIntOption(option_map_, ARG_RATE_LIMITER, rate_limiter_bytes_per_sec, exec_state_)) { + if (rate_limiter_bytes_per_sec > 0) { + options_.rate_limiter.reset(NewGenericRateLimiter( + rate_limiter_bytes_per_sec, 100 * 1000 /* refill_period_us */, 10 /* fairness */, + RateLimiter::Mode::kWritesOnly, false)); + } else { + exec_state_ = + LDBCommandExecuteResult::Failed(ARG_RATE_LIMITER + " must be > 0."); + } + } + // TODO(ajkr): this return value doesn't reflect the CF options changed, so // subcommands that rely on this won't see the effect of CF-related CLI args. // Such subcommands need to be changed to properly support CFs. @@ -869,8 +882,10 @@ KvCombineCommand::KvCombineCommand( const std::vector& /*params*/, const std::map& options, const std::vector& flags) - : LDBCommand(options, flags, false, BuildCmdLineOptions({ARG_PARALLEL})), - parallel_(8) {} + : LDBCommand(options, flags, false, + BuildCmdLineOptions({ARG_PARALLEL, ARG_RATE_LIMITER})), + parallel_(8) { +} void KvCombineCommand::Help(std::string& ret) { ret.append(" "); ret.append(KvCombineCommand::Name()); @@ -881,6 +896,7 @@ void KvCombineCommand::DoCommand() { assert(GetExecuteState().IsFailed()); return; } + int parallel; if (ParseIntOption(option_map_, ARG_PARALLEL, parallel, exec_state_)) { if (parallel > 0) { @@ -891,14 +907,13 @@ void KvCombineCommand::DoCommand() { return; } } - // rate_limiter_bytes_per_sec CompactionOptions cfo; cfo.separation_type = kCompactionCombineValue; // we should skip the last level file that kv already combine while (true) { - std::atomic failed_cnt(0); + bool has_non_origin_file = false; std::vector LiveFiles; db_->GetLiveFilesMetaData(&LiveFiles); std::atomic next_file_index(0); @@ -911,16 +926,19 @@ void KvCombineCommand::DoCommand() { std::ostringstream output; auto file = LiveFiles[index]; if (file.non_origin_file) { + has_non_origin_file = true; std::vector inputs = {file.name}; output << "file_index: " << index << "cf:" << file.column_family_name << " level " << file.level << " compact file " << file.name << std::endl; ColumnFamilyHandle* cfh = cf_handles_[file.column_family_name]; + MutableCFOptions immutable_db_options(options_); + cfo.output_file_size_limit = MaxFileSizeForLevel( + immutable_db_options, file.level, options_.compaction_style); Status s = db_->CompactFiles(cfo, cfh, inputs, file.level); output << "file_index: " << index << " " << s.ToString() << std::endl; if (!s.ok()) { - failed_cnt.fetch_add(1); std::cerr << output.str(); } else std::cout << index << "/" << LiveFiles.size() << std::endl; @@ -935,7 +953,7 @@ void KvCombineCommand::DoCommand() { for (auto& t : threads) { t.join(); } - if (failed_cnt.load() == 0) break; + if (!has_non_origin_file) break; } std::cout << "kv_combine finish!" << std::endl; } From 47f0a669aa6cfa7d59c8344a6d1e38aeba87edb0 Mon Sep 17 00:00:00 2001 From: wangyi Date: Thu, 15 Jul 2021 17:07:17 +0800 Subject: [PATCH 10/11] 1. KvCombineCommand add rate_limiter_bytes_per_sec support write rate control --- tools/ldb_cmd.cc | 47 ++++++++++++++++++++++++----------------------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 04ac043c10..aa427c98b4 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -911,38 +911,39 @@ void KvCombineCommand::DoCommand() { cfo.separation_type = kCompactionCombineValue; // we should skip the last level file that kv already combine - + MutableCFOptions mutable_db_options(options_); while (true) { - bool has_non_origin_file = false; std::vector LiveFiles; db_->GetLiveFilesMetaData(&LiveFiles); std::atomic next_file_index(0); + + std::vector NonOriginFiles; + for(auto& file:LiveFiles){ + if(file.non_origin_file){ + NonOriginFiles.push_back(file); + } + } std::function worker_func([&]() { while (true) { size_t index = next_file_index.fetch_add(1); - if (index >= LiveFiles.size()) { + if (index >= NonOriginFiles.size()) { break; } std::ostringstream output; - auto file = LiveFiles[index]; - if (file.non_origin_file) { - has_non_origin_file = true; - std::vector inputs = {file.name}; - output << "file_index: " << index << "cf:" << file.column_family_name - << " level " << file.level << " compact file " << file.name - << std::endl; - - ColumnFamilyHandle* cfh = cf_handles_[file.column_family_name]; - MutableCFOptions immutable_db_options(options_); - cfo.output_file_size_limit = MaxFileSizeForLevel( - immutable_db_options, file.level, options_.compaction_style); - Status s = db_->CompactFiles(cfo, cfh, inputs, file.level); - output << "file_index: " << index << " " << s.ToString() << std::endl; - if (!s.ok()) { - std::cerr << output.str(); - } else - std::cout << index << "/" << LiveFiles.size() << std::endl; - } + auto file = NonOriginFiles[index]; + std::vector inputs = {file.name}; + output << "file_index: " << index << "cf:" << file.column_family_name + << " level " << file.level << " compact file " << file.name + << std::endl; + ColumnFamilyHandle* cfh = cf_handles_[file.column_family_name]; + cfo.output_file_size_limit = MaxFileSizeForLevel( + mutable_db_options, file.level, options_.compaction_style); + Status s = db_->CompactFiles(cfo, cfh, inputs, file.level); + output << "file_index: " << index << " " << s.ToString() << std::endl; + if (!s.ok()) { + std::cerr << output.str(); + } else + std::cout << index << "/" << NonOriginFiles.size() << std::endl; } }); std::vector threads; @@ -953,7 +954,7 @@ void KvCombineCommand::DoCommand() { for (auto& t : threads) { t.join(); } - if (!has_non_origin_file) break; + if (NonOriginFiles.size() == 0) break; } std::cout << "kv_combine finish!" << std::endl; } From dd4e794b1a1db141469db1e34373b9ca0bd674c3 Mon Sep 17 00:00:00 2001 From: wangyi Date: Fri, 16 Jul 2021 15:33:51 +0800 Subject: [PATCH 11/11] 1. initial combine command, close auto compaction and load options 2. not chose level-1's file to compact --- tools/ldb_cmd.cc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index aa427c98b4..775d2d1db7 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -197,8 +197,11 @@ LDBCommand* LDBCommand::SelectCommand(const ParsedParams& parsed_params) { return new CompactorCommand(parsed_params.cmd_params, parsed_params.option_map, parsed_params.flags); } else if (parsed_params.cmd == KvCombineCommand::Name()) { - return new KvCombineCommand(parsed_params.cmd_params, - parsed_params.option_map, parsed_params.flags); + ParsedParams combineParam = parsed_params; + combineParam.option_map["auto_compaction"] = "false"; + combineParam.flags.push_back("try_load_options"); + return new KvCombineCommand(combineParam.cmd_params, + combineParam.option_map, combineParam.flags); } else if (parsed_params.cmd == ManifestRollbackCommand::Name()) { return new ManifestRollbackCommand(parsed_params.cmd_params, parsed_params.option_map, @@ -919,7 +922,7 @@ void KvCombineCommand::DoCommand() { std::vector NonOriginFiles; for(auto& file:LiveFiles){ - if(file.non_origin_file){ + if(file.non_origin_file && file.level >= 0){ NonOriginFiles.push_back(file); } }