Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions include/storage/vacuumer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ public:
_global_file_size_threshold = size;
}

/**
* @brief Cleanup DB's entries from vacuum storage
* - memory, global vacuum file and partials
*
* @param cleanup_db_id DB to be cleaned up
*/
void cleanup_db(uint64_t cleanup_db_id);

protected:
/**
* @brief Constructor, that inits the vacuumer thread
Expand Down Expand Up @@ -133,6 +141,11 @@ private:
uint64_t size;
};

/**
* Enum to indicate the type of cleanup on the global vacuum file
*/
enum class CleanupOperation { DB_CLEANUP, RECOVERY };

/**
* Hole-punch block size
*/
Expand Down Expand Up @@ -337,5 +350,13 @@ private:
* be written to the disk
*/
void _commit_expired_extents(uint64_t db_id, uint64_t committed_xid);

/**
* @brief Cleans up global vacuum file - db_cleanup or recovery till last committed XID
*
* @param cleanup_db_id ID to cleanup entries for db_cleanup
*/
template <CleanupOperation op>
void _cleanup_global_vacuum_file(uint64_t cleanup_db_id=-1);
};
}
4 changes: 4 additions & 0 deletions src/pg_log_mgr/pg_log_coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <pg_log_mgr/xid_ready.hh>

#include <write_cache/write_cache_server.hh>
#include <storage/vacuumer.hh>

namespace springtail::pg_log_mgr {

Expand Down Expand Up @@ -162,5 +163,8 @@ namespace springtail::pg_log_mgr {

// Remove index reconciliation queue for the db
_index_reconciliation_queue_mgr->remove_queue(db_id);

// Cleanup from vacuumer
Vacuumer::get_instance()->cleanup_db(db_id);
}
}
4 changes: 4 additions & 0 deletions src/pg_log_mgr/pg_log_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <pg_log_mgr/sync_tracker.hh>

#include <xid_mgr/xid_mgr_server.hh>
#include <storage/vacuumer.hh>

namespace springtail::pg_log_mgr {

Expand Down Expand Up @@ -210,6 +211,9 @@ namespace springtail::pg_log_mgr {
std::filesystem::remove_all(_repl_log_path);
std::filesystem::remove_all(_xact_log_path);

// Cleanup from vacuumer
Vacuumer::get_instance()->cleanup_db(_db_id);

// create directories if they don't exist
std::filesystem::create_directories(_repl_log_path);
std::filesystem::create_directories(_xact_log_path);
Expand Down
130 changes: 102 additions & 28 deletions src/storage/vacuumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -556,8 +556,9 @@ Vacuumer::_get_partials_from_file(const std::filesystem::path &path) {
return partials;
}

template <Vacuumer::CleanupOperation op>
void
Vacuumer::_recover_global_vacuum_file()
Vacuumer::_cleanup_global_vacuum_file(uint64_t cleanup_db_id)
{
// Map: db_id -> last_committed_xid
std::map<uint64_t, uint64_t> committed_xid_map;
Expand All @@ -573,41 +574,57 @@ Vacuumer::_recover_global_vacuum_file()
int start_offset = 0;
auto response = handle->read(start_offset);
auto file_f = _global_vacuum_schema->get_field("file");

while (!response->data.empty()) {

// Get extent by extent, get XID and check if its <= last_committed_xid
// if so, copy that to runfile
// Read through the extents and do the appropriate cleanup
auto extent = std::make_shared<Extent>(response->data);
CHECK_GT(extent->row_count(), 0);

auto xid = extent->header().xid;
if (op == Vacuumer::CleanupOperation::RECOVERY) {
auto xid = extent->header().xid;
// Since different extent can have different DB records
// get the db_id from the file path in the extent record
auto i = extent->begin();
auto &&row = *i;
auto file = file_f->get_text(&row);
auto db_id = _get_db_id_from_path(file);

// Since different extent can have different DB records
// get the db_id from the file path in the extent record
auto i = extent->begin();
auto &&row = *i;
auto file = file_f->get_text(&row);
auto db_id = _get_db_id_from_path(file);
CHECK_GT(db_id, 0);

CHECK_GT(db_id, 0);
// Get the last_committed_xid from the cache
// else get from XID server and cache it
auto db_it = committed_xid_map.find(db_id);
uint64_t last_committed_xid;
if (db_it != committed_xid_map.end()) {
last_committed_xid = db_it->second;
} else {
last_committed_xid = XidMgrClient::get_instance()->get_committed_xid(db_id, 0);
committed_xid_map[db_id] = last_committed_xid;
}

// Get the last_committed_xid from the cache
// else get from XID server and cache it
auto db_it = committed_xid_map.find(db_id);
uint64_t last_committed_xid;
if (db_it != committed_xid_map.end()) {
last_committed_xid = db_it->second;
} else {
last_committed_xid = XidMgrClient::get_instance()->get_committed_xid(db_id, 0);
committed_xid_map[db_id] = last_committed_xid;
}
/* skip writing to runfile if xid > last_committed_xid */
if (xid <= last_committed_xid) {
// Flush to runfile
auto write_response = extent->async_flush(runhandle);
write_response.wait();
}
} else { /* op == Vacuumer::CleanupOperation::DB_CLEANUP */
auto db_id = 0;
for (auto &row : *extent) {
auto file = file_f->get_text(&row);
db_id = _get_db_id_from_path(file);
if (db_id == cleanup_db_id) {
_cleanup_partial_files(file, std::filesystem::is_directory(file));
}
}

/* skip writing to runfile if xid > last_committed_xid */
if (xid <= last_committed_xid) {
// Flush to runfile
auto write_response = extent->async_flush(runhandle);
write_response.wait();
CHECK_GT(db_id, 0);

if (db_id != cleanup_db_id) {
// Flush to runfile
auto write_response = extent->async_flush(runhandle);
write_response.wait();
}
}

// Move to the next extent
Expand All @@ -618,9 +635,65 @@ Vacuumer::_recover_global_vacuum_file()
// Move runfile on to the global file
if (std::filesystem::exists(global_vacuum_runfile)) {
std::filesystem::rename(global_vacuum_runfile, _global_vacuum_file);
} else {
// If runfile is empty, all the entries in the global vacuum file
// are not relevant, so lets drop the global file
std::filesystem::remove(_global_vacuum_file);
}
}

void
Vacuumer::cleanup_db(uint64_t cleanup_db_id)
{
std::unique_lock lock(_mutex);

/* --------------------------- Global file and partials file cleanup ---------------------------- */

_cleanup_global_vacuum_file<CleanupOperation::DB_CLEANUP>(cleanup_db_id);

/* --------------------------- End of global and partial files cleanup -------------------------- */

/* --------------------------- Cleanup memory & partial files ----------------------------------- */

if (!_extent_map.empty()) {
for (auto file_it = _extent_map.begin(); file_it != _extent_map.end(); ) {
auto db_id = _get_db_id_from_path(file_it->first);
if (cleanup_db_id == db_id) {
// Cleanup partial file and remove from the map
_cleanup_partial_files(file_it->first, false);
file_it = _extent_map.erase(file_it);
} else {
++file_it;
}
}
}

if (!_snapshot_map.empty()) {
auto snapshot_db_entry = _snapshot_map.find(cleanup_db_id);
if (snapshot_db_entry != _snapshot_map.end()) {
auto& snapshot_xid_map = snapshot_db_entry->second;
auto snapshot_xid_entry = snapshot_xid_map.begin();

// First clear the partial files related to the snapshot paths
while (snapshot_xid_entry != snapshot_xid_map.end()) {
auto& snapshot_list = snapshot_xid_entry->second;
for (const auto& snapshot_path : snapshot_list) {
_cleanup_partial_files(snapshot_path, std::filesystem::is_directory(snapshot_path));
}
}
_snapshot_map.erase(snapshot_db_entry);
}
}
/* ------------------------- End of memory cleanup ------------------------------------------ */

}

void
Vacuumer::_recover_global_vacuum_file()
{
_cleanup_global_vacuum_file<CleanupOperation::RECOVERY>();
}

void
Vacuumer::_run_recovery()
{
Expand Down Expand Up @@ -719,7 +792,8 @@ Vacuumer::_do_vacuum_run()

// Lets read from global vacuum file if no expired extents is memory (that got written to the global file)

if (fs::get_file_size(_global_vacuum_file) > _global_file_size_threshold) {
auto global_vacuum_filesize = fs::get_file_size(_global_vacuum_file);
if (global_vacuum_filesize != -1 && global_vacuum_filesize > _global_file_size_threshold) {
LOG_INFO("Running vacuum as the global log crossed threshold: {}", _global_vacuum_file);

/* --------------------------------- Populate maps from global vacuum log -------------------------- */
Expand Down
Loading