diff --git a/src/include/duckdb/storage/buffer/buffer_pool.hpp b/src/include/duckdb/storage/buffer/buffer_pool.hpp index f407f9b592bb..e97c0150e798 100644 --- a/src/include/duckdb/storage/buffer/buffer_pool.hpp +++ b/src/include/duckdb/storage/buffer/buffer_pool.hpp @@ -34,6 +34,7 @@ struct BufferEvictionNode { bool CanUnload(BlockMemory &memory); shared_ptr TryGetBlockMemory(); + bool IsDeadNode(); }; //! The BufferPool is in charge of handling memory management for one or more databases. It defines memory limits diff --git a/src/storage/buffer/block_handle.cpp b/src/storage/buffer/block_handle.cpp index a652940a65dc..6fe462ba3a3f 100644 --- a/src/storage/buffer/block_handle.cpp +++ b/src/storage/buffer/block_handle.cpp @@ -34,7 +34,7 @@ BlockMemory::~BlockMemory() { // NOLINT: allow internal exceptions // The block memory is being destroyed, meaning that any unswizzled pointers are now binary junk. SetSwizzling(nullptr); D_ASSERT(!GetBuffer() || GetBuffer()->GetBufferType() == GetBufferType()); - if (GetBuffer() && GetBufferType() != FileBufferType::TINY_BUFFER) { + if (GetBufferType() != FileBufferType::TINY_BUFFER) { // Kill the latest version in the eviction queue. GetBufferManager().GetBufferPool().IncrementDeadNodes(*this); } diff --git a/src/storage/buffer/buffer_pool.cpp b/src/storage/buffer/buffer_pool.cpp index d2a6fab47d70..ad44903ff026 100644 --- a/src/storage/buffer/buffer_pool.cpp +++ b/src/storage/buffer/buffer_pool.cpp @@ -3,6 +3,8 @@ #include "duckdb/common/exception.hpp" #include "duckdb/common/thread.hpp" #include "duckdb/common/typedefs.hpp" +#include "duckdb/logging/logger.hpp" +#include #include "duckdb/main/settings.hpp" #include "duckdb/parallel/concurrentqueue.hpp" #include "duckdb/parallel/task_scheduler.hpp" @@ -66,6 +68,18 @@ shared_ptr BufferEvictionNode::TryGetBlockMemory() { return shared_memory_p; } +bool BufferEvictionNode::IsDeadNode() { + auto shared_memory_p = memory_p.lock(); + if (!shared_memory_p) { + return true; + } + if (handle_sequence_number != shared_memory_p->GetEvictionSequenceNumber()) { + return true; + } + return false; +} + + typedef duckdb_moodycamel::ConcurrentQueue eviction_queue_t; struct EvictionQueue { @@ -82,7 +96,7 @@ struct EvictionQueue { //! Tries to dequeue an element from the eviction queue, but only after acquiring the purge queue lock. bool TryDequeueWithLock(BufferEvictionNode &node); //! Garbage collect dead nodes in the eviction queue. - void Purge(); + void Purge(const DatabaseInstance &db); template void IterateUnloadableBlocks(FN fn); @@ -108,8 +122,8 @@ struct EvictionQueue { } private: - //! Bulk purge dead nodes from the eviction queue. Then, enqueue those that are still alive. - void PurgeIteration(const idx_t purge_size); + //! Bulk purge dead nodes from the eviction queue. Alive nodes are appended to alive_nodes_out. + void PurgeIteration(const idx_t purge_size, vector &alive_nodes_out); public: //! The type of the buffers in this queue and helper function (both for verification only) @@ -155,13 +169,15 @@ bool EvictionQueue::TryDequeueWithLock(BufferEvictionNode &node) { return q.try_dequeue(node); } -void EvictionQueue::Purge() { +void EvictionQueue::Purge(const DatabaseInstance &db) { // only one thread purges the queue, all other threads early-out unique_lock guard(purge_lock, std::try_to_lock); if (!guard.owns_lock()) { return; } + auto purge_start = std::chrono::steady_clock::now(); + // we purge INSERT_INTERVAL * PURGE_SIZE_MULTIPLIER nodes idx_t purge_size = INSERT_INTERVAL * PURGE_SIZE_MULTIPLIER; @@ -190,12 +206,23 @@ void EvictionQueue::Purge() { // 2.3. We've purged the entire queue: max_purges is zero. This is a worst-case scenario, // guaranteeing that we always exit the loop. + idx_t initial_q_size = approx_q_size; + idx_t initial_dead_nodes = total_dead_nodes; idx_t max_purges = approx_q_size / purge_size; + idx_t initial_max_purges = max_purges; + + // Accumulate alive nodes across all iterations and re-enqueue once at the end. + // This prevents the purge thread from feeding alive nodes back into its own + // moodycamel sub-queue between iterations, which would cause subsequent + // try_dequeue_bulk calls to re-drain those same alive nodes instead of + // reaching dead entries in worker thread sub-queues. + vector alive_nodes_to_reenqueue; + while (max_purges != 0) { - PurgeIteration(purge_size); + PurgeIteration(purge_size, alive_nodes_to_reenqueue); - // update relevant sizes and potentially early-out - approx_q_size = q.size_approx(); + // The effective queue size includes alive nodes held in our buffer + approx_q_size = q.size_approx() + alive_nodes_to_reenqueue.size(); // early-out according to (2.1) if (approx_q_size < purge_size * EARLY_OUT_MULTIPLIER) { @@ -213,9 +240,30 @@ void EvictionQueue::Purge() { max_purges--; } + + // Re-enqueue all alive nodes after the purge loop completes + if (!alive_nodes_to_reenqueue.empty()) { + q.enqueue_bulk(alive_nodes_to_reenqueue.begin(), alive_nodes_to_reenqueue.size()); + } + + idx_t iterations = initial_max_purges - max_purges; + idx_t total_alive_found = alive_nodes_to_reenqueue.size(); + idx_t total_dead_found = initial_dead_nodes - total_dead_nodes; + auto elapsed_ms = + std::chrono::duration_cast(std::chrono::steady_clock::now() - purge_start).count(); + if (iterations > 10 || elapsed_ms > 1000) { + DUCKDB_LOG_WARNING(db, + "EvictionQueue::Purge took %lldms with %llu iterations, " + "queue_size_before=%llu, queue_size_after=%llu, " + "dead_nodes_before=%llu, dead_nodes_after=%llu, " + "total_dequeued=%llu (alive=%llu, dead=%llu)", + elapsed_ms, iterations, initial_q_size, q.size_approx(), + initial_dead_nodes, (idx_t)total_dead_nodes, + total_alive_found + total_dead_found, total_alive_found, total_dead_found); + } } -void EvictionQueue::PurgeIteration(const idx_t purge_size) { +void EvictionQueue::PurgeIteration(const idx_t purge_size, vector &alive_nodes_out) { // if this purge is significantly smaller or bigger than the previous purge, then // we need to resize the purge_nodes vector. Note that this barely happens, as we // purge queue_insertions * PURGE_SIZE_MULTIPLIER nodes @@ -227,25 +275,21 @@ void EvictionQueue::PurgeIteration(const idx_t purge_size) { // bulk purge const idx_t actually_dequeued = q.try_dequeue_bulk(purge_nodes.begin(), purge_size); - // retrieve all alive nodes that have been wrongly dequeued - idx_t alive_nodes = 0; + idx_t dead_count = 0; auto debug_sleep_micros = debug_eviction_queue_sleep.load(std::memory_order_relaxed); for (idx_t i = 0; i < actually_dequeued; i++) { auto &node = purge_nodes[i]; - auto handle = node.TryGetBlockMemory(); if (debug_sleep_micros > 0) { - // Debug race conditions regarding the ownership of the BlockMemory. ThreadUtil::SleepMicroSeconds(debug_sleep_micros); } - if (handle) { - purge_nodes[alive_nodes++] = std::move(node); + if (node.IsDeadNode()) { + dead_count++; + } else { + alive_nodes_out.push_back(std::move(node)); } } - // bulk re-add (TODO order them by timestamp to better retain the LRU behavior) - q.enqueue_bulk(purge_nodes.begin(), alive_nodes); - - total_dead_nodes -= actually_dequeued - alive_nodes; + total_dead_nodes -= dead_count; } BufferPool::BufferPool(BlockAllocator &block_allocator, idx_t maximum_memory, bool track_eviction_timestamps, @@ -500,7 +544,7 @@ void BufferPool::PurgeQueue(const BlockHandle &block) { const auto queue_sleep_micros = Settings::Get(buffer_manager.GetDatabase()); eviction_queue.debug_eviction_queue_sleep = queue_sleep_micros; - eviction_queue.Purge(); + eviction_queue.Purge(buffer_manager.GetDatabase()); } void BufferPool::SetLimit(idx_t limit, const char *exception_postscript) {