diff --git a/src/include/duckdb/storage/buffer/block_handle.hpp b/src/include/duckdb/storage/buffer/block_handle.hpp index e6eefe774385..cd51841ac6b9 100644 --- a/src/include/duckdb/storage/buffer/block_handle.hpp +++ b/src/include/duckdb/storage/buffer/block_handle.hpp @@ -86,6 +86,22 @@ class BlockMemory : public enable_shared_from_this { void SetReaders(int32_t n) { readers = n; } + //! Try to increment readers from a positive value without holding the lock. + //! Returns true if successful (block was loaded and had active readers). + bool TryIncrementReadersIfPositive() { + auto current = readers.load(std::memory_order_relaxed); + while (current > 0) { + if (readers.compare_exchange_weak(current, current + 1, std::memory_order_acquire, + std::memory_order_relaxed)) { + return true; + } + } + return false; + } + //! Atomically decrement readers and return the new value. + int32_t DecrementReadersAtomic() { + return readers.fetch_sub(1, std::memory_order_release) - 1; + } //! Returns the memory tag. MemoryTag GetMemoryTag() const { return tag; diff --git a/src/include/duckdb/transaction/update_info.hpp b/src/include/duckdb/transaction/update_info.hpp index 5d2eaad9011a..bdd29826915f 100644 --- a/src/include/duckdb/transaction/update_info.hpp +++ b/src/include/duckdb/transaction/update_info.hpp @@ -103,8 +103,8 @@ struct UpdateInfo { static void Initialize(UpdateInfo &info, DataTable &data_table, transaction_t transaction_id, idx_t row_group_start); //! Initialize with a specific capacity (for compact allocations) - static void Initialize(UpdateInfo &info, DataTable &data_table, transaction_t transaction_id, - idx_t row_group_start, idx_t capacity); + static void Initialize(UpdateInfo &info, DataTable &data_table, transaction_t transaction_id, idx_t row_group_start, + idx_t capacity); }; } // namespace duckdb diff --git a/src/parallel/pipeline_executor.cpp b/src/parallel/pipeline_executor.cpp index 590417220b16..dff139ba2aad 100644 --- a/src/parallel/pipeline_executor.cpp +++ b/src/parallel/pipeline_executor.cpp @@ -1,6 +1,7 @@ #include "duckdb/parallel/pipeline_executor.hpp" #include "duckdb/common/limits.hpp" +#include "duckdb/logging/logger.hpp" #include "duckdb/main/client_context.hpp" #ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE @@ -189,6 +190,8 @@ PipelineExecuteResult PipelineExecutor::Execute(idx_t max_chunks) { D_ASSERT(pipeline.sink); auto &source_chunk = pipeline.operators.empty() ? final_chunk : *intermediate_chunks[0]; ExecutionBudget chunk_budget(max_chunks); + auto execute_start = std::chrono::steady_clock::now(); + idx_t chunks_processed = 0; do { if (context.client.interrupted) { throw InterruptException(); @@ -228,6 +231,7 @@ PipelineExecuteResult PipelineExecutor::Execute(idx_t max_chunks) { // "Regular" path: fetch a chunk from the source and push it through the pipeline source_chunk.Reset(); source_result = FetchFromSource(source_chunk); + chunks_processed++; if (source_result == SourceResultType::BLOCKED) { return PipelineExecuteResult::INTERRUPTED; } @@ -270,6 +274,14 @@ PipelineExecuteResult PipelineExecutor::Execute(idx_t max_chunks) { return PipelineExecuteResult::NOT_FINISHED; } + auto execute_ms = + std::chrono::duration_cast(std::chrono::steady_clock::now() - execute_start).count(); + if (execute_ms > 200) { + DUCKDB_LOG_WARNING(context.client, + "PipelineExecutor::Execute took %lldms for %llu chunks, pipeline has %llu operators", + execute_ms, chunks_processed, pipeline.operators.size()); + } + return PushFinalize(); } @@ -355,6 +367,8 @@ PipelineExecuteResult PipelineExecutor::PushFinalize() { throw InternalException("Calling PushFinalize on a pipeline that has been finalized already"); } + auto finalize_start = std::chrono::steady_clock::now(); + D_ASSERT(local_sink_state); // Run the combine for the sink @@ -390,6 +404,15 @@ PipelineExecuteResult PipelineExecutor::PushFinalize() { intermediate_states[i]->Finalize(pipeline.operators[i].get(), context); } pipeline.executor.Flush(thread); + + auto finalize_ms = + std::chrono::duration_cast(std::chrono::steady_clock::now() - finalize_start) + .count(); + if (finalize_ms > 200) { + DUCKDB_LOG_WARNING(context.client, "PipelineExecutor::PushFinalize took %lldms (combine + profiler flush)", + finalize_ms); + } + local_sink_state.reset(); return PipelineExecuteResult::FINISHED; diff --git a/src/storage/standard_buffer_manager.cpp b/src/storage/standard_buffer_manager.cpp index 45ff2f8d3f64..d491227e962c 100644 --- a/src/storage/standard_buffer_manager.cpp +++ b/src/storage/standard_buffer_manager.cpp @@ -312,6 +312,13 @@ BufferHandle StandardBufferManager::Pin(const QueryContext &context, shared_ptr< idx_t required_memory; auto &block_memory = handle->GetMemory(); + + // Fast path: if the block is loaded and has active readers, we can pin without the mutex. + // Safety: readers > 0 guarantees no concurrent unload (CanUnload checks readers > 0). + if (block_memory.GetState() == BlockState::BLOCK_LOADED && block_memory.TryIncrementReadersIfPositive()) { + return BufferHandle(handle, block_memory.GetBuffer()); + } + { // lock the block auto lock = block_memory.GetLock(); @@ -394,16 +401,24 @@ void StandardBufferManager::VerifyZeroReaders(BlockLock &lock, shared_ptr &handle) { - bool purge = false; auto &block_memory = handle->GetMemory(); + + // Fast path: if there are multiple readers, just decrement atomically. + // No state transition happens when readers goes from N>1 to N-1>0. + auto new_readers = block_memory.DecrementReadersAtomic(); + if (new_readers > 0) { + return; + } + + // Slow path: readers hit 0, need to handle eviction queue or unload. + // We already decremented, so re-acquire lock to handle the transition. + bool purge = false; { auto lock = block_memory.GetLock(); if (!block_memory.GetBuffer(lock) || block_memory.GetBufferType() == FileBufferType::TINY_BUFFER) { return; } - D_ASSERT(block_memory.GetReaders() > 0); - auto new_readers = block_memory.DecrementReaders(); - if (new_readers == 0) { + if (block_memory.GetReaders() == 0) { VerifyZeroReaders(lock, handle); if (block_memory.MustAddToEvictionQueue()) { purge = buffer_pool.AddToEvictionQueue(handle);