diff --git a/include/pika_server.h b/include/pika_server.h index df75229188..925126ad7a 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -520,6 +520,7 @@ class PikaServer : public pstd::noncopyable { */ void DoTimingTask(); void AutoCompactRange(); + void AutoProgressiveCompact(); void AutoBinlogPurge(); void AutoServerlogPurge(); void AutoDeleteExpiredDump(); @@ -550,6 +551,7 @@ class PikaServer : public pstd::noncopyable { */ bool have_scheduled_crontask_ = false; struct timeval last_check_compact_time_; + struct timeval last_progressive_compact_time_; /* * ResumeDB used diff --git a/src/pika_server.cc b/src/pika_server.cc index b205f3e34b..fabe58e12a 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -46,6 +46,7 @@ PikaServer::PikaServer() : exit_(false), slow_cmd_thread_pool_flag_(g_pika_conf->slow_cmd_pool()), last_check_compact_time_({0, 0}), + last_progressive_compact_time_({0, 0}), last_check_resume_time_({0, 0}), repl_state_(PIKA_REPL_NO_CONNECT), role_(PIKA_ROLE_SINGLE) { @@ -1108,6 +1109,8 @@ int PikaServer::ClientPubSubChannelPatternSize(const std::shared_ptr& c void PikaServer::DoTimingTask() { // Maybe schedule compactrange AutoCompactRange(); + // Progressive compact + AutoProgressiveCompact(); // Purge serverlog AutoServerlogPurge(); // Purge binlog @@ -1140,6 +1143,33 @@ void PikaServer::StatDiskUsage() { disk_statistic_.log_size_.store(pstd::Du(g_pika_conf->log_path())); } +void PikaServer::AutoProgressiveCompact() { + struct timeval now; + gettimeofday(&now, nullptr); + + // Execute progressive compact every 60 seconds + if (last_progressive_compact_time_.tv_sec == 0 || + now.tv_sec - last_progressive_compact_time_.tv_sec >= 60) { + gettimeofday(&last_progressive_compact_time_, nullptr); + + std::shared_lock db_rwl(dbs_rw_); + for (const auto& db_item : dbs_) { + db_item.second->DBLockShared(); + auto storage = db_item.second->storage(); + if (storage) { + storage::Status s = storage->LongestNotCompactionSstCompact(storage::DataType::kAll); + if (!s.ok()) { + LOG(WARNING) << "Progressive compact for DB: " << db_item.first + << " failed: " << s.ToString(); + } else { + LOG(INFO) << "Progressive compact for DB: " << db_item.first << " completed"; + } + } + db_item.second->DBUnlockShared(); + } + } +} + void PikaServer::AutoCompactRange() { struct statfs disk_info; int ret = statfs(g_pika_conf->db_path().c_str(), &disk_info); @@ -1297,25 +1327,30 @@ void PikaServer::AutoServerlogPurge() { // Process files for each log level for (auto& [level, files] : log_files_by_level) { - // Sort by time in descending order + // Sort by time in descending order (newest first) std::sort(files.begin(), files.end(), [](const auto& a, const auto& b) { return a.second > b.second; }); - bool has_recent_file = false; + // Keep the most recent file for each level, delete others that exceed retention_time + bool is_first = true; for (const auto& [file, log_timestamp] : files) { - double diff_seconds = difftime(now_timestamp, log_timestamp); - int64_t interval_days = static_cast(diff_seconds / 86400); - if (interval_days <= retention_time) { - has_recent_file = true; + // Always keep the most recent file for each log level + if (is_first) { + is_first = false; continue; } - if (!has_recent_file) { - has_recent_file = true; - continue; + + double diff_seconds = difftime(now_timestamp, log_timestamp); + int64_t interval_days = static_cast(diff_seconds / 86400); + + // Delete files that exceed the retention time + if (interval_days > retention_time) { + std::string log_file = log_path + "/" + file; + LOG(INFO) << "Deleting out of date log file: " << log_file; + if(!pstd::DeleteFile(log_file)) { + LOG(ERROR) << "Failed to delete log file: " << log_file; + } } - std::string log_file = log_path + "/" + file; - LOG(INFO) << "Deleting out of date log file: " << log_file; - if(!pstd::DeleteFile(log_file)) LOG(ERROR) << "Failed to delete log file: " << log_file; } } }