Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/include/duckdb/storage/buffer/buffer_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct BufferEvictionNode {

bool CanUnload(BlockMemory &memory);
shared_ptr<BlockMemory> TryGetBlockMemory();
bool IsDeadNode();
};

//! The BufferPool is in charge of handling memory management for one or more databases. It defines memory limits
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/block_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ BlockMemory::~BlockMemory() { // NOLINT: allow internal exceptions
// The block memory is being destroyed, meaning that any unswizzled pointers are now binary junk.
SetSwizzling(nullptr);
D_ASSERT(!GetBuffer() || GetBuffer()->GetBufferType() == GetBufferType());
if (GetBuffer() && GetBufferType() != FileBufferType::TINY_BUFFER) {
if (GetBufferType() != FileBufferType::TINY_BUFFER) {
// Kill the latest version in the eviction queue.
GetBufferManager().GetBufferPool().IncrementDeadNodes(*this);
}
Expand Down
82 changes: 63 additions & 19 deletions src/storage/buffer/buffer_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include "duckdb/common/exception.hpp"
#include "duckdb/common/thread.hpp"
#include "duckdb/common/typedefs.hpp"
#include "duckdb/logging/logger.hpp"
#include <chrono>
#include "duckdb/main/settings.hpp"
#include "duckdb/parallel/concurrentqueue.hpp"
#include "duckdb/parallel/task_scheduler.hpp"
Expand Down Expand Up @@ -66,6 +68,18 @@ shared_ptr<BlockMemory> BufferEvictionNode::TryGetBlockMemory() {
return shared_memory_p;
}

bool BufferEvictionNode::IsDeadNode() {
auto shared_memory_p = memory_p.lock();
if (!shared_memory_p) {
return true;
}
if (handle_sequence_number != shared_memory_p->GetEvictionSequenceNumber()) {
return true;
}
return false;
}


typedef duckdb_moodycamel::ConcurrentQueue<BufferEvictionNode> eviction_queue_t;

struct EvictionQueue {
Expand All @@ -82,7 +96,7 @@ struct EvictionQueue {
//! Tries to dequeue an element from the eviction queue, but only after acquiring the purge queue lock.
bool TryDequeueWithLock(BufferEvictionNode &node);
//! Garbage collect dead nodes in the eviction queue.
void Purge();
void Purge(const DatabaseInstance &db);
template <typename FN>
void IterateUnloadableBlocks(FN fn);

Expand All @@ -108,8 +122,8 @@ struct EvictionQueue {
}

private:
//! Bulk purge dead nodes from the eviction queue. Then, enqueue those that are still alive.
void PurgeIteration(const idx_t purge_size);
//! Bulk purge dead nodes from the eviction queue. Alive nodes are appended to alive_nodes_out.
void PurgeIteration(const idx_t purge_size, vector<BufferEvictionNode> &alive_nodes_out);

public:
//! The type of the buffers in this queue and helper function (both for verification only)
Expand Down Expand Up @@ -155,13 +169,15 @@ bool EvictionQueue::TryDequeueWithLock(BufferEvictionNode &node) {
return q.try_dequeue(node);
}

void EvictionQueue::Purge() {
void EvictionQueue::Purge(const DatabaseInstance &db) {
// only one thread purges the queue, all other threads early-out
unique_lock<mutex> guard(purge_lock, std::try_to_lock);
if (!guard.owns_lock()) {
return;
}

auto purge_start = std::chrono::steady_clock::now();

// we purge INSERT_INTERVAL * PURGE_SIZE_MULTIPLIER nodes
idx_t purge_size = INSERT_INTERVAL * PURGE_SIZE_MULTIPLIER;

Expand Down Expand Up @@ -190,12 +206,23 @@ void EvictionQueue::Purge() {
// 2.3. We've purged the entire queue: max_purges is zero. This is a worst-case scenario,
// guaranteeing that we always exit the loop.

idx_t initial_q_size = approx_q_size;
idx_t initial_dead_nodes = total_dead_nodes;
idx_t max_purges = approx_q_size / purge_size;
idx_t initial_max_purges = max_purges;

// Accumulate alive nodes across all iterations and re-enqueue once at the end.
// This prevents the purge thread from feeding alive nodes back into its own
// moodycamel sub-queue between iterations, which would cause subsequent
// try_dequeue_bulk calls to re-drain those same alive nodes instead of
// reaching dead entries in worker thread sub-queues.
vector<BufferEvictionNode> alive_nodes_to_reenqueue;

while (max_purges != 0) {
PurgeIteration(purge_size);
PurgeIteration(purge_size, alive_nodes_to_reenqueue);

// update relevant sizes and potentially early-out
approx_q_size = q.size_approx();
// The effective queue size includes alive nodes held in our buffer
approx_q_size = q.size_approx() + alive_nodes_to_reenqueue.size();

// early-out according to (2.1)
if (approx_q_size < purge_size * EARLY_OUT_MULTIPLIER) {
Expand All @@ -213,9 +240,30 @@ void EvictionQueue::Purge() {

max_purges--;
}

// Re-enqueue all alive nodes after the purge loop completes
if (!alive_nodes_to_reenqueue.empty()) {
q.enqueue_bulk(alive_nodes_to_reenqueue.begin(), alive_nodes_to_reenqueue.size());
}

idx_t iterations = initial_max_purges - max_purges;
idx_t total_alive_found = alive_nodes_to_reenqueue.size();
idx_t total_dead_found = initial_dead_nodes - total_dead_nodes;
auto elapsed_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - purge_start).count();
if (iterations > 10 || elapsed_ms > 1000) {
DUCKDB_LOG_WARNING(db,
"EvictionQueue::Purge took %lldms with %llu iterations, "
"queue_size_before=%llu, queue_size_after=%llu, "
"dead_nodes_before=%llu, dead_nodes_after=%llu, "
"total_dequeued=%llu (alive=%llu, dead=%llu)",
elapsed_ms, iterations, initial_q_size, q.size_approx(),
initial_dead_nodes, (idx_t)total_dead_nodes,
total_alive_found + total_dead_found, total_alive_found, total_dead_found);
}
}

void EvictionQueue::PurgeIteration(const idx_t purge_size) {
void EvictionQueue::PurgeIteration(const idx_t purge_size, vector<BufferEvictionNode> &alive_nodes_out) {
// if this purge is significantly smaller or bigger than the previous purge, then
// we need to resize the purge_nodes vector. Note that this barely happens, as we
// purge queue_insertions * PURGE_SIZE_MULTIPLIER nodes
Expand All @@ -227,25 +275,21 @@ void EvictionQueue::PurgeIteration(const idx_t purge_size) {
// bulk purge
const idx_t actually_dequeued = q.try_dequeue_bulk(purge_nodes.begin(), purge_size);

// retrieve all alive nodes that have been wrongly dequeued
idx_t alive_nodes = 0;
idx_t dead_count = 0;
auto debug_sleep_micros = debug_eviction_queue_sleep.load(std::memory_order_relaxed);
for (idx_t i = 0; i < actually_dequeued; i++) {
auto &node = purge_nodes[i];
auto handle = node.TryGetBlockMemory();
if (debug_sleep_micros > 0) {
// Debug race conditions regarding the ownership of the BlockMemory.
ThreadUtil::SleepMicroSeconds(debug_sleep_micros);
}
if (handle) {
purge_nodes[alive_nodes++] = std::move(node);
if (node.IsDeadNode()) {
dead_count++;
} else {
alive_nodes_out.push_back(std::move(node));
}
}

// bulk re-add (TODO order them by timestamp to better retain the LRU behavior)
q.enqueue_bulk(purge_nodes.begin(), alive_nodes);

total_dead_nodes -= actually_dequeued - alive_nodes;
total_dead_nodes -= dead_count;
}

BufferPool::BufferPool(BlockAllocator &block_allocator, idx_t maximum_memory, bool track_eviction_timestamps,
Expand Down Expand Up @@ -500,7 +544,7 @@ void BufferPool::PurgeQueue(const BlockHandle &block) {
const auto queue_sleep_micros =
Settings::Get<DebugEvictionQueueSleepMicroSecondsSetting>(buffer_manager.GetDatabase());
eviction_queue.debug_eviction_queue_sleep = queue_sleep_micros;
eviction_queue.Purge();
eviction_queue.Purge(buffer_manager.GetDatabase());
}

void BufferPool::SetLimit(idx_t limit, const char *exception_postscript) {
Expand Down
Loading