Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/common/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,13 @@
"#if defined(DUCKDB_EXTENSION_AUTOLOAD_DEFAULT) && DUCKDB_EXTENSION_AUTOLOAD_DEFAULT": "true"
}
},
{
"name": "background_queue_purge",
"description": "Run eviction queue purge in a background thread instead of inline.",
"type": "BOOLEAN",
"scope": "global",
"default_value": "false"
},
{
"name": "block_allocator_memory",
"description": "Physical memory that the block allocator is allowed to use (this memory is never freed and cannot be reduced).",
Expand Down
180 changes: 95 additions & 85 deletions src/include/duckdb/main/settings.hpp

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/include/duckdb/storage/buffer/buffer_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct BufferEvictionNode {

bool CanUnload(BlockMemory &memory);
shared_ptr<BlockMemory> TryGetBlockMemory();
bool IsDeadNode(idx_t debug_sleep_micros = 0);
};

//! The BufferPool is in charge of handling memory management for one or more databases. It defines memory limits
Expand Down
1 change: 1 addition & 0 deletions src/include/duckdb/storage/table/update_segment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class UpdateSegment {
void InitializeUpdateInfo(idx_t vector_idx);
void InitializeUpdateInfo(UpdateInfo &info, row_t *ids, const SelectionVector &sel, idx_t count, idx_t vector_index,
idx_t vector_offset);
void ReallocateRootInfoIfNeeded(UpdateInfo &current_info, idx_t update_count, idx_t vector_index);
};

struct UpdateNode {
Expand Down
9 changes: 8 additions & 1 deletion src/include/duckdb/transaction/update_info.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,18 @@ struct UpdateInfo {
bool HasPrev() const;
bool HasNext() const;
static UpdateInfo &Get(UndoBufferReference &entry);
//! Returns the total allocation size for an UpdateInfo entry, together with space for the tuple data
//! Returns the total allocation size for an UpdateInfo entry with max capacity (STANDARD_VECTOR_SIZE)
static idx_t GetAllocSize(idx_t type_size);
//! Returns the total allocation size for an UpdateInfo entry with a specific capacity
static idx_t GetAllocSize(idx_t type_size, idx_t capacity);
//! Computes a compact capacity for a given count (rounds up with growth headroom)
static idx_t GetCompactCapacity(idx_t count);
//! Initialize an UpdateInfo struct that has been allocated using GetAllocSize (i.e. has extra space after it)
static void Initialize(UpdateInfo &info, DataTable &data_table, transaction_t transaction_id,
idx_t row_group_start);
//! Initialize with a specific capacity (for compact allocations)
static void Initialize(UpdateInfo &info, DataTable &data_table, transaction_t transaction_id,
idx_t row_group_start, idx_t capacity);
};

} // namespace duckdb
13 changes: 7 additions & 6 deletions src/main/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ static const ConfigurationOption internal_options[] = {
DUCKDB_SETTING(AutoinstallExtensionRepositorySetting),
DUCKDB_SETTING(AutoinstallKnownExtensionsSetting),
DUCKDB_SETTING(AutoloadKnownExtensionsSetting),
DUCKDB_SETTING(BackgroundQueuePurgeSetting),
DUCKDB_GLOBAL(BlockAllocatorMemorySetting),
DUCKDB_SETTING(CatalogErrorMaxSchemasSetting),
DUCKDB_SETTING_CALLBACK(CheckpointOnDetachSetting),
Expand Down Expand Up @@ -211,12 +212,12 @@ static const ConfigurationOption internal_options[] = {
DUCKDB_SETTING(ZstdMinStringLengthSetting),
FINAL_SETTING};

static const ConfigurationAlias setting_aliases[] = {DUCKDB_SETTING_ALIAS("memory_limit", 102),
DUCKDB_SETTING_ALIAS("null_order", 44),
DUCKDB_SETTING_ALIAS("profiling_output", 121),
DUCKDB_SETTING_ALIAS("user", 136),
DUCKDB_SETTING_ALIAS("wal_autocheckpoint", 26),
DUCKDB_SETTING_ALIAS("worker_threads", 135),
static const ConfigurationAlias setting_aliases[] = {DUCKDB_SETTING_ALIAS("memory_limit", 103),
DUCKDB_SETTING_ALIAS("null_order", 45),
DUCKDB_SETTING_ALIAS("profiling_output", 122),
DUCKDB_SETTING_ALIAS("user", 137),
DUCKDB_SETTING_ALIAS("wal_autocheckpoint", 27),
DUCKDB_SETTING_ALIAS("worker_threads", 136),
FINAL_ALIAS};

vector<ConfigurationOption> DBConfig::GetOptions() {
Expand Down
6 changes: 4 additions & 2 deletions src/storage/buffer/block_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ BlockMemory::~BlockMemory() { // NOLINT: allow internal exceptions
// The block memory is being destroyed, meaning that any unswizzled pointers are now binary junk.
SetSwizzling(nullptr);
D_ASSERT(!GetBuffer() || GetBuffer()->GetBufferType() == GetBufferType());
if (GetBuffer() && GetBufferType() != FileBufferType::TINY_BUFFER) {
// Kill the latest version in the eviction queue.
if (GetEvictionSequenceNumber() > 0 && GetBufferType() != FileBufferType::TINY_BUFFER) {
// The eviction_seq_num is >0 only when there is an active entry in the queue for this
// block (it's reset to 0 on Unload/eviction). When this BlockMemory is destroyed with
// seq_num >0, that queue entry's weak_ptr will expire — mark it dead.
GetBufferManager().GetBufferPool().IncrementDeadNodes(*this);
}

Expand Down
136 changes: 83 additions & 53 deletions src/storage/buffer/buffer_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,27 @@ shared_ptr<BlockMemory> BufferEvictionNode::TryGetBlockMemory() {
return shared_memory_p;
}

bool BufferEvictionNode::IsDeadNode(idx_t debug_sleep_micros) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this debugging logic in the code?

auto shared_memory_p = memory_p.lock();
if (debug_sleep_micros > 0) {
ThreadUtil::SleepMicroSeconds(debug_sleep_micros);
}
if (!shared_memory_p) {
return true;
}
if (handle_sequence_number != shared_memory_p->GetEvictionSequenceNumber()) {
return true;
}
return false;
}

typedef duckdb_moodycamel::ConcurrentQueue<BufferEvictionNode> eviction_queue_t;

struct EvictionQueue {
public:
explicit EvictionQueue(const vector<FileBufferType> &file_buffer_types_p)
: file_buffer_types(file_buffer_types_p), debug_eviction_queue_sleep(0), evict_queue_insertions(0),
total_dead_nodes(0) {
total_dead_nodes(0), purge_consumer_token(q), purge_producer_token(q) {
}

public:
Expand All @@ -82,7 +96,8 @@ struct EvictionQueue {
//! Tries to dequeue an element from the eviction queue, but only after acquiring the purge queue lock.
bool TryDequeueWithLock(BufferEvictionNode &node);
//! Garbage collect dead nodes in the eviction queue.
void Purge();
//! When full_purge is true, skip early-out conditions and process the entire queue.
void Purge(bool full_purge = false);
template <typename FN>
void IterateUnloadableBlocks(FN fn);

Expand All @@ -108,8 +123,9 @@ struct EvictionQueue {
}

private:
//! Bulk purge dead nodes from the eviction queue. Then, enqueue those that are still alive.
void PurgeIteration(const idx_t purge_size);
//! Dequeue a batch via consumer token, drop dead nodes, re-enqueue alive nodes
//! via producer token.
void PurgeIterationWithTokens(const idx_t purge_size);

public:
//! The type of the buffers in this queue and helper function (both for verification only)
Expand Down Expand Up @@ -143,6 +159,15 @@ struct EvictionQueue {
mutex purge_lock;
//! A pre-allocated vector of eviction nodes. We reuse this to keep the allocation overhead of purges small.
vector<BufferEvictionNode> purge_nodes;
//! Consumer token for full_purge: ensures forward progress through sub-queues.
eviction_queue_t::consumer_token_t purge_consumer_token;
//! Producer token for full_purge: alive nodes go into a dedicated sub-queue
//! that the consumer has already passed, avoiding re-processing.
eviction_queue_t::producer_token_t purge_producer_token;

public:
//! Whether a background purge thread is currently running for this queue.
atomic<bool> purge_in_flight {false};
};

bool EvictionQueue::AddToEvictionQueue(BufferEvictionNode &&node) {
Expand All @@ -155,7 +180,7 @@ bool EvictionQueue::TryDequeueWithLock(BufferEvictionNode &node) {
return q.try_dequeue(node);
}

void EvictionQueue::Purge() {
void EvictionQueue::Purge(bool full_purge) {
// only one thread purges the queue, all other threads early-out
unique_lock<mutex> guard(purge_lock, std::try_to_lock);
if (!guard.owns_lock()) {
Expand All @@ -174,78 +199,72 @@ void EvictionQueue::Purge() {
return;
}

// There are two types of situations.

// For most scenarios, purging INSERT_INTERVAL * PURGE_SIZE_MULTIPLIER nodes is enough.
// Purging more nodes than we insert also counters oscillation for scenarios where most nodes are dead.
// If we always purge slightly more, we trigger a purge less often, as we purge below the trigger.

// However, if the pressure on the queue becomes too contested, we need to purge more aggressively,
// i.e., we actively seek a specific number of dead nodes to purge. We use the total number of existing dead nodes.
// We detect this situation by observing the queue's ratio between alive vs. dead nodes. If the ratio of alive vs.
// dead nodes grows faster than we can purge, we keep purging until we hit one of the following conditions.

// 2.1. We're back at an approximate queue size less than purge_size * EARLY_OUT_MULTIPLIER.
// 2.2. We're back at a ratio of 1*alive_node:ALIVE_NODE_MULTIPLIER*dead_nodes.
// 2.3. We've purged the entire queue: max_purges is zero. This is a worst-case scenario,
// guaranteeing that we always exit the loop.

idx_t max_purges = approx_q_size / purge_size;

// Use consumer/producer tokens to avoid re-processing alive nodes.
// The consumer token progresses through sub-queues sequentially; alive nodes
// are re-enqueued via a dedicated producer token whose sub-queue is "behind"
// the consumer position, so they won't be revisited until a full wrap-around.
while (max_purges != 0) {
PurgeIteration(purge_size);
PurgeIterationWithTokens(purge_size);

// update relevant sizes and potentially early-out
approx_q_size = q.size_approx();
if (!full_purge) {
approx_q_size = q.size_approx();

// early-out according to (2.1)
if (approx_q_size < purge_size * EARLY_OUT_MULTIPLIER) {
break;
}
// early-out according to (2.1)
if (approx_q_size < purge_size * EARLY_OUT_MULTIPLIER) {
break;
}

idx_t approx_dead_nodes = total_dead_nodes;
approx_dead_nodes = approx_dead_nodes > approx_q_size ? approx_q_size : approx_dead_nodes;
idx_t approx_alive_nodes = approx_q_size - approx_dead_nodes;
idx_t approx_dead_nodes = total_dead_nodes;
approx_dead_nodes = approx_dead_nodes > approx_q_size ? approx_q_size : approx_dead_nodes;
idx_t approx_alive_nodes = approx_q_size - approx_dead_nodes;

// early-out according to (2.2)
if (approx_alive_nodes * (ALIVE_NODE_MULTIPLIER - 1) > approx_dead_nodes) {
break;
// early-out according to (2.2)
if (approx_alive_nodes * (ALIVE_NODE_MULTIPLIER - 1) > approx_dead_nodes) {
break;
}
}

max_purges--;
}
}

void EvictionQueue::PurgeIteration(const idx_t purge_size) {
// if this purge is significantly smaller or bigger than the previous purge, then
// we need to resize the purge_nodes vector. Note that this barely happens, as we
// purge queue_insertions * PURGE_SIZE_MULTIPLIER nodes
void EvictionQueue::PurgeIterationWithTokens(const idx_t purge_size) {
idx_t previous_purge_size = purge_nodes.size();
if (purge_size < previous_purge_size / 2 || purge_size > previous_purge_size) {
purge_nodes.resize(purge_size);
}

// bulk purge
const idx_t actually_dequeued = q.try_dequeue_bulk(purge_nodes.begin(), purge_size);
// Dequeue using consumer token — progresses through sub-queues sequentially
const idx_t actually_dequeued = q.try_dequeue_bulk(purge_consumer_token, purge_nodes.begin(), purge_size);
if (actually_dequeued == 0) {
return;
}

// retrieve all alive nodes that have been wrongly dequeued
idx_t alive_nodes = 0;
idx_t dead_count = 0;
idx_t alive_count = 0;
auto debug_sleep_micros = debug_eviction_queue_sleep.load(std::memory_order_relaxed);
for (idx_t i = 0; i < actually_dequeued; i++) {
auto &node = purge_nodes[i];
auto handle = node.TryGetBlockMemory();
if (debug_sleep_micros > 0) {
// Debug race conditions regarding the ownership of the BlockMemory.
ThreadUtil::SleepMicroSeconds(debug_sleep_micros);
}
if (handle) {
purge_nodes[alive_nodes++] = std::move(node);
if (node.IsDeadNode(debug_sleep_micros)) {
dead_count++;
} else {
// Move alive nodes to the front for bulk re-enqueue
if (alive_count != i) {
purge_nodes[alive_count] = std::move(node);
}
alive_count++;
}
}

// bulk re-add (TODO order them by timestamp to better retain the LRU behavior)
q.enqueue_bulk(purge_nodes.begin(), alive_nodes);
// Re-enqueue alive nodes via producer token — goes into a dedicated sub-queue
// that the consumer token has already passed
if (alive_count > 0) {
q.enqueue_bulk(purge_producer_token, purge_nodes.begin(), alive_count);
}

total_dead_nodes -= actually_dequeued - alive_nodes;
total_dead_nodes -= dead_count;
}

BufferPool::BufferPool(BlockAllocator &block_allocator, idx_t maximum_memory, bool track_eviction_timestamps,
Expand Down Expand Up @@ -500,7 +519,18 @@ void BufferPool::PurgeQueue(const BlockHandle &block) {
const auto queue_sleep_micros =
Settings::Get<DebugEvictionQueueSleepMicroSecondsSetting>(buffer_manager.GetDatabase());
eviction_queue.debug_eviction_queue_sleep = queue_sleep_micros;
eviction_queue.Purge();
if (Settings::Get<BackgroundQueuePurgeSetting>(buffer_manager.GetDatabase())) {
bool expected = false;
if (eviction_queue.purge_in_flight.compare_exchange_strong(expected, true)) {
thread purge_thread([&eviction_queue]() {
eviction_queue.Purge(true);
eviction_queue.purge_in_flight.store(false);
});
purge_thread.detach();
}
} else {
eviction_queue.Purge();
}
}

void BufferPool::SetLimit(idx_t limit, const char *exception_postscript) {
Expand Down
1 change: 1 addition & 0 deletions src/storage/buffer/buffer_pool_reservation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ BufferPoolReservation::BufferPoolReservation(BufferPoolReservation &&src) noexce
}

BufferPoolReservation &BufferPoolReservation::operator=(BufferPoolReservation &&src) noexcept {
pool.UpdateUsedMemory(tag, -UnsafeNumericCast<int64_t>(size));
tag = src.tag;
size = src.size;
src.size = 0;
Expand Down
6 changes: 5 additions & 1 deletion src/storage/table/column_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,11 @@ void ColumnData::InitializeAppend(ColumnAppendState &state) {
!last_segment.GetCompressionFunction().init_append) {
// we cannot append to this segment - append a new segment
auto total_rows = segment->GetRowStart() + last_segment.count;
AppendTransientSegment(l, total_rows, last_segment);

// Persistent segments are resized to block_size during checkpoint, wo we pass nullptr to start fresh
const bool is_persistent = last_segment.segment_type == ColumnSegmentType::PERSISTENT;
AppendTransientSegment(l, total_rows, is_persistent ? nullptr : &last_segment);

state.current = data.GetLastSegment(l);
} else {
state.current = segment;
Expand Down
Loading
Loading