From 7be1b42f072b474afec46a99ee4f4e362a94906d Mon Sep 17 00:00:00 2001 From: chenbt Date: Tue, 3 Feb 2026 17:51:38 +0800 Subject: [PATCH 1/6] fix: ttl error for hlen update cache --- src/pika_command.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pika_command.cc b/src/pika_command.cc index 93455644ef..1342efe2c9 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -417,7 +417,7 @@ void InitCmdTable(CmdTable* cmd_table) { cmd_table->insert(std::pair>(kCmdNameHKeys, std::move(hkeysptr))); ////HLenCmd std::unique_ptr hlenptr = - std::make_unique(kCmdNameHLen, 2, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsFast | kCmdFlagsReadCache); + std::make_unique(kCmdNameHLen, 2, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsDoThroughDB | kCmdFlagsFast | kCmdFlagsReadCache); cmd_table->insert(std::pair>(kCmdNameHLen, std::move(hlenptr))); ////HMgetCmd std::unique_ptr hmgetptr = From 0b434a64b0d633488826d32869dd81de46a1f397 Mon Sep 17 00:00:00 2001 From: chenbt Date: Tue, 3 Feb 2026 17:52:41 +0800 Subject: [PATCH 2/6] fix: rate_limiter_bandwidth_ overflow --- src/pika_conf.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pika_conf.cc b/src/pika_conf.cc index aac11c3871..b7bf82e647 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -451,7 +451,7 @@ int PikaConf::Load() { rate_limiter_auto_tuned_ = at == "yes" || at.empty(); // if rate limiter autotune enable, `rate_limiter_bandwidth_` will still be respected as an upper-bound. if (rate_limiter_auto_tuned_) { - rate_limiter_bandwidth_ = 10 * 1024 * 1024 * 1024; // 10GB/s + rate_limiter_bandwidth_ = 10LL * 1024 * 1024 * 1024; // 10GB/s } // max_write_buffer_num From a0376e2f08802c70fe19e8362590651070f83aea Mon Sep 17 00:00:00 2001 From: chenbt Date: Thu, 5 Feb 2026 17:17:21 +0800 Subject: [PATCH 3/6] fix: \x00 set error --- src/storage/include/storage/storage_define.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/storage/include/storage/storage_define.h b/src/storage/include/storage/storage_define.h index 59fa44c495..f1cb47d40e 100644 --- a/src/storage/include/storage/storage_define.h +++ b/src/storage/include/storage/storage_define.h @@ -78,6 +78,7 @@ inline char* EncodeUserKey(const Slice& user_key, char* dst_ptr, size_t nzero) { } if (pos != user_key.size()) { memcpy(dst_ptr, user_data + pos, user_key.size() - pos); + dst_ptr += user_key.size() - pos; } memcpy(dst_ptr, kEncodedKeyDelim, 2); From 974276b9a9c509184fef10bee40caeec6ba327ff Mon Sep 17 00:00:00 2001 From: chenbt Date: Thu, 5 Feb 2026 17:46:01 +0800 Subject: [PATCH 4/6] =?UTF-8?q?change:=20=E7=A7=BB=E9=99=A4=E4=B8=8D?= =?UTF-8?q?=E9=9C=80=E8=A6=81=E7=9A=84=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 523638f04f..b0c54e0607 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -790,9 +790,7 @@ message("pika GIT_DATE = ${PIKA_GIT_DATE}") message("pika GIT_TAG = ${PIKA_GIT_TAG}") message("pika BUILD_DATE = ${PIKA_BUILD_DATE}") -set(PIKA_BUILD_VERSION_CC ${CMAKE_BINARY_DIR}/pika_build_version.cc - src/pika_cache_load_thread.cc - ) +set(PIKA_BUILD_VERSION_CC ${CMAKE_BINARY_DIR}/pika_build_version.cc) message("PIKA_BUILD_VERSION_CC : " ${PIKA_BUILD_VERSION_CC}) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/build_version.cc.in ${PIKA_BUILD_VERSION_CC} @ONLY) From bf41bf1d6e5d3a1f076c3c2442e3d90fa8a046b3 Mon Sep 17 00:00:00 2001 From: chenbt Date: Mon, 2 Mar 2026 10:28:41 +0800 Subject: [PATCH 5/6] =?UTF-8?q?feat:=20=E7=8B=AC=E7=AB=8Bdump=20+=20?= =?UTF-8?q?=E5=8D=B3=E6=97=B6=E6=B8=85=E7=90=86=E6=9C=BA=E5=88=B6,?= =?UTF-8?q?=E8=A7=A3=E5=86=B3Pika=E5=85=A8=E9=87=8F=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E5=AD=A4=E5=84=BF=E6=96=87=E4=BB=B6=E9=97=AE=E9=A2=98=EF=BC=9A?= =?UTF-8?q?=201.=20=E6=AF=8F=E4=B8=AASlave=E7=8B=AC=E5=8D=A0=E4=B8=80?= =?UTF-8?q?=E4=B8=AAdump=E7=9B=AE=E5=BD=95=EF=BC=88dump-YYYYMMDD-NN?= =?UTF-8?q?=E6=A0=BC=E5=BC=8F=EF=BC=89=202.=20=E4=BC=A0=E8=BE=93=E5=AE=8C?= =?UTF-8?q?=E6=88=90=E5=90=8E=E7=AB=8B=E5=8D=B3=E6=B8=85=E7=90=86=E6=96=87?= =?UTF-8?q?=E4=BB=B6=EF=BC=8C=E9=87=8A=E6=94=BE=E7=A3=81=E7=9B=98=E7=A9=BA?= =?UTF-8?q?=E9=97=B4=203.=20=E9=99=90=E5=88=B6=E5=B9=B6=E5=8F=91dump?= =?UTF-8?q?=E6=95=B0=E9=87=8F=E4=B8=BA3=E4=B8=AA=204.=20=E5=AE=8C=E5=96=84?= =?UTF-8?q?dump=E5=AE=8C=E6=95=B4=E6=80=A7=E6=A3=80=E6=9F=A5=E5=92=8C?= =?UTF-8?q?=E5=8D=A0=E7=94=A8=E7=AE=A1=E7=90=86=20=E5=A4=87=E6=B3=A8?= =?UTF-8?q?=EF=BC=9A1.=E6=9C=AA=E5=85=BC=E5=AE=B9=E5=A4=9Adatabase?= =?UTF-8?q?=E5=9C=BA=E6=99=AF=202.=E5=A4=9Aslave=E5=90=8C=E6=97=B6?= =?UTF-8?q?=E5=85=A8=E9=87=8F=E5=90=8C=E6=AD=A5=E6=97=B6=EF=BC=8C=E6=B8=85?= =?UTF-8?q?=E7=90=86=E5=AD=98=E5=9C=A8=E5=BC=82=E5=B8=B8=203.=E5=BD=93?= =?UTF-8?q?=E5=A4=A9=E5=A4=9A=E6=AC=A1=E6=89=8B=E5=8A=A8=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E6=97=B6=E5=AD=A4=E5=84=BF=E6=96=87=E4=BB=B6=E6=B8=85=E7=90=86?= =?UTF-8?q?=E8=83=BD=E5=8A=9B=E4=B8=8B=E9=99=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 9 +- conf/pika.conf | 6 +- include/pika_server.h | 76 +++++ include/rsync_server.h | 22 +- src/pika_db.cc | 128 ++++++-- src/pika_rm.cc | 13 +- src/pika_server.cc | 388 +++++++++++++++++++++-- src/pstd/CMakeLists.txt | 4 +- src/pstd/src/env.cc | 15 +- src/rsync_client.cc | 16 +- src/rsync_server.cc | 277 +++++++++++++++- src/storage/CMakeLists.txt | 4 +- src/storage/include/storage/backupable.h | 4 + src/storage/src/backupable.cc | 38 +++ 14 files changed, 938 insertions(+), 62 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index b0c54e0607..ab39e520e9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,7 +8,13 @@ endif() set(CMAKE_CXX_STANDARD 17) project(pika) set(CMAKE_EXPORT_COMPILE_COMMANDS ON) -enable_testing() + +# Option to control whether tests are built +option(BUILD_TESTS "Build tests" ON) + +if(BUILD_TESTS) + enable_testing() +endif() if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang") # using Clang @@ -760,6 +766,7 @@ if (USE_PIKA_TOOLS) add_subdirectory(tools) endif() aux_source_directory(src DIR_SRCS) +list(REMOVE_ITEM DIR_SRCS "src/build_version.cc") # # generate version string(TIMESTAMP TS "%Y-%m-%d %H:%M:%S") diff --git a/conf/pika.conf b/conf/pika.conf index 4f51f9cdbd..a68ca78d56 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -496,7 +496,7 @@ default-slot-num : 1024 # rate limiter bandwidth, units in bytes, default 1024GB/s (No limit) # [Support Dynamically changeable] send 'rate-limiter-bandwidth' to a running pika can change it's value dynamically -#rate-limiter-bandwidth : 1099511627776 +rate-limiter-bandwidth : 109951162 #rate-limiter-refill-period-us : 100000 # @@ -505,7 +505,7 @@ default-slot-num : 1024 # if auto_tuned is true: Enables dynamic adjustment of rate limit within the range #`[rate-limiter-bandwidth / 20, rate-limiter-bandwidth]`, according to the recent demand for background I/O. # rate limiter auto tune https://rocksdb.org/blog/2017/12/18/17-auto-tuned-rate-limiter.html. the default value is true. -#rate-limiter-auto-tuned : yes +rate-limiter-auto-tuned : no ################################## RocksDB Blob Configure ##################### # rocksdb blob configure @@ -673,7 +673,7 @@ internal-used-unfinished-full-sync : # for wash data from 4.0.0 to 4.0.1 # https://github.com/OpenAtomFoundation/pika/issues/2886 # default value: true -wash-data: true +wash-data: false # Pika automatic compact compact strategy, a complement to rocksdb compact. # Trigger the compact background task periodically according to `compact-interval` diff --git a/include/pika_server.h b/include/pika_server.h index df75229188..7cfa23825b 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -212,6 +212,40 @@ class PikaServer : public pstd::noncopyable { pstd::Status GetDumpMeta(const std::string& db_name, std::vector* files, std::string* snapshot_uuid); void TryDBSync(const std::string& ip, int port, const std::string& db_name, int32_t top); + /* + * Rsync snapshot tracking (for orphan file cleanup protection) + */ + void RegisterRsyncSnapshot(const std::string& snapshot_uuid); + void UnregisterRsyncSnapshot(const std::string& snapshot_uuid); + bool IsRsyncSnapshotActive(const std::string& snapshot_uuid); + std::set GetActiveRsyncSnapshots(); + + /* + * Rsync file transfer tracking (for safe orphan file cleanup during sync) + */ + void RegisterRsyncTransferringFile(const std::string& snapshot_uuid, const std::string& filename); + void UnregisterRsyncTransferringFile(const std::string& snapshot_uuid, const std::string& filename); + bool IsRsyncFileTransferring(const std::string& snapshot_uuid, const std::string& filename); + std::set GetRsyncTransferringFiles(const std::string& snapshot_uuid); + + /* + * Dump ownership management (Scheme A: each slave has exclusive dump) + */ + bool MarkDumpInUse(const std::string& snapshot_uuid, const std::string& conn_id, const std::string& dump_path); + void ReleaseDump(const std::string& snapshot_uuid); + bool IsDumpInUse(const std::string& snapshot_uuid) const; + std::string GetDumpPathBySnapshot(const std::string& snapshot_uuid) const; + size_t GetActiveDumpCount() const; + static constexpr size_t kMaxConcurrentDumps = 3; // Max concurrent dumps allowed + + /* + * Delayed file cleanup for orphan SST files (Scheme A) + * Files are scheduled for cleanup 10 minutes after transfer completes + * to allow for retries and ensure data consistency + */ + void ScheduleFileForCleanup(const std::string& filepath, int delay_seconds); + void ProcessPendingCleanupFiles(); + /* * Keyscan used */ @@ -498,6 +532,13 @@ class PikaServer : public pstd::noncopyable { */ void DisableCompact(); + /* + * Utility function to ensure directory exists + * Returns true if directory exists or was created successfully + * Handles the special case where CreatePath returns 0 for both success and "already exists" + */ + static bool EnsureDirExists(const std::string& path, mode_t mode = 0755); + /* * lastsave used */ @@ -605,6 +646,41 @@ class PikaServer : public pstd::noncopyable { std::unique_ptr pika_rsync_service_; std::unique_ptr rsync_server_; + /* + * Rsync snapshot tracking used (for orphan file cleanup protection) + */ + std::set active_rsync_snapshots_; + std::mutex active_rsync_snapshots_mutex_; + + /* + * Rsync file transfer tracking used (for safe orphan file cleanup during sync) + * Tracks which files are currently being transferred for each snapshot + */ + std::map> rsync_transferring_files_; + std::mutex rsync_transferring_files_mutex_; + + /* + * Dump ownership tracking used (Scheme A: each slave has exclusive dump) + * snapshot_uuid -> {connection id, dump path} + */ + struct DumpOwnerInfo { + std::string conn_id; + std::string dump_path; + }; + std::map dump_owners_; + mutable std::mutex dump_owners_mutex_; + + /* + * Pending cleanup tracking for delayed file deletion (Scheme A) + * filepath -> {cleanup_time} + */ + struct PendingCleanupInfo { + std::string filepath; + time_t cleanup_time; + }; + std::map pending_cleanup_files_; + mutable std::mutex pending_cleanup_mutex_; + /* * Pubsub used */ diff --git a/include/rsync_server.h b/include/rsync_server.h index 560585f3c8..11466d0a8e 100644 --- a/include/rsync_server.h +++ b/include/rsync_server.h @@ -54,10 +54,30 @@ class RsyncServerConn : public net::PbConn { int DealMessage() override; static void HandleMetaRsyncRequest(void* arg); static void HandleFileRsyncRequest(void* arg); + + // Snapshot tracking for orphan file cleanup protection + void RegisterSnapshot(const std::string& snapshot_uuid); + void UnregisterSnapshot(); + std::string GetSnapshotUuid() const { return snapshot_uuid_; } + + // File transfer tracking for safe orphan file cleanup during sync + void AddTransferringFile(const std::string& filename); + // Remove file from transfer tracking, optionally cleanup if transfer is complete (is_eof=true) + void RemoveTransferringFile(const std::string& filename, bool is_eof = false); + bool IsFileTransferring(const std::string& filename) const; + std::set GetTransferringFiles() const; + // Global check if a file is being transferred by any connection + static bool IsFileTransferringGlobally(const std::string& snapshot_uuid, const std::string& filename); + + // Public member for dump ownership tracking (Scheme A) + std::string conn_id_; // Connection ID for dump ownership tracking + private: std::vector > readers_; - std::mutex mu_; + mutable std::mutex mu_; void* data_ = nullptr; + std::string snapshot_uuid_; // Current snapshot being synced + std::set transferring_files_; // Files currently being read }; class RsyncServerThread : public net::HolyThread { diff --git a/src/pika_db.cc b/src/pika_db.cc index f3d52fdec3..775582c323 100644 --- a/src/pika_db.cc +++ b/src/pika_db.cc @@ -3,6 +3,7 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. +#include #include #include @@ -300,8 +301,9 @@ bool DB::RunBgsaveEngine() { LOG(INFO) << db_name_ << " bgsave_info: path=" << info.path << ", filenum=" << info.offset.b_offset.filenum << ", offset=" << info.offset.b_offset.offset; - // Backup to tmp dir - rocksdb::Status s = bgsave_engine_->CreateNewBackup(info.path); + // Use SetBackupContentAndCreate to minimize time window between GetLiveFiles and CreateCheckpoint + // This reduces the chance of compaction occurring and creating orphan files + rocksdb::Status s = bgsave_engine_->SetBackupContentAndCreate(info.path); if (!s.ok()) { LOG(WARNING) << db_name_ << " create new backup failed :" << s.ToString(); @@ -324,6 +326,7 @@ void DB::FinishBgsave() { } // Prepare engine, need bgsave_protector protect +// Scheme A: Each slave has exclusive dump, so we need unique dump directories bool DB::InitBgsaveEnv() { std::lock_guard l(bgsave_protector_); // Prepare for bgsave dir @@ -331,22 +334,53 @@ bool DB::InitBgsaveEnv() { char s_time[32]; int len = static_cast(strftime(s_time, sizeof(s_time), "%Y%m%d%H%M%S", localtime(&bgsave_info_.start_time))); bgsave_info_.s_start_time.assign(s_time, len); - std::string time_sub_path = g_pika_conf->bgsave_prefix() + std::string(s_time, 8); - bgsave_info_.path = g_pika_conf->bgsave_path() + time_sub_path + "/" + bgsave_sub_path_; - if (!pstd::DeleteDirIfExist(bgsave_info_.path)) { - LOG(WARNING) << db_name_ << " remove exist bgsave dir failed"; + + // Scheme A: Use unique directory name with sequence number + // Format: dump-YYYYMMDD-NN/db_name where NN is sequence number + std::string base_path = g_pika_conf->bgsave_path(); + std::string date_str(s_time, 8); + std::string prefix = g_pika_conf->bgsave_prefix() + date_str; + + // Find first available sequence number + int seq = 0; + std::string time_sub_path; + std::string full_path; + do { + time_sub_path = prefix + "-" + std::to_string(seq); + full_path = base_path + time_sub_path + "/" + bgsave_sub_path_; + seq++; + } while (pstd::FileExists(full_path) && seq < 1000); // Max 1000 dumps per day + + if (seq >= 1000) { + LOG(ERROR) << db_name_ << " too many dump directories for today"; return false; } - pstd::CreatePath(bgsave_info_.path, 0755); - // Prepare for failed dir - if (!pstd::DeleteDirIfExist(bgsave_info_.path + "_FAILED")) { - LOG(WARNING) << db_name_ << " remove exist fail bgsave dir failed :"; + + bgsave_info_.path = full_path; + LOG(INFO) << db_name_ << " preparing bgsave dir: " << bgsave_info_.path; + + // Note: In Scheme A, we don't delete existing directories + // because other slaves may be using them + // Just create the new path + if (!PikaServer::EnsureDirExists(bgsave_info_.path, 0755)) { + LOG(WARNING) << db_name_ << " create bgsave dir failed: " << bgsave_info_.path + << ", errno=" << errno << ", error=" << strerror(errno); + // Clear the path on failure to avoid using invalid path in GetDumpMeta + bgsave_info_.path.clear(); return false; } + + // Prepare for failed dir + std::string failed_dir = bgsave_info_.path + "_FAILED"; + if (pstd::FileExists(failed_dir)) { + pstd::DeleteDirIfExist(failed_dir); + } return true; } // Prepare bgsave env, need bgsave_protector protect +// Note: SetBackupContent is now done in RunBgsaveEngine using SetBackupContentAndCreate +// to minimize time window between GetLiveFiles and CreateCheckpoint bool DB::InitBgsaveEngine() { bgsave_engine_.reset(); rocksdb::Status s = storage::BackupEngine::Open(storage().get(), bgsave_engine_, g_pika_conf->db_instance_num()); @@ -371,11 +405,7 @@ bool DB::InitBgsaveEngine() { std::lock_guard l(bgsave_protector_); bgsave_info_.offset = bgsave_offset; } - s = bgsave_engine_->SetBackupContent(); - if (!s.ok()) { - LOG(WARNING) << db_name_ << " set backup content failed " << s.ToString(); - return false; - } + // SetBackupContent is now done in RunBgsaveEngine to minimize time window } return true; } @@ -390,25 +420,73 @@ void DB::Init() { void DB::GetBgSaveMetaData(std::vector* fileNames, std::string* snapshot_uuid) { const std::string dbPath = bgsave_info().path; + size_t total_sst_files = 0; + size_t orphan_sst_files = 0; + + LOG(INFO) << "[GetBgSaveMetaData] Starting scan, dbPath=" << dbPath; + + // dbPath is already the specific DB path (e.g., .../dump/dump-9454-20260302/db0) + // We need to scan its subdirectories (0, 1, 2 for rocksdb instances) + std::vector subDirs; + int ret = pstd::GetChildren(dbPath, subDirs); + LOG(INFO) << "[GetBgSaveMetaData] GetChildren for dbPath returned " << ret + << ", subDirs count=" << subDirs.size(); + if (ret) { + LOG(WARNING) << "[GetBgSaveMetaData] Failed to read dbPath: " << dbPath; + return; + } - int db_instance_num = g_pika_conf->db_instance_num(); - for (int index = 0; index < db_instance_num; index++) { - std::string instPath = dbPath + ((dbPath.back() != '/') ? "/" : "") + std::to_string(index); - if (!pstd::FileExists(instPath)) { - continue ; + for (const std::string& subDir : subDirs) { + std::string instPath = dbPath + "/" + subDir; + // Skip if not exists or is a file (not directory) + // Note: IsDir returns 0 for directory, 1 for file, -1 for error + if (!pstd::FileExists(instPath) || pstd::IsDir(instPath) != 0) { + continue; } std::vector tmpFileNames; - int ret = pstd::GetChildren(instPath, tmpFileNames); + ret = pstd::GetChildren(instPath, tmpFileNames); if (ret) { - LOG(WARNING) << dbPath << " read dump meta files failed, path " << instPath; - return; + LOG(WARNING) << "[GetBgSaveMetaData] Failed to read instPath: " << instPath; + continue; } - for (const std::string fileName : tmpFileNames) { - fileNames -> push_back(std::to_string(index) + "/" + fileName); + for (const std::string& fileName : tmpFileNames) { + std::string fullPath = instPath + "/" + fileName; + struct stat st; + // Check if file exists and get its stat + if (stat(fullPath.c_str(), &st) != 0) { + // File doesn't exist, skip it + LOG(WARNING) << "[GetBgSaveMetaData] File does not exist: " << fullPath; + continue; + } + + // Check if it's an SST file and if it's an orphan (Links=1) + if (fileName.size() > 4 && fileName.substr(fileName.size() - 4) == ".sst") { + total_sst_files++; + if (st.st_nlink == 1) { + // This is an orphan file, but we need to include it in the meta + // to ensure data consistency. The file will be cleaned up after + // a delay to allow for retries. + orphan_sst_files++; + LOG(INFO) << "[GetBgSaveMetaData] Including orphan SST file: " << fullPath + << ", size=" << st.st_size; + // NOTE: We no longer skip orphan files here. They will be included + // in the file list and cleaned up with a delay after transfer. + } + } + // Construct relative path like "0/xxx.sst" or "1/xxx.sst" + fileNames->push_back(subDir + "/" + fileName); } } + + if (orphan_sst_files > 0) { + LOG(INFO) << "[GetBgSaveMetaData] Summary for " << dbPath + << ": total_sst=" << total_sst_files + << ", orphan_included=" << orphan_sst_files + << ", returned=" << fileNames->size(); + } + fileNames->push_back(kBgsaveInfoFile); pstd::Status s = GetBgSaveUUID(snapshot_uuid); if (!s.ok()) { diff --git a/src/pika_rm.cc b/src/pika_rm.cc index 9df7b82101..8e26ba6a03 100644 --- a/src/pika_rm.cc +++ b/src/pika_rm.cc @@ -10,6 +10,8 @@ #include #include +#include +#include #include #include "net/include/net_cli.h" @@ -507,7 +509,16 @@ pstd::Status SyncSlaveDB::ActivateRsync() { if (!rsync_cli_->IsIdle()) { return s; } - LOG(WARNING) << "Slave DB: " << DBName() << " Activating Rsync ... (retry count:" << rsync_init_retry_count_ << ")"; + // Rate limiting for retry logs - only log once per 30 seconds to reduce noise + static std::map last_retry_log_time; + time_t now = time(nullptr); + std::string db_key = DBName(); + bool should_log = (last_retry_log_time.find(db_key) == last_retry_log_time.end() || + now - last_retry_log_time[db_key] >= 30); + if (should_log) { + LOG(WARNING) << "Slave DB: " << DBName() << " Activating Rsync ... (retry count:" << rsync_init_retry_count_ << ")"; + last_retry_log_time[db_key] = now; + } if (rsync_cli_->Init()) { rsync_init_retry_count_ = 0; rsync_cli_->Start(); diff --git a/src/pika_server.cc b/src/pika_server.cc index b205f3e34b..57b8b95ca5 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -6,11 +6,16 @@ #include #include #include +#include +#include #include +#include #include #include #include +#include #include +#include "pstd/include/pstd_hash.h" #include "net/include/net_cli.h" #include "net/include/net_interfaces.h" #include "net/include/net_stats.h" @@ -829,6 +834,219 @@ pstd::Status PikaServer::GetDumpMeta(const std::string& db_name, std::vector lock(active_rsync_snapshots_mutex_); + active_rsync_snapshots_.insert(snapshot_uuid); + LOG(INFO) << "[RsyncSnapshot] Registered snapshot: " << snapshot_uuid + << ", active count: " << active_rsync_snapshots_.size(); +} + +void PikaServer::UnregisterRsyncSnapshot(const std::string& snapshot_uuid) { + if (snapshot_uuid.empty()) { + return; + } + std::lock_guard lock(active_rsync_snapshots_mutex_); + auto it = active_rsync_snapshots_.find(snapshot_uuid); + if (it != active_rsync_snapshots_.end()) { + active_rsync_snapshots_.erase(it); + LOG(INFO) << "[RsyncSnapshot] Unregistered snapshot: " << snapshot_uuid + << ", active count: " << active_rsync_snapshots_.size(); + } +} + +bool PikaServer::IsRsyncSnapshotActive(const std::string& snapshot_uuid) { + if (snapshot_uuid.empty()) { + return false; + } + std::lock_guard lock(active_rsync_snapshots_mutex_); + return active_rsync_snapshots_.find(snapshot_uuid) != active_rsync_snapshots_.end(); +} + +std::set PikaServer::GetActiveRsyncSnapshots() { + std::lock_guard lock(active_rsync_snapshots_mutex_); + return active_rsync_snapshots_; +} + +/* + * Rsync file transfer tracking (for safe orphan file cleanup during sync) + * These functions track which files are currently being transferred for each snapshot + * This allows cleanup of orphan files that are NOT being transferred, even during sync + */ +void PikaServer::RegisterRsyncTransferringFile(const std::string& snapshot_uuid, const std::string& filename) { + if (snapshot_uuid.empty() || filename.empty()) { + return; + } + std::lock_guard lock(rsync_transferring_files_mutex_); + rsync_transferring_files_[snapshot_uuid].insert(filename); + LOG(INFO) << "[RsyncTransfer] Registered file: " << filename << " for snapshot: " << snapshot_uuid; +} + +void PikaServer::UnregisterRsyncTransferringFile(const std::string& snapshot_uuid, const std::string& filename) { + if (snapshot_uuid.empty() || filename.empty()) { + return; + } + std::lock_guard lock(rsync_transferring_files_mutex_); + auto it = rsync_transferring_files_.find(snapshot_uuid); + if (it != rsync_transferring_files_.end()) { + it->second.erase(filename); + LOG(INFO) << "[RsyncTransfer] Unregistered file: " << filename << " for snapshot: " << snapshot_uuid; + // Clean up empty sets + if (it->second.empty()) { + rsync_transferring_files_.erase(it); + } + } +} + +bool PikaServer::IsRsyncFileTransferring(const std::string& snapshot_uuid, const std::string& filename) { + if (snapshot_uuid.empty() || filename.empty()) { + return false; + } + std::lock_guard lock(rsync_transferring_files_mutex_); + auto it = rsync_transferring_files_.find(snapshot_uuid); + if (it != rsync_transferring_files_.end()) { + return it->second.find(filename) != it->second.end(); + } + return false; +} + +std::set PikaServer::GetRsyncTransferringFiles(const std::string& snapshot_uuid) { + std::lock_guard lock(rsync_transferring_files_mutex_); + auto it = rsync_transferring_files_.find(snapshot_uuid); + if (it != rsync_transferring_files_.end()) { + return it->second; + } + return std::set(); +} + +/* + * Dump ownership management (Scheme A: each slave has exclusive dump) + */ +bool PikaServer::MarkDumpInUse(const std::string& snapshot_uuid, const std::string& conn_id, const std::string& dump_path) { + if (snapshot_uuid.empty() || conn_id.empty()) { + return false; + } + + std::lock_guard lock(dump_owners_mutex_); + + // Check if already in use by another connection + auto it = dump_owners_.find(snapshot_uuid); + if (it != dump_owners_.end()) { + if (it->second.conn_id != conn_id) { + LOG(WARNING) << "[DumpOwnership] Dump " << snapshot_uuid + << " is already in use by " << it->second.conn_id + << ", cannot mark for " << conn_id; + return false; + } + // Already owned by this connection + return true; + } + + // Check concurrent dump limit + if (dump_owners_.size() >= kMaxConcurrentDumps) { + LOG(WARNING) << "[DumpOwnership] Max concurrent dumps (" << kMaxConcurrentDumps + << ") reached, cannot mark dump " << snapshot_uuid + << " for " << conn_id; + return false; + } + + // Mark this dump as in use + dump_owners_[snapshot_uuid] = {conn_id, dump_path}; + LOG(INFO) << "[DumpOwnership] Dump " << snapshot_uuid + << " marked in use by " << conn_id + << " (path: " << dump_path << ")" + << ", active dumps: " << dump_owners_.size(); + return true; +} + +void PikaServer::ReleaseDump(const std::string& snapshot_uuid) { + if (snapshot_uuid.empty()) { + return; + } + + std::lock_guard lock(dump_owners_mutex_); + auto it = dump_owners_.find(snapshot_uuid); + if (it != dump_owners_.end()) { + LOG(INFO) << "[DumpOwnership] Dump " << snapshot_uuid + << " released by " << it->second.conn_id + << ", active dumps: " << (dump_owners_.size() - 1); + dump_owners_.erase(it); + } +} + +bool PikaServer::IsDumpInUse(const std::string& snapshot_uuid) const { + if (snapshot_uuid.empty()) { + return false; + } + + std::lock_guard lock(dump_owners_mutex_); + return dump_owners_.find(snapshot_uuid) != dump_owners_.end(); +} + +std::string PikaServer::GetDumpPathBySnapshot(const std::string& snapshot_uuid) const { + if (snapshot_uuid.empty()) { + return ""; + } + + std::lock_guard lock(dump_owners_mutex_); + auto it = dump_owners_.find(snapshot_uuid); + if (it != dump_owners_.end()) { + return it->second.dump_path; + } + return ""; +} + +size_t PikaServer::GetActiveDumpCount() const { + std::lock_guard lock(dump_owners_mutex_); + return dump_owners_.size(); +} + +void PikaServer::ScheduleFileForCleanup(const std::string& filepath, int delay_seconds) { + std::lock_guard lock(pending_cleanup_mutex_); + PendingCleanupInfo info; + info.filepath = filepath; + info.cleanup_time = time(nullptr) + delay_seconds; + pending_cleanup_files_[filepath] = info; + LOG(INFO) << "[Cleanup] Scheduled file for delayed cleanup: " << filepath + << " in " << delay_seconds << " seconds"; +} + +void PikaServer::ProcessPendingCleanupFiles() { + std::lock_guard lock(pending_cleanup_mutex_); + time_t now = time(nullptr); + int cleaned_count = 0; + + for (auto it = pending_cleanup_files_.begin(); + it != pending_cleanup_files_.end(); ) { + if (now >= it->second.cleanup_time) { + // Check if file still exists and is still an orphan (nlink=1) + if (pstd::FileExists(it->second.filepath)) { + struct stat st; + if (stat(it->second.filepath.c_str(), &st) == 0 && st.st_nlink == 1) { + pstd::DeleteFile(it->second.filepath); + cleaned_count++; + LOG(INFO) << "[Cleanup] Deleted delayed cleanup file: " << it->second.filepath; + } + } + it = pending_cleanup_files_.erase(it); + } else { + ++it; + } + } + + if (cleaned_count > 0) { + LOG(INFO) << "[Cleanup] Processed delayed cleanup, deleted " << cleaned_count << " files"; + } +} + void PikaServer::TryDBSync(const std::string& ip, int port, const std::string& db_name, int32_t top) { std::shared_ptr db = GetDB(db_name); @@ -1321,14 +1539,36 @@ void PikaServer::AutoServerlogPurge() { } void PikaServer::AutoDeleteExpiredDump() { + // Process pending delayed cleanup files with its own rate limiting (5 minutes) + // This is independent of the full directory cleanup rate limiting + static time_t last_pending_cleanup_time = 0; + time_t now = time(nullptr); + if (now - last_pending_cleanup_time >= 300) { // 300 seconds = 5 minutes + ProcessPendingCleanupFiles(); + last_pending_cleanup_time = now; + } + + // Rate limiting for full directory cleanup: once per 10 minutes during active syncs + static time_t last_full_cleanup_time = 0; + { + std::lock_guard lock(active_rsync_snapshots_mutex_); + if (!active_rsync_snapshots_.empty() && (now - last_full_cleanup_time < 600)) { + // Skip logging to reduce noise - only log status once per minute (see below) + return; + } + } + last_full_cleanup_time = now; + std::string db_sync_prefix = g_pika_conf->bgsave_prefix(); std::string db_sync_path = g_pika_conf->bgsave_path(); int expiry_days = g_pika_conf->expire_dump_days(); std::vector dump_dir; - // Never expire - if (expiry_days <= 0) { - return; + // Rate limiting for status logs - only log once per minute + static time_t last_status_log_time = 0; + bool should_log_status = (now - last_status_log_time >= 60); + if (should_log_status) { + last_status_log_time = now; } // Dump is not exist @@ -1338,21 +1578,107 @@ void PikaServer::AutoDeleteExpiredDump() { // Directory traversal if (pstd::GetChildren(db_sync_path, dump_dir) != 0) { + LOG(WARNING) << "[AutoDeleteExpiredDump] GetChildren failed for: " << db_sync_path; return; } + + int sync_slaves = CountSyncSlaves(); + if (should_log_status) { + LOG(INFO) << "[AutoDeleteExpiredDump] Scanning " << dump_dir.size() << " items, sync_slaves: " << sync_slaves; + } + + int dumps_cleaned = 0; + int dumps_checked = 0; + // Handle dump directory + // Scheme A: Support new naming format: dump-prefix-YYYYMMDD-NN (e.g., dump-20260304-0, dump-20260304-1) for (auto& i : dump_dir) { - if (i.substr(0, db_sync_prefix.size()) != db_sync_prefix || i.size() != (db_sync_prefix.size() + 8)) { + // Check prefix + if (i.substr(0, db_sync_prefix.size()) != db_sync_prefix) { continue; } - std::string str_date = i.substr(db_sync_prefix.size(), (i.size() - db_sync_prefix.size())); + // Extract date part (8 digits after prefix) + // New format: dump-prefix-YYYYMMDD-NN (min size: prefix + 8 date + 1 dash + 1 seq = prefix + 10) + // Old format: dump-prefix-YYYYMMDD (size: prefix + 8) + std::string remaining = i.substr(db_sync_prefix.size()); + if (remaining.size() < 8) { + continue; + } + + std::string str_date = remaining.substr(0, 8); char* end = nullptr; std::strtol(str_date.c_str(), &end, 10); if (*end != 0) { continue; } + std::string dump_file = db_sync_path + i; + + // Read snapshot_uuid from info file for protection check + // TODO: For multi-DB setups, should check ALL db subdirectories (db0, db1, db2...) + // If any db is in use, the entire dump should be protected. + // Current simple approach only checks db0 for backward compatibility. + std::string snapshot_uuid; + std::string info_path = dump_file + "/db0/info"; + if (!pstd::FileExists(info_path)) { + // Fallback to legacy path (directly under dump directory) + info_path = dump_file + "/info"; + } + if (pstd::FileExists(info_path)) { + std::ifstream info_file(info_path); + if (info_file) { + std::stringstream buffer; + buffer << info_file.rdbuf(); + std::string info_data = buffer.str(); + if (!info_data.empty()) { + pstd::MD5 md5 = pstd::MD5(info_data); + snapshot_uuid = md5.hexdigest(); + } + } + } else if (should_log_status) { + // Log missing info file at low frequency for debugging dump issues + LOG(WARNING) << "[AutoDeleteExpiredDump] Info file missing: " << info_path; + } + + // Check if this dump is actively being synced + bool is_syncing = !snapshot_uuid.empty() && IsRsyncSnapshotActive(snapshot_uuid); + + // Handle corrupted dump directory (info file missing) + // If info file is missing and the dump is not in use and has expired, delete it + if (snapshot_uuid.empty() && !is_syncing) { + // For corrupted dumps (missing info), we can't determine exact creation time + // Use directory mtime as fallback, or delete if it's clearly old (different date) + struct stat dump_stat; + if (stat(dump_file.c_str(), &dump_stat) == 0) { + time_t dump_mtime = dump_stat.st_mtime; + struct tm* dump_tm = localtime(&dump_mtime); + int dump_mtime_year = dump_tm->tm_year; + int dump_mtime_mon = dump_tm->tm_mon; + int dump_mtime_mday = dump_tm->tm_mday; + + time_t t_now = time(nullptr); + struct tm* now_tm = localtime(&t_now); + + // Delete corrupted dump if it's from a different day (not today) + // This gives some grace period and avoids deleting very recent dumps + if (dump_mtime_year != now_tm->tm_year || + dump_mtime_mon != now_tm->tm_mon || + dump_mtime_mday != now_tm->tm_mday) { + LOG(WARNING) << "[AutoDeleteExpiredDump] Deleting corrupted dump (missing info): " << i; + pstd::DeleteDirIfExist(dump_file); + dumps_cleaned++; + continue; + } + } + } + + // Check if we should delete the entire dump directory + // Skip if expiry_days <= 0 (never expire) + if (expiry_days <= 0) { + continue; + } + // Parse filename int dump_year = std::atoi(str_date.substr(0, 4).c_str()); int dump_month = std::atoi(str_date.substr(4, 2).c_str()); @@ -1360,23 +1686,22 @@ void PikaServer::AutoDeleteExpiredDump() { time_t t = time(nullptr); struct tm* now = localtime(&t); - int now_year = now->tm_year + 1900; - int now_month = now->tm_mon + 1; - int now_day = now->tm_mday; struct tm dump_time = {}; struct tm now_time = {}; - dump_time.tm_year = dump_year; - dump_time.tm_mon = dump_month; + // Fix: tm_year is years since 1900, tm_mon is 0-11 + dump_time.tm_year = dump_year - 1900; + dump_time.tm_mon = dump_month - 1; dump_time.tm_mday = dump_day; dump_time.tm_hour = 0; dump_time.tm_min = 0; dump_time.tm_sec = 0; - now_time.tm_year = now_year; - now_time.tm_mon = now_month; - now_time.tm_mday = now_day; + // Fix: use tm struct directly without adding offset + now_time.tm_year = now->tm_year; + now_time.tm_mon = now->tm_mon; + now_time.tm_mday = now->tm_mday; now_time.tm_hour = 0; now_time.tm_min = 0; now_time.tm_sec = 0; @@ -1387,15 +1712,21 @@ void PikaServer::AutoDeleteExpiredDump() { int64_t interval_days = (now_timestamp - dump_timestamp) / 86400; if (interval_days >= expiry_days) { - std::string dump_file = db_sync_path + i; - if (CountSyncSlaves() == 0) { - LOG(INFO) << "Not syncing, delete dump file: " << dump_file; + dumps_checked++; + // Scheme A: Check if dump is in use by any slave (exclusive dump ownership) + if (!IsDumpInUse(snapshot_uuid)) { + LOG(INFO) << "[AutoDeleteExpiredDump] Deleting expired dump: " << i + << " (age: " << interval_days << " days)"; pstd::DeleteDirIfExist(dump_file); - } else { - LOG(INFO) << "Syncing, can not delete " << dump_file << " dump file"; + dumps_cleaned++; } + // Note: If dump is in use, we silently skip without logging (reduces noise) } } + + if (should_log_status || dumps_cleaned > 0) { + LOG(INFO) << "[AutoDeleteExpiredDump] Checked " << dumps_checked << " dumps, cleaned " << dumps_cleaned; + } } void PikaServer::AutoUpdateNetworkMetric() { @@ -1730,6 +2061,27 @@ void PikaServer::DisableCompact() { } } +// Utility function to ensure directory exists +// Returns true if directory exists or was created successfully +// Handles the special case where CreatePath returns 0 for both success and "already exists" +bool PikaServer::EnsureDirExists(const std::string& path, mode_t mode) { + // First check if directory already exists + if (pstd::FileExists(path)) { + return true; + } + // Directory doesn't exist, try to create it + int ret = pstd::CreatePath(path, mode); + // CreatePath returns 0 on success, -1 on failure + // Note: CreatePath also returns -1 if directory already exists (due to the + // !filesystem::create_directories check), but we already checked FileExists above + if (ret != 0) { + LOG(WARNING) << "Failed to create directory: " << path << ", error code: " << ret; + return false; + } + // Verify directory was created successfully + return pstd::FileExists(path); +} + void DoBgslotscleanup(void* arg) { auto p = static_cast(arg); PikaServer::BGSlotsCleanup cleanup = p->bgslots_cleanup(); diff --git a/src/pstd/CMakeLists.txt b/src/pstd/CMakeLists.txt index 306e2cc518..38790dc471 100644 --- a/src/pstd/CMakeLists.txt +++ b/src/pstd/CMakeLists.txt @@ -23,7 +23,9 @@ if(NOT DISABLE_WARNING_AS_ERROR) endif() -add_subdirectory(tests) +if(BUILD_TESTS) + add_subdirectory(tests) +endif() add_subdirectory(examples) aux_source_directory(./src DIR_SRCS) diff --git a/src/pstd/src/env.cc b/src/pstd/src/env.cc index 1abfe35cf2..afd1bcae54 100644 --- a/src/pstd/src/env.cc +++ b/src/pstd/src/env.cc @@ -132,11 +132,16 @@ int CreatePath(const std::string& path, mode_t mode) { int GetChildren(const std::string& dir, std::vector& result) { result.clear(); - if (filesystem::is_empty(dir)) { - return -1; - } - for (auto& de : filesystem::directory_iterator(dir)) { - result.emplace_back(de.path().filename()); + try { + if (filesystem::is_empty(dir)) { + return -1; + } + for (auto& de : filesystem::directory_iterator(dir)) { + result.emplace_back(de.path().filename()); + } + } catch (const filesystem::filesystem_error& e) { + LOG(WARNING) << "GetChildren failed for " << dir << ": " << e.what(); + return -1; } return 0; } diff --git a/src/rsync_client.cc b/src/rsync_client.cc index 61fab0e0d1..523fe693db 100644 --- a/src/rsync_client.cc +++ b/src/rsync_client.cc @@ -239,6 +239,16 @@ Status RsyncClient::CopyRemoteFile(const std::string& filename, int index) { return s; } + // If Master returns empty response, it means the file was cleaned up during sync + // This is an error condition - the sync should fail and retry + if (ret_count == 0 && resp->file_resp().eof()) { + LOG(ERROR) << "File not available on Master at offset " << offset + << ", filename: " << filename + << ". Will retry and may trigger new bgsave."; + s = Status::IOError("File not available on Master"); + return s; + } + s = writer->Write((uint64_t)offset, ret_count, resp->file_resp().data().c_str()); if (!s.ok()) { LOG(WARNING) << "rsync client write file error"; @@ -250,7 +260,7 @@ Status RsyncClient::CopyRemoteFile(const std::string& filename, int index) { s = writer->Fsync(); if (!s.ok()) { return s; - } + } mu_.lock(); meta_table_[filename] = ""; mu_.unlock(); @@ -317,7 +327,9 @@ bool RsyncClient::ComparisonUpdate() { set_difference(local_file_set.begin(), local_file_set.end(), remote_file_set.begin(), remote_file_set.end(), inserter(expired_files, expired_files.begin())); - file_set_.insert(newly_files.begin(), newly_files.end()); + // Replace file_set_ with remote_file_set to ensure files deleted on Master + // are also removed from local tracking + file_set_ = remote_file_set; } s = CleanUpExpiredFiles(local_snapshot_uuid != remote_snapshot_uuid, expired_files); diff --git a/src/rsync_server.cc b/src/rsync_server.cc index 5696719980..a3cc7a811f 100644 --- a/src/rsync_server.cc +++ b/src/rsync_server.cc @@ -78,10 +78,97 @@ RsyncServerConn::RsyncServerConn(int connfd, const std::string& ip_port, Thread* } RsyncServerConn::~RsyncServerConn() { + { + std::lock_guard guard(mu_); + for (int i = 0; i < readers_.size(); i++) { + readers_[i].reset(); + } + } + // Release dump ownership when connection closes (Scheme A) + if (!snapshot_uuid_.empty()) { + LOG(INFO) << "[RsyncServerConn] Connection " << conn_id_ << " closing, releasing dump " << snapshot_uuid_; + g_pika_server->ReleaseDump(snapshot_uuid_); + } + // Unregister snapshot when connection closes (outside of mu_ lock) + UnregisterSnapshot(); +} + +void RsyncServerConn::RegisterSnapshot(const std::string& snapshot_uuid) { + if (!snapshot_uuid.empty() && snapshot_uuid_ != snapshot_uuid) { + // Unregister old snapshot if different + if (!snapshot_uuid_.empty()) { + UnregisterSnapshot(); + } + snapshot_uuid_ = snapshot_uuid; + g_pika_server->RegisterRsyncSnapshot(snapshot_uuid_); + } +} + +void RsyncServerConn::UnregisterSnapshot() { + if (!snapshot_uuid_.empty()) { + // Clear any remaining transferring files + std::set remaining_files; + { + std::lock_guard guard(mu_); + remaining_files = transferring_files_; + transferring_files_.clear(); + } + for (const auto& file : remaining_files) { + g_pika_server->UnregisterRsyncTransferringFile(snapshot_uuid_, file); + } + g_pika_server->UnregisterRsyncSnapshot(snapshot_uuid_); + snapshot_uuid_.clear(); + } +} + +void RsyncServerConn::AddTransferringFile(const std::string& filename) { + if (!snapshot_uuid_.empty() && !filename.empty()) { + std::lock_guard guard(mu_); + transferring_files_.insert(filename); + g_pika_server->RegisterRsyncTransferringFile(snapshot_uuid_, filename); + } +} + +void RsyncServerConn::RemoveTransferringFile(const std::string& filename, bool is_eof) { + if (!snapshot_uuid_.empty() && !filename.empty()) { + std::lock_guard guard(mu_); + transferring_files_.erase(filename); + g_pika_server->UnregisterRsyncTransferringFile(snapshot_uuid_, filename); + + // Only process cleanup when file transfer is complete (is_eof=true) + if (is_eof) { + std::string dump_path = g_pika_server->GetDumpPathBySnapshot(snapshot_uuid_); + std::string filepath = dump_path + "/" + filename; + + // Check if file is orphan (nlink=1, only referenced by dump, not by db) + struct stat st; + if (stat(filepath.c_str(), &st) == 0 && st.st_nlink == 1) { + // Orphan file: schedule for delayed cleanup (10 minutes) + // This allows Slave to retry if needed before actual deletion + g_pika_server->ScheduleFileForCleanup(filepath, 600); + LOG(INFO) << "[RsyncTransfer] Scheduled orphan file for cleanup: " << filename + << " for snapshot: " << snapshot_uuid_; + } + // Non-orphan files (nlink=2) are still referenced by RocksDB, no cleanup needed + } + } +} + +bool RsyncServerConn::IsFileTransferring(const std::string& filename) const { std::lock_guard guard(mu_); - for (int i = 0; i < readers_.size(); i++) { - readers_[i].reset(); + return transferring_files_.find(filename) != transferring_files_.end(); +} + +std::set RsyncServerConn::GetTransferringFiles() const { + std::lock_guard guard(mu_); + return transferring_files_; +} + +bool RsyncServerConn::IsFileTransferringGlobally(const std::string& snapshot_uuid, const std::string& filename) { + if (g_pika_server) { + return g_pika_server->IsRsyncFileTransferring(snapshot_uuid, filename); } + return false; } int RsyncServerConn::DealMessage() { @@ -143,15 +230,177 @@ void RsyncServerConn::HandleMetaRsyncRequest(void* arg) { g_pika_server->GetDumpMeta(db_name, &filenames, &snapshot_uuid); response.set_snapshot_uuid(snapshot_uuid); + // Get db_ptr early for use in checks + std::shared_ptr db_ptr = g_pika_server->GetDB(db_name); + + // Check if dump directory exists and has files + if (filenames.empty()) { + LOG(ERROR) << "[Rsync Meta] No files found in dump directory for db: " << db_name + << ", path: " << (db_ptr ? db_ptr->bgsave_info().path : "unknown") + << ". Triggering new bgsave."; + db->BgSaveDB(); + response.set_code(RsyncService::kErr); + RsyncWriteResp(response, conn); + return; + } + + // Check if current dump directory has all required files (integrity check) + // If files are missing, the dump is corrupted and needs to be regenerated + if (db_ptr) { + const std::string dump_path = db_ptr->bgsave_info().path; + std::vector missing_files; + for (const auto& filename : filenames) { + std::string full_path = dump_path + "/" + filename; + if (!pstd::FileExists(full_path)) { + missing_files.push_back(filename); + } + } + + if (!missing_files.empty()) { + LOG(ERROR) << "[Rsync Meta] Dump integrity check failed for snapshot: " << snapshot_uuid + << ", missing " << missing_files.size() << " files" + << ", first missing: " << (missing_files.empty() ? "none" : missing_files[0]) + << ". Deleting and triggering new bgsave."; + + // Delete the corrupted dump + pstd::DeleteDirIfExist(dump_path); + + // Trigger new bgsave + db->BgSaveDB(); + + response.set_code(RsyncService::kErr); + RsyncWriteResp(response, conn); + return; + } + } + + // Get connection and dump path + auto conn_ptr = std::dynamic_pointer_cast(conn); + std::string dump_path = db_ptr ? db_ptr->bgsave_info().path : ""; + + // Integrity Check: Re-scan dump directory to verify file consistency + // This detects files that were deleted between GetDumpMeta and now + if (!dump_path.empty()) { + std::vector actual_files; + // Scan each instance directory (0, 1, 2, etc.) + std::vector subDirs; + if (pstd::GetChildren(dump_path, subDirs) == 0) { + for (const auto& subDir : subDirs) { + std::string instPath = dump_path + "/" + subDir; + if (!pstd::FileExists(instPath) || pstd::IsDir(instPath) != 0) { + continue; + } + std::vector instFiles; + if (pstd::GetChildren(instPath, instFiles) == 0) { + for (const auto& file : instFiles) { + actual_files.push_back(subDir + "/" + file); + } + } + } + } + + // Compare filenames (from GetDumpMeta) with actual_files (re-scanned) + std::vector missing_files; + for (const auto& expected : filenames) { + bool found = false; + for (const auto& actual : actual_files) { + if (actual == expected) { + found = true; + break; + } + } + if (!found && expected != "info") { // info file is handled separately + missing_files.push_back(expected); + } + } + + if (!missing_files.empty()) { + LOG(ERROR) << "[Rsync Meta] Dump integrity check failed for db: " << db_name + << ", missing " << missing_files.size() << " files" + << ", first missing: " << missing_files[0] + << ", deleting dump and triggering new bgsave"; + LOG(ERROR) << "[Integrity Check] Missing files in " << dump_path << ": " + << pstd::StringConcat(missing_files, ','); + + pstd::DeleteDirIfExist(dump_path); + db->BgSaveDB(); + + response.set_code(RsyncService::kErr); + RsyncWriteResp(response, conn); + return; + } + } + + // Get connection ID for dump ownership tracking (use connection object address) + std::string conn_id = conn_ptr ? std::to_string(reinterpret_cast(conn_ptr.get())) : ""; + + // Check if this dump is already in use by another slave + // Scheme A: Each slave has exclusive dump ownership + if (g_pika_server->IsDumpInUse(snapshot_uuid)) { + LOG(INFO) << "[Rsync Meta] Dump " << snapshot_uuid << " is already in use by another slave." + << " Active dumps: " << g_pika_server->GetActiveDumpCount() + << "/" << PikaServer::kMaxConcurrentDumps + << ". Triggering new bgsave."; + + // Trigger new bgsave for this slave + db->BgSaveDB(); + + response.set_code(RsyncService::kErr); + RsyncWriteResp(response, conn); + return; + } + + // Check concurrent dump limit + if (g_pika_server->GetActiveDumpCount() >= PikaServer::kMaxConcurrentDumps) { + LOG(WARNING) << "[Rsync Meta] Max concurrent dumps (" << PikaServer::kMaxConcurrentDumps + << ") reached. Rejecting new sync request."; + + response.set_code(RsyncService::kErr); + RsyncWriteResp(response, conn); + return; + } + + // Mark this dump as in use by this connection + if (conn_ptr && !dump_path.empty()) { + if (!g_pika_server->MarkDumpInUse(snapshot_uuid, conn_id, dump_path)) { + LOG(WARNING) << "[Rsync Meta] Failed to mark dump " << snapshot_uuid << " in use." + << " Possibly concurrent access. Triggering new bgsave."; + + db->BgSaveDB(); + + response.set_code(RsyncService::kErr); + RsyncWriteResp(response, conn); + return; + } + + // Store connection ID in the connection object + conn_ptr->conn_id_ = conn_id; + + // Register snapshot for tracking + // Note: RegisterSnapshot will set snapshot_uuid_ internally, don't set it here + conn_ptr->RegisterSnapshot(snapshot_uuid); + + // Pre-register all files for protection during transfer + for (const auto& filename : filenames) { + conn_ptr->AddTransferringFile(filename); + } + + LOG(INFO) << "[Rsync Meta] Dump " << snapshot_uuid << " reserved for connection " << conn_id + << ", files count: " << filenames.size(); + } + LOG(INFO) << "Rsync Meta request, snapshot_uuid: " << snapshot_uuid - << " files count: " << filenames.size() << " file list: "; + << " files count: " << filenames.size() + << " active dumps: " << g_pika_server->GetActiveDumpCount() + << "/" << PikaServer::kMaxConcurrentDumps; + std::for_each(filenames.begin(), filenames.end(), [](auto& file) { LOG(INFO) << "rsync snapshot file: " << file; }); RsyncService::MetaResponse* meta_resp = response.mutable_meta_resp(); for (const auto& filename : filenames) { - meta_resp->add_filenames(filename); + meta_resp->add_filenames(filename); } RsyncWriteResp(response, conn); } @@ -193,9 +442,24 @@ void RsyncServerConn::HandleFileRsyncRequest(void* arg) { LOG(WARNING) << "cannot find db for db_name: " << db_name; response.set_code(RsyncService::kErr); RsyncWriteResp(response, conn); + return; } const std::string filepath = db->bgsave_info().path + "/" + filename; + + // Check if file exists (may have been cleaned up during sync) + // If file doesn't exist, return error to let Slave retry and potentially trigger new bgsave + if (!pstd::FileExists(filepath)) { + LOG(WARNING) << "File no longer exists, returning error: " << filepath; + response.set_code(RsyncService::kErr); + RsyncWriteResp(response, conn); + return; + } + + // Register this file as being transferred + // This prevents the file from being cleaned up during orphan file cleanup + conn->AddTransferringFile(filename); + char* buffer = new char[req->file_req().count() + 1]; size_t bytes_read{0}; std::string checksum = ""; @@ -203,6 +467,11 @@ void RsyncServerConn::HandleFileRsyncRequest(void* arg) { std::shared_ptr reader = conn->readers_[req->reader_index()]; s = reader->Read(filepath, offset, count, buffer, &bytes_read, &checksum, &is_eof); + + // Unregister this file after transfer (whether successful or not) + // Only cleanup the file if transfer is complete (is_eof=true) + conn->RemoveTransferringFile(filename, is_eof); + if (!s.ok()) { response.set_code(RsyncService::kErr); RsyncWriteResp(response, conn); diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index e12cae9b7d..2b501cbf38 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -4,7 +4,9 @@ set (CMAKE_CXX_STANDARD 17) project (storage) # Other CMake modules -add_subdirectory(tests) +if(BUILD_TESTS) + add_subdirectory(tests) +endif() # add_subdirectory(examples) # add_subdirectory(benchmark) diff --git a/src/storage/include/storage/backupable.h b/src/storage/include/storage/backupable.h index e190993c29..65090193e5 100644 --- a/src/storage/include/storage/backupable.h +++ b/src/storage/include/storage/backupable.h @@ -47,6 +47,10 @@ class BackupEngine { Status SetBackupContent(); + // Create backup immediately after setting content to minimize time window + // This reduces the chance of compaction occurring between GetLiveFiles and CreateCheckpoint + Status SetBackupContentAndCreate(const std::string& dir); + Status CreateNewBackup(const std::string& dir); void StopBackup(); diff --git a/src/storage/src/backupable.cc b/src/storage/src/backupable.cc index 4acd8dee72..8acba658b8 100644 --- a/src/storage/src/backupable.cc +++ b/src/storage/src/backupable.cc @@ -6,6 +6,8 @@ #include #include +#include + #include "storage/backupable.h" #include "storage/storage.h" @@ -69,6 +71,42 @@ Status BackupEngine::SetBackupContent() { return s; } +Status BackupEngine::SetBackupContentAndCreate(const std::string& dir) { + Status s; + // Process each engine sequentially to minimize time window + for (const auto& engine : engines_) { + // 1. Get backup content (this calls DisableFileDeletions internally) + BackupContent bcontent; + s = engine.second->GetCheckpointFiles(bcontent.live_files, bcontent.live_wal_files, bcontent.manifest_file_size, + bcontent.sequence_number); + if (!s.ok()) { + // GetCheckpointFiles already calls EnableFileDeletions on failure + return s; + } + + // 2. Immediately create checkpoint with the files we just got + // This minimizes the time window between GetLiveFiles and CreateCheckpoint + std::string backup_dir = GetSaveDirByIndex(dir, engine.first); + delete_dir(backup_dir.c_str()); + + s = engine.second->CreateCheckpointWithFiles( + backup_dir, bcontent.live_files, bcontent.live_wal_files, + bcontent.manifest_file_size, bcontent.sequence_number); + + // 3. Re-enable file deletions regardless of success + // CreateCheckpointWithFiles already calls EnableFileDeletions in db_checkpoint.cc + if (!s.ok()) { + LOG(WARNING) << "CreateCheckpointWithFiles failed for index " << engine.first + << ": " << s.ToString(); + return s; + } + + // Save content for potential reuse + backup_content_[engine.first] = std::move(bcontent); + } + return Status::OK(); +} + Status BackupEngine::CreateNewBackupSpecify(const std::string& backup_dir, int index) { auto it_engine = engines_.find(index); auto it_content = backup_content_.find(index); From 36d8f170ed5eadcc6acb58252fab533326709d42 Mon Sep 17 00:00:00 2001 From: chenbt Date: Mon, 9 Mar 2026 14:07:42 +0800 Subject: [PATCH 6/6] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E9=9D=9ESST?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E8=A2=AB=E8=AF=AF=E8=AF=86=E5=88=AB=E4=B8=BA?= =?UTF-8?q?=E5=AD=A4=E5=84=BF=E6=96=87=E4=BB=B6=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在 RemoveTransferringFile 中增加文件类型检查,只有 .sst 后缀的文件才可能是孤儿文件(硬链接)。 其他文件如 info、CURRENT、MANIFEST、OPTIONS、.log 等都是在 dump 过程中新生成的, nlink=1 是正常情况,不应被清理。 修复前:info 文件被错误识别为孤儿文件并调度清理,导致 dump 目录失去保护机制 修复后:只有 SST 文件会进入孤儿文件检查和延迟清理流程 --- src/rsync_server.cc | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/rsync_server.cc b/src/rsync_server.cc index a3cc7a811f..da7ff0c4b6 100644 --- a/src/rsync_server.cc +++ b/src/rsync_server.cc @@ -137,6 +137,13 @@ void RsyncServerConn::RemoveTransferringFile(const std::string& filename, bool i // Only process cleanup when file transfer is complete (is_eof=true) if (is_eof) { + // Only SST files can be orphan files (hard links from DB) + // Other files (info, CURRENT, MANIFEST, OPTIONS, .log) are created + // during dump and should not be cleaned up + if (filename.size() < 4 || filename.substr(filename.size() - 4) != ".sst") { + return; + } + std::string dump_path = g_pika_server->GetDumpPathBySnapshot(snapshot_uuid_); std::string filepath = dump_path + "/" + filename; @@ -146,7 +153,7 @@ void RsyncServerConn::RemoveTransferringFile(const std::string& filename, bool i // Orphan file: schedule for delayed cleanup (10 minutes) // This allows Slave to retry if needed before actual deletion g_pika_server->ScheduleFileForCleanup(filepath, 600); - LOG(INFO) << "[RsyncTransfer] Scheduled orphan file for cleanup: " << filename + LOG(INFO) << "[RsyncTransfer] Scheduled orphan SST file for cleanup: " << filename << " for snapshot: " << snapshot_uuid_; } // Non-orphan files (nlink=2) are still referenced by RocksDB, no cleanup needed