diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index e01c7669b0..8d1db0721a 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -556,7 +556,7 @@ Compaction* CompactionPicker::CompactFiles( // This compaction output should not overlap with a running compaction as // `SanitizeCompactionInputFiles` should've checked earlier and db mutex // shouldn't have been released since. - assert(!FilesRangeOverlapWithCompaction(input_files, output_level)); + assert(output_level == 0 || !FilesRangeOverlapWithCompaction(input_files, output_level)); CompressionType compression_type; if (compact_options.compression == kDisableCompressionOption) { @@ -1591,7 +1591,7 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels( } } } - if (RangeOverlapWithCompaction(smallestkey, largestkey, output_level)) { + if (output_level != 0 && RangeOverlapWithCompaction(smallestkey, largestkey, output_level)) { return Status::Aborted( "A running compaction is writing to the same output level in an " "overlapping key range"); diff --git a/db/version_edit.cc b/db/version_edit.cc index 1b8bfb402d..96f2071c73 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -184,7 +184,13 @@ bool VersionEdit::EncodeTo(std::string* dst) const { PutLengthPrefixedSlice(dst, varint_log_number); min_log_num_written = true; } - if (true) { + if (rollback_) { + if (!f.prop.is_essense_sst() || f.prop.dependence.size() != 0 || + f.prop.inheritance.size() != 0) { + return false; + } + } + if (!rollback_) { PutVarint32(dst, CustomTag::kPropertyCache); std::string encode_property_cache; encode_property_cache.push_back((char)f.prop.purpose); @@ -666,6 +672,16 @@ std::string VersionEdit::DebugString(bool hex_key) const { r.append(" .. "); r.append(f.largest.DebugString(hex_key)); } + if(rollback_){ + r.append("\n Rollback Info: "); + for (size_t i = 0; i < new_files_.size(); i++) { + const FileMetaData& f = new_files_[i].second; + if (!f.prop.is_essense_sst() || f.prop.dependence.size() != 0 || + f.prop.inheritance.size() != 0){ + r.append("%d .. ",f.fd.GetNumber()); + } + } + } r.append("\n ColumnFamily: "); AppendNumberTo(&r, column_family_); if (is_column_family_add_) { diff --git a/db/version_edit.h b/db/version_edit.h index 0078e5fe34..890df42167 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -100,6 +100,7 @@ struct TablePropertyCache { uint64_t latest_time_end_compact = port::kMaxUint64; bool is_map_sst() const { return purpose == kMapSst; } + bool is_essense_sst() const { return purpose == kEssenceSst; } bool has_range_deletions() const { return (flags & kNoRangeDeletions) == 0; } bool map_handle_range_deletions() const { return (flags & kMapHandleRangeDeletions) != 0; @@ -389,7 +390,9 @@ class VersionEdit { std::string DebugString(bool hex_key = false) const; std::string DebugJSON(int edit_num, bool hex_key = false) const; - + void setRollback(const bool rollback = true){ + rollback_ = rollback; + } private: friend class VersionSet; friend class Version; @@ -432,6 +435,8 @@ class VersionEdit { bool is_open_db_; bool is_in_atomic_group_; uint32_t remaining_entries_; + + bool rollback_ = false; }; } // namespace TERARKDB_NAMESPACE diff --git a/db/version_set.cc b/db/version_set.cc index 73dcb419b5..256ceb92fb 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2992,7 +2992,59 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, v->prev_->next_ = v; v->next_->prev_ = v; } +Status VersionSet::ManifestRollback(InstrumentedMutex* mu) { + Status s; + pending_manifest_file_number_ = NewFileNumber(); + std::string descriptor_fname = + DescriptorFileName(dbname_, pending_manifest_file_number_); + std::unique_ptr descriptor_file; + EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_); + s = NewWritableFile(env_, descriptor_fname, &descriptor_file, + opt_env_opts); + if (s.ok()) { + descriptor_file->SetPreallocationBlockSize( + db_options_->manifest_preallocation_size); + + std::unique_ptr file_writer(new WritableFileWriter( + std::move(descriptor_file), descriptor_fname, opt_env_opts, nullptr, + db_options_->listeners)); + descriptor_log_.reset( + new log::Writer(std::move(file_writer), 0, false)); + s = WriteSnapshot(descriptor_log_.get(), true); + } + do { + if(s.ok()) { + VersionEdit e; + for(auto cfd : *column_family_set_) { + LogAndApplyHelper(cfd, nullptr, cfd->current(), &e, mu, false); + } + std::string record; + if (!e.EncodeTo(&record)) { + s = Status::Corruption("Unable to encode VersionEdit:" + + e.DebugString(true)); + } + s = descriptor_log_->AddRecord(record); + if (!s.ok()) { + break; + } + if (s.ok()) { + s = SyncManifest(env_, db_options_, descriptor_log_->file()); + } + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n", + s.ToString().c_str()); + } + } + }while(false); + + if (s.ok()) { + s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_,nullptr); + } + + + return s; +} Status VersionSet::ProcessManifestWrites(std::deque& writers, InstrumentedMutex* mu, Directory* db_directory, @@ -4339,7 +4391,7 @@ void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) { } } -Status VersionSet::WriteSnapshot(log::Writer* log) { +Status VersionSet::WriteSnapshot(log::Writer* log, const bool rollback) { // TODO: Break up into multiple records to reduce memory usage on recovery? // WARNING: This method doesn't hold a mutex!! @@ -4348,6 +4400,9 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { // LogAndApply. Column family manipulations can only happen within LogAndApply // (the same single thread), so we're safe to iterate. for (auto cfd : *column_family_set_) { + if(rollback && cfd->current()->storage_info()->LevelFiles(-1).size() != 0){ + return Status::Corruption("cfd: %s's level -1 is not null", cfd->GetName()); + } if (cfd->IsDropped()) { continue; } @@ -4377,6 +4432,9 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { { // Save files VersionEdit edit; + if(rollback) { + edit.setRollback(); + } edit.SetColumnFamily(cfd->GetID()); for (int level = -1; level < cfd->NumberLevels(); level++) { @@ -4759,6 +4817,9 @@ void VersionSet::GetLiveFilesMetaData(std::vector* metadata) { filemetadata.being_compacted = file->being_compacted; filemetadata.num_entries = file->prop.num_entries; filemetadata.num_deletions = file->prop.num_deletions; + if(file->prop.dependence.size() != 0 || file->prop.inheritance.size() !=0 || !file->prop.is_essense_sst()) { + filemetadata.non_origin_file = true; + } metadata->push_back(filemetadata); } } diff --git a/db/version_set.h b/db/version_set.h index 46e2914f9e..f862e5adef 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -927,6 +927,7 @@ class VersionSet { // are not opened Status Recover(const std::vector& column_families, bool read_only = false); + Status ManifestRollback(InstrumentedMutex* mu); // Reads a manifest file and returns a list of column families in // column_families. @@ -1128,7 +1129,7 @@ class VersionSet { const Slice& key); // Save current contents to *log - Status WriteSnapshot(log::Writer* log); + Status WriteSnapshot(log::Writer* log, const bool rollback = false); void AppendVersion(ColumnFamilyData* column_family_data, Version* v); diff --git a/include/rocksdb/metadata.h b/include/rocksdb/metadata.h index 3977b07a28..70fb990e3b 100644 --- a/include/rocksdb/metadata.h +++ b/include/rocksdb/metadata.h @@ -65,7 +65,8 @@ struct SstFileMetaData { num_reads_sampled(0), being_compacted(false), num_entries(0), - num_deletions(0) {} + num_deletions(0), + non_origin_file(false) {} SstFileMetaData(const std::string& _file_name, const std::string& _path, size_t _size, SequenceNumber _smallest_seqno, @@ -83,7 +84,8 @@ struct SstFileMetaData { num_reads_sampled(_num_reads_sampled), being_compacted(_being_compacted), num_entries(0), - num_deletions(0) {} + num_deletions(0), + non_origin_file(false) {} // File size in bytes. size_t size; @@ -101,6 +103,8 @@ struct SstFileMetaData { uint64_t num_entries; uint64_t num_deletions; + // true Means a terarkdb sst file instead of rocksdb file + bool non_origin_file; }; // The full set of metadata associated with each SST file. diff --git a/include/rocksdb/utilities/ldb_cmd.h b/include/rocksdb/utilities/ldb_cmd.h index 4edfd73241..a8bf5e4c7f 100644 --- a/include/rocksdb/utilities/ldb_cmd.h +++ b/include/rocksdb/utilities/ldb_cmd.h @@ -57,6 +57,9 @@ class LDBCommand { static const std::string ARG_FILE_SIZE; static const std::string ARG_CREATE_IF_MISSING; static const std::string ARG_NO_VALUE; + static const std::string ARG_REBUILD; + static const std::string ARG_PARALLEL; + static const std::string ARG_RATE_LIMITER; struct ParsedParams { std::string cmd; diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index a41fb18fdc..775d2d1db7 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -81,6 +81,9 @@ const std::string LDBCommand::ARG_WRITE_BUFFER_SIZE = "write_buffer_size"; const std::string LDBCommand::ARG_FILE_SIZE = "file_size"; const std::string LDBCommand::ARG_CREATE_IF_MISSING = "create_if_missing"; const std::string LDBCommand::ARG_NO_VALUE = "no_value"; +const std::string LDBCommand::ARG_REBUILD = "kv_combine"; +const std::string LDBCommand::ARG_PARALLEL = "parallel"; +const std::string LDBCommand::ARG_RATE_LIMITER = "rate_limiter_bytes_per_sec"; const char* LDBCommand::DELIM = " ==> "; @@ -193,6 +196,16 @@ LDBCommand* LDBCommand::SelectCommand(const ParsedParams& parsed_params) { } else if (parsed_params.cmd == CompactorCommand::Name()) { return new CompactorCommand(parsed_params.cmd_params, parsed_params.option_map, parsed_params.flags); + } else if (parsed_params.cmd == KvCombineCommand::Name()) { + ParsedParams combineParam = parsed_params; + combineParam.option_map["auto_compaction"] = "false"; + combineParam.flags.push_back("try_load_options"); + return new KvCombineCommand(combineParam.cmd_params, + combineParam.option_map, combineParam.flags); + } else if (parsed_params.cmd == ManifestRollbackCommand::Name()) { + return new ManifestRollbackCommand(parsed_params.cmd_params, + parsed_params.option_map, + parsed_params.flags); } else if (parsed_params.cmd == WALDumperCommand::Name()) { return new WALDumperCommand(parsed_params.cmd_params, parsed_params.option_map, parsed_params.flags); @@ -641,6 +654,18 @@ Options LDBCommand::PrepareOptionsForOpenDB() { LDBCommandExecuteResult::Failed(ARG_FIX_PREFIX_LEN + " must be > 0."); } } + int rate_limiter_bytes_per_sec; + if (ParseIntOption(option_map_, ARG_RATE_LIMITER, rate_limiter_bytes_per_sec, exec_state_)) { + if (rate_limiter_bytes_per_sec > 0) { + options_.rate_limiter.reset(NewGenericRateLimiter( + rate_limiter_bytes_per_sec, 100 * 1000 /* refill_period_us */, 10 /* fairness */, + RateLimiter::Mode::kWritesOnly, false)); + } else { + exec_state_ = + LDBCommandExecuteResult::Failed(ARG_RATE_LIMITER + " must be > 0."); + } + } + // TODO(ajkr): this return value doesn't reflect the CF options changed, so // subcommands that rely on this won't see the effect of CF-related CLI args. // Such subcommands need to be changed to properly support CFs. @@ -792,7 +817,7 @@ CompactorCommand::CompactorCommand( const std::vector& flags) : LDBCommand(options, flags, false, BuildCmdLineOptions({ARG_FROM, ARG_TO, ARG_HEX, ARG_KEY_HEX, - ARG_VALUE_HEX, ARG_TTL})), + ARG_VALUE_HEX, ARG_TTL, ARG_REBUILD})), null_from_(true), null_to_(true) { std::map::const_iterator itr = @@ -816,6 +841,10 @@ CompactorCommand::CompactorCommand( to_ = HexToString(to_); } } + if (IsFlagPresent(flags, ARG_REBUILD)) { + printf("compact with kv_combine \n"); + separation_type = kCompactionCombineValue; + } } void CompactorCommand::Help(std::string& ret) { @@ -841,6 +870,9 @@ void CompactorCommand::DoCommand() { } CompactRangeOptions cro; + if (separation_type == kCompactionCombineValue) { + cro.separation_type = kCompactionCombineValue; + } cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; db_->CompactRange(cro, GetCfHandle(), begin, end); @@ -849,6 +881,132 @@ void CompactorCommand::DoCommand() { delete begin; delete end; } +KvCombineCommand::KvCombineCommand( + const std::vector& /*params*/, + const std::map& options, + const std::vector& flags) + : LDBCommand(options, flags, false, + BuildCmdLineOptions({ARG_PARALLEL, ARG_RATE_LIMITER})), + parallel_(8) { +} +void KvCombineCommand::Help(std::string& ret) { + ret.append(" "); + ret.append(KvCombineCommand::Name()); + ret.append("\n"); +} +void KvCombineCommand::DoCommand() { + if (!db_) { + assert(GetExecuteState().IsFailed()); + return; + } + + int parallel; + if (ParseIntOption(option_map_, ARG_PARALLEL, parallel, exec_state_)) { + if (parallel > 0) { + parallel_ = parallel; + } else { + exec_state_ = + LDBCommandExecuteResult::Failed(ARG_PARALLEL + " must be > 0."); + return; + } + } + CompactionOptions cfo; + + cfo.separation_type = kCompactionCombineValue; + // we should skip the last level file that kv already combine + MutableCFOptions mutable_db_options(options_); + while (true) { + std::vector LiveFiles; + db_->GetLiveFilesMetaData(&LiveFiles); + std::atomic next_file_index(0); + + std::vector NonOriginFiles; + for(auto& file:LiveFiles){ + if(file.non_origin_file && file.level >= 0){ + NonOriginFiles.push_back(file); + } + } + std::function worker_func([&]() { + while (true) { + size_t index = next_file_index.fetch_add(1); + if (index >= NonOriginFiles.size()) { + break; + } + std::ostringstream output; + auto file = NonOriginFiles[index]; + std::vector inputs = {file.name}; + output << "file_index: " << index << "cf:" << file.column_family_name + << " level " << file.level << " compact file " << file.name + << std::endl; + ColumnFamilyHandle* cfh = cf_handles_[file.column_family_name]; + cfo.output_file_size_limit = MaxFileSizeForLevel( + mutable_db_options, file.level, options_.compaction_style); + Status s = db_->CompactFiles(cfo, cfh, inputs, file.level); + output << "file_index: " << index << " " << s.ToString() << std::endl; + if (!s.ok()) { + std::cerr << output.str(); + } else + std::cout << index << "/" << NonOriginFiles.size() << std::endl; + } + }); + std::vector threads; + for (int i = 1; i < parallel_; i++) { + threads.emplace_back(worker_func); + } + worker_func(); + for (auto& t : threads) { + t.join(); + } + if (NonOriginFiles.size() == 0) break; + } + std::cout << "kv_combine finish!" << std::endl; +} +ManifestRollbackCommand::ManifestRollbackCommand( + const std::vector& /*params*/, + const std::map& options, + const std::vector& flags) + : LDBCommand(options, flags, true, BuildCmdLineOptions({})) {} + +void ManifestRollbackCommand::Help(std::string& ret) { + ret.append(" "); + ret.append(ManifestRollbackCommand::Name()); + ret.append("\n"); +} +void ManifestRollbackCommand::DoCommand() { + Options options = PrepareOptionsForOpenDB(); + + if (options_.db_paths.empty()) { + // `VersionSet` expects options that have been through `SanitizeOptions()`, + // which would sanitize an empty `db_paths`. + options_.db_paths.emplace_back(db_path_, 0 /* target_size */); + } + + WriteController wc(options_.delayed_write_rate); + WriteBufferManager wb(options_.db_write_buffer_size); + + std::shared_ptr tc( + NewLRUCache(1 << 20 /* capacity */, options_.table_cache_numshardbits)); + EnvOptions sopt; + const bool seq_per_batch = false; + ImmutableDBOptions immutable_db_options(options); + VersionSet versions(db_path_, &immutable_db_options, sopt, seq_per_batch, + tc.get(), &wb, &wc); + Status s = LoadLatestOptions(db_path_, Env::Default(), &options_, + &column_families_, ignore_unknown_options_); + if (!s.ok()) { + printf("LoadLatestOptions Error %s\n", s.ToString().c_str()); + } + s = versions.Recover(column_families_); + if (!s.ok()) { + printf("Error in Recover DB %s\n", s.ToString().c_str()); + return; + } + InstrumentedMutex mutex; + s = versions.ManifestRollback(&mutex); + if (!s.ok()) { + printf("Error in Manifest Rollback %s\n", s.ToString().c_str()); + } +} // ---------------------------------------------------------------------------- diff --git a/tools/ldb_cmd_impl.h b/tools/ldb_cmd_impl.h index 20d6d00134..57e2aaa167 100644 --- a/tools/ldb_cmd_impl.h +++ b/tools/ldb_cmd_impl.h @@ -32,6 +32,38 @@ class CompactorCommand : public LDBCommand { std::string from_; bool null_to_; std::string to_; + SeparationType separation_type; +}; + +class KvCombineCommand : public LDBCommand { + public: + static std::string Name() { return "combine"; } + + KvCombineCommand(const std::vector& params, + const std::map& options, + const std::vector& flags); + + static void Help(std::string& ret); + + virtual void DoCommand() override; + private: + int parallel_; + +}; + +// Command that removes the SST file forcibly from the manifest. +class ManifestRollbackCommand : public LDBCommand { + public: + static std::string Name() { return "manifest_rollback"; } + + ManifestRollbackCommand(const std::vector& params, + const std::map& options, + const std::vector& flags); + + static void Help(std::string& ret); + + virtual void DoCommand() override; + virtual bool NoDBOpen() override { return true; } }; class DBFileDumperCommand : public LDBCommand {