diff --git a/include/storage/vacuumer.hh b/include/storage/vacuumer.hh index 49caf9a3c..4cb18d306 100644 --- a/include/storage/vacuumer.hh +++ b/include/storage/vacuumer.hh @@ -92,6 +92,14 @@ public: _global_file_size_threshold = size; } + /** + * @brief Cleanup DB's entries from vacuum storage + * - memory, global vacuum file and partials + * + * @param cleanup_db_id DB to be cleaned up + */ + void cleanup_db(uint64_t cleanup_db_id); + protected: /** * @brief Constructor, that inits the vacuumer thread @@ -133,6 +141,11 @@ private: uint64_t size; }; + /** + * Enum to indicate the type of cleanup on the global vacuum file + */ + enum class CleanupOperation { DB_CLEANUP, RECOVERY }; + /** * Hole-punch block size */ @@ -337,5 +350,13 @@ private: * be written to the disk */ void _commit_expired_extents(uint64_t db_id, uint64_t committed_xid); + + /** + * @brief Cleans up global vacuum file - db_cleanup or recovery till last committed XID + * + * @param cleanup_db_id ID to cleanup entries for db_cleanup + */ + template + void _cleanup_global_vacuum_file(uint64_t cleanup_db_id=-1); }; } diff --git a/src/pg_log_mgr/pg_log_coordinator.cc b/src/pg_log_mgr/pg_log_coordinator.cc index c772ce667..b1671412b 100644 --- a/src/pg_log_mgr/pg_log_coordinator.cc +++ b/src/pg_log_mgr/pg_log_coordinator.cc @@ -10,6 +10,7 @@ #include #include +#include namespace springtail::pg_log_mgr { @@ -162,5 +163,8 @@ namespace springtail::pg_log_mgr { // Remove index reconciliation queue for the db _index_reconciliation_queue_mgr->remove_queue(db_id); + + // Cleanup from vacuumer + Vacuumer::get_instance()->cleanup_db(db_id); } } diff --git a/src/pg_log_mgr/pg_log_mgr.cc b/src/pg_log_mgr/pg_log_mgr.cc index 77f1fc9cd..fda07abc0 100644 --- a/src/pg_log_mgr/pg_log_mgr.cc +++ b/src/pg_log_mgr/pg_log_mgr.cc @@ -20,6 +20,7 @@ #include #include +#include namespace springtail::pg_log_mgr { @@ -210,6 +211,9 @@ namespace springtail::pg_log_mgr { std::filesystem::remove_all(_repl_log_path); std::filesystem::remove_all(_xact_log_path); + // Cleanup from vacuumer + Vacuumer::get_instance()->cleanup_db(_db_id); + // create directories if they don't exist std::filesystem::create_directories(_repl_log_path); std::filesystem::create_directories(_xact_log_path); diff --git a/src/storage/vacuumer.cc b/src/storage/vacuumer.cc index 9b1eb1d16..137627077 100644 --- a/src/storage/vacuumer.cc +++ b/src/storage/vacuumer.cc @@ -556,8 +556,9 @@ Vacuumer::_get_partials_from_file(const std::filesystem::path &path) { return partials; } +template void -Vacuumer::_recover_global_vacuum_file() +Vacuumer::_cleanup_global_vacuum_file(uint64_t cleanup_db_id) { // Map: db_id -> last_committed_xid std::map committed_xid_map; @@ -573,41 +574,57 @@ Vacuumer::_recover_global_vacuum_file() int start_offset = 0; auto response = handle->read(start_offset); auto file_f = _global_vacuum_schema->get_field("file"); - while (!response->data.empty()) { - // Get extent by extent, get XID and check if its <= last_committed_xid - // if so, copy that to runfile + // Read through the extents and do the appropriate cleanup auto extent = std::make_shared(response->data); CHECK_GT(extent->row_count(), 0); - auto xid = extent->header().xid; + if (op == Vacuumer::CleanupOperation::RECOVERY) { + auto xid = extent->header().xid; + // Since different extent can have different DB records + // get the db_id from the file path in the extent record + auto i = extent->begin(); + auto &&row = *i; + auto file = file_f->get_text(&row); + auto db_id = _get_db_id_from_path(file); - // Since different extent can have different DB records - // get the db_id from the file path in the extent record - auto i = extent->begin(); - auto &&row = *i; - auto file = file_f->get_text(&row); - auto db_id = _get_db_id_from_path(file); + CHECK_GT(db_id, 0); - CHECK_GT(db_id, 0); + // Get the last_committed_xid from the cache + // else get from XID server and cache it + auto db_it = committed_xid_map.find(db_id); + uint64_t last_committed_xid; + if (db_it != committed_xid_map.end()) { + last_committed_xid = db_it->second; + } else { + last_committed_xid = XidMgrClient::get_instance()->get_committed_xid(db_id, 0); + committed_xid_map[db_id] = last_committed_xid; + } - // Get the last_committed_xid from the cache - // else get from XID server and cache it - auto db_it = committed_xid_map.find(db_id); - uint64_t last_committed_xid; - if (db_it != committed_xid_map.end()) { - last_committed_xid = db_it->second; - } else { - last_committed_xid = XidMgrClient::get_instance()->get_committed_xid(db_id, 0); - committed_xid_map[db_id] = last_committed_xid; - } + /* skip writing to runfile if xid > last_committed_xid */ + if (xid <= last_committed_xid) { + // Flush to runfile + auto write_response = extent->async_flush(runhandle); + write_response.wait(); + } + } else { /* op == Vacuumer::CleanupOperation::DB_CLEANUP */ + auto db_id = 0; + for (auto &row : *extent) { + auto file = file_f->get_text(&row); + db_id = _get_db_id_from_path(file); + if (db_id == cleanup_db_id) { + _cleanup_partial_files(file, std::filesystem::is_directory(file)); + } + } - /* skip writing to runfile if xid > last_committed_xid */ - if (xid <= last_committed_xid) { - // Flush to runfile - auto write_response = extent->async_flush(runhandle); - write_response.wait(); + CHECK_GT(db_id, 0); + + if (db_id != cleanup_db_id) { + // Flush to runfile + auto write_response = extent->async_flush(runhandle); + write_response.wait(); + } } // Move to the next extent @@ -618,9 +635,65 @@ Vacuumer::_recover_global_vacuum_file() // Move runfile on to the global file if (std::filesystem::exists(global_vacuum_runfile)) { std::filesystem::rename(global_vacuum_runfile, _global_vacuum_file); + } else { + // If runfile is empty, all the entries in the global vacuum file + // are not relevant, so lets drop the global file + std::filesystem::remove(_global_vacuum_file); } } +void +Vacuumer::cleanup_db(uint64_t cleanup_db_id) +{ + std::unique_lock lock(_mutex); + + /* --------------------------- Global file and partials file cleanup ---------------------------- */ + + _cleanup_global_vacuum_file(cleanup_db_id); + + /* --------------------------- End of global and partial files cleanup -------------------------- */ + + /* --------------------------- Cleanup memory & partial files ----------------------------------- */ + + if (!_extent_map.empty()) { + for (auto file_it = _extent_map.begin(); file_it != _extent_map.end(); ) { + auto db_id = _get_db_id_from_path(file_it->first); + if (cleanup_db_id == db_id) { + // Cleanup partial file and remove from the map + _cleanup_partial_files(file_it->first, false); + file_it = _extent_map.erase(file_it); + } else { + ++file_it; + } + } + } + + if (!_snapshot_map.empty()) { + auto snapshot_db_entry = _snapshot_map.find(cleanup_db_id); + if (snapshot_db_entry != _snapshot_map.end()) { + auto& snapshot_xid_map = snapshot_db_entry->second; + auto snapshot_xid_entry = snapshot_xid_map.begin(); + + // First clear the partial files related to the snapshot paths + while (snapshot_xid_entry != snapshot_xid_map.end()) { + auto& snapshot_list = snapshot_xid_entry->second; + for (const auto& snapshot_path : snapshot_list) { + _cleanup_partial_files(snapshot_path, std::filesystem::is_directory(snapshot_path)); + } + } + _snapshot_map.erase(snapshot_db_entry); + } + } + /* ------------------------- End of memory cleanup ------------------------------------------ */ + +} + +void +Vacuumer::_recover_global_vacuum_file() +{ + _cleanup_global_vacuum_file(); +} + void Vacuumer::_run_recovery() { @@ -719,7 +792,8 @@ Vacuumer::_do_vacuum_run() // Lets read from global vacuum file if no expired extents is memory (that got written to the global file) - if (fs::get_file_size(_global_vacuum_file) > _global_file_size_threshold) { + auto global_vacuum_filesize = fs::get_file_size(_global_vacuum_file); + if (global_vacuum_filesize != -1 && global_vacuum_filesize > _global_file_size_threshold) { LOG_INFO("Running vacuum as the global log crossed threshold: {}", _global_vacuum_file); /* --------------------------------- Populate maps from global vacuum log -------------------------- */