-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feat: Adjustment of the master-slave synchronization dump cleanup mechanism, optimizing storage space usage #3225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: unstable
Are you sure you want to change the base?
Changes from all commits
7be1b42
0b434a6
a0376e2
974276b
bf41bf1
36d8f17
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -496,7 +496,7 @@ default-slot-num : 1024 | |||||
|
|
||||||
| # rate limiter bandwidth, units in bytes, default 1024GB/s (No limit) | ||||||
| # [Support Dynamically changeable] send 'rate-limiter-bandwidth' to a running pika can change it's value dynamically | ||||||
| #rate-limiter-bandwidth : 1099511627776 | ||||||
| rate-limiter-bandwidth : 109951162 | ||||||
|
|
||||||
| #rate-limiter-refill-period-us : 100000 | ||||||
| # | ||||||
|
|
@@ -505,7 +505,7 @@ default-slot-num : 1024 | |||||
| # if auto_tuned is true: Enables dynamic adjustment of rate limit within the range | ||||||
| #`[rate-limiter-bandwidth / 20, rate-limiter-bandwidth]`, according to the recent demand for background I/O. | ||||||
| # rate limiter auto tune https://rocksdb.org/blog/2017/12/18/17-auto-tuned-rate-limiter.html. the default value is true. | ||||||
| #rate-limiter-auto-tuned : yes | ||||||
| rate-limiter-auto-tuned : no | ||||||
|
|
||||||
| ################################## RocksDB Blob Configure ##################### | ||||||
| # rocksdb blob configure | ||||||
|
|
@@ -673,7 +673,7 @@ internal-used-unfinished-full-sync : | |||||
| # for wash data from 4.0.0 to 4.0.1 | ||||||
| # https://github.com/OpenAtomFoundation/pika/issues/2886 | ||||||
| # default value: true | ||||||
|
||||||
| # default value: true | |
| # default value: false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Disabling wash-data by default may break upgrades from 4.0.0 to 4.0.1.
The WashData() function (referenced in the comment at lines 673-675) is essential for migrating hash column family data to the correct internal format when upgrading. With wash-data: false as the default:
- Users upgrading from 4.0.0 won't automatically get their data migrated
- Hash values without the proper suffix encoding will remain inconsistent
- This could cause silent data corruption or read failures
Consider either:
- Keeping the default as
trueand documenting that users should set it tofalseafter the first successful startup post-upgrade - Adding prominent upgrade documentation warning users to set
wash-data: truebefore their first 4.0.1 startup
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@conf/pika.conf` around lines 673 - 676, The config currently sets wash-data:
false which prevents the WashData() migration from running during upgrades;
change the default to wash-data: true so WashData() runs automatically on first
startup after upgrading from 4.0.0, and add a clear comment next to the
wash-data entry instructing operators to set wash-data: false after the first
successful startup (or include alternative upgrade docs); specifically update
the wash-data default and the adjacent comment block referenced by WashData() to
reflect this behavior.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -212,6 +212,40 @@ class PikaServer : public pstd::noncopyable { | |
| pstd::Status GetDumpMeta(const std::string& db_name, std::vector<std::string>* files, std::string* snapshot_uuid); | ||
| void TryDBSync(const std::string& ip, int port, const std::string& db_name, int32_t top); | ||
|
|
||
| /* | ||
| * Rsync snapshot tracking (for orphan file cleanup protection) | ||
| */ | ||
| void RegisterRsyncSnapshot(const std::string& snapshot_uuid); | ||
| void UnregisterRsyncSnapshot(const std::string& snapshot_uuid); | ||
| bool IsRsyncSnapshotActive(const std::string& snapshot_uuid); | ||
| std::set<std::string> GetActiveRsyncSnapshots(); | ||
|
|
||
| /* | ||
| * Rsync file transfer tracking (for safe orphan file cleanup during sync) | ||
| */ | ||
| void RegisterRsyncTransferringFile(const std::string& snapshot_uuid, const std::string& filename); | ||
| void UnregisterRsyncTransferringFile(const std::string& snapshot_uuid, const std::string& filename); | ||
| bool IsRsyncFileTransferring(const std::string& snapshot_uuid, const std::string& filename); | ||
| std::set<std::string> GetRsyncTransferringFiles(const std::string& snapshot_uuid); | ||
|
|
||
| /* | ||
| * Dump ownership management (Scheme A: each slave has exclusive dump) | ||
| */ | ||
| bool MarkDumpInUse(const std::string& snapshot_uuid, const std::string& conn_id, const std::string& dump_path); | ||
| void ReleaseDump(const std::string& snapshot_uuid); | ||
| bool IsDumpInUse(const std::string& snapshot_uuid) const; | ||
| std::string GetDumpPathBySnapshot(const std::string& snapshot_uuid) const; | ||
| size_t GetActiveDumpCount() const; | ||
| static constexpr size_t kMaxConcurrentDumps = 3; // Max concurrent dumps allowed | ||
|
|
||
| /* | ||
| * Delayed file cleanup for orphan SST files (Scheme A) | ||
| * Files are scheduled for cleanup 10 minutes after transfer completes | ||
| * to allow for retries and ensure data consistency | ||
| */ | ||
| void ScheduleFileForCleanup(const std::string& filepath, int delay_seconds); | ||
| void ProcessPendingCleanupFiles(); | ||
|
Comment on lines
+246
to
+247
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keep the same transfer identity in the delayed-cleanup queue.
Minimal shape change- void ScheduleFileForCleanup(const std::string& filepath, int delay_seconds);
+ void ScheduleFileForCleanup(const std::string& snapshot_uuid,
+ const std::string& filename,
+ const std::string& filepath,
+ int delay_seconds);
struct PendingCleanupInfo {
+ std::string snapshot_uuid;
+ std::string filename;
std::string filepath;
time_t cleanup_time;
};Also applies to: 677-682 🤖 Prompt for AI Agents |
||
|
|
||
| /* | ||
| * Keyscan used | ||
| */ | ||
|
|
@@ -498,6 +532,13 @@ class PikaServer : public pstd::noncopyable { | |
| */ | ||
| void DisableCompact(); | ||
|
|
||
| /* | ||
| * Utility function to ensure directory exists | ||
| * Returns true if directory exists or was created successfully | ||
| * Handles the special case where CreatePath returns 0 for both success and "already exists" | ||
| */ | ||
| static bool EnsureDirExists(const std::string& path, mode_t mode = 0755); | ||
|
|
||
| /* | ||
| * lastsave used | ||
| */ | ||
|
|
@@ -605,6 +646,41 @@ class PikaServer : public pstd::noncopyable { | |
| std::unique_ptr<PikaRsyncService> pika_rsync_service_; | ||
| std::unique_ptr<rsync::RsyncServer> rsync_server_; | ||
|
|
||
| /* | ||
| * Rsync snapshot tracking used (for orphan file cleanup protection) | ||
| */ | ||
| std::set<std::string> active_rsync_snapshots_; | ||
| std::mutex active_rsync_snapshots_mutex_; | ||
|
|
||
| /* | ||
| * Rsync file transfer tracking used (for safe orphan file cleanup during sync) | ||
| * Tracks which files are currently being transferred for each snapshot | ||
| */ | ||
| std::map<std::string, std::set<std::string>> rsync_transferring_files_; | ||
| std::mutex rsync_transferring_files_mutex_; | ||
|
|
||
| /* | ||
| * Dump ownership tracking used (Scheme A: each slave has exclusive dump) | ||
| * snapshot_uuid -> {connection id, dump path} | ||
| */ | ||
| struct DumpOwnerInfo { | ||
| std::string conn_id; | ||
| std::string dump_path; | ||
| }; | ||
| std::map<std::string, DumpOwnerInfo> dump_owners_; | ||
| mutable std::mutex dump_owners_mutex_; | ||
|
|
||
| /* | ||
| * Pending cleanup tracking for delayed file deletion (Scheme A) | ||
| * filepath -> {cleanup_time} | ||
| */ | ||
| struct PendingCleanupInfo { | ||
| std::string filepath; | ||
| time_t cleanup_time; | ||
| }; | ||
| std::map<std::string, PendingCleanupInfo> pending_cleanup_files_; | ||
| mutable std::mutex pending_cleanup_mutex_; | ||
|
|
||
| /* | ||
| * Pubsub used | ||
| */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ | |
| // LICENSE file in the root directory of this source tree. An additional grant | ||
| // of patent rights can be found in the PATENTS file in the same directory. | ||
|
|
||
| #include <sys/stat.h> | ||
| #include <fstream> | ||
| #include <utility> | ||
|
|
||
|
|
@@ -300,8 +301,9 @@ bool DB::RunBgsaveEngine() { | |
| LOG(INFO) << db_name_ << " bgsave_info: path=" << info.path << ", filenum=" << info.offset.b_offset.filenum | ||
| << ", offset=" << info.offset.b_offset.offset; | ||
|
|
||
| // Backup to tmp dir | ||
| rocksdb::Status s = bgsave_engine_->CreateNewBackup(info.path); | ||
| // Use SetBackupContentAndCreate to minimize time window between GetLiveFiles and CreateCheckpoint | ||
| // This reduces the chance of compaction occurring and creating orphan files | ||
| rocksdb::Status s = bgsave_engine_->SetBackupContentAndCreate(info.path); | ||
|
|
||
| if (!s.ok()) { | ||
| LOG(WARNING) << db_name_ << " create new backup failed :" << s.ToString(); | ||
|
|
@@ -324,29 +326,61 @@ void DB::FinishBgsave() { | |
| } | ||
|
|
||
| // Prepare engine, need bgsave_protector protect | ||
| // Scheme A: Each slave has exclusive dump, so we need unique dump directories | ||
| bool DB::InitBgsaveEnv() { | ||
| std::lock_guard l(bgsave_protector_); | ||
| // Prepare for bgsave dir | ||
| bgsave_info_.start_time = time(nullptr); | ||
| char s_time[32]; | ||
| int len = static_cast<int32_t>(strftime(s_time, sizeof(s_time), "%Y%m%d%H%M%S", localtime(&bgsave_info_.start_time))); | ||
| bgsave_info_.s_start_time.assign(s_time, len); | ||
| std::string time_sub_path = g_pika_conf->bgsave_prefix() + std::string(s_time, 8); | ||
| bgsave_info_.path = g_pika_conf->bgsave_path() + time_sub_path + "/" + bgsave_sub_path_; | ||
| if (!pstd::DeleteDirIfExist(bgsave_info_.path)) { | ||
| LOG(WARNING) << db_name_ << " remove exist bgsave dir failed"; | ||
|
|
||
| // Scheme A: Use unique directory name with sequence number | ||
| // Format: dump-YYYYMMDD-NN/db_name where NN is sequence number | ||
| std::string base_path = g_pika_conf->bgsave_path(); | ||
| std::string date_str(s_time, 8); | ||
| std::string prefix = g_pika_conf->bgsave_prefix() + date_str; | ||
|
|
||
| // Find first available sequence number | ||
| int seq = 0; | ||
| std::string time_sub_path; | ||
| std::string full_path; | ||
| do { | ||
| time_sub_path = prefix + "-" + std::to_string(seq); | ||
| full_path = base_path + time_sub_path + "/" + bgsave_sub_path_; | ||
| seq++; | ||
| } while (pstd::FileExists(full_path) && seq < 1000); // Max 1000 dumps per day | ||
|
Comment on lines
+344
to
+352
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reserve the sequence at the dump root, not the DB path. The availability check uses 🤖 Prompt for AI Agents |
||
|
|
||
| if (seq >= 1000) { | ||
| LOG(ERROR) << db_name_ << " too many dump directories for today"; | ||
| return false; | ||
|
Comment on lines
+345
to
356
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The last daily slot is never used. Because 🤖 Prompt for AI Agents |
||
| } | ||
| pstd::CreatePath(bgsave_info_.path, 0755); | ||
| // Prepare for failed dir | ||
| if (!pstd::DeleteDirIfExist(bgsave_info_.path + "_FAILED")) { | ||
| LOG(WARNING) << db_name_ << " remove exist fail bgsave dir failed :"; | ||
|
|
||
| bgsave_info_.path = full_path; | ||
| LOG(INFO) << db_name_ << " preparing bgsave dir: " << bgsave_info_.path; | ||
|
|
||
| // Note: In Scheme A, we don't delete existing directories | ||
| // because other slaves may be using them | ||
| // Just create the new path | ||
| if (!PikaServer::EnsureDirExists(bgsave_info_.path, 0755)) { | ||
| LOG(WARNING) << db_name_ << " create bgsave dir failed: " << bgsave_info_.path | ||
| << ", errno=" << errno << ", error=" << strerror(errno); | ||
| // Clear the path on failure to avoid using invalid path in GetDumpMeta | ||
| bgsave_info_.path.clear(); | ||
| return false; | ||
| } | ||
|
|
||
| // Prepare for failed dir | ||
| std::string failed_dir = bgsave_info_.path + "_FAILED"; | ||
| if (pstd::FileExists(failed_dir)) { | ||
| pstd::DeleteDirIfExist(failed_dir); | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| // Prepare bgsave env, need bgsave_protector protect | ||
| // Note: SetBackupContent is now done in RunBgsaveEngine using SetBackupContentAndCreate | ||
| // to minimize time window between GetLiveFiles and CreateCheckpoint | ||
| bool DB::InitBgsaveEngine() { | ||
| bgsave_engine_.reset(); | ||
| rocksdb::Status s = storage::BackupEngine::Open(storage().get(), bgsave_engine_, g_pika_conf->db_instance_num()); | ||
|
|
@@ -371,11 +405,7 @@ bool DB::InitBgsaveEngine() { | |
| std::lock_guard l(bgsave_protector_); | ||
| bgsave_info_.offset = bgsave_offset; | ||
| } | ||
| s = bgsave_engine_->SetBackupContent(); | ||
| if (!s.ok()) { | ||
| LOG(WARNING) << db_name_ << " set backup content failed " << s.ToString(); | ||
| return false; | ||
| } | ||
| // SetBackupContent is now done in RunBgsaveEngine to minimize time window | ||
| } | ||
| return true; | ||
| } | ||
|
|
@@ -390,25 +420,73 @@ void DB::Init() { | |
|
|
||
| void DB::GetBgSaveMetaData(std::vector<std::string>* fileNames, std::string* snapshot_uuid) { | ||
| const std::string dbPath = bgsave_info().path; | ||
| size_t total_sst_files = 0; | ||
| size_t orphan_sst_files = 0; | ||
|
|
||
| LOG(INFO) << "[GetBgSaveMetaData] Starting scan, dbPath=" << dbPath; | ||
|
|
||
| // dbPath is already the specific DB path (e.g., .../dump/dump-9454-20260302/db0) | ||
| // We need to scan its subdirectories (0, 1, 2 for rocksdb instances) | ||
| std::vector<std::string> subDirs; | ||
| int ret = pstd::GetChildren(dbPath, subDirs); | ||
| LOG(INFO) << "[GetBgSaveMetaData] GetChildren for dbPath returned " << ret | ||
| << ", subDirs count=" << subDirs.size(); | ||
| if (ret) { | ||
| LOG(WARNING) << "[GetBgSaveMetaData] Failed to read dbPath: " << dbPath; | ||
| return; | ||
| } | ||
|
|
||
| int db_instance_num = g_pika_conf->db_instance_num(); | ||
| for (int index = 0; index < db_instance_num; index++) { | ||
| std::string instPath = dbPath + ((dbPath.back() != '/') ? "/" : "") + std::to_string(index); | ||
| if (!pstd::FileExists(instPath)) { | ||
| continue ; | ||
| for (const std::string& subDir : subDirs) { | ||
| std::string instPath = dbPath + "/" + subDir; | ||
| // Skip if not exists or is a file (not directory) | ||
| // Note: IsDir returns 0 for directory, 1 for file, -1 for error | ||
| if (!pstd::FileExists(instPath) || pstd::IsDir(instPath) != 0) { | ||
| continue; | ||
| } | ||
|
|
||
| std::vector<std::string> tmpFileNames; | ||
| int ret = pstd::GetChildren(instPath, tmpFileNames); | ||
| ret = pstd::GetChildren(instPath, tmpFileNames); | ||
| if (ret) { | ||
| LOG(WARNING) << dbPath << " read dump meta files failed, path " << instPath; | ||
| return; | ||
| LOG(WARNING) << "[GetBgSaveMetaData] Failed to read instPath: " << instPath; | ||
| continue; | ||
| } | ||
|
|
||
| for (const std::string fileName : tmpFileNames) { | ||
| fileNames -> push_back(std::to_string(index) + "/" + fileName); | ||
| for (const std::string& fileName : tmpFileNames) { | ||
| std::string fullPath = instPath + "/" + fileName; | ||
| struct stat st; | ||
| // Check if file exists and get its stat | ||
| if (stat(fullPath.c_str(), &st) != 0) { | ||
| // File doesn't exist, skip it | ||
| LOG(WARNING) << "[GetBgSaveMetaData] File does not exist: " << fullPath; | ||
| continue; | ||
|
Comment on lines
+431
to
+461
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't serve a partial dump manifest on scan errors. These 🤖 Prompt for AI Agents |
||
| } | ||
|
|
||
| // Check if it's an SST file and if it's an orphan (Links=1) | ||
| if (fileName.size() > 4 && fileName.substr(fileName.size() - 4) == ".sst") { | ||
| total_sst_files++; | ||
| if (st.st_nlink == 1) { | ||
| // This is an orphan file, but we need to include it in the meta | ||
| // to ensure data consistency. The file will be cleaned up after | ||
| // a delay to allow for retries. | ||
| orphan_sst_files++; | ||
| LOG(INFO) << "[GetBgSaveMetaData] Including orphan SST file: " << fullPath | ||
| << ", size=" << st.st_size; | ||
| // NOTE: We no longer skip orphan files here. They will be included | ||
| // in the file list and cleaned up with a delay after transfer. | ||
| } | ||
| } | ||
| // Construct relative path like "0/xxx.sst" or "1/xxx.sst" | ||
| fileNames->push_back(subDir + "/" + fileName); | ||
| } | ||
| } | ||
|
|
||
| if (orphan_sst_files > 0) { | ||
| LOG(INFO) << "[GetBgSaveMetaData] Summary for " << dbPath | ||
| << ": total_sst=" << total_sst_files | ||
| << ", orphan_included=" << orphan_sst_files | ||
| << ", returned=" << fileNames->size(); | ||
| } | ||
|
|
||
| fileNames->push_back(kBgsaveInfoFile); | ||
| pstd::Status s = GetBgSaveUUID(snapshot_uuid); | ||
| if (!s.ok()) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comments around the rate limiter settings no longer match the effective defaults in this file (e.g., it now enforces a non-default
rate-limiter-bandwidthand setsrate-limiter-auto-tunedtonowhile the comment says the default is true/no-limit). Please update the surrounding comments to reflect the new defaults, or keep these lines commented-out if they are meant as optional overrides rather than shipped defaults.