Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/include/duckdb/storage/buffer/block_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,22 @@ class BlockMemory : public enable_shared_from_this<BlockMemory> {
void SetReaders(int32_t n) {
readers = n;
}
//! Try to increment readers from a positive value without holding the lock.
//! Returns true if successful (block was loaded and had active readers).
bool TryIncrementReadersIfPositive() {
auto current = readers.load(std::memory_order_relaxed);
while (current > 0) {
if (readers.compare_exchange_weak(current, current + 1, std::memory_order_acquire,
std::memory_order_relaxed)) {
return true;
}
}
return false;
}
//! Atomically decrement readers and return the new value.
int32_t DecrementReadersAtomic() {
return readers.fetch_sub(1, std::memory_order_release) - 1;
}
//! Returns the memory tag.
MemoryTag GetMemoryTag() const {
return tag;
Expand Down
4 changes: 2 additions & 2 deletions src/include/duckdb/transaction/update_info.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ struct UpdateInfo {
static void Initialize(UpdateInfo &info, DataTable &data_table, transaction_t transaction_id,
idx_t row_group_start);
//! Initialize with a specific capacity (for compact allocations)
static void Initialize(UpdateInfo &info, DataTable &data_table, transaction_t transaction_id,
idx_t row_group_start, idx_t capacity);
static void Initialize(UpdateInfo &info, DataTable &data_table, transaction_t transaction_id, idx_t row_group_start,
idx_t capacity);
};

} // namespace duckdb
23 changes: 23 additions & 0 deletions src/parallel/pipeline_executor.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "duckdb/parallel/pipeline_executor.hpp"

#include "duckdb/common/limits.hpp"
#include "duckdb/logging/logger.hpp"
#include "duckdb/main/client_context.hpp"

#ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE
Expand Down Expand Up @@ -189,6 +190,8 @@ PipelineExecuteResult PipelineExecutor::Execute(idx_t max_chunks) {
D_ASSERT(pipeline.sink);
auto &source_chunk = pipeline.operators.empty() ? final_chunk : *intermediate_chunks[0];
ExecutionBudget chunk_budget(max_chunks);
auto execute_start = std::chrono::steady_clock::now();
idx_t chunks_processed = 0;
do {
if (context.client.interrupted) {
throw InterruptException();
Expand Down Expand Up @@ -228,6 +231,7 @@ PipelineExecuteResult PipelineExecutor::Execute(idx_t max_chunks) {
// "Regular" path: fetch a chunk from the source and push it through the pipeline
source_chunk.Reset();
source_result = FetchFromSource(source_chunk);
chunks_processed++;
if (source_result == SourceResultType::BLOCKED) {
return PipelineExecuteResult::INTERRUPTED;
}
Expand Down Expand Up @@ -270,6 +274,14 @@ PipelineExecuteResult PipelineExecutor::Execute(idx_t max_chunks) {
return PipelineExecuteResult::NOT_FINISHED;
}

auto execute_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - execute_start).count();
if (execute_ms > 200) {
DUCKDB_LOG_WARNING(context.client,
"PipelineExecutor::Execute took %lldms for %llu chunks, pipeline has %llu operators",
execute_ms, chunks_processed, pipeline.operators.size());
}

return PushFinalize();
}

Expand Down Expand Up @@ -355,6 +367,8 @@ PipelineExecuteResult PipelineExecutor::PushFinalize() {
throw InternalException("Calling PushFinalize on a pipeline that has been finalized already");
}

auto finalize_start = std::chrono::steady_clock::now();

D_ASSERT(local_sink_state);

// Run the combine for the sink
Expand Down Expand Up @@ -390,6 +404,15 @@ PipelineExecuteResult PipelineExecutor::PushFinalize() {
intermediate_states[i]->Finalize(pipeline.operators[i].get(), context);
}
pipeline.executor.Flush(thread);

auto finalize_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - finalize_start)
.count();
if (finalize_ms > 200) {
DUCKDB_LOG_WARNING(context.client, "PipelineExecutor::PushFinalize took %lldms (combine + profiler flush)",
finalize_ms);
}

local_sink_state.reset();

return PipelineExecuteResult::FINISHED;
Expand Down
23 changes: 19 additions & 4 deletions src/storage/standard_buffer_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,13 @@ BufferHandle StandardBufferManager::Pin(const QueryContext &context, shared_ptr<

idx_t required_memory;
auto &block_memory = handle->GetMemory();

// Fast path: if the block is loaded and has active readers, we can pin without the mutex.
// Safety: readers > 0 guarantees no concurrent unload (CanUnload checks readers > 0).
if (block_memory.GetState() == BlockState::BLOCK_LOADED && block_memory.TryIncrementReadersIfPositive()) {
return BufferHandle(handle, block_memory.GetBuffer());
}

{
// lock the block
auto lock = block_memory.GetLock();
Expand Down Expand Up @@ -394,16 +401,24 @@ void StandardBufferManager::VerifyZeroReaders(BlockLock &lock, shared_ptr<BlockH
}

void StandardBufferManager::Unpin(shared_ptr<BlockHandle> &handle) {
bool purge = false;
auto &block_memory = handle->GetMemory();

// Fast path: if there are multiple readers, just decrement atomically.
// No state transition happens when readers goes from N>1 to N-1>0.
auto new_readers = block_memory.DecrementReadersAtomic();
if (new_readers > 0) {
return;
}

// Slow path: readers hit 0, need to handle eviction queue or unload.
// We already decremented, so re-acquire lock to handle the transition.
bool purge = false;
{
auto lock = block_memory.GetLock();
if (!block_memory.GetBuffer(lock) || block_memory.GetBufferType() == FileBufferType::TINY_BUFFER) {
return;
}
D_ASSERT(block_memory.GetReaders() > 0);
auto new_readers = block_memory.DecrementReaders();
if (new_readers == 0) {
if (block_memory.GetReaders() == 0) {
VerifyZeroReaders(lock, handle);
if (block_memory.MustAddToEvictionQueue()) {
purge = buffer_pool.AddToEvictionQueue(handle);
Expand Down
Loading