Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions db/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ Compaction* CompactionPicker::CompactFiles(
// This compaction output should not overlap with a running compaction as
// `SanitizeCompactionInputFiles` should've checked earlier and db mutex
// shouldn't have been released since.
assert(!FilesRangeOverlapWithCompaction(input_files, output_level));
assert(output_level == 0 || !FilesRangeOverlapWithCompaction(input_files, output_level));

CompressionType compression_type;
if (compact_options.compression == kDisableCompressionOption) {
Expand Down Expand Up @@ -1591,7 +1591,7 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
}
}
}
if (RangeOverlapWithCompaction(smallestkey, largestkey, output_level)) {
if (output_level != 0 && RangeOverlapWithCompaction(smallestkey, largestkey, output_level)) {
return Status::Aborted(
"A running compaction is writing to the same output level in an "
"overlapping key range");
Expand Down
18 changes: 17 additions & 1 deletion db/version_edit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,13 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
PutLengthPrefixedSlice(dst, varint_log_number);
min_log_num_written = true;
}
if (true) {
if (rollback_) {
if (!f.prop.is_essense_sst() || f.prop.dependence.size() != 0 ||
f.prop.inheritance.size() != 0) {
return false;
}
}
if (!rollback_) {
PutVarint32(dst, CustomTag::kPropertyCache);
std::string encode_property_cache;
encode_property_cache.push_back((char)f.prop.purpose);
Expand Down Expand Up @@ -666,6 +672,16 @@ std::string VersionEdit::DebugString(bool hex_key) const {
r.append(" .. ");
r.append(f.largest.DebugString(hex_key));
}
if(rollback_){
r.append("\n Rollback Info: ");
for (size_t i = 0; i < new_files_.size(); i++) {
const FileMetaData& f = new_files_[i].second;
if (!f.prop.is_essense_sst() || f.prop.dependence.size() != 0 ||
f.prop.inheritance.size() != 0){
r.append("%d .. ",f.fd.GetNumber());
}
}
}
r.append("\n ColumnFamily: ");
AppendNumberTo(&r, column_family_);
if (is_column_family_add_) {
Expand Down
7 changes: 6 additions & 1 deletion db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ struct TablePropertyCache {
uint64_t latest_time_end_compact = port::kMaxUint64;

bool is_map_sst() const { return purpose == kMapSst; }
bool is_essense_sst() const { return purpose == kEssenceSst; }
bool has_range_deletions() const { return (flags & kNoRangeDeletions) == 0; }
bool map_handle_range_deletions() const {
return (flags & kMapHandleRangeDeletions) != 0;
Expand Down Expand Up @@ -389,7 +390,9 @@ class VersionEdit {

std::string DebugString(bool hex_key = false) const;
std::string DebugJSON(int edit_num, bool hex_key = false) const;

void setRollback(const bool rollback = true){
rollback_ = rollback;
}
private:
friend class VersionSet;
friend class Version;
Expand Down Expand Up @@ -432,6 +435,8 @@ class VersionEdit {
bool is_open_db_;
bool is_in_atomic_group_;
uint32_t remaining_entries_;

bool rollback_ = false;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

调整一下变量定义的位置,尽量让所有的 bool 挨在一起

};

} // namespace TERARKDB_NAMESPACE
63 changes: 62 additions & 1 deletion db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2992,7 +2992,59 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
v->prev_->next_ = v;
v->next_->prev_ = v;
}
Status VersionSet::ManifestRollback(InstrumentedMutex* mu) {
Status s;
pending_manifest_file_number_ = NewFileNumber();
std::string descriptor_fname =
DescriptorFileName(dbname_, pending_manifest_file_number_);
std::unique_ptr<WritableFile> descriptor_file;
EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_);
s = NewWritableFile(env_, descriptor_fname, &descriptor_file,
opt_env_opts);
if (s.ok()) {
descriptor_file->SetPreallocationBlockSize(
db_options_->manifest_preallocation_size);

std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(descriptor_file), descriptor_fname, opt_env_opts, nullptr,
db_options_->listeners));
descriptor_log_.reset(
new log::Writer(std::move(file_writer), 0, false));
s = WriteSnapshot(descriptor_log_.get(), true);
}
do {
if(s.ok()) {
VersionEdit e;
for(auto cfd : *column_family_set_) {
LogAndApplyHelper(cfd, nullptr, cfd->current(), &e, mu, false);
}
std::string record;
if (!e.EncodeTo(&record)) {
s = Status::Corruption("Unable to encode VersionEdit:" +
e.DebugString(true));
}
s = descriptor_log_->AddRecord(record);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

未检查 s

if (!s.ok()) {
break;
}
if (s.ok()) {
s = SyncManifest(env_, db_options_, descriptor_log_->file());
}
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
s.ToString().c_str());
}
}
}while(false);

if (s.ok()) {
s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_,nullptr);
}



return s;
}
Status VersionSet::ProcessManifestWrites(std::deque<ManifestWriter>& writers,
InstrumentedMutex* mu,
Directory* db_directory,
Expand Down Expand Up @@ -4339,7 +4391,7 @@ void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
}
}

Status VersionSet::WriteSnapshot(log::Writer* log) {
Status VersionSet::WriteSnapshot(log::Writer* log, const bool rollback) {
// TODO: Break up into multiple records to reduce memory usage on recovery?

// WARNING: This method doesn't hold a mutex!!
Expand All @@ -4348,6 +4400,9 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
// LogAndApply. Column family manipulations can only happen within LogAndApply
// (the same single thread), so we're safe to iterate.
for (auto cfd : *column_family_set_) {
if(rollback && cfd->current()->storage_info()->LevelFiles(-1).size() != 0){
return Status::Corruption("cfd: %s's level -1 is not null", cfd->GetName());
}
if (cfd->IsDropped()) {
continue;
}
Expand Down Expand Up @@ -4377,6 +4432,9 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
{
// Save files
VersionEdit edit;
if(rollback) {
edit.setRollback();
}
edit.SetColumnFamily(cfd->GetID());

for (int level = -1; level < cfd->NumberLevels(); level++) {
Expand Down Expand Up @@ -4759,6 +4817,9 @@ void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
filemetadata.being_compacted = file->being_compacted;
filemetadata.num_entries = file->prop.num_entries;
filemetadata.num_deletions = file->prop.num_deletions;
if(file->prop.dependence.size() != 0 || file->prop.inheritance.size() !=0 || !file->prop.is_essense_sst()) {
filemetadata.non_origin_file = true;
}
metadata->push_back(filemetadata);
}
}
Expand Down
3 changes: 2 additions & 1 deletion db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,7 @@ class VersionSet {
// are not opened
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only = false);
Status ManifestRollback(InstrumentedMutex* mu);

// Reads a manifest file and returns a list of column families in
// column_families.
Expand Down Expand Up @@ -1128,7 +1129,7 @@ class VersionSet {
const Slice& key);

// Save current contents to *log
Status WriteSnapshot(log::Writer* log);
Status WriteSnapshot(log::Writer* log, const bool rollback = false);

void AppendVersion(ColumnFamilyData* column_family_data, Version* v);

Expand Down
8 changes: 6 additions & 2 deletions include/rocksdb/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ struct SstFileMetaData {
num_reads_sampled(0),
being_compacted(false),
num_entries(0),
num_deletions(0) {}
num_deletions(0),
non_origin_file(false) {}

SstFileMetaData(const std::string& _file_name, const std::string& _path,
size_t _size, SequenceNumber _smallest_seqno,
Expand All @@ -83,7 +84,8 @@ struct SstFileMetaData {
num_reads_sampled(_num_reads_sampled),
being_compacted(_being_compacted),
num_entries(0),
num_deletions(0) {}
num_deletions(0),
non_origin_file(false) {}

// File size in bytes.
size_t size;
Expand All @@ -101,6 +103,8 @@ struct SstFileMetaData {

uint64_t num_entries;
uint64_t num_deletions;
// true Means a terarkdb sst file instead of rocksdb file
bool non_origin_file;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

移动一下 bool 定义的位置,尽可能让 bool 全都挨着

};

// The full set of metadata associated with each SST file.
Expand Down
3 changes: 3 additions & 0 deletions include/rocksdb/utilities/ldb_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class LDBCommand {
static const std::string ARG_FILE_SIZE;
static const std::string ARG_CREATE_IF_MISSING;
static const std::string ARG_NO_VALUE;
static const std::string ARG_REBUILD;
static const std::string ARG_PARALLEL;
static const std::string ARG_RATE_LIMITER;

struct ParsedParams {
std::string cmd;
Expand Down
Loading