Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions src/storage/buffer/buffer_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include "duckdb/common/exception.hpp"
#include "duckdb/common/thread.hpp"
#include "duckdb/common/typedefs.hpp"
#include "duckdb/logging/logger.hpp"
#include <chrono>
#include "duckdb/main/settings.hpp"
#include "duckdb/parallel/concurrentqueue.hpp"
#include "duckdb/parallel/task_scheduler.hpp"
Expand Down Expand Up @@ -82,7 +84,7 @@ struct EvictionQueue {
//! Tries to dequeue an element from the eviction queue, but only after acquiring the purge queue lock.
bool TryDequeueWithLock(BufferEvictionNode &node);
//! Garbage collect dead nodes in the eviction queue.
void Purge();
void Purge(const DatabaseInstance &db);
template <typename FN>
void IterateUnloadableBlocks(FN fn);

Expand Down Expand Up @@ -155,13 +157,15 @@ bool EvictionQueue::TryDequeueWithLock(BufferEvictionNode &node) {
return q.try_dequeue(node);
}

void EvictionQueue::Purge() {
void EvictionQueue::Purge(const DatabaseInstance &db) {
// only one thread purges the queue, all other threads early-out
unique_lock<mutex> guard(purge_lock, std::try_to_lock);
if (!guard.owns_lock()) {
return;
}

auto purge_start = std::chrono::steady_clock::now();

// we purge INSERT_INTERVAL * PURGE_SIZE_MULTIPLIER nodes
idx_t purge_size = INSERT_INTERVAL * PURGE_SIZE_MULTIPLIER;

Expand Down Expand Up @@ -191,6 +195,7 @@ void EvictionQueue::Purge() {
// guaranteeing that we always exit the loop.

idx_t max_purges = approx_q_size / purge_size;
idx_t initial_max_purges = max_purges;
while (max_purges != 0) {
PurgeIteration(purge_size);

Expand All @@ -213,6 +218,15 @@ void EvictionQueue::Purge() {

max_purges--;
}

idx_t iterations = initial_max_purges - max_purges;
auto elapsed_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - purge_start).count();
if (iterations > 10 || elapsed_ms > 1000) {
DUCKDB_LOG_WARNING(db,
"EvictionQueue::Purge took %lldms with %llu iterations, queue_size=%llu, dead_nodes=%llu",
elapsed_ms, iterations, q.size_approx(), (idx_t)total_dead_nodes);
}
}

void EvictionQueue::PurgeIteration(const idx_t purge_size) {
Expand Down Expand Up @@ -500,7 +514,7 @@ void BufferPool::PurgeQueue(const BlockHandle &block) {
const auto queue_sleep_micros =
Settings::Get<DebugEvictionQueueSleepMicroSecondsSetting>(buffer_manager.GetDatabase());
eviction_queue.debug_eviction_queue_sleep = queue_sleep_micros;
eviction_queue.Purge();
eviction_queue.Purge(buffer_manager.GetDatabase());
}

void BufferPool::SetLimit(idx_t limit, const char *exception_postscript) {
Expand Down
Loading