Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/include/duckdb/storage/block_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class BlockManager {
BlockManager() = delete;
BlockManager(BufferManager &buffer_manager, const optional_idx block_alloc_size_p,
const optional_idx block_header_size_p);
virtual ~BlockManager() = default;
virtual ~BlockManager();

//! The buffer manager
BufferManager &buffer_manager;
Expand Down Expand Up @@ -188,8 +188,8 @@ class BlockManager {
bool in_destruction = false;

private:
//! The lock for the set of blocks
mutex blocks_lock;
//! Read-write lock storage — initialized in constructor, platform-specific
alignas(8) char blocks_lock_storage[200];
//! A mapping of block id -> BlockHandle
unordered_map<block_id_t, weak_ptr<BlockHandle>> blocks;
//! The metadata manager
Expand Down
16 changes: 16 additions & 0 deletions src/include/duckdb/storage/buffer/block_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,22 @@ class BlockMemory : public enable_shared_from_this<BlockMemory> {
void SetReaders(int32_t n) {
readers = n;
}
//! Try to increment readers from a positive value without holding the lock.
//! Returns true if successful (block was loaded and had active readers).
bool TryIncrementReadersIfPositive() {
auto current = readers.load(std::memory_order_relaxed);
while (current > 0) {
if (readers.compare_exchange_weak(current, current + 1, std::memory_order_acquire,
std::memory_order_relaxed)) {
return true;
}
}
return false;
}
//! Atomically decrement readers and return the new value.
int32_t DecrementReadersAtomic() {
return readers.fetch_sub(1, std::memory_order_release) - 1;
}
//! Returns the memory tag.
MemoryTag GetMemoryTag() const {
return tag;
Expand Down
114 changes: 99 additions & 15 deletions src/storage/buffer/block_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,52 +6,137 @@
#include "duckdb/storage/buffer_manager.hpp"
#include "duckdb/storage/metadata/metadata_manager.hpp"

#ifdef _WIN32
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN
#endif
#ifndef NOMINMAX
#define NOMINMAX
#endif
#include <windows.h>
#else
#include <pthread.h>
#endif

namespace duckdb {

#ifdef _WIN32
using RWLockType = SRWLOCK;
#else
using RWLockType = pthread_rwlock_t;
#endif

static_assert(sizeof(RWLockType) <= 200, "blocks_lock_storage too small");
static_assert(alignof(RWLockType) <= 8, "blocks_lock_storage alignment insufficient");

static RWLockType &GetBlocksLock(char *storage) {
return *reinterpret_cast<RWLockType *>(storage);
}

class ReadLockGuard {
public:
explicit ReadLockGuard(RWLockType &lock) : lock_(lock) {
#ifdef _WIN32
AcquireSRWLockShared(&lock_);
#else
pthread_rwlock_rdlock(&lock_);
#endif
}
~ReadLockGuard() {
#ifdef _WIN32
ReleaseSRWLockShared(&lock_);
#else
pthread_rwlock_unlock(&lock_);
#endif
}
ReadLockGuard(const ReadLockGuard &) = delete;
ReadLockGuard &operator=(const ReadLockGuard &) = delete;

private:
RWLockType &lock_;
};

class WriteLockGuard {
public:
explicit WriteLockGuard(RWLockType &lock) : lock_(lock) {
#ifdef _WIN32
AcquireSRWLockExclusive(&lock_);
#else
pthread_rwlock_wrlock(&lock_);
#endif
}
~WriteLockGuard() {
#ifdef _WIN32
ReleaseSRWLockExclusive(&lock_);
#else
pthread_rwlock_unlock(&lock_);
#endif
}
WriteLockGuard(const WriteLockGuard &) = delete;
WriteLockGuard &operator=(const WriteLockGuard &) = delete;

private:
RWLockType &lock_;
};

BlockManager::BlockManager(BufferManager &buffer_manager, const optional_idx block_alloc_size_p,
const optional_idx block_header_size_p)
: buffer_manager(buffer_manager), metadata_manager(make_uniq<MetadataManager>(*this, buffer_manager)),
block_alloc_size(block_alloc_size_p), block_header_size(block_header_size_p) {
auto &lock = GetBlocksLock(blocks_lock_storage);
#ifdef _WIN32
InitializeSRWLock(&lock);
#else
pthread_rwlock_init(&lock, nullptr);
#endif
}

BlockManager::~BlockManager() {
#ifndef _WIN32
pthread_rwlock_destroy(&GetBlocksLock(blocks_lock_storage));
#endif
}

bool BlockManager::BlockIsRegistered(block_id_t block_id) {
lock_guard<mutex> lock(blocks_lock);
// check if the block already exists
ReadLockGuard guard(GetBlocksLock(blocks_lock_storage));
auto entry = blocks.find(block_id);
if (entry == blocks.end()) {
return false;
}
// already exists: check if it hasn't expired yet
return !entry->second.expired();
}

shared_ptr<BlockHandle> BlockManager::TryGetBlock(block_id_t block_id) {
lock_guard<mutex> lock(blocks_lock);
// check if the block already exists
ReadLockGuard guard(GetBlocksLock(blocks_lock_storage));
auto entry = blocks.find(block_id);
if (entry == blocks.end()) {
// the block does not exist
return nullptr;
}
// the block exists - try to lock it
return entry->second.lock();
}

shared_ptr<BlockHandle> BlockManager::RegisterBlock(block_id_t block_id) {
lock_guard<mutex> lock(blocks_lock);
// check if the block already exists
// Fast path: read lock to check if already registered
{
ReadLockGuard guard(GetBlocksLock(blocks_lock_storage));
auto entry = blocks.find(block_id);
if (entry != blocks.end()) {
auto existing_ptr = entry->second.lock();
if (existing_ptr) {
return existing_ptr;
}
}
}
// Slow path: write lock to insert
WriteLockGuard guard(GetBlocksLock(blocks_lock_storage));
auto entry = blocks.find(block_id);
if (entry != blocks.end()) {
// already exists: check if it hasn't expired yet
auto existing_ptr = entry->second.lock();
if (existing_ptr) {
//! it hasn't! return it
return existing_ptr;
}
}
// create a new block pointer for this block
auto result = make_shared_ptr<BlockHandle>(*this, block_id, MemoryTag::BASE_TABLE);
// register the block pointer in the set of blocks as a weak pointer
blocks[block_id] = weak_ptr<BlockHandle>(result);
return result;
}
Expand Down Expand Up @@ -122,8 +207,7 @@ shared_ptr<BlockHandle> BlockManager::ConvertToPersistent(QueryContext context,

void BlockManager::UnregisterBlock(block_id_t id) {
D_ASSERT(id < MAXIMUM_BLOCK);
lock_guard<mutex> lock(blocks_lock);
// on-disk block: erase from list of blocks in manager
WriteLockGuard guard(GetBlocksLock(blocks_lock_storage));
blocks.erase(id);
}

Expand Down
27 changes: 24 additions & 3 deletions src/storage/buffer/buffer_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include "duckdb/common/exception.hpp"
#include "duckdb/common/thread.hpp"
#include "duckdb/common/typedefs.hpp"
#include "duckdb/logging/logger.hpp"
#include <chrono>
#include "duckdb/main/settings.hpp"
#include "duckdb/parallel/concurrentqueue.hpp"
#include "duckdb/parallel/task_scheduler.hpp"
Expand Down Expand Up @@ -96,7 +98,7 @@ struct EvictionQueue {
//! Tries to dequeue an element from the eviction queue, but only after acquiring the purge queue lock.
bool TryDequeueWithLock(BufferEvictionNode &node);
//! Garbage collect dead nodes in the eviction queue.
void Purge();
void Purge(const DatabaseInstance &db);
template <typename FN>
void IterateUnloadableBlocks(FN fn);

Expand Down Expand Up @@ -173,13 +175,15 @@ bool EvictionQueue::TryDequeueWithLock(BufferEvictionNode &node) {
return q.try_dequeue(node);
}

void EvictionQueue::Purge() {
void EvictionQueue::Purge(const DatabaseInstance &db) {
// only one thread purges the queue, all other threads early-out
unique_lock<mutex> guard(purge_lock, std::try_to_lock);
if (!guard.owns_lock()) {
return;
}

auto purge_start = std::chrono::steady_clock::now();

// we purge INSERT_INTERVAL * PURGE_SIZE_MULTIPLIER nodes
idx_t purge_size = INSERT_INTERVAL * PURGE_SIZE_MULTIPLIER;

Expand Down Expand Up @@ -208,7 +212,10 @@ void EvictionQueue::Purge() {
// 2.3. We've purged the entire queue: max_purges is zero. This is a worst-case scenario,
// guaranteeing that we always exit the loop.

idx_t initial_q_size = approx_q_size;
idx_t initial_dead_nodes = total_dead_nodes;
idx_t max_purges = approx_q_size / purge_size;
idx_t initial_max_purges = max_purges;

while (max_purges != 0) {
PurgeIteration(purge_size);
Expand All @@ -232,6 +239,20 @@ void EvictionQueue::Purge() {

max_purges--;
}

idx_t iterations = initial_max_purges - max_purges;
idx_t total_dead_found = initial_dead_nodes - total_dead_nodes;
auto elapsed_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - purge_start).count();
if (iterations > 10 || elapsed_ms > 500) {
DUCKDB_LOG_WARNING(db,
"EvictionQueue::Purge took %lldms with %llu iterations, "
"queue_size_before=%llu, queue_size_after=%llu, "
"dead_nodes_before=%llu, dead_nodes_after=%llu, "
"total_dead_found=%llu",
elapsed_ms, iterations, initial_q_size, q.size_approx(), initial_dead_nodes,
(idx_t)total_dead_nodes, total_dead_found);
}
}

void EvictionQueue::PurgeIteration(const idx_t purge_size) {
Expand Down Expand Up @@ -524,7 +545,7 @@ void BufferPool::PurgeQueue(const BlockHandle &block) {
const auto queue_sleep_micros =
Settings::Get<DebugEvictionQueueSleepMicroSecondsSetting>(buffer_manager.GetDatabase());
eviction_queue.debug_eviction_queue_sleep = queue_sleep_micros;
eviction_queue.Purge();
eviction_queue.Purge(buffer_manager.GetDatabase());
}

void BufferPool::SetLimit(idx_t limit, const char *exception_postscript) {
Expand Down
23 changes: 19 additions & 4 deletions src/storage/standard_buffer_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,13 @@ BufferHandle StandardBufferManager::Pin(const QueryContext &context, shared_ptr<

idx_t required_memory;
auto &block_memory = handle->GetMemory();

// Fast path: if the block is loaded and has active readers, we can pin without the mutex.
// Safety: readers > 0 guarantees no concurrent unload (CanUnload checks readers > 0).
if (block_memory.GetState() == BlockState::BLOCK_LOADED && block_memory.TryIncrementReadersIfPositive()) {
return BufferHandle(handle, block_memory.GetBuffer());
}

{
// lock the block
auto lock = block_memory.GetLock();
Expand Down Expand Up @@ -394,16 +401,24 @@ void StandardBufferManager::VerifyZeroReaders(BlockLock &lock, shared_ptr<BlockH
}

void StandardBufferManager::Unpin(shared_ptr<BlockHandle> &handle) {
bool purge = false;
auto &block_memory = handle->GetMemory();

// Fast path: if there are multiple readers, just decrement atomically.
// No state transition happens when readers goes from N>1 to N-1>0.
auto new_readers = block_memory.DecrementReadersAtomic();
if (new_readers > 0) {
return;
}

// Slow path: readers hit 0, need to handle eviction queue or unload.
// We already decremented, so re-acquire lock to handle the transition.
bool purge = false;
{
auto lock = block_memory.GetLock();
if (!block_memory.GetBuffer(lock) || block_memory.GetBufferType() == FileBufferType::TINY_BUFFER) {
return;
}
D_ASSERT(block_memory.GetReaders() > 0);
auto new_readers = block_memory.DecrementReaders();
if (new_readers == 0) {
if (block_memory.GetReaders() == 0) {
VerifyZeroReaders(lock, handle);
if (block_memory.MustAddToEvictionQueue()) {
purge = buffer_pool.AddToEvictionQueue(handle);
Expand Down
Loading