feat: Adjustment of the master-slave synchronization dump cleanup mechanism, optimizing storage space usage#3225
Conversation
1. 每个Slave独占一个dump目录(dump-YYYYMMDD-NN格式) 2. 传输完成后立即清理文件,释放磁盘空间 3. 限制并发dump数量为3个 4. 完善dump完整性检查和占用管理 备注:1.未兼容多database场景 2.多slave同时全量同步时,清理存在异常 3.当天多次手动同步时孤儿文件清理能力下降
|
feat: 独立dump + 即时清理机制,解决Pika全量同步孤儿文件问题:
|
feat: independent dump + instant cleanup mechanism to solve the problem of full synchronization of orphan files in Pika:
|
Pika 全量同步方案详解文档说明本文档详细描述 Pika 新的全量同步机制(Scheme A),包括各场景下的完整流程、状态变化、数据流转以及已知问题。
1. 架构概述1.1 核心设计Scheme A 采用以下设计原则:
1.2 关键组件
1.3 关键数据结构// Dump 占用信息
struct DumpOwnerInfo {
std::string conn_id; // 占用连接的 ID
std::string dump_path; // dump 目录路径
};
std::map<std::string, DumpOwnerInfo> dump_owners_; // snapshot_uuid -> 占用信息
// 传输中文件保护
std::map<std::string, std::set<std::string>> rsync_transferring_files_; // snapshot_uuid -> 文件集合
// 活跃 snapshot
std::set<std::string> active_rsync_snapshots_; // 用于孤儿文件清理保护2. 单个 Slave 单 DB 全量同步流程以 2.1 流程时序图2.2 Master 状态变化
2.3 Slave 状态变化
2.4 数据变化Master 磁盘占用变化:
3. 多 Slave 同步流程3.1 场景描述
3.2 流程时序3.3 关键限制
4. 单 Slave 多 DB 同步流程4.1 场景描述
4.2 目录结构4.3 文件命名规则
4.4 同步流程每个 DB 独立同步:
4.5 潜在问题问题:info 文件位置不一致
已修复:先尝试 5. 多 Slave 多 DB 同步流程这是 Scheme A 最复杂的场景,结合了多 Slave 和多 DB 的特点。 5.1 场景描述
5.2 Dump 占用机制方案 A 设计:每个 Slave 独占整个 dump 目录(包含所有 DB) 5.3 占用检查
5.4 潜在问题问题 1:DB 级别粒度 vs Dump 级别粒度
问题 2:多 DB 的孤儿文件清理
6. 孤儿文件清理机制(统一延迟清理)6.1 触发条件孤儿文件:nlink=1 的 SST 文件(只被 dump 引用,不被 RocksDB 引用) 产生原因:
6.2 统一清理策略设计变更:移除 新清理流程: 6.3 保护机制
6.4 时序说明6.5 对比:旧方案 vs 新方案
7. Bug 列表7.1 待修复的 Bug
8. 待办事项8.1 高优先级
8.2 中优先级
8.3 低优先级
9. 配置建议9.1 关键配置项# pika.conf
# dump 目录前缀
dump-prefix : dump-
# dump 目录路径
dump-path : ./dump/
# dump 过期时间(天)
# 0 表示永不过期
dump-expire : 1
# RocksDB 实例数
db-instance-num : 3
# 最大并发 dump 数(编译期配置)
# kMaxConcurrentDumps = 39.2 部署建议
10. 附录10.1 关键日志# 查看 Meta 请求处理
grep "Rsync Meta" log/pika.INFO
# 查看文件传输
grep "RsyncTransfer" log/pika.INFO
# 查看孤儿文件延迟清理调度
grep "Scheduled orphan file" log/pika.INFO
# 查看延迟清理执行
grep "Deleted delayed cleanup file" log/pika.INFO
# 查看 dump 占用
grep "DumpOwnership" log/pika.INFO
# 查看错误
grep "File no longer exists" log/pika.WARNING10.2 状态码说明
10.3 文件路径规范
|
Detailed explanation of Pika’s full synchronization planDocument descriptionThis document describes in detail Pika's new full synchronization mechanism (Scheme A), including the complete process, status changes, data flow and known issues in each scenario.
1. Architecture Overview1.1 Core designScheme A uses the following design principles:
1.2 Key components
1.3 Key data structures// Dump occupancy information
struct DumpOwnerInfo {
std::string conn_id; // ID of occupied connection
std::string dump_path; // dump directory path
};
std::map<std::string, DumpOwnerInfo> dump_owners_; // snapshot_uuid -> occupancy information
// File protection in transit
std::map<std::string, std::set<std::string>> rsync_transferring_files_; // snapshot_uuid -> file collection
// active snapshot
std::set<std::string> active_rsync_snapshots_; // Used for orphan file cleanup protection2. Single Slave single DB full synchronization processTaking 2.1 Process sequence diagram2.2 Master status changes
2.3 Slave status changes
2.4 Data changesMaster disk usage changes:
3. Multi-Slave synchronization process3.1 Scene description
3.2 Process Timing3.3 Key limitations
4. Single Slave multiple DB synchronization process4.1 Scene description
4.2 Directory structure4.3 File naming rules
4.4 Synchronization processEach DB is synchronized independently:
4.5 Potential ProblemsProblem: Info file location is inconsistent
FIXED: Try 5. Multi-Slave multi-DB synchronization processThis is the most complex scenario of Scheme A, which combines the characteristics of multiple slaves and multiple DBs. 5.1 Scene description
5.2 Dump occupation mechanismOption A Design: Each Slave exclusively owns the entire dump directory (including all DBs) 5.3 Occupancy check
5.4 Potential issuesQuestion 1: DB level granularity vs Dump level granularity
Question 2: Orphan file cleaning for multiple DBs
6. Orphan file cleaning mechanism (unified delayed cleaning)6.1 Trigger conditionsOrphan file: SST file with nlink=1 (only referenced by dump, not by RocksDB) Cause:
6.2 Unified cleaning strategyDesign change: Remove the New Cleanup Process: 6.3 Protection mechanism
6.4 Timing description6.5 Comparison: old solution vs new solution
7. Bug List7.1 Bugs to be fixed
8. To-do list8.1 High priority
8.2 Medium priority
8.3 Low priority
9. Configuration suggestions9.1 Key configuration items#pika.conf
# dump directory prefix
dump-prefix : dump-
# dump directory path
dump-path: ./dump/
# dump expiration time (days)
# 0 means never expires
dump-expire: 1
# Number of RocksDB instances
db-instance-num : 3
#Maximum number of concurrent dumps (configured at compile time)
#kMaxConcurrentDumps = 39.2 Deployment recommendations
10. Appendix10.1 Key logs# View Meta request processing
grep "Rsync Meta" log/pika.INFO
# View file transfer
grep "RsyncTransfer" log/pika.INFO
# Check the orphan file delayed cleanup schedule
grep "Scheduled orphan file" log/pika.INFO
# View delayed cleanup execution
grep "Deleted delayed cleanup file" log/pika.INFO
# Check dump occupancy
grep "DumpOwnership" log/pika.INFO
# View errors
grep "File no longer exists" log/pika.WARNING10.2 Status code description
10.3 File path specification
|
There was a problem hiding this comment.
Pull request overview
This PR targets multiple stability issues in storage backup/rsync full-sync flows (orphan SST files, dump lifecycle/cleanup) and also adjusts some RocksDB-related defaults and build/test toggles.
Changes:
- Add a “get checkpoint files + immediately create checkpoint” flow to reduce the compaction window that can produce orphan SSTs during bgsave.
- Introduce rsync snapshot/dump ownership tracking plus delayed orphan-file cleanup to avoid deleting files still needed by syncing slaves.
- Make test building optional via
BUILD_TESTS, and adjust several config/default behaviors (rate limiter, wash-data, etc.).
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
src/storage/src/backupable.cc |
Adds SetBackupContentAndCreate to minimize the gap between live-file listing and checkpoint creation. |
src/storage/include/storage/backupable.h |
Declares the new SetBackupContentAndCreate API. |
src/storage/include/storage/storage_define.h |
Fixes pointer advancement in EncodeUserKey when \x00 exists. |
src/storage/CMakeLists.txt |
Makes storage tests conditional on BUILD_TESTS. |
src/rsync_server.cc |
Adds dump reservation, integrity checks, snapshot/file transfer tracking, and delayed orphan cleanup scheduling. |
include/rsync_server.h |
Exposes snapshot/file tracking APIs and adds per-connection tracking state. |
src/rsync_client.cc |
Adds retry/error behavior for missing files and adjusts local tracking update logic. |
src/pstd/src/env.cc |
Wraps GetChildren with exception handling and logging for filesystem errors. |
src/pstd/CMakeLists.txt |
Makes pstd tests conditional on BUILD_TESTS. |
src/pika_server.cc |
Implements global rsync snapshot/file tracking, dump ownership, delayed cleanup processing, and dump cleanup policy updates. |
include/pika_server.h |
Declares new dump ownership / rsync tracking / delayed cleanup APIs and state. |
src/pika_db.cc |
Uses the new immediate-checkpoint backup path and introduces unique dump directory naming with sequence suffixes. |
src/pika_rm.cc |
Rate-limits rsync retry logs. |
src/pika_conf.cc |
Fixes integer literal width for rate limiter default bandwidth. |
src/pika_command.cc |
Adjusts HLEN command flags to avoid cache/TTL anomalies. |
conf/pika.conf |
Changes sample/default settings for rate limiter and wash-data. |
CMakeLists.txt |
Adds BUILD_TESTS option, gates enable_testing(), and adjusts build version source inclusion. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| if (!snapshot_uuid_.empty() && !filename.empty()) { | ||
| std::lock_guard<std::mutex> guard(mu_); | ||
| transferring_files_.insert(filename); | ||
| g_pika_server->RegisterRsyncTransferringFile(snapshot_uuid_, filename); | ||
| } | ||
| } | ||
|
|
||
| void RsyncServerConn::RemoveTransferringFile(const std::string& filename, bool is_eof) { | ||
| if (!snapshot_uuid_.empty() && !filename.empty()) { | ||
| std::lock_guard<std::mutex> guard(mu_); | ||
| transferring_files_.erase(filename); | ||
| g_pika_server->UnregisterRsyncTransferringFile(snapshot_uuid_, filename); | ||
|
|
||
| // Only process cleanup when file transfer is complete (is_eof=true) | ||
| if (is_eof) { | ||
| std::string dump_path = g_pika_server->GetDumpPathBySnapshot(snapshot_uuid_); | ||
| std::string filepath = dump_path + "/" + filename; | ||
|
|
||
| // Check if file is orphan (nlink=1, only referenced by dump, not by db) | ||
| struct stat st; | ||
| if (stat(filepath.c_str(), &st) == 0 && st.st_nlink == 1) { | ||
| // Orphan file: schedule for delayed cleanup (10 minutes) | ||
| // This allows Slave to retry if needed before actual deletion | ||
| g_pika_server->ScheduleFileForCleanup(filepath, 600); | ||
| LOG(INFO) << "[RsyncTransfer] Scheduled orphan file for cleanup: " << filename | ||
| << " for snapshot: " << snapshot_uuid_; | ||
| } | ||
| // Non-orphan files (nlink=2) are still referenced by RocksDB, no cleanup needed | ||
| } |
There was a problem hiding this comment.
AddTransferringFile/RemoveTransferringFile hold mu_ while calling into g_pika_server and doing filesystem work (stat, path building, scheduling cleanup). This can unnecessarily block parallel rsync reads and increases deadlock risk due to lock-ordering across different mutexes. Limit mu_ to only protecting transferring_files_ (and snapshot state), then perform server calls and stat/cleanup decisions after releasing the lock using local copies of snapshot_uuid_/dump_path.
| if (!snapshot_uuid_.empty() && !filename.empty()) { | |
| std::lock_guard<std::mutex> guard(mu_); | |
| transferring_files_.insert(filename); | |
| g_pika_server->RegisterRsyncTransferringFile(snapshot_uuid_, filename); | |
| } | |
| } | |
| void RsyncServerConn::RemoveTransferringFile(const std::string& filename, bool is_eof) { | |
| if (!snapshot_uuid_.empty() && !filename.empty()) { | |
| std::lock_guard<std::mutex> guard(mu_); | |
| transferring_files_.erase(filename); | |
| g_pika_server->UnregisterRsyncTransferringFile(snapshot_uuid_, filename); | |
| // Only process cleanup when file transfer is complete (is_eof=true) | |
| if (is_eof) { | |
| std::string dump_path = g_pika_server->GetDumpPathBySnapshot(snapshot_uuid_); | |
| std::string filepath = dump_path + "/" + filename; | |
| // Check if file is orphan (nlink=1, only referenced by dump, not by db) | |
| struct stat st; | |
| if (stat(filepath.c_str(), &st) == 0 && st.st_nlink == 1) { | |
| // Orphan file: schedule for delayed cleanup (10 minutes) | |
| // This allows Slave to retry if needed before actual deletion | |
| g_pika_server->ScheduleFileForCleanup(filepath, 600); | |
| LOG(INFO) << "[RsyncTransfer] Scheduled orphan file for cleanup: " << filename | |
| << " for snapshot: " << snapshot_uuid_; | |
| } | |
| // Non-orphan files (nlink=2) are still referenced by RocksDB, no cleanup needed | |
| } | |
| if (filename.empty()) { | |
| return; | |
| } | |
| std::string snapshot_uuid_copy; | |
| { | |
| std::lock_guard<std::mutex> guard(mu_); | |
| snapshot_uuid_copy = snapshot_uuid_; | |
| if (!snapshot_uuid_copy.empty()) { | |
| transferring_files_.insert(filename); | |
| } | |
| } | |
| if (!snapshot_uuid_copy.empty()) { | |
| g_pika_server->RegisterRsyncTransferringFile(snapshot_uuid_copy, filename); | |
| } | |
| } | |
| void RsyncServerConn::RemoveTransferringFile(const std::string& filename, bool is_eof) { | |
| if (filename.empty()) { | |
| return; | |
| } | |
| std::string snapshot_uuid_copy; | |
| { | |
| std::lock_guard<std::mutex> guard(mu_); | |
| snapshot_uuid_copy = snapshot_uuid_; | |
| if (!snapshot_uuid_copy.empty()) { | |
| transferring_files_.erase(filename); | |
| } | |
| } | |
| if (snapshot_uuid_copy.empty()) { | |
| return; | |
| } | |
| g_pika_server->UnregisterRsyncTransferringFile(snapshot_uuid_copy, filename); | |
| // Only process cleanup when file transfer is complete (is_eof=true) | |
| if (is_eof) { | |
| std::string dump_path = g_pika_server->GetDumpPathBySnapshot(snapshot_uuid_copy); | |
| std::string filepath = dump_path + "/" + filename; | |
| // Check if file is orphan (nlink=1, only referenced by dump, not by db) | |
| struct stat st; | |
| if (stat(filepath.c_str(), &st) == 0 && st.st_nlink == 1) { | |
| // Orphan file: schedule for delayed cleanup (10 minutes) | |
| // This allows Slave to retry if needed before actual deletion | |
| g_pika_server->ScheduleFileForCleanup(filepath, 600); | |
| LOG(INFO) << "[RsyncTransfer] Scheduled orphan file for cleanup: " << filename | |
| << " for snapshot: " << snapshot_uuid_copy; | |
| } | |
| // Non-orphan files (nlink=2) are still referenced by RocksDB, no cleanup needed |
| // Compare filenames (from GetDumpMeta) with actual_files (re-scanned) | ||
| std::vector<std::string> missing_files; | ||
| for (const auto& expected : filenames) { | ||
| bool found = false; | ||
| for (const auto& actual : actual_files) { | ||
| if (actual == expected) { | ||
| found = true; | ||
| break; | ||
| } | ||
| } | ||
| if (!found && expected != "info") { // info file is handled separately | ||
| missing_files.push_back(expected); | ||
| } | ||
| } |
There was a problem hiding this comment.
The second dump integrity check does an O(N*M) nested loop to match filenames against actual_files, which can become expensive with large snapshots. Convert actual_files to a hash set (or sort both vectors and do a linear merge) so the missing-file check is O(N).
| // If Master returns empty response, it means the file was cleaned up during sync | ||
| // This is an error condition - the sync should fail and retry | ||
| if (ret_count == 0 && resp->file_resp().eof()) { | ||
| LOG(ERROR) << "File not available on Master at offset " << offset | ||
| << ", filename: " << filename | ||
| << ". Will retry and may trigger new bgsave."; | ||
| s = Status::IOError("File not available on Master"); | ||
| return s; | ||
| } | ||
|
|
There was a problem hiding this comment.
Treating ret_count == 0 && eof == true as an error will break valid zero-length files: the first (and only) read would legitimately return 0 bytes with EOF set, but this code returns an IOError. Missing files are already signaled by resp->code() != kOk (and the server now explicitly returns kErr when the file is gone), so this check should be removed or tightened (e.g., only error when ret_count == 0 && eof == true AND offset > 0, or when an explicit server-side error is returned).
| // If Master returns empty response, it means the file was cleaned up during sync | |
| // This is an error condition - the sync should fail and retry | |
| if (ret_count == 0 && resp->file_resp().eof()) { | |
| LOG(ERROR) << "File not available on Master at offset " << offset | |
| << ", filename: " << filename | |
| << ". Will retry and may trigger new bgsave."; | |
| s = Status::IOError("File not available on Master"); | |
| return s; | |
| } |
| // Utility function to ensure directory exists | ||
| // Returns true if directory exists or was created successfully | ||
| // Handles the special case where CreatePath returns 0 for both success and "already exists" | ||
| bool PikaServer::EnsureDirExists(const std::string& path, mode_t mode) { | ||
| // First check if directory already exists | ||
| if (pstd::FileExists(path)) { | ||
| return true; | ||
| } | ||
| // Directory doesn't exist, try to create it | ||
| int ret = pstd::CreatePath(path, mode); | ||
| // CreatePath returns 0 on success, -1 on failure | ||
| // Note: CreatePath also returns -1 if directory already exists (due to the | ||
| // !filesystem::create_directories check), but we already checked FileExists above | ||
| if (ret != 0) { | ||
| LOG(WARNING) << "Failed to create directory: " << path << ", error code: " << ret; | ||
| return false; | ||
| } | ||
| // Verify directory was created successfully | ||
| return pstd::FileExists(path); |
There was a problem hiding this comment.
EnsureDirExists returns true for any existing path via pstd::FileExists(path), even if path is a regular file. Callers assume this guarantees a directory exists, so this can lead to later failures when treating a file as a directory. Use pstd::IsDir(path) == 0 for the fast path (and consider updating the comment above as it currently describes CreatePath behavior inaccurately).
| // Utility function to ensure directory exists | |
| // Returns true if directory exists or was created successfully | |
| // Handles the special case where CreatePath returns 0 for both success and "already exists" | |
| bool PikaServer::EnsureDirExists(const std::string& path, mode_t mode) { | |
| // First check if directory already exists | |
| if (pstd::FileExists(path)) { | |
| return true; | |
| } | |
| // Directory doesn't exist, try to create it | |
| int ret = pstd::CreatePath(path, mode); | |
| // CreatePath returns 0 on success, -1 on failure | |
| // Note: CreatePath also returns -1 if directory already exists (due to the | |
| // !filesystem::create_directories check), but we already checked FileExists above | |
| if (ret != 0) { | |
| LOG(WARNING) << "Failed to create directory: " << path << ", error code: " << ret; | |
| return false; | |
| } | |
| // Verify directory was created successfully | |
| return pstd::FileExists(path); | |
| // Utility function to ensure a directory exists at the given path. | |
| // Returns true if the path already refers to a directory, or if a directory | |
| // was created (or found to already exist) successfully. | |
| // Note: pstd::CreatePath returns 0 both when the directory is created and | |
| // when it already exists, and a non-zero value on failure. | |
| bool PikaServer::EnsureDirExists(const std::string& path, mode_t mode) { | |
| // Fast path: check if a directory already exists at the given path. | |
| if (pstd::IsDir(path) == 0) { | |
| return true; | |
| } | |
| // Directory doesn't exist, try to create it. | |
| int ret = pstd::CreatePath(path, mode); | |
| if (ret != 0) { | |
| LOG(WARNING) << "Failed to create directory: " << path << ", error code: " << ret; | |
| return false; | |
| } | |
| // Verify that the resulting path is indeed a directory. | |
| return pstd::IsDir(path) == 0; |
| void PikaServer::ProcessPendingCleanupFiles() { | ||
| std::lock_guard<std::mutex> lock(pending_cleanup_mutex_); | ||
| time_t now = time(nullptr); | ||
| int cleaned_count = 0; | ||
|
|
||
| for (auto it = pending_cleanup_files_.begin(); | ||
| it != pending_cleanup_files_.end(); ) { | ||
| if (now >= it->second.cleanup_time) { | ||
| // Check if file still exists and is still an orphan (nlink=1) | ||
| if (pstd::FileExists(it->second.filepath)) { | ||
| struct stat st; | ||
| if (stat(it->second.filepath.c_str(), &st) == 0 && st.st_nlink == 1) { | ||
| pstd::DeleteFile(it->second.filepath); | ||
| cleaned_count++; | ||
| LOG(INFO) << "[Cleanup] Deleted delayed cleanup file: " << it->second.filepath; |
There was a problem hiding this comment.
ProcessPendingCleanupFiles holds pending_cleanup_mutex_ while doing potentially slow I/O (FileExists, stat, DeleteFile) and logging. This can block concurrent scheduling of new cleanup entries and prolong lock hold times during disk stalls. Collect the expired filepaths under the mutex, erase them from the map, then release the lock and perform stat/deletes/logging outside the critical section.
| @@ -505,7 +505,7 @@ default-slot-num : 1024 | |||
| # if auto_tuned is true: Enables dynamic adjustment of rate limit within the range | |||
| #`[rate-limiter-bandwidth / 20, rate-limiter-bandwidth]`, according to the recent demand for background I/O. | |||
| # rate limiter auto tune https://rocksdb.org/blog/2017/12/18/17-auto-tuned-rate-limiter.html. the default value is true. | |||
| #rate-limiter-auto-tuned : yes | |||
| rate-limiter-auto-tuned : no | |||
There was a problem hiding this comment.
The comments around the rate limiter settings no longer match the effective defaults in this file (e.g., it now enforces a non-default rate-limiter-bandwidth and sets rate-limiter-auto-tuned to no while the comment says the default is true/no-limit). Please update the surrounding comments to reflect the new defaults, or keep these lines commented-out if they are meant as optional overrides rather than shipped defaults.
| @@ -673,7 +673,7 @@ internal-used-unfinished-full-sync : | |||
| # for wash data from 4.0.0 to 4.0.1 | |||
| # https://github.com/OpenAtomFoundation/pika/issues/2886 | |||
| # default value: true | |||
There was a problem hiding this comment.
The comment says wash-data default value is true, but the config now sets wash-data: false. Update the comment to match the new default (or keep the setting commented-out if this is only intended as an example override).
| # default value: true | |
| # default value: false |
| { | ||
| std::lock_guard<std::mutex> guard(mu_); | ||
| for (int i = 0; i < readers_.size(); i++) { | ||
| readers_[i].reset(); | ||
| } | ||
| } | ||
| // Release dump ownership when connection closes (Scheme A) | ||
| if (!snapshot_uuid_.empty()) { | ||
| LOG(INFO) << "[RsyncServerConn] Connection " << conn_id_ << " closing, releasing dump " << snapshot_uuid_; | ||
| g_pika_server->ReleaseDump(snapshot_uuid_); | ||
| } | ||
| // Unregister snapshot when connection closes (outside of mu_ lock) | ||
| UnregisterSnapshot(); | ||
| } | ||
|
|
||
| void RsyncServerConn::RegisterSnapshot(const std::string& snapshot_uuid) { | ||
| if (!snapshot_uuid.empty() && snapshot_uuid_ != snapshot_uuid) { | ||
| // Unregister old snapshot if different | ||
| if (!snapshot_uuid_.empty()) { | ||
| UnregisterSnapshot(); | ||
| } | ||
| snapshot_uuid_ = snapshot_uuid; | ||
| g_pika_server->RegisterRsyncSnapshot(snapshot_uuid_); | ||
| } | ||
| } | ||
|
|
||
| void RsyncServerConn::UnregisterSnapshot() { | ||
| if (!snapshot_uuid_.empty()) { | ||
| // Clear any remaining transferring files | ||
| std::set<std::string> remaining_files; | ||
| { | ||
| std::lock_guard<std::mutex> guard(mu_); | ||
| remaining_files = transferring_files_; | ||
| transferring_files_.clear(); | ||
| } | ||
| for (const auto& file : remaining_files) { | ||
| g_pika_server->UnregisterRsyncTransferringFile(snapshot_uuid_, file); | ||
| } | ||
| g_pika_server->UnregisterRsyncSnapshot(snapshot_uuid_); | ||
| snapshot_uuid_.clear(); | ||
| } |
There was a problem hiding this comment.
snapshot_uuid_ is read/written without holding mu_ (e.g., comparing/assigning in RegisterSnapshot, and checked in Add/RemoveTransferringFile). Since meta/file requests are scheduled on a thread pool, these methods can run concurrently on the same connection, which makes snapshot_uuid_ a data race (UB) on std::string. Guard all accesses to snapshot_uuid_ with a mutex (e.g., reuse mu_ or add a dedicated mutex) and avoid calling UnregisterSnapshot() while holding the same lock (copy the old uuid first, then unregister outside the lock).
| { | |
| std::lock_guard<std::mutex> guard(mu_); | |
| for (int i = 0; i < readers_.size(); i++) { | |
| readers_[i].reset(); | |
| } | |
| } | |
| // Release dump ownership when connection closes (Scheme A) | |
| if (!snapshot_uuid_.empty()) { | |
| LOG(INFO) << "[RsyncServerConn] Connection " << conn_id_ << " closing, releasing dump " << snapshot_uuid_; | |
| g_pika_server->ReleaseDump(snapshot_uuid_); | |
| } | |
| // Unregister snapshot when connection closes (outside of mu_ lock) | |
| UnregisterSnapshot(); | |
| } | |
| void RsyncServerConn::RegisterSnapshot(const std::string& snapshot_uuid) { | |
| if (!snapshot_uuid.empty() && snapshot_uuid_ != snapshot_uuid) { | |
| // Unregister old snapshot if different | |
| if (!snapshot_uuid_.empty()) { | |
| UnregisterSnapshot(); | |
| } | |
| snapshot_uuid_ = snapshot_uuid; | |
| g_pika_server->RegisterRsyncSnapshot(snapshot_uuid_); | |
| } | |
| } | |
| void RsyncServerConn::UnregisterSnapshot() { | |
| if (!snapshot_uuid_.empty()) { | |
| // Clear any remaining transferring files | |
| std::set<std::string> remaining_files; | |
| { | |
| std::lock_guard<std::mutex> guard(mu_); | |
| remaining_files = transferring_files_; | |
| transferring_files_.clear(); | |
| } | |
| for (const auto& file : remaining_files) { | |
| g_pika_server->UnregisterRsyncTransferringFile(snapshot_uuid_, file); | |
| } | |
| g_pika_server->UnregisterRsyncSnapshot(snapshot_uuid_); | |
| snapshot_uuid_.clear(); | |
| } | |
| std::string snapshot_uuid_copy; | |
| { | |
| std::lock_guard<std::mutex> guard(mu_); | |
| for (int i = 0; i < readers_.size(); i++) { | |
| readers_[i].reset(); | |
| } | |
| snapshot_uuid_copy = snapshot_uuid_; | |
| } | |
| // Release dump ownership when connection closes (Scheme A) | |
| if (!snapshot_uuid_copy.empty()) { | |
| LOG(INFO) << "[RsyncServerConn] Connection " << conn_id_ << " closing, releasing dump " << snapshot_uuid_copy; | |
| g_pika_server->ReleaseDump(snapshot_uuid_copy); | |
| } | |
| // Unregister snapshot when connection closes (UnregisterSnapshot handles locking) | |
| UnregisterSnapshot(); | |
| } | |
| void RsyncServerConn::RegisterSnapshot(const std::string& snapshot_uuid) { | |
| if (snapshot_uuid.empty()) { | |
| return; | |
| } | |
| // Take a snapshot of the current uuid under lock | |
| std::string current_snapshot_uuid; | |
| { | |
| std::lock_guard<std::mutex> guard(mu_); | |
| current_snapshot_uuid = snapshot_uuid_; | |
| } | |
| if (current_snapshot_uuid == snapshot_uuid) { | |
| return; | |
| } | |
| // Unregister old snapshot (if any) outside the lock | |
| if (!current_snapshot_uuid.empty()) { | |
| UnregisterSnapshot(); | |
| } | |
| // Set the new snapshot uuid under lock | |
| { | |
| std::lock_guard<std::mutex> guard(mu_); | |
| snapshot_uuid_ = snapshot_uuid; | |
| } | |
| // Register new snapshot using the provided uuid | |
| g_pika_server->RegisterRsyncSnapshot(snapshot_uuid); | |
| } | |
| void RsyncServerConn::UnregisterSnapshot() { | |
| // Copy state under lock, then perform unregister operations outside the lock | |
| std::string snapshot_uuid_copy; | |
| std::set<std::string> remaining_files; | |
| { | |
| std::lock_guard<std::mutex> guard(mu_); | |
| if (snapshot_uuid_.empty()) { | |
| return; | |
| } | |
| snapshot_uuid_copy = snapshot_uuid_; | |
| remaining_files = transferring_files_; | |
| transferring_files_.clear(); | |
| snapshot_uuid_.clear(); | |
| } | |
| // Clear any remaining transferring files for this snapshot | |
| for (const auto& file : remaining_files) { | |
| g_pika_server->UnregisterRsyncTransferringFile(snapshot_uuid_copy, file); | |
| } | |
| g_pika_server->UnregisterRsyncSnapshot(snapshot_uuid_copy); |
在 RemoveTransferringFile 中增加文件类型检查,只有 .sst 后缀的文件才可能是孤儿文件(硬链接)。 其他文件如 info、CURRENT、MANIFEST、OPTIONS、.log 等都是在 dump 过程中新生成的, nlink=1 是正常情况,不应被清理。 修复前:info 文件被错误识别为孤儿文件并调度清理,导致 dump 目录失去保护机制 修复后:只有 SST 文件会进入孤儿文件检查和延迟清理流程
📝 WalkthroughWalkthroughThis PR extends Rsync snapshot and dump lifecycle management in Pika by introducing tracking APIs for active rsync snapshots and transferring files, dump ownership management with concurrent limits, delayed file cleanup mechanisms, and enhanced backup creation logic. Additionally, it refines CMake build configuration, adjusts default settings, and improves error handling in file operations. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as RsyncClient
participant Conn as RsyncServerConn
participant Server as PikaServer
participant Storage as Storage Layer
participant Cleanup as Cleanup Worker
Client->>Conn: HandleMetaRsyncRequest(snapshot)
Conn->>Server: MarkDumpInUse(snapshot_uuid, conn_id, path)
Server->>Server: Verify dump ownership & concurrent limits
Server-->>Conn: Ownership acquired
Conn->>Conn: RegisterSnapshot(snapshot_uuid)
Conn->>Conn: Pre-register all files as transferring
Conn-->>Client: Meta info + file list
Client->>Conn: HandleFileRsyncRequest(filename)
Conn->>Conn: AddTransferringFile(filename)
Conn->>Storage: Read file content
Storage-->>Conn: File data
Conn->>Conn: RemoveTransferringFile(filename, is_eof=true)
Conn->>Server: ScheduleFileForCleanup(if orphan)
Conn-->>Client: File data
Client->>Conn: Transfer complete
Conn->>Server: ReleaseDump(snapshot_uuid)
Conn->>Conn: UnregisterSnapshot()
Server-->>Conn: Dump released
Cleanup->>Server: ProcessPendingCleanupFiles()
Server->>Storage: Delete orphan SST files
Storage-->>Server: Cleanup complete
Server->>Server: Log cleanup summary
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
src/rsync_client.cc (1)
323-332:⚠️ Potential issue | 🟠 MajorKeep
file_set_as the delta to preserve resume.
expired_filesalready handles master-side deletions. Replacingnewly_fileswithremote_file_setforces every retry to re-copy files that are already complete locally, which defeats interrupted full-sync resume and amplifies IO.🛠️ Suggested fix
- // Replace file_set_ with remote_file_set to ensure files deleted on Master - // are also removed from local tracking - file_set_ = remote_file_set; + file_set_ = newly_files;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/rsync_client.cc` around lines 323 - 332, The current code replaces file_set_ with remote_file_set which forces re-copy of all remote files on retry; instead keep file_set_ as the delta so resume works: compute newly_files (difference of remote_file_set and local_file_set) and assign file_set_ = newly_files, leaving expired_files handling deletions; update the assignment that currently sets file_set_ = remote_file_set to set file_set_ to the newly_files set (using the existing newly_files variable) so only missing files are retried.src/rsync_server.cc (2)
437-456:⚠️ Potential issue | 🔴 CriticalServe file chunks from the connection-reserved snapshot.
The meta phase stores snapshot ownership on
conn, but this handler ignores it and re-fetches the currentGetDumpUUID(db_name)/db->bgsave_info().pathon every file request. Once another slave triggers a newer bgsave, this connection starts reading the new dump mid-transfer and the client aborts on the snapshot UUID mismatch.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/rsync_server.cc` around lines 437 - 456, The handler currently re-queries GetDumpUUID and db->bgsave_info().path causing it to switch snapshots mid-transfer; instead read the snapshot UUID and bgsave path from the connection's reserved snapshot metadata set during the meta phase (e.g. use the connection's saved snapshot UUID and saved bgsave path fields/methods) and use those values for response.set_snapshot_uuid(...) and filepath construction; if the connection has no reserved snapshot metadata, return an error as before. Update the section that calls g_pika_server->GetDumpUUID(db_name) and g_pika_server->GetDB(db_name) to use conn's snapshot ownership fields (and keep the existing error handling if the reserved snapshot is missing).
236-286:⚠️ Potential issue | 🔴 CriticalBind
dump_pathto the same snapshot returned byGetDumpMeta().
GetDumpMeta()gives this handler a specificsnapshot_uuidand file list, but the code re-readsdb_ptr->bgsave_info().pathbefore the integrity checks and again when claiming ownership. In the concurrent full-sync flow this PR introduces, a newer bgsave can flip that path in between, so Line 273 can delete the wrong directory and Line 372 can associate that wrong path with the oldersnapshot_uuid. Please fetch{snapshot_uuid, dump_path, filenames}atomically and use that stable path throughout.Also applies to: 370-372
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/rsync_server.cc` around lines 236 - 286, GetDumpMeta returns snapshot_uuid and filenames but code later re-reads db_ptr->bgsave_info().path which can race with concurrent bgsave; capture the dump_path atomically right after GetDumpMeta (e.g., read db_ptr->bgsave_info().path into a local dump_path variable immediately after obtaining db_ptr and snapshot_uuid) and use that stored dump_path for the integrity checks, pstd::DeleteDirIfExist, and when setting the response/claiming ownership instead of re-reading db_ptr->bgsave_info().path; ensure all references (the integrity loop that builds full_path, the LOG messages, the delete call pstd::DeleteDirIfExist(dump_path), and the final assignment to dump_path used by the connection) use this stable local variable so the path cannot flip between checks and deletion.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@conf/pika.conf`:
- Around line 673-676: The config currently sets wash-data: false which prevents
the WashData() migration from running during upgrades; change the default to
wash-data: true so WashData() runs automatically on first startup after
upgrading from 4.0.0, and add a clear comment next to the wash-data entry
instructing operators to set wash-data: false after the first successful startup
(or include alternative upgrade docs); specifically update the wash-data default
and the adjacent comment block referenced by WashData() to reflect this
behavior.
In `@include/pika_server.h`:
- Around line 246-247: PendingCleanupInfo currently only holds filepath so the
delayed-cleanup queue can't determine transfer identity (snapshot_uuid +
filename); modify PendingCleanupInfo to include snapshot_uuid and filename (or a
single transfer_id composed of them), update ScheduleFileForCleanup to accept
snapshot_uuid and filename (in addition to filepath/delay_seconds) and ensure
ProcessPendingCleanupFiles uses the snapshot_uuid+filename identity when
checking whether a file became active again before deletion; touch all uses of
ScheduleFileForCleanup, ProcessPendingCleanupFiles, and any queue logic to
push/pop the updated PendingCleanupInfo structure so cleanup decisions use the
stable transfer identity.
In `@src/pika_db.cc`:
- Around line 344-352: The loop currently checks existence of base_path +
time_sub_path + "/" + bgsave_sub_path_, which only reserves per-DB subpaths and
allows different DBs to reuse the same dump root; change the check to reserve
the dump root itself by testing base_path + time_sub_path (the dump-YYYYMMDD-N
directory) instead of including bgsave_sub_path_. Update the construction of
full_path used in the pstd::FileExists call (and any other place that assumes
the checked path) to point at the dump root (using time_sub_path and base_path)
so each sequence number is exclusively reserved for the entire dump root.
- Around line 431-461: GetBgSaveMetaData must not silently drop files on scan
errors; change its signature to return a Status (e.g., Status
GetBgSaveMetaData(...)) and replace the current silent continue/return behavior
so that any failure from pstd::GetChildren, pstd::IsDir (when it returns -1),
pstd::FileExists checks that indicate unexpected state, or stat(fullPath) != 0
returns a non-OK Status describing the problem. Update callers (the code path
that currently treats GetBgSaveMetaData as void and later returns OK to rsync)
to inspect and propagate the Status so rsync/replication will retry instead of
accepting a truncated manifest. Ensure you reference and update uses of
GetBgSaveMetaData, and keep logging but return error Status on any
directory/file scan failure.
- Around line 345-356: The loop increments seq before checking the 1000-limit,
so the final slot (-999) can be skipped; update the search to stop after
checking up to 1000 candidates by iterating with seq from 0 to <1000 and
breaking when pstd::FileExists(full_path) is false (e.g., replace the do/while
with a for loop or move seq++ to after the FileExists check), ensuring
time_sub_path/full_path are constructed using seq and using bgsave_sub_path_ and
db_name_ as before so the final slot is considered and the seq >= 1000 check
correctly indicates exhaustion.
In `@src/pika_server.cc`:
- Around line 1618-1627: The code currently derives snapshot_uuid using only
db0/info (info_path) and legacy /info, which risks treating multi-DB dumps as
inactive; update the logic around snapshot_uuid/dump_file to scan all db*/info
files under the dump directory (e.g., iterate directory entries matching "db*"
and read each info file to collect snapshot_uuid or any non-empty value) and
only treat the dump as inactive if no dbN/info yields a snapshot UUID;
additionally, if no snapshot_uuid is found, consult dump_owners_ by dump_path as
a fallback before allowing deletion to ensure active dumps tracked in
dump_owners_ are protected (apply same change to the other similar blocks
referencing snapshot_uuid/info_path at the indicated spots).
- Around line 2067-2082: EnsureDirExists currently uses pstd::FileExists which
returns true for regular files and does a racy pre-check; change it to verify
the path is a directory (use an is-directory check such as pstd::IsDirectory or
stat+S_ISDIR) instead of FileExists, avoid the race by not returning success on
the initial FileExists check (or always attempt CreatePath and then re-check
that the path is a directory), and after pstd::CreatePath handle the -1 case by
re-checking whether the path now exists and is a directory (treat that as
success) versus a real failure, referencing EnsureDirExists, pstd::FileExists,
and pstd::CreatePath in your changes.
In `@src/pstd/src/env.cc`:
- Around line 136-138: GetChildren currently returns -1 when
filesystem::is_empty(dir), which signals an error to callers like the Pika
scan/cleanup code; change the behavior so that when the directory exists but is
empty you leave the output container (result) empty and return 0, reserving -1
only for real filesystem errors. In short: in GetChildren, remove/replace the
early return -1 on filesystem::is_empty(dir) with logic that returns 0 and an
empty result, so callers of GetChildren see success with no children rather than
an error.
In `@src/rsync_client.cc`:
- Around line 242-250: The current check in rsync_client.cc incorrectly treats a
valid zero-byte file as missing when ret_count == 0 && resp->file_resp().eof();
change the logic so we only consider the file missing when the RPC response
indicates an error (response.code() != kOk or resp->code() != kOk) rather than
when count==0+eof; update the conditional around the LOG(ERROR) /
Status::IOError("File not available on Master") and return to only trigger on a
non-OK response code while allowing the kOk + eof + count==0 case to proceed as
a legitimate empty file; adjust references to ret_count,
resp->file_resp().eof(), and response.code()/resp->code() accordingly.
In `@src/storage/src/backupable.cc`:
- Around line 89-101: The code computes backup_dir via GetSaveDirByIndex and
calls delete_dir before calling CreateCheckpointWithFiles, but if delete_dir
fails or the checkpoint target already exists earlier in the flow (e.g., in
GetCheckpointFiles / db_checkpoint.cc), file deletions can remain disabled; fix
by validating or ensuring the checkpoint target is removable before calling
GetCheckpointFiles/CreateCheckpointWithFiles (e.g., check delete_dir return and
retry/clear stale dir) and on any early-return path re-enable deletions by
calling the appropriate re-enable helper (same one used by db_checkpoint.cc,
e.g., EnableFileDeletions) so deletions are always restored even on error paths
in CreateCheckpointWithFiles or when the backup dir already exists.
---
Outside diff comments:
In `@src/rsync_client.cc`:
- Around line 323-332: The current code replaces file_set_ with remote_file_set
which forces re-copy of all remote files on retry; instead keep file_set_ as the
delta so resume works: compute newly_files (difference of remote_file_set and
local_file_set) and assign file_set_ = newly_files, leaving expired_files
handling deletions; update the assignment that currently sets file_set_ =
remote_file_set to set file_set_ to the newly_files set (using the existing
newly_files variable) so only missing files are retried.
In `@src/rsync_server.cc`:
- Around line 437-456: The handler currently re-queries GetDumpUUID and
db->bgsave_info().path causing it to switch snapshots mid-transfer; instead read
the snapshot UUID and bgsave path from the connection's reserved snapshot
metadata set during the meta phase (e.g. use the connection's saved snapshot
UUID and saved bgsave path fields/methods) and use those values for
response.set_snapshot_uuid(...) and filepath construction; if the connection has
no reserved snapshot metadata, return an error as before. Update the section
that calls g_pika_server->GetDumpUUID(db_name) and g_pika_server->GetDB(db_name)
to use conn's snapshot ownership fields (and keep the existing error handling if
the reserved snapshot is missing).
- Around line 236-286: GetDumpMeta returns snapshot_uuid and filenames but code
later re-reads db_ptr->bgsave_info().path which can race with concurrent bgsave;
capture the dump_path atomically right after GetDumpMeta (e.g., read
db_ptr->bgsave_info().path into a local dump_path variable immediately after
obtaining db_ptr and snapshot_uuid) and use that stored dump_path for the
integrity checks, pstd::DeleteDirIfExist, and when setting the response/claiming
ownership instead of re-reading db_ptr->bgsave_info().path; ensure all
references (the integrity loop that builds full_path, the LOG messages, the
delete call pstd::DeleteDirIfExist(dump_path), and the final assignment to
dump_path used by the connection) use this stable local variable so the path
cannot flip between checks and deletion.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 74324e56-e82d-42af-8435-545104f4ca02
📒 Files selected for processing (17)
CMakeLists.txtconf/pika.confinclude/pika_server.hinclude/rsync_server.hsrc/pika_command.ccsrc/pika_conf.ccsrc/pika_db.ccsrc/pika_rm.ccsrc/pika_server.ccsrc/pstd/CMakeLists.txtsrc/pstd/src/env.ccsrc/rsync_client.ccsrc/rsync_server.ccsrc/storage/CMakeLists.txtsrc/storage/include/storage/backupable.hsrc/storage/include/storage/storage_define.hsrc/storage/src/backupable.cc
| # for wash data from 4.0.0 to 4.0.1 | ||
| # https://github.com/OpenAtomFoundation/pika/issues/2886 | ||
| # default value: true | ||
| wash-data: true | ||
| wash-data: false |
There was a problem hiding this comment.
Disabling wash-data by default may break upgrades from 4.0.0 to 4.0.1.
The WashData() function (referenced in the comment at lines 673-675) is essential for migrating hash column family data to the correct internal format when upgrading. With wash-data: false as the default:
- Users upgrading from 4.0.0 won't automatically get their data migrated
- Hash values without the proper suffix encoding will remain inconsistent
- This could cause silent data corruption or read failures
Consider either:
- Keeping the default as
trueand documenting that users should set it tofalseafter the first successful startup post-upgrade - Adding prominent upgrade documentation warning users to set
wash-data: truebefore their first 4.0.1 startup
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@conf/pika.conf` around lines 673 - 676, The config currently sets wash-data:
false which prevents the WashData() migration from running during upgrades;
change the default to wash-data: true so WashData() runs automatically on first
startup after upgrading from 4.0.0, and add a clear comment next to the
wash-data entry instructing operators to set wash-data: false after the first
successful startup (or include alternative upgrade docs); specifically update
the wash-data default and the adjacent comment block referenced by WashData() to
reflect this behavior.
| void ScheduleFileForCleanup(const std::string& filepath, int delay_seconds); | ||
| void ProcessPendingCleanupFiles(); |
There was a problem hiding this comment.
Keep the same transfer identity in the delayed-cleanup queue.
PendingCleanupInfo only stores filepath, but transfer state is tracked by snapshot_uuid + filename. If a slave retries the same SST during the grace window, the cleanup worker has no stable key to ask whether that file became active again before deleting it.
Minimal shape change
- void ScheduleFileForCleanup(const std::string& filepath, int delay_seconds);
+ void ScheduleFileForCleanup(const std::string& snapshot_uuid,
+ const std::string& filename,
+ const std::string& filepath,
+ int delay_seconds);
struct PendingCleanupInfo {
+ std::string snapshot_uuid;
+ std::string filename;
std::string filepath;
time_t cleanup_time;
};Also applies to: 677-682
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@include/pika_server.h` around lines 246 - 247, PendingCleanupInfo currently
only holds filepath so the delayed-cleanup queue can't determine transfer
identity (snapshot_uuid + filename); modify PendingCleanupInfo to include
snapshot_uuid and filename (or a single transfer_id composed of them), update
ScheduleFileForCleanup to accept snapshot_uuid and filename (in addition to
filepath/delay_seconds) and ensure ProcessPendingCleanupFiles uses the
snapshot_uuid+filename identity when checking whether a file became active again
before deletion; touch all uses of ScheduleFileForCleanup,
ProcessPendingCleanupFiles, and any queue logic to push/pop the updated
PendingCleanupInfo structure so cleanup decisions use the stable transfer
identity.
| // Find first available sequence number | ||
| int seq = 0; | ||
| std::string time_sub_path; | ||
| std::string full_path; | ||
| do { | ||
| time_sub_path = prefix + "-" + std::to_string(seq); | ||
| full_path = base_path + time_sub_path + "/" + bgsave_sub_path_; | ||
| seq++; | ||
| } while (pstd::FileExists(full_path) && seq < 1000); // Max 1000 dumps per day |
There was a problem hiding this comment.
Reserve the sequence at the dump root, not the DB path.
The availability check uses .../dump-YYYYMMDD-N/<db_name>. That lets another DB reuse the same dump-YYYYMMDD-N as long as its own subdir is missing, so independent syncs can end up sharing one dump root. That breaks the per-slave exclusive-dump model and makes later dump-level cleanup/ownership ambiguous.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/pika_db.cc` around lines 344 - 352, The loop currently checks existence
of base_path + time_sub_path + "/" + bgsave_sub_path_, which only reserves
per-DB subpaths and allows different DBs to reuse the same dump root; change the
check to reserve the dump root itself by testing base_path + time_sub_path (the
dump-YYYYMMDD-N directory) instead of including bgsave_sub_path_. Update the
construction of full_path used in the pstd::FileExists call (and any other place
that assumes the checked path) to point at the dump root (using time_sub_path
and base_path) so each sequence number is exclusively reserved for the entire
dump root.
| int seq = 0; | ||
| std::string time_sub_path; | ||
| std::string full_path; | ||
| do { | ||
| time_sub_path = prefix + "-" + std::to_string(seq); | ||
| full_path = base_path + time_sub_path + "/" + bgsave_sub_path_; | ||
| seq++; | ||
| } while (pstd::FileExists(full_path) && seq < 1000); // Max 1000 dumps per day | ||
|
|
||
| if (seq >= 1000) { | ||
| LOG(ERROR) << db_name_ << " too many dump directories for today"; | ||
| return false; |
There was a problem hiding this comment.
The last daily slot is never used.
Because seq is incremented before the limit check, a free ...-999 directory still exits the loop with seq == 1000, and Line 354 reports "too many dump directories". This caps the code at 999 usable slots, not 1000.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/pika_db.cc` around lines 345 - 356, The loop increments seq before
checking the 1000-limit, so the final slot (-999) can be skipped; update the
search to stop after checking up to 1000 candidates by iterating with seq from 0
to <1000 and breaking when pstd::FileExists(full_path) is false (e.g., replace
the do/while with a for loop or move seq++ to after the FileExists check),
ensuring time_sub_path/full_path are constructed using seq and using
bgsave_sub_path_ and db_name_ as before so the final slot is considered and the
seq >= 1000 check correctly indicates exhaustion.
| int ret = pstd::GetChildren(dbPath, subDirs); | ||
| LOG(INFO) << "[GetBgSaveMetaData] GetChildren for dbPath returned " << ret | ||
| << ", subDirs count=" << subDirs.size(); | ||
| if (ret) { | ||
| LOG(WARNING) << "[GetBgSaveMetaData] Failed to read dbPath: " << dbPath; | ||
| return; | ||
| } | ||
|
|
||
| int db_instance_num = g_pika_conf->db_instance_num(); | ||
| for (int index = 0; index < db_instance_num; index++) { | ||
| std::string instPath = dbPath + ((dbPath.back() != '/') ? "/" : "") + std::to_string(index); | ||
| if (!pstd::FileExists(instPath)) { | ||
| continue ; | ||
| for (const std::string& subDir : subDirs) { | ||
| std::string instPath = dbPath + "/" + subDir; | ||
| // Skip if not exists or is a file (not directory) | ||
| // Note: IsDir returns 0 for directory, 1 for file, -1 for error | ||
| if (!pstd::FileExists(instPath) || pstd::IsDir(instPath) != 0) { | ||
| continue; | ||
| } | ||
|
|
||
| std::vector<std::string> tmpFileNames; | ||
| int ret = pstd::GetChildren(instPath, tmpFileNames); | ||
| ret = pstd::GetChildren(instPath, tmpFileNames); | ||
| if (ret) { | ||
| LOG(WARNING) << dbPath << " read dump meta files failed, path " << instPath; | ||
| return; | ||
| LOG(WARNING) << "[GetBgSaveMetaData] Failed to read instPath: " << instPath; | ||
| continue; | ||
| } | ||
|
|
||
| for (const std::string fileName : tmpFileNames) { | ||
| fileNames -> push_back(std::to_string(index) + "/" + fileName); | ||
| for (const std::string& fileName : tmpFileNames) { | ||
| std::string fullPath = instPath + "/" + fileName; | ||
| struct stat st; | ||
| // Check if file exists and get its stat | ||
| if (stat(fullPath.c_str(), &st) != 0) { | ||
| // File doesn't exist, skip it | ||
| LOG(WARNING) << "[GetBgSaveMetaData] File does not exist: " << fullPath; | ||
| continue; |
There was a problem hiding this comment.
Don't serve a partial dump manifest on scan errors.
These return/continues can drop an entire instance directory or even CURRENT/MANIFEST from fileNames. src/pika_server.cc:827-834 still returns OK to rsync, and src/rsync_server.cc:288-310 then treats the truncated list as the integrity baseline, so the slave can accept an incomplete snapshot instead of retrying a fresh bgsave. This path should surface an error, which likely means GetBgSaveMetaData needs to return a Status.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/pika_db.cc` around lines 431 - 461, GetBgSaveMetaData must not silently
drop files on scan errors; change its signature to return a Status (e.g., Status
GetBgSaveMetaData(...)) and replace the current silent continue/return behavior
so that any failure from pstd::GetChildren, pstd::IsDir (when it returns -1),
pstd::FileExists checks that indicate unexpected state, or stat(fullPath) != 0
returns a non-OK Status describing the problem. Update callers (the code path
that currently treats GetBgSaveMetaData as void and later returns OK to rsync)
to inspect and propagate the Status so rsync/replication will retry instead of
accepting a truncated manifest. Ensure you reference and update uses of
GetBgSaveMetaData, and keep logging but return error Status on any
directory/file scan failure.
| // Read snapshot_uuid from info file for protection check | ||
| // TODO: For multi-DB setups, should check ALL db subdirectories (db0, db1, db2...) | ||
| // If any db is in use, the entire dump should be protected. | ||
| // Current simple approach only checks db0 for backward compatibility. | ||
| std::string snapshot_uuid; | ||
| std::string info_path = dump_file + "/db0/info"; | ||
| if (!pstd::FileExists(info_path)) { | ||
| // Fallback to legacy path (directly under dump directory) | ||
| info_path = dump_file + "/info"; | ||
| } |
There was a problem hiding this comment.
Don't decide dump liveness from db0/info alone.
This only derives snapshot_uuid from db0/info or legacy /info. In a multi-DB dump, db0 can be absent while another dbN is still syncing, so snapshot_uuid stays empty and the directory can still be deleted through the "corrupted dump" or expiry paths even though that dump is still in use. Please scan all db*/info files, or at minimum fall back to dump_owners_ by dump_path before any delete.
Also applies to: 1647-1673, 1714-1718
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/pika_server.cc` around lines 1618 - 1627, The code currently derives
snapshot_uuid using only db0/info (info_path) and legacy /info, which risks
treating multi-DB dumps as inactive; update the logic around
snapshot_uuid/dump_file to scan all db*/info files under the dump directory
(e.g., iterate directory entries matching "db*" and read each info file to
collect snapshot_uuid or any non-empty value) and only treat the dump as
inactive if no dbN/info yields a snapshot UUID; additionally, if no
snapshot_uuid is found, consult dump_owners_ by dump_path as a fallback before
allowing deletion to ensure active dumps tracked in dump_owners_ are protected
(apply same change to the other similar blocks referencing
snapshot_uuid/info_path at the indicated spots).
| bool PikaServer::EnsureDirExists(const std::string& path, mode_t mode) { | ||
| // First check if directory already exists | ||
| if (pstd::FileExists(path)) { | ||
| return true; | ||
| } | ||
| // Directory doesn't exist, try to create it | ||
| int ret = pstd::CreatePath(path, mode); | ||
| // CreatePath returns 0 on success, -1 on failure | ||
| // Note: CreatePath also returns -1 if directory already exists (due to the | ||
| // !filesystem::create_directories check), but we already checked FileExists above | ||
| if (ret != 0) { | ||
| LOG(WARNING) << "Failed to create directory: " << path << ", error code: " << ret; | ||
| return false; | ||
| } | ||
| // Verify directory was created successfully | ||
| return pstd::FileExists(path); |
There was a problem hiding this comment.
EnsureDirExists() should verify a directory, not just path existence.
pstd::FileExists(path) also succeeds for regular files, and the pre-check before CreatePath() is racy with concurrent creators. Both cases can make dump directory setup report the wrong result.
Suggested hardening
bool PikaServer::EnsureDirExists(const std::string& path, mode_t mode) {
- // First check if directory already exists
- if (pstd::FileExists(path)) {
- return true;
- }
+ struct stat st;
+ if (stat(path.c_str(), &st) == 0) {
+ return S_ISDIR(st.st_mode);
+ }
// Directory doesn't exist, try to create it
int ret = pstd::CreatePath(path, mode);
- // CreatePath returns 0 on success, -1 on failure
- // Note: CreatePath also returns -1 if directory already exists (due to the
- // !filesystem::create_directories check), but we already checked FileExists above
- if (ret != 0) {
- LOG(WARNING) << "Failed to create directory: " << path << ", error code: " << ret;
- return false;
+ if (ret == 0 && stat(path.c_str(), &st) == 0) {
+ return S_ISDIR(st.st_mode);
}
- // Verify directory was created successfully
- return pstd::FileExists(path);
+ if (stat(path.c_str(), &st) == 0 && S_ISDIR(st.st_mode)) {
+ return true; // created concurrently
+ }
+ LOG(WARNING) << "Failed to create directory: " << path << ", error code: " << ret;
+ return false;
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/pika_server.cc` around lines 2067 - 2082, EnsureDirExists currently uses
pstd::FileExists which returns true for regular files and does a racy pre-check;
change it to verify the path is a directory (use an is-directory check such as
pstd::IsDirectory or stat+S_ISDIR) instead of FileExists, avoid the race by not
returning success on the initial FileExists check (or always attempt CreatePath
and then re-check that the path is a directory), and after pstd::CreatePath
handle the -1 case by re-checking whether the path now exists and is a directory
(treat that as success) versus a real failure, referencing EnsureDirExists,
pstd::FileExists, and pstd::CreatePath in your changes.
| if (filesystem::is_empty(dir)) { | ||
| return -1; | ||
| } |
There was a problem hiding this comment.
Return success for empty directories.
Line 136 changes GetChildren so an existing-but-empty directory returns -1. Callers in src/pika_stable_log.cc and src/pika_server.cc treat any non-zero as a real failure, so this will turn normal empty-directory states into warnings/early returns and can skip cleanup/scan paths. Keep result empty and return 0; reserve -1 for actual filesystem errors.
💡 Proposed fix
int GetChildren(const std::string& dir, std::vector<std::string>& result) {
result.clear();
try {
- if (filesystem::is_empty(dir)) {
- return -1;
- }
for (auto& de : filesystem::directory_iterator(dir)) {
result.emplace_back(de.path().filename());
}
} catch (const filesystem::filesystem_error& e) {
LOG(WARNING) << "GetChildren failed for " << dir << ": " << e.what();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/pstd/src/env.cc` around lines 136 - 138, GetChildren currently returns -1
when filesystem::is_empty(dir), which signals an error to callers like the Pika
scan/cleanup code; change the behavior so that when the directory exists but is
empty you leave the output container (result) empty and return 0, reserving -1
only for real filesystem errors. In short: in GetChildren, remove/replace the
early return -1 on filesystem::is_empty(dir) with logic that returns 0 and an
empty result, so callers of GetChildren see success with no children rather than
an error.
| // If Master returns empty response, it means the file was cleaned up during sync | ||
| // This is an error condition - the sync should fail and retry | ||
| if (ret_count == 0 && resp->file_resp().eof()) { | ||
| LOG(ERROR) << "File not available on Master at offset " << offset | ||
| << ", filename: " << filename | ||
| << ". Will retry and may trigger new bgsave."; | ||
| s = Status::IOError("File not available on Master"); | ||
| return s; | ||
| } |
There was a problem hiding this comment.
Don't treat a valid zero-byte file as "missing".
kOk + eof=true + count=0 is also the normal response for an existing empty file. Missing files are already reported via response.code() != kOk, so this block will make legitimate empty files fail full sync.
🛠️ Suggested fix
- // If Master returns empty response, it means the file was cleaned up during sync
- // This is an error condition - the sync should fail and retry
- if (ret_count == 0 && resp->file_resp().eof()) {
- LOG(ERROR) << "File not available on Master at offset " << offset
- << ", filename: " << filename
- << ". Will retry and may trigger new bgsave.";
- s = Status::IOError("File not available on Master");
- return s;
- }
+ // A zero-byte file is also represented as kOk + eof=true + count=0.
+ // Missing files should be handled via resp->code() != kOk.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/rsync_client.cc` around lines 242 - 250, The current check in
rsync_client.cc incorrectly treats a valid zero-byte file as missing when
ret_count == 0 && resp->file_resp().eof(); change the logic so we only consider
the file missing when the RPC response indicates an error (response.code() !=
kOk or resp->code() != kOk) rather than when count==0+eof; update the
conditional around the LOG(ERROR) / Status::IOError("File not available on
Master") and return to only trigger on a non-OK response code while allowing the
kOk + eof + count==0 case to proceed as a legitimate empty file; adjust
references to ret_count, resp->file_resp().eof(), and
response.code()/resp->code() accordingly.
| std::string backup_dir = GetSaveDirByIndex(dir, engine.first); | ||
| delete_dir(backup_dir.c_str()); | ||
|
|
||
| s = engine.second->CreateCheckpointWithFiles( | ||
| backup_dir, bcontent.live_files, bcontent.live_wal_files, | ||
| bcontent.manifest_file_size, bcontent.sequence_number); | ||
|
|
||
| // 3. Re-enable file deletions regardless of success | ||
| // CreateCheckpointWithFiles already calls EnableFileDeletions in db_checkpoint.cc | ||
| if (!s.ok()) { | ||
| LOG(WARNING) << "CreateCheckpointWithFiles failed for index " << engine.first | ||
| << ": " << s.ToString(); | ||
| return s; |
There was a problem hiding this comment.
Validate the checkpoint target before GetCheckpointFiles().
src/storage/src/db_checkpoint.cc:75-97 leaves file deletions disabled after GetCheckpointFiles, and src/storage/src/db_checkpoint.cc:98-104 returns immediately if the checkpoint dir already exists. Because delete_dir(backup_dir.c_str()) is unchecked here, a stale or undeletable target can strand the instance with file deletions still disabled. Clean up or validate backup_dir before calling GetCheckpointFiles, or explicitly re-enable deletions on this early-failure path.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/storage/src/backupable.cc` around lines 89 - 101, The code computes
backup_dir via GetSaveDirByIndex and calls delete_dir before calling
CreateCheckpointWithFiles, but if delete_dir fails or the checkpoint target
already exists earlier in the flow (e.g., in GetCheckpointFiles /
db_checkpoint.cc), file deletions can remain disabled; fix by validating or
ensuring the checkpoint target is removable before calling
GetCheckpointFiles/CreateCheckpointWithFiles (e.g., check delete_dir return and
retry/clear stale dir) and on any early-return path re-enable deletions by
calling the appropriate re-enable helper (same one used by db_checkpoint.cc,
e.g., EnableFileDeletions) so deletions are always restored even on error paths
in CreateCheckpointWithFiles or when the backup dir already exists.
1.hlen命令导致ttl异常
2.rate_limit异常
3.\x00解析异常
4.【优化】调整主从同步时的dump目录清理机制
Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
Configuration Changes