From 5c87c20b4cf9ef8e3c42f5a8020c3495306a86e0 Mon Sep 17 00:00:00 2001 From: raphaelsteiner Date: Thu, 25 Jun 2026 15:26:46 +0200 Subject: [PATCH 01/10] atomic mpmc queue Signed-off-by: raphaelsteiner --- .../runtime/scheduler/pto_scheduler.h | 524 ++++++++++-------- .../runtime/shared/pto_runtime2_init.cpp | 17 +- 2 files changed, 303 insertions(+), 238 deletions(-) diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h index 5619a9fdf..0d4d55e3b 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h @@ -29,7 +29,9 @@ #pragma once +#include #include +#include #include "common/core_type.h" #include "utils/device_arena.h" @@ -55,16 +57,26 @@ extern "C" bool is_scope_stats_enabled(); #endif // ============================================================================= -// Ready Queue (Lock-free bounded MPMC — Vyukov design) +// Ready Queue (Lock-free bounded MPMC — Maxim Egorushkin's design) // ============================================================================= -/** - * Per-slot entry: sequence counter for ABA safety + task payload - */ -struct PTO2ReadyQueueSlot { - std::atomic sequence; - PTO2TaskSlotState *slot_state; -}; +#if defined(__aarch64__) +inline void pto2_cpu_relax() noexcept { asm volatile("isb" ::: "memory"); } +#elif defined(__x86_64__) || defined(_M_X64) +# include +inline void pto2_cpu_relax() noexcept { _mm_pause(); } +#else +inline void pto2_cpu_relax() noexcept { std::atomic_signal_fence(std::memory_order_acquire); } +#endif + +#if defined(__GNUC__) || defined(__clang__) +# ifndef unlikely +# define unlikely(x) (__builtin_expect(!!(x), 0)) +# endif +# ifndef likely +# define likely(x) (__builtin_expect(!!(x), 1)) +# endif +#endif /** * Thread-local ready buffer for local-first dispatch optimization. @@ -103,324 +115,378 @@ struct PTO2LocalReadyBuffer { }; /** - * Lock-free bounded MPMC queue (Dmitry Vyukov design) + * Lock-free bounded MPMC queue * * Key properties: - * - enqueue_pos and dequeue_pos on separate cache lines (no false sharing) - * - Per-slot sequence counter prevents ABA problem - * - Empty queue pop returns immediately (single atomic load, no lock) - * - CAS contention is split: producers only touch enqueue_pos, - * consumers only touch dequeue_pos + * - tailCachedHead_ bundles the consumer position (MSD) with a stale producer + * snapshot (LSD) on a single exclusive cache line — no cross-side contention + * - headCachedTail_ bundles the producer position (MSD) with a stale consumer + * snapshot (LSD) on a second exclusive cache line + * - Each slot is atomic with nullptr as the empty sentinel; + * no per-slot sequence counter is required + * - Index scattering (multiplier 9, coprime to 2^k) distributes consecutive + * positions across cache lines to reduce slot contention + * - Empty pop returns nullptr immediately; full push returns false */ struct alignas(64) PTO2ReadyQueue { - PTO2ReadyQueueSlot *slots; - uint64_t capacity; - uint64_t mask; // capacity - 1 - char _pad0[64 - 24]; // Pad to own cache line + // Consumer cache line: {tail[63:32] | cached_head[31:0]} + alignas(64) std::atomic tailCachedHead_; - std::atomic enqueue_pos; - char _pad1[64 - sizeof(std::atomic)]; // Own cache line + // Producer cache line: {head[63:32] | cached_tail[31:0]} + alignas(64) std::atomic headCachedTail_; - std::atomic dequeue_pos; - char _pad2[64 - sizeof(std::atomic)]; // Own cache line + // Shared read-only after wire_arena_pointers + alignas(64) std::atomic *slots; + uint32_t capacity; + uint32_t mask; // capacity - 1 + char _pad2[64 - sizeof(std::atomic *) - (2 * sizeof(uint32_t))]; - uint64_t size() { - uint64_t e = enqueue_pos.load(std::memory_order_relaxed); - uint64_t d = dequeue_pos.load(std::memory_order_relaxed); - return (e >= d) ? (e - d) : 0; + static_assert( + std::atomic::is_always_lock_free, + "PTO2ReadyQueue requires lock-free pointer atomics." + ); + static_assert( + std::atomic::is_always_lock_free, + "PTO2ReadyQueue requires lock-free uint64_t." + ); + + constexpr uint32_t get_msd(uint64_t v) const noexcept { return static_cast(v >> 32u); } + constexpr uint32_t get_lsd(uint64_t v) const noexcept { return static_cast(v); } + constexpr uint64_t pack(uint32_t msd, uint32_t lsd) const noexcept { + return (static_cast(msd) << 32u) | static_cast(lsd); } - bool push(PTO2TaskSlotState *slot_state) { - uint64_t pos; - PTO2ReadyQueueSlot *slot; + constexpr uint32_t get_tail(uint64_t tailCachedHead) const noexcept { return get_msd(tailCachedHead); } + constexpr uint32_t get_cached_head(uint64_t tailCachedHead) const noexcept { + return get_lsd(tailCachedHead); + } + constexpr uint32_t get_head(uint64_t headCachedTail) const noexcept { return get_msd(headCachedTail); } + constexpr uint32_t get_cached_tail(uint64_t headCachedTail) const noexcept { + return get_lsd(headCachedTail); + } + + uint32_t get_index(uint32_t position) const noexcept { + static_assert(sizeof(PTO2TaskSlotState *) == 8u, "index_multiplier assumes 8-byte pointer"); + constexpr uint32_t mul = 9u; // ceil(64/8)=8, even → 9; coprime to every power-of-2 N + return (position * mul) & mask; + } + + bool do_push_(PTO2TaskSlotState *value, uint32_t index) noexcept { + auto &element = slots[index]; + PTO2TaskSlotState *empty; + while (!element.compare_exchange_weak( + empty = nullptr, value, std::memory_order_acq_rel, std::memory_order_relaxed)) { + do { pto2_cpu_relax(); } while (element.load(std::memory_order_relaxed) != nullptr); + } + return true; + } + + PTO2TaskSlotState *do_pop_(uint32_t index) noexcept { + auto &element = slots[index]; + PTO2TaskSlotState *val = element.exchange(nullptr, std::memory_order_acq_rel); + while (val == nullptr) { + do { pto2_cpu_relax(); } while (element.load(std::memory_order_relaxed) == nullptr); + val = element.exchange(nullptr, std::memory_order_acq_rel); + } + return val; + } + + uint64_t size() noexcept { + uint32_t head = get_head(headCachedTail_.load(std::memory_order_relaxed)); + uint32_t tail = get_tail(tailCachedHead_.load(std::memory_order_relaxed)); + return head >= tail? static_cast(head) - static_cast(tail) : 0u; + } + + bool push(PTO2TaskSlotState *slot_state) noexcept { + uint64_t headCachedTail = headCachedTail_.load(std::memory_order_relaxed); + uint32_t head = get_head(headCachedTail); + uint32_t tail = get_cached_tail(headCachedTail); while (true) { - pos = enqueue_pos.load(std::memory_order_relaxed); - slot = &slots[pos & mask]; - int64_t seq = slot->sequence.load(std::memory_order_acquire); - int64_t diff = seq - static_cast(pos); - if (diff == 0) { - if (enqueue_pos.compare_exchange_weak( - pos, pos + 1, std::memory_order_relaxed, std::memory_order_relaxed - )) { - break; - } - } else if (diff < 0) { - return false; // Queue full + uint32_t headTrail = head - capacity; + if (headTrail == tail) { + tail = get_tail(tailCachedHead_.load(std::memory_order_relaxed)); + if (headTrail == tail) return false; // full } + if (headCachedTail_.compare_exchange_weak( + headCachedTail, pack(head + 1u, tail), + std::memory_order_relaxed, std::memory_order_relaxed)) + break; + head = get_head(headCachedTail); + tail = std::max(tail, get_cached_tail(headCachedTail)); } - - slot->slot_state = slot_state; - slot->sequence.store(static_cast(pos + 1), std::memory_order_release); - return true; + return do_push_(slot_state, get_index(head)); } - // Batch push: reserve count slots with a single CAS after confirming - // every target slot is available under the usual Vyukov sequence check. void push_batch(PTO2TaskSlotState **items, int count) { - if (count == 0) return; + uint32_t left = static_cast(count); + while (left -= try_push_batch(items, left)) {} + } - uint64_t pos; + template + uint32_t try_push_batch(It &&items, uint32_t count) noexcept { + uint64_t headCachedTail = headCachedTail_.load(std::memory_order_relaxed); + uint32_t head = get_head(headCachedTail); + uint32_t tail = get_cached_tail(headCachedTail); + uint32_t toPush; while (true) { - pos = enqueue_pos.load(std::memory_order_relaxed); - bool ready = true; - for (int i = 0; i < count; i++) { - PTO2ReadyQueueSlot *slot = &slots[(pos + i) & mask]; - int64_t seq = slot->sequence.load(std::memory_order_acquire); - int64_t diff = seq - static_cast(pos + i); - if (diff != 0) { - ready = false; - break; - } + uint32_t headTrail = head - capacity; + toPush = tail - headTrail; + if (toPush < count) { + tail = get_tail(tailCachedHead_.load(std::memory_order_relaxed)); + toPush = tail - headTrail; } - if (!ready) { - continue; + toPush = std::min(toPush, count); + if (toPush == 0u) { + return toPush; } - if (enqueue_pos.compare_exchange_weak( - pos, pos + count, std::memory_order_relaxed, std::memory_order_relaxed - )) { + + if (headCachedTail_.compare_exchange_weak( + headCachedTail, pack(head + toPush, tail), + std::memory_order_relaxed, std::memory_order_relaxed)) { break; } + head = get_head(headCachedTail); + tail = std::max(tail, get_cached_tail(headCachedTail)); } - for (int i = 0; i < count; i++) { - PTO2ReadyQueueSlot *slot = &slots[(pos + i) & mask]; - slot->slot_state = items[i]; - slot->sequence.store(static_cast(pos + i + 1), std::memory_order_release); + const uint32_t end = head + toPush; + for (uint32_t i = head; i < end; ++i) { + do_push_(*items++, get_index(head)); } + + return toPush; } #if PTO2_ORCH_PROFILING || PTO2_SCHED_PROFILING - bool push(PTO2TaskSlotState *slot_state, uint64_t &atomic_count, uint64_t &wait_cycle) { - uint64_t pos; - PTO2ReadyQueueSlot *slot; + bool push(PTO2TaskSlotState *slot_state, uint64_t &atomic_count, uint64_t &wait_cycle) noexcept { + uint64_t headCachedTail = headCachedTail_.load(std::memory_order_relaxed); + uint32_t head = get_head(headCachedTail); + uint32_t tail = get_cached_tail(headCachedTail); uint64_t t0 = get_sys_cnt_aicpu(); bool contended = false; - uint32_t atomic_ops = 0; + uint32_t atomic_ops = 1; // headCachedTail_.load while (true) { - pos = enqueue_pos.load(std::memory_order_relaxed); - slot = &slots[pos & mask]; - int64_t seq = slot->sequence.load(std::memory_order_acquire); - int64_t diff = seq - static_cast(pos); - atomic_ops += 2; // enqueue_pos.load + sequence.load - if (diff == 0) { - if (enqueue_pos.compare_exchange_weak( - pos, pos + 1, std::memory_order_relaxed, std::memory_order_relaxed - )) { - atomic_ops++; // successful CAS - break; + uint32_t headTrail = head - static_cast(mask_ + 1u); + if (headTrail == tail) { + tail = get_tail(tailCachedHead_.load(std::memory_order_relaxed)); + atomic_ops++; + if (headTrail == tail) { + atomic_count += atomic_ops; + return false; } - contended = true; - atomic_ops++; // failed CAS - } else if (diff < 0) { - return false; // Queue full - } else { - contended = true; // diff > 0: slot not yet released, spin } + if (headCachedTail_.compare_exchange_weak( + headCachedTail, pack(head + 1u, tail), + std::memory_order_relaxed, std::memory_order_relaxed)) { + atomic_ops++; // successful CAS + break; + } + contended = true; + atomic_ops++; // failed CAS + head = get_head(headCachedTail); + tail = std::max(tail, get_cached_tail(headCachedTail)); } - atomic_ops++; // final sequence.store - atomic_count += atomic_ops; - if (contended) { - wait_cycle += (get_sys_cnt_aicpu() - t0); + auto &element = slots_[get_index(head)]; + PTO2TaskSlotState *empty; + if (!element.compare_exchange_weak( + empty = nullptr, slot_state, std::memory_order_acq_rel, + std::memory_order_relaxed)) { + contended = true; + do { + do { pto2_cpu_relax(); } while (element.load(std::memory_order_relaxed) != nullptr); + } while (!element.compare_exchange_weak( + empty = nullptr, slot_state, std::memory_order_acq_rel, + std::memory_order_relaxed)); } - - slot->slot_state = slot_state; - slot->sequence.store(static_cast(pos + 1), std::memory_order_release); + atomic_ops++; // element CAS + atomic_count += atomic_ops; + if (contended) wait_cycle += (get_sys_cnt_aicpu() - t0); return true; } #endif - PTO2TaskSlotState *pop() { - // Fast-path: skip slot load when queue is clearly empty - uint64_t d = dequeue_pos.load(std::memory_order_relaxed); - uint64_t e = enqueue_pos.load(std::memory_order_relaxed); - if (d >= e) { - return nullptr; - } - - uint64_t pos; - PTO2ReadyQueueSlot *slot; + PTO2TaskSlotState *pop() noexcept { + uint64_t tailCachedHead = tailCachedHead_.load(std::memory_order_relaxed); + uint32_t tail = get_tail(tailCachedHead); + uint32_t head = get_cached_head(tailCachedHead); while (true) { - pos = dequeue_pos.load(std::memory_order_relaxed); - slot = &slots[pos & mask]; - int64_t seq = slot->sequence.load(std::memory_order_acquire); - int64_t diff = seq - static_cast(pos + 1); - if (diff == 0) { - if (dequeue_pos.compare_exchange_weak( - pos, pos + 1, std::memory_order_relaxed, std::memory_order_relaxed - )) - break; - } else if (diff < 0) { - return nullptr; // Queue empty + if (tail == head) { + head = get_head(headCachedTail_.load(std::memory_order_relaxed)); + if (tail == head) return nullptr; // empty } + if (tailCachedHead_.compare_exchange_weak( + tailCachedHead, pack(tail + 1u, head), + std::memory_order_relaxed, std::memory_order_relaxed)) + break; + tail = get_tail(tailCachedHead); + head = std::max(head, get_cached_head(tailCachedHead)); } - - PTO2TaskSlotState *result = slot->slot_state; - slot->sequence.store(static_cast(pos + mask + 1), std::memory_order_release); - return result; + return do_pop_(get_index(tail)); } #if PTO2_SCHED_PROFILING - PTO2TaskSlotState *pop(uint64_t &atomic_count, uint64_t &wait_cycle) { - // Fast-path: skip slot load when queue is clearly empty - uint64_t d = dequeue_pos.load(std::memory_order_relaxed); - uint64_t e = enqueue_pos.load(std::memory_order_relaxed); - atomic_count += 2; // dequeue_pos.load + enqueue_pos.load - if (d >= e) { - return nullptr; - } - - uint64_t pos; - PTO2ReadyQueueSlot *slot; + PTO2TaskSlotState *pop(uint64_t &atomic_count, uint64_t &wait_cycle) noexcept { + uint64_t tailCachedHead = tailCachedHead_.load(std::memory_order_relaxed); + uint32_t tail = get_tail(tailCachedHead); + uint32_t head = get_cached_head(tailCachedHead); uint64_t t0 = get_sys_cnt_aicpu(); bool contended = false; - uint32_t atomic_ops = 0; + uint32_t atomic_ops = 1; // tailCachedHead_.load while (true) { - pos = dequeue_pos.load(std::memory_order_relaxed); - slot = &slots[pos & mask]; - int64_t seq = slot->sequence.load(std::memory_order_acquire); - int64_t diff = seq - static_cast(pos + 1); - atomic_ops += 2; // dequeue_pos.load + sequence.load - if (diff == 0) { - if (dequeue_pos.compare_exchange_weak( - pos, pos + 1, std::memory_order_relaxed, std::memory_order_relaxed - )) { - atomic_ops++; // successful CAS - break; + if (tail == head) { + head = get_head(headCachedTail_.load(std::memory_order_relaxed)); + atomic_ops++; + if (tail == head) { + atomic_count += atomic_ops; + return nullptr; // empty } - contended = true; - atomic_ops++; // failed CAS - } else if (diff < 0) { - atomic_count += atomic_ops; - return nullptr; // Queue empty - } else { - contended = true; } + if (tailCachedHead_.compare_exchange_weak( + tailCachedHead, pack(tail + 1u, head), + std::memory_order_relaxed, std::memory_order_relaxed)) { + atomic_ops++; // successful CAS + break; + } + contended = true; + atomic_ops++; // failed CAS + tail = get_tail(tailCachedHead); + head = std::max(head, get_cached_head(tailCachedHead)); } - atomic_ops++; // final sequence.store - atomic_count += atomic_ops; - if (contended) { - wait_cycle += (get_sys_cnt_aicpu() - t0); + auto &element = slots_[get_index(tail)]; + PTO2TaskSlotState *val = element.exchange(nullptr, std::memory_order_acq_rel); + atomic_ops++; // exchange + while (val == nullptr) { + contended = true; + do { pto2_cpu_relax(); } while (element.load(std::memory_order_relaxed) == nullptr); + val = element.exchange(nullptr, std::memory_order_acq_rel); + atomic_ops++; } - - PTO2TaskSlotState *result = slot->slot_state; - slot->sequence.store(static_cast(pos + mask + 1), std::memory_order_release); - return result; + atomic_count += atomic_ops; + if (contended) wait_cycle += (get_sys_cnt_aicpu() - t0); + return val; } #endif - // Batch pop: reserve a contiguous run of ready slots with a single CAS. - // Returns actual number of items popped (may be less than max_count). - int pop_batch(PTO2TaskSlotState **out, int max_count) { - uint64_t pos; - int count; + int pop_batch(PTO2TaskSlotState **out, int max_count) noexcept { + uint32_t amount = static_cast(max_count); + uint64_t tailCachedHead = tailCachedHead_.load(std::memory_order_relaxed); + uint32_t tail = get_tail(tailCachedHead); + uint32_t head = get_cached_head(tailCachedHead); + uint32_t toPop; while (true) { - pos = dequeue_pos.load(std::memory_order_relaxed); - count = 0; - while (count < max_count) { - PTO2ReadyQueueSlot *slot = &slots[(pos + count) & mask]; - int64_t seq = slot->sequence.load(std::memory_order_acquire); - int64_t diff = seq - static_cast(pos + count + 1); - if (diff == 0) { - count++; - continue; - } - if (diff < 0) { - break; - } - count = -1; - break; + toPop = head - tail; + if (toPop < amount) { + head = get_head(headCachedTail_.load(std::memory_order_relaxed)); + toPop = head - tail; } - if (count == 0) return 0; - if (count < 0) continue; - if (dequeue_pos.compare_exchange_weak( - pos, pos + count, std::memory_order_relaxed, std::memory_order_relaxed - )) { + + toPop = std::min(toPop, amount); + if (toPop == 0u) return 0; + + if (tailCachedHead_.compare_exchange_weak( + tailCachedHead, pack(tail + toPop, head), + std::memory_order_relaxed, std::memory_order_relaxed)) { break; } + tail = get_tail(tailCachedHead); + head = std::max(head, get_cached_head(tailCachedHead)); } - for (int i = 0; i < count; i++) { - PTO2ReadyQueueSlot *slot = &slots[(pos + i) & mask]; - out[i] = slot->slot_state; - slot->sequence.store(static_cast(pos + i + mask + 1), std::memory_order_release); + const uint32_t end = tail + toPop; + for (uint32_t i = tail; i < end; ++i) { + *out++ = do_pop_(get_index(i)); } - return count; + + return static_cast(toPop); } #if PTO2_SCHED_PROFILING - int pop_batch(PTO2TaskSlotState **out, int max_count, uint64_t &atomic_count, uint64_t &wait_cycle) { - uint64_t pos; + int pop_batch( + PTO2TaskSlotState **out, int max_count, uint64_t &atomic_count, uint64_t &wait_cycle + ) noexcept { + uint64_t tailCachedHead; + uint32_t tail, head; int count; uint64_t t0 = get_sys_cnt_aicpu(); bool contended = false; uint32_t atomic_ops = 0; while (true) { - pos = dequeue_pos.load(std::memory_order_relaxed); - atomic_ops++; // dequeue_pos.load - count = 0; - while (count < max_count) { - PTO2ReadyQueueSlot *slot = &slots[(pos + count) & mask]; - int64_t seq = slot->sequence.load(std::memory_order_acquire); - int64_t diff = seq - static_cast(pos + count + 1); - atomic_ops++; // sequence.load - if (diff == 0) { - count++; - continue; + tailCachedHead = tailCachedHead_.load(std::memory_order_relaxed); + tail = get_tail(tailCachedHead); + head = get_cached_head(tailCachedHead); + atomic_ops++; // tailCachedHead_.load + if (tail == head) { + head = get_head(headCachedTail_.load(std::memory_order_relaxed)); + atomic_ops++; + if (tail == head) { + atomic_count += atomic_ops; + return 0; // empty } - if (diff < 0) { + } + uint32_t available = head - tail; + uint32_t limit = static_cast(max_count) < available + ? static_cast(max_count) : available; + count = 0; + while (static_cast(count) < limit) { + if (slots_[get_index(tail + static_cast(count))].load( + std::memory_order_acquire) != nullptr) { + ++count; + atomic_ops++; // element.load + } else { + contended = true; + atomic_ops++; // element.load + count = -1; break; } - contended = true; - count = -1; - break; } if (count == 0) { atomic_count += atomic_ops; return 0; } - if (count < 0) { - continue; - } - if (dequeue_pos.compare_exchange_weak( - pos, pos + count, std::memory_order_relaxed, std::memory_order_relaxed - )) { + if (count < 0) continue; + if (tailCachedHead_.compare_exchange_weak( + tailCachedHead, pack(tail + static_cast(count), head), + std::memory_order_relaxed, std::memory_order_relaxed)) { atomic_ops++; // successful CAS break; } contended = true; atomic_ops++; // failed CAS } - for (int i = 0; i < count; i++) { - PTO2ReadyQueueSlot *slot = &slots[(pos + i) & mask]; - out[i] = slot->slot_state; - slot->sequence.store(static_cast(pos + i + mask + 1), std::memory_order_release); - atomic_ops++; // sequence.store + auto &element = slots_[get_index(tail + static_cast(i))]; + PTO2TaskSlotState *val = element.exchange(nullptr, std::memory_order_acq_rel); + atomic_ops++; // exchange + while (val == nullptr) { + contended = true; + do { pto2_cpu_relax(); } while (element.load(std::memory_order_relaxed) == nullptr); + val = element.exchange(nullptr, std::memory_order_acq_rel); + atomic_ops++; + } + out[i] = val; } atomic_count += atomic_ops; - if (contended) { - wait_cycle += (get_sys_cnt_aicpu() - t0); - } + if (contended) wait_cycle += (get_sys_cnt_aicpu() - t0); return count; } #endif }; -// Cold-path ready queue operations (defined in pto_scheduler.cpp). Declared +// Cold-path ready queue operations (defined in pto_runtime2_init.cpp). Declared // as non-member so PTO2ReadyQueue stays a POD-like struct with cache-line // alignment. Storage is owned by the caller-supplied arena. -// reserve_layout: declare the slots[] region on the arena (must precede commit) -// init_from_layout: bind slots pointer from arena.region_ptr(off) and -// initialize sequence counters -// destroy: forget the slots pointer (arena owns the buffer) +// reserve_layout: declare the slots[] region on the arena (must precede commit) +// init_from_layout: initialize positions/mask and zero all slot atomics to nullptr; +// does NOT store the slots_ pointer (call wire_arena_pointers for that) +// destroy: forget the slots_ pointer (arena owns the buffer) size_t ready_queue_reserve_layout(DeviceArena &arena, uint64_t capacity); -// Writes everything *except* the arena-internal `slots` pointer field -// (sequences/positions on the slot array, capacity, mask). Uses +// Writes everything except the arena-internal `slots_` pointer field +// (slot atomics to nullptr, mask_, tailCachedHead_, headCachedTail_). Uses // arena.region_ptr(slots_off) only to address the slot array for writes; -// does NOT store the pointer in `queue->slots`. Call +// does NOT store the pointer in `queue->slots_`. Call // `ready_queue_wire_arena_pointers` afterwards to set the field itself. bool ready_queue_init_data_from_layout(PTO2ReadyQueue *queue, DeviceArena &arena, size_t slots_off, uint64_t capacity); -// Stores queue->slots = arena.region_ptr(slots_off). Idempotent. +// Stores queue->slots_ = arena.region_ptr(slots_off). Idempotent. void ready_queue_wire_arena_pointers(PTO2ReadyQueue *queue, DeviceArena &arena, size_t slots_off); void ready_queue_destroy(PTO2ReadyQueue *queue); diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/shared/pto_runtime2_init.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/shared/pto_runtime2_init.cpp index 24585db85..340ca06af 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/shared/pto_runtime2_init.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/shared/pto_runtime2_init.cpp @@ -34,31 +34,30 @@ // ============================================================================= size_t ready_queue_reserve_layout(DeviceArena &arena, uint64_t capacity) { - // Align the slots[] base to a full cache line so MPMC CAS traffic on the + // Align the slots_ base to a full cache line so MPMC CAS traffic on the // first slot cannot false-share with whatever region sits in front of us // (e.g. orchestrator tensormap heads written by the orch thread). - return arena.reserve(capacity * sizeof(PTO2ReadyQueueSlot), PTO2_ALIGN_SIZE); + return arena.reserve(capacity * sizeof(std::atomic), PTO2_ALIGN_SIZE); } bool ready_queue_init_data_from_layout(PTO2ReadyQueue *queue, DeviceArena &arena, size_t slots_off, uint64_t capacity) { // Address the slots region for data writes without storing the pointer in - // queue->slots — that field is set by ready_queue_wire_arena_pointers. - auto *slots_arena = static_cast(arena.region_ptr(slots_off)); + // queue->slots_ — that field is set by ready_queue_wire_arena_pointers. + auto *slots_arena = static_cast *>(arena.region_ptr(slots_off)); queue->capacity = capacity; queue->mask = capacity - 1; - queue->enqueue_pos.store(0, std::memory_order_relaxed); - queue->dequeue_pos.store(0, std::memory_order_relaxed); + queue->tailCachedHead_.store(0u, std::memory_order_relaxed); + queue->headCachedTail_.store(0u, std::memory_order_relaxed); for (uint64_t i = 0; i < capacity; i++) { - slots_arena[i].sequence.store((int64_t)i, std::memory_order_relaxed); - slots_arena[i].slot_state = nullptr; + slots_arena[i].store(nullptr, std::memory_order_relaxed); } return true; } void ready_queue_wire_arena_pointers(PTO2ReadyQueue *queue, DeviceArena &arena, size_t slots_off) { - queue->slots = static_cast(arena.region_ptr(slots_off)); + queue->slots = static_cast *>(arena.region_ptr(slots_off)); } void ready_queue_destroy(PTO2ReadyQueue *queue) { From 0e099137967b7063782d69ba5a7e7cfc51233e23 Mon Sep 17 00:00:00 2001 From: raphaelsteiner Date: Thu, 25 Jun 2026 16:00:25 +0200 Subject: [PATCH 02/10] progress Signed-off-by: raphaelsteiner --- .../runtime/scheduler/pto_scheduler.h | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h index 0d4d55e3b..9879a4b1d 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h @@ -266,7 +266,7 @@ struct alignas(64) PTO2ReadyQueue { bool contended = false; uint32_t atomic_ops = 1; // headCachedTail_.load while (true) { - uint32_t headTrail = head - static_cast(mask_ + 1u); + uint32_t headTrail = head - capacity; if (headTrail == tail) { tail = get_tail(tailCachedHead_.load(std::memory_order_relaxed)); atomic_ops++; @@ -286,19 +286,20 @@ struct alignas(64) PTO2ReadyQueue { head = get_head(headCachedTail); tail = std::max(tail, get_cached_tail(headCachedTail)); } + auto &element = slots_[get_index(head)]; PTO2TaskSlotState *empty; - if (!element.compare_exchange_weak( + while (!element.compare_exchange_weak( empty = nullptr, slot_state, std::memory_order_acq_rel, std::memory_order_relaxed)) { contended = true; + atomic_ops++; // element CAS failed do { - do { pto2_cpu_relax(); } while (element.load(std::memory_order_relaxed) != nullptr); - } while (!element.compare_exchange_weak( - empty = nullptr, slot_state, std::memory_order_acq_rel, - std::memory_order_relaxed)); + pto2_cpu_relax(); + atomic_ops++; // element load + } while (element.load(std::memory_order_relaxed) != nullptr); } - atomic_ops++; // element CAS + atomic_ops++; // element CAS success atomic_count += atomic_ops; if (contended) wait_cycle += (get_sys_cnt_aicpu() - t0); return true; @@ -352,14 +353,18 @@ struct alignas(64) PTO2ReadyQueue { tail = get_tail(tailCachedHead); head = std::max(head, get_cached_head(tailCachedHead)); } + auto &element = slots_[get_index(tail)]; PTO2TaskSlotState *val = element.exchange(nullptr, std::memory_order_acq_rel); atomic_ops++; // exchange while (val == nullptr) { contended = true; - do { pto2_cpu_relax(); } while (element.load(std::memory_order_relaxed) == nullptr); + do { + pto2_cpu_relax(); + atomic_ops++; // load + } while (element.load(std::memory_order_relaxed) == nullptr); val = element.exchange(nullptr, std::memory_order_acq_rel); - atomic_ops++; + atomic_ops++; // exchange } atomic_count += atomic_ops; if (contended) wait_cycle += (get_sys_cnt_aicpu() - t0); From 6729e3a8603c871c6955778919a04353f82437ec Mon Sep 17 00:00:00 2001 From: raphaelsteiner Date: Thu, 25 Jun 2026 16:10:04 +0200 Subject: [PATCH 03/10] contention comments Signed-off-by: raphaelsteiner --- .../runtime/scheduler/pto_scheduler.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h index 9879a4b1d..42de81725 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h @@ -292,7 +292,7 @@ struct alignas(64) PTO2ReadyQueue { while (!element.compare_exchange_weak( empty = nullptr, slot_state, std::memory_order_acq_rel, std::memory_order_relaxed)) { - contended = true; + contended = true; // contended only if very unlikely loop happened, more likely waiting for reader atomic_ops++; // element CAS failed do { pto2_cpu_relax(); @@ -358,7 +358,7 @@ struct alignas(64) PTO2ReadyQueue { PTO2TaskSlotState *val = element.exchange(nullptr, std::memory_order_acq_rel); atomic_ops++; // exchange while (val == nullptr) { - contended = true; + contended = true; // contended only if very unlikely loop happened, more likely waiting for writer do { pto2_cpu_relax(); atomic_ops++; // load From fb1a584ce955509727f453623d16cadad00801eb Mon Sep 17 00:00:00 2001 From: raphaelsteiner Date: Thu, 25 Jun 2026 17:14:34 +0200 Subject: [PATCH 04/10] profiling correctly Signed-off-by: raphaelsteiner --- .../runtime/scheduler/pto_scheduler.h | 89 +++++++++---------- 1 file changed, 41 insertions(+), 48 deletions(-) diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h index 42de81725..0391819a9 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h @@ -272,6 +272,7 @@ struct alignas(64) PTO2ReadyQueue { atomic_ops++; if (headTrail == tail) { atomic_count += atomic_ops; + if (contended) wait_cycle += (get_sys_cnt_aicpu() - t0); return false; } } @@ -287,7 +288,7 @@ struct alignas(64) PTO2ReadyQueue { tail = std::max(tail, get_cached_tail(headCachedTail)); } - auto &element = slots_[get_index(head)]; + auto &element = slots[get_index(head)]; PTO2TaskSlotState *empty; while (!element.compare_exchange_weak( empty = nullptr, slot_state, std::memory_order_acq_rel, @@ -339,6 +340,7 @@ struct alignas(64) PTO2ReadyQueue { atomic_ops++; if (tail == head) { atomic_count += atomic_ops; + if (contended) wait_cycle += (get_sys_cnt_aicpu() - t0); return nullptr; // empty } } @@ -354,7 +356,7 @@ struct alignas(64) PTO2ReadyQueue { head = std::max(head, get_cached_head(tailCachedHead)); } - auto &element = slots_[get_index(tail)]; + auto &element = slots[get_index(tail)]; PTO2TaskSlotState *val = element.exchange(nullptr, std::memory_order_acq_rel); atomic_ops++; // exchange while (val == nullptr) { @@ -409,70 +411,61 @@ struct alignas(64) PTO2ReadyQueue { int pop_batch( PTO2TaskSlotState **out, int max_count, uint64_t &atomic_count, uint64_t &wait_cycle ) noexcept { - uint64_t tailCachedHead; - uint32_t tail, head; - int count; + uint32_t amount = static_cast(max_count); + uint64_t tailCachedHead = tailCachedHead_.load(std::memory_order_relaxed); + uint32_t tail = get_tail(tailCachedHead); + uint32_t head = get_cached_head(tailCachedHead); + uint32_t toPop; uint64_t t0 = get_sys_cnt_aicpu(); bool contended = false; - uint32_t atomic_ops = 0; + uint32_t atomic_ops = 1; // load + while (true) { - tailCachedHead = tailCachedHead_.load(std::memory_order_relaxed); - tail = get_tail(tailCachedHead); - head = get_cached_head(tailCachedHead); - atomic_ops++; // tailCachedHead_.load - if (tail == head) { + toPop = head - tail; + if (toPop < amount) { head = get_head(headCachedTail_.load(std::memory_order_relaxed)); - atomic_ops++; - if (tail == head) { - atomic_count += atomic_ops; - return 0; // empty - } - } - uint32_t available = head - tail; - uint32_t limit = static_cast(max_count) < available - ? static_cast(max_count) : available; - count = 0; - while (static_cast(count) < limit) { - if (slots_[get_index(tail + static_cast(count))].load( - std::memory_order_acquire) != nullptr) { - ++count; - atomic_ops++; // element.load - } else { - contended = true; - atomic_ops++; // element.load - count = -1; - break; - } + toPop = head - tail; + atomic_ops++; // load } - if (count == 0) { + + toPop = std::min(toPop, amount); + if (toPop == 0u) { atomic_count += atomic_ops; + if (contended) wait_cycle += (get_sys_cnt_aicpu() - t0); return 0; } - if (count < 0) continue; + if (tailCachedHead_.compare_exchange_weak( - tailCachedHead, pack(tail + static_cast(count), head), + tailCachedHead, pack(tail + toPop, head), std::memory_order_relaxed, std::memory_order_relaxed)) { - atomic_ops++; // successful CAS + atomic_ops++; // CAS success break; } - contended = true; - atomic_ops++; // failed CAS + atomic_ops++; // CAS fail + tail = get_tail(tailCachedHead); + head = std::max(head, get_cached_head(tailCachedHead)); } - for (int i = 0; i < count; i++) { - auto &element = slots_[get_index(tail + static_cast(i))]; - PTO2TaskSlotState *val = element.exchange(nullptr, std::memory_order_acq_rel); + + const uint32_t end = tail + toPop; + for (uint32_t i = tail; i < end; ++i) { + auto &element = slots[get_index(i)]; + *out = element.exchange(nullptr, std::memory_order_acq_rel); atomic_ops++; // exchange - while (val == nullptr) { - contended = true; - do { pto2_cpu_relax(); } while (element.load(std::memory_order_relaxed) == nullptr); - val = element.exchange(nullptr, std::memory_order_acq_rel); - atomic_ops++; + while (*out == nullptr) { + contended = true; // contended only if very unlikely loop happened, more likely waiting for writer + do { + pto2_cpu_relax(); + atomic_ops++; // load + } while (element.load(std::memory_order_relaxed) == nullptr); + *out = element.exchange(nullptr, std::memory_order_acq_rel); + atomic_ops++; // exchange } - out[i] = val; + out++; } + atomic_count += atomic_ops; if (contended) wait_cycle += (get_sys_cnt_aicpu() - t0); - return count; + return static_cast(toPop); } #endif }; From 489647c26711d0876fe3788abf9bbb93b0e9c720 Mon Sep 17 00:00:00 2001 From: raphaelsteiner Date: Thu, 25 Jun 2026 17:45:29 +0200 Subject: [PATCH 05/10] likely unlikely progress Signed-off-by: raphaelsteiner --- .../runtime/scheduler/pto_scheduler.h | 69 +++++++++++-------- 1 file changed, 39 insertions(+), 30 deletions(-) diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h index 0391819a9..defeb1829 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h @@ -69,12 +69,19 @@ inline void pto2_cpu_relax() noexcept { _mm_pause(); } inline void pto2_cpu_relax() noexcept { std::atomic_signal_fence(std::memory_order_acquire); } #endif -#if defined(__GNUC__) || defined(__clang__) -# ifndef unlikely +#ifndef unlikely +# if defined(__GNUC__) || defined(__clang__) # define unlikely(x) (__builtin_expect(!!(x), 0)) +# else +# define unlikely(x) (x) # endif -# ifndef likely +#endif + +#ifndef likely +# if defined(__GNUC__) || defined(__clang__) # define likely(x) (__builtin_expect(!!(x), 1)) +# else +# define likely(x) (x) # endif #endif @@ -174,9 +181,9 @@ struct alignas(64) PTO2ReadyQueue { bool do_push_(PTO2TaskSlotState *value, uint32_t index) noexcept { auto &element = slots[index]; PTO2TaskSlotState *empty; - while (!element.compare_exchange_weak( - empty = nullptr, value, std::memory_order_acq_rel, std::memory_order_relaxed)) { - do { pto2_cpu_relax(); } while (element.load(std::memory_order_relaxed) != nullptr); + while (unlikely(!element.compare_exchange_weak( + empty = nullptr, value, std::memory_order_acq_rel, std::memory_order_relaxed))) { + do { pto2_cpu_relax(); } while (unlikely(element.load(std::memory_order_relaxed) != nullptr)); } return true; } @@ -184,8 +191,8 @@ struct alignas(64) PTO2ReadyQueue { PTO2TaskSlotState *do_pop_(uint32_t index) noexcept { auto &element = slots[index]; PTO2TaskSlotState *val = element.exchange(nullptr, std::memory_order_acq_rel); - while (val == nullptr) { - do { pto2_cpu_relax(); } while (element.load(std::memory_order_relaxed) == nullptr); + while (unlikely(val == nullptr)) { + do { pto2_cpu_relax(); } while (unlikely(element.load(std::memory_order_relaxed) == nullptr)); val = element.exchange(nullptr, std::memory_order_acq_rel); } return val; @@ -194,7 +201,7 @@ struct alignas(64) PTO2ReadyQueue { uint64_t size() noexcept { uint32_t head = get_head(headCachedTail_.load(std::memory_order_relaxed)); uint32_t tail = get_tail(tailCachedHead_.load(std::memory_order_relaxed)); - return head >= tail? static_cast(head) - static_cast(tail) : 0u; + return likely(head >= tail) ? static_cast(head) - static_cast(tail) : 0u; } bool push(PTO2TaskSlotState *slot_state) noexcept { @@ -203,14 +210,15 @@ struct alignas(64) PTO2ReadyQueue { uint32_t tail = get_cached_tail(headCachedTail); while (true) { uint32_t headTrail = head - capacity; - if (headTrail == tail) { + if (unlikely(headTrail == tail)) { tail = get_tail(tailCachedHead_.load(std::memory_order_relaxed)); - if (headTrail == tail) return false; // full + if (unlikely(headTrail == tail)) return false; // full } - if (headCachedTail_.compare_exchange_weak( + if (likely(headCachedTail_.compare_exchange_weak( headCachedTail, pack(head + 1u, tail), - std::memory_order_relaxed, std::memory_order_relaxed)) + std::memory_order_relaxed, std::memory_order_relaxed))) { break; + } head = get_head(headCachedTail); tail = std::max(tail, get_cached_tail(headCachedTail)); } @@ -231,18 +239,18 @@ struct alignas(64) PTO2ReadyQueue { while (true) { uint32_t headTrail = head - capacity; toPush = tail - headTrail; - if (toPush < count) { + if (unlikely(toPush < count)) { tail = get_tail(tailCachedHead_.load(std::memory_order_relaxed)); toPush = tail - headTrail; + if (unlikely(toPush == 0u)) { + return toPush; + } } toPush = std::min(toPush, count); - if (toPush == 0u) { - return toPush; - } - if (headCachedTail_.compare_exchange_weak( + if (likely(headCachedTail_.compare_exchange_weak( headCachedTail, pack(head + toPush, tail), - std::memory_order_relaxed, std::memory_order_relaxed)) { + std::memory_order_relaxed, std::memory_order_relaxed))) { break; } head = get_head(headCachedTail); @@ -267,18 +275,18 @@ struct alignas(64) PTO2ReadyQueue { uint32_t atomic_ops = 1; // headCachedTail_.load while (true) { uint32_t headTrail = head - capacity; - if (headTrail == tail) { + if (unlikely(headTrail == tail)) { tail = get_tail(tailCachedHead_.load(std::memory_order_relaxed)); atomic_ops++; - if (headTrail == tail) { + if (unlikely(headTrail == tail)) { atomic_count += atomic_ops; if (contended) wait_cycle += (get_sys_cnt_aicpu() - t0); return false; } } - if (headCachedTail_.compare_exchange_weak( + if (likely(headCachedTail_.compare_exchange_weak( headCachedTail, pack(head + 1u, tail), - std::memory_order_relaxed, std::memory_order_relaxed)) { + std::memory_order_relaxed, std::memory_order_relaxed))) { atomic_ops++; // successful CAS break; } @@ -290,15 +298,15 @@ struct alignas(64) PTO2ReadyQueue { auto &element = slots[get_index(head)]; PTO2TaskSlotState *empty; - while (!element.compare_exchange_weak( + while (unlikely(!element.compare_exchange_weak( empty = nullptr, slot_state, std::memory_order_acq_rel, - std::memory_order_relaxed)) { + std::memory_order_relaxed))) { contended = true; // contended only if very unlikely loop happened, more likely waiting for reader atomic_ops++; // element CAS failed do { pto2_cpu_relax(); atomic_ops++; // element load - } while (element.load(std::memory_order_relaxed) != nullptr); + } while (unlikely(element.load(std::memory_order_relaxed) != nullptr)); } atomic_ops++; // element CAS success atomic_count += atomic_ops; @@ -312,14 +320,15 @@ struct alignas(64) PTO2ReadyQueue { uint32_t tail = get_tail(tailCachedHead); uint32_t head = get_cached_head(tailCachedHead); while (true) { - if (tail == head) { + if (unlikely(tail == head)) { head = get_head(headCachedTail_.load(std::memory_order_relaxed)); - if (tail == head) return nullptr; // empty + if (unlikely(tail == head)) return nullptr; // empty } - if (tailCachedHead_.compare_exchange_weak( + if (likely(tailCachedHead_.compare_exchange_weak( tailCachedHead, pack(tail + 1u, head), - std::memory_order_relaxed, std::memory_order_relaxed)) + std::memory_order_relaxed, std::memory_order_relaxed))) { break; + } tail = get_tail(tailCachedHead); head = std::max(head, get_cached_head(tailCachedHead)); } From bdc18b5b84ee5925e8b2e2bfe24d16c14cfb429f Mon Sep 17 00:00:00 2001 From: raphaelsteiner Date: Fri, 26 Jun 2026 09:49:14 +0200 Subject: [PATCH 06/10] likely unlikely and improved memory order Signed-off-by: raphaelsteiner --- .../runtime/scheduler/pto_scheduler.h | 50 +++++++++---------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h index defeb1829..e780145a9 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h @@ -182,7 +182,7 @@ struct alignas(64) PTO2ReadyQueue { auto &element = slots[index]; PTO2TaskSlotState *empty; while (unlikely(!element.compare_exchange_weak( - empty = nullptr, value, std::memory_order_acq_rel, std::memory_order_relaxed))) { + empty = nullptr, value, std::memory_order_release, std::memory_order_relaxed))) { do { pto2_cpu_relax(); } while (unlikely(element.load(std::memory_order_relaxed) != nullptr)); } return true; @@ -190,10 +190,10 @@ struct alignas(64) PTO2ReadyQueue { PTO2TaskSlotState *do_pop_(uint32_t index) noexcept { auto &element = slots[index]; - PTO2TaskSlotState *val = element.exchange(nullptr, std::memory_order_acq_rel); + PTO2TaskSlotState *val = element.exchange(nullptr, std::memory_order_acquire); while (unlikely(val == nullptr)) { do { pto2_cpu_relax(); } while (unlikely(element.load(std::memory_order_relaxed) == nullptr)); - val = element.exchange(nullptr, std::memory_order_acq_rel); + val = element.exchange(nullptr, std::memory_order_acquire); } return val; } @@ -299,7 +299,7 @@ struct alignas(64) PTO2ReadyQueue { auto &element = slots[get_index(head)]; PTO2TaskSlotState *empty; while (unlikely(!element.compare_exchange_weak( - empty = nullptr, slot_state, std::memory_order_acq_rel, + empty = nullptr, slot_state, std::memory_order_release, std::memory_order_relaxed))) { contended = true; // contended only if very unlikely loop happened, more likely waiting for reader atomic_ops++; // element CAS failed @@ -344,18 +344,18 @@ struct alignas(64) PTO2ReadyQueue { bool contended = false; uint32_t atomic_ops = 1; // tailCachedHead_.load while (true) { - if (tail == head) { + if (unlikely(tail == head)) { head = get_head(headCachedTail_.load(std::memory_order_relaxed)); atomic_ops++; - if (tail == head) { + if (unlikely(tail == head)) { atomic_count += atomic_ops; if (contended) wait_cycle += (get_sys_cnt_aicpu() - t0); return nullptr; // empty } } - if (tailCachedHead_.compare_exchange_weak( + if (likely(tailCachedHead_.compare_exchange_weak( tailCachedHead, pack(tail + 1u, head), - std::memory_order_relaxed, std::memory_order_relaxed)) { + std::memory_order_relaxed, std::memory_order_relaxed))) { atomic_ops++; // successful CAS break; } @@ -366,15 +366,15 @@ struct alignas(64) PTO2ReadyQueue { } auto &element = slots[get_index(tail)]; - PTO2TaskSlotState *val = element.exchange(nullptr, std::memory_order_acq_rel); + PTO2TaskSlotState *val = element.exchange(nullptr, std::memory_order_acquire); atomic_ops++; // exchange - while (val == nullptr) { + while (unlikely(val == nullptr)) { contended = true; // contended only if very unlikely loop happened, more likely waiting for writer do { pto2_cpu_relax(); atomic_ops++; // load - } while (element.load(std::memory_order_relaxed) == nullptr); - val = element.exchange(nullptr, std::memory_order_acq_rel); + } while (unlikely(element.load(std::memory_order_relaxed) == nullptr)); + val = element.exchange(nullptr, std::memory_order_acquire); atomic_ops++; // exchange } atomic_count += atomic_ops; @@ -394,14 +394,13 @@ struct alignas(64) PTO2ReadyQueue { if (toPop < amount) { head = get_head(headCachedTail_.load(std::memory_order_relaxed)); toPop = head - tail; + if (toPop == 0u) return 0; } - toPop = std::min(toPop, amount); - if (toPop == 0u) return 0; - if (tailCachedHead_.compare_exchange_weak( + if (likely(tailCachedHead_.compare_exchange_weak( tailCachedHead, pack(tail + toPop, head), - std::memory_order_relaxed, std::memory_order_relaxed)) { + std::memory_order_relaxed, std::memory_order_relaxed))) { break; } tail = get_tail(tailCachedHead); @@ -435,18 +434,17 @@ struct alignas(64) PTO2ReadyQueue { head = get_head(headCachedTail_.load(std::memory_order_relaxed)); toPop = head - tail; atomic_ops++; // load + if (toPop == 0u) { + atomic_count += atomic_ops; + if (contended) wait_cycle += (get_sys_cnt_aicpu() - t0); + return 0; + } } - toPop = std::min(toPop, amount); - if (toPop == 0u) { - atomic_count += atomic_ops; - if (contended) wait_cycle += (get_sys_cnt_aicpu() - t0); - return 0; - } - if (tailCachedHead_.compare_exchange_weak( + if (likely(tailCachedHead_.compare_exchange_weak( tailCachedHead, pack(tail + toPop, head), - std::memory_order_relaxed, std::memory_order_relaxed)) { + std::memory_order_relaxed, std::memory_order_relaxed))) { atomic_ops++; // CAS success break; } @@ -460,12 +458,12 @@ struct alignas(64) PTO2ReadyQueue { auto &element = slots[get_index(i)]; *out = element.exchange(nullptr, std::memory_order_acq_rel); atomic_ops++; // exchange - while (*out == nullptr) { + while (unlikely(*out == nullptr)) { contended = true; // contended only if very unlikely loop happened, more likely waiting for writer do { pto2_cpu_relax(); atomic_ops++; // load - } while (element.load(std::memory_order_relaxed) == nullptr); + } while (unlikely(element.load(std::memory_order_relaxed) == nullptr)); *out = element.exchange(nullptr, std::memory_order_acq_rel); atomic_ops++; // exchange } From 0d254259a5a1ada9c458ae6228c59934f047ba01 Mon Sep 17 00:00:00 2001 From: raphaelsteiner Date: Fri, 26 Jun 2026 11:24:43 +0200 Subject: [PATCH 07/10] typo Signed-off-by: raphaelsteiner --- .../runtime/shared/pto_runtime2_init.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/shared/pto_runtime2_init.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/shared/pto_runtime2_init.cpp index 340ca06af..6458f1d68 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/shared/pto_runtime2_init.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/shared/pto_runtime2_init.cpp @@ -34,7 +34,7 @@ // ============================================================================= size_t ready_queue_reserve_layout(DeviceArena &arena, uint64_t capacity) { - // Align the slots_ base to a full cache line so MPMC CAS traffic on the + // Align the slots[] base to a full cache line so MPMC CAS traffic on the // first slot cannot false-share with whatever region sits in front of us // (e.g. orchestrator tensormap heads written by the orch thread). return arena.reserve(capacity * sizeof(std::atomic), PTO2_ALIGN_SIZE); @@ -42,7 +42,7 @@ size_t ready_queue_reserve_layout(DeviceArena &arena, uint64_t capacity) { bool ready_queue_init_data_from_layout(PTO2ReadyQueue *queue, DeviceArena &arena, size_t slots_off, uint64_t capacity) { // Address the slots region for data writes without storing the pointer in - // queue->slots_ — that field is set by ready_queue_wire_arena_pointers. + // queue->slots — that field is set by ready_queue_wire_arena_pointers. auto *slots_arena = static_cast *>(arena.region_ptr(slots_off)); queue->capacity = capacity; queue->mask = capacity - 1; From 17759c0018e7813a4c3cae9bee089fc5af858cae Mon Sep 17 00:00:00 2001 From: raphaelsteiner Date: Fri, 26 Jun 2026 11:41:48 +0200 Subject: [PATCH 08/10] format Signed-off-by: raphaelsteiner --- .../runtime/scheduler/pto_scheduler.h | 109 +++++++++--------- 1 file changed, 52 insertions(+), 57 deletions(-) diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h index e780145a9..89190e8b6 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h @@ -63,26 +63,26 @@ extern "C" bool is_scope_stats_enabled(); #if defined(__aarch64__) inline void pto2_cpu_relax() noexcept { asm volatile("isb" ::: "memory"); } #elif defined(__x86_64__) || defined(_M_X64) -# include +#include inline void pto2_cpu_relax() noexcept { _mm_pause(); } #else inline void pto2_cpu_relax() noexcept { std::atomic_signal_fence(std::memory_order_acquire); } #endif #ifndef unlikely -# if defined(__GNUC__) || defined(__clang__) -# define unlikely(x) (__builtin_expect(!!(x), 0)) -# else -# define unlikely(x) (x) -# endif +#if defined(__GNUC__) || defined(__clang__) +#define unlikely(x) (__builtin_expect(!!(x), 0)) +#else +#define unlikely(x) (x) +#endif #endif #ifndef likely -# if defined(__GNUC__) || defined(__clang__) -# define likely(x) (__builtin_expect(!!(x), 1)) -# else -# define likely(x) (x) -# endif +#if defined(__GNUC__) || defined(__clang__) +#define likely(x) (__builtin_expect(!!(x), 1)) +#else +#define likely(x) (x) +#endif #endif /** @@ -149,13 +149,9 @@ struct alignas(64) PTO2ReadyQueue { char _pad2[64 - sizeof(std::atomic *) - (2 * sizeof(uint32_t))]; static_assert( - std::atomic::is_always_lock_free, - "PTO2ReadyQueue requires lock-free pointer atomics." - ); - static_assert( - std::atomic::is_always_lock_free, - "PTO2ReadyQueue requires lock-free uint64_t." + std::atomic::is_always_lock_free, "PTO2ReadyQueue requires lock-free pointer atomics." ); + static_assert(std::atomic::is_always_lock_free, "PTO2ReadyQueue requires lock-free uint64_t."); constexpr uint32_t get_msd(uint64_t v) const noexcept { return static_cast(v >> 32u); } constexpr uint32_t get_lsd(uint64_t v) const noexcept { return static_cast(v); } @@ -164,13 +160,9 @@ struct alignas(64) PTO2ReadyQueue { } constexpr uint32_t get_tail(uint64_t tailCachedHead) const noexcept { return get_msd(tailCachedHead); } - constexpr uint32_t get_cached_head(uint64_t tailCachedHead) const noexcept { - return get_lsd(tailCachedHead); - } + constexpr uint32_t get_cached_head(uint64_t tailCachedHead) const noexcept { return get_lsd(tailCachedHead); } constexpr uint32_t get_head(uint64_t headCachedTail) const noexcept { return get_msd(headCachedTail); } - constexpr uint32_t get_cached_tail(uint64_t headCachedTail) const noexcept { - return get_lsd(headCachedTail); - } + constexpr uint32_t get_cached_tail(uint64_t headCachedTail) const noexcept { return get_lsd(headCachedTail); } uint32_t get_index(uint32_t position) const noexcept { static_assert(sizeof(PTO2TaskSlotState *) == 8u, "index_multiplier assumes 8-byte pointer"); @@ -181,9 +173,12 @@ struct alignas(64) PTO2ReadyQueue { bool do_push_(PTO2TaskSlotState *value, uint32_t index) noexcept { auto &element = slots[index]; PTO2TaskSlotState *empty; - while (unlikely(!element.compare_exchange_weak( - empty = nullptr, value, std::memory_order_release, std::memory_order_relaxed))) { - do { pto2_cpu_relax(); } while (unlikely(element.load(std::memory_order_relaxed) != nullptr)); + while (unlikely( + !element.compare_exchange_weak(empty = nullptr, value, std::memory_order_release, std::memory_order_relaxed) + )) { + do { + pto2_cpu_relax(); + } while (unlikely(element.load(std::memory_order_relaxed) != nullptr)); } return true; } @@ -192,7 +187,9 @@ struct alignas(64) PTO2ReadyQueue { auto &element = slots[index]; PTO2TaskSlotState *val = element.exchange(nullptr, std::memory_order_acquire); while (unlikely(val == nullptr)) { - do { pto2_cpu_relax(); } while (unlikely(element.load(std::memory_order_relaxed) == nullptr)); + do { + pto2_cpu_relax(); + } while (unlikely(element.load(std::memory_order_relaxed) == nullptr)); val = element.exchange(nullptr, std::memory_order_acquire); } return val; @@ -215,8 +212,8 @@ struct alignas(64) PTO2ReadyQueue { if (unlikely(headTrail == tail)) return false; // full } if (likely(headCachedTail_.compare_exchange_weak( - headCachedTail, pack(head + 1u, tail), - std::memory_order_relaxed, std::memory_order_relaxed))) { + headCachedTail, pack(head + 1u, tail), std::memory_order_relaxed, std::memory_order_relaxed + ))) { break; } head = get_head(headCachedTail); @@ -249,8 +246,8 @@ struct alignas(64) PTO2ReadyQueue { toPush = std::min(toPush, count); if (likely(headCachedTail_.compare_exchange_weak( - headCachedTail, pack(head + toPush, tail), - std::memory_order_relaxed, std::memory_order_relaxed))) { + headCachedTail, pack(head + toPush, tail), std::memory_order_relaxed, std::memory_order_relaxed + ))) { break; } head = get_head(headCachedTail); @@ -285,8 +282,8 @@ struct alignas(64) PTO2ReadyQueue { } } if (likely(headCachedTail_.compare_exchange_weak( - headCachedTail, pack(head + 1u, tail), - std::memory_order_relaxed, std::memory_order_relaxed))) { + headCachedTail, pack(head + 1u, tail), std::memory_order_relaxed, std::memory_order_relaxed + ))) { atomic_ops++; // successful CAS break; } @@ -299,13 +296,13 @@ struct alignas(64) PTO2ReadyQueue { auto &element = slots[get_index(head)]; PTO2TaskSlotState *empty; while (unlikely(!element.compare_exchange_weak( - empty = nullptr, slot_state, std::memory_order_release, - std::memory_order_relaxed))) { - contended = true; // contended only if very unlikely loop happened, more likely waiting for reader - atomic_ops++; // element CAS failed + empty = nullptr, slot_state, std::memory_order_release, std::memory_order_relaxed + ))) { + contended = true; // contended only if very unlikely loop happened, more likely waiting for reader + atomic_ops++; // element CAS failed do { pto2_cpu_relax(); - atomic_ops++; // element load + atomic_ops++; // element load } while (unlikely(element.load(std::memory_order_relaxed) != nullptr)); } atomic_ops++; // element CAS success @@ -325,8 +322,8 @@ struct alignas(64) PTO2ReadyQueue { if (unlikely(tail == head)) return nullptr; // empty } if (likely(tailCachedHead_.compare_exchange_weak( - tailCachedHead, pack(tail + 1u, head), - std::memory_order_relaxed, std::memory_order_relaxed))) { + tailCachedHead, pack(tail + 1u, head), std::memory_order_relaxed, std::memory_order_relaxed + ))) { break; } tail = get_tail(tailCachedHead); @@ -354,8 +351,8 @@ struct alignas(64) PTO2ReadyQueue { } } if (likely(tailCachedHead_.compare_exchange_weak( - tailCachedHead, pack(tail + 1u, head), - std::memory_order_relaxed, std::memory_order_relaxed))) { + tailCachedHead, pack(tail + 1u, head), std::memory_order_relaxed, std::memory_order_relaxed + ))) { atomic_ops++; // successful CAS break; } @@ -369,13 +366,13 @@ struct alignas(64) PTO2ReadyQueue { PTO2TaskSlotState *val = element.exchange(nullptr, std::memory_order_acquire); atomic_ops++; // exchange while (unlikely(val == nullptr)) { - contended = true; // contended only if very unlikely loop happened, more likely waiting for writer + contended = true; // contended only if very unlikely loop happened, more likely waiting for writer do { pto2_cpu_relax(); atomic_ops++; // load } while (unlikely(element.load(std::memory_order_relaxed) == nullptr)); val = element.exchange(nullptr, std::memory_order_acquire); - atomic_ops++; // exchange + atomic_ops++; // exchange } atomic_count += atomic_ops; if (contended) wait_cycle += (get_sys_cnt_aicpu() - t0); @@ -399,8 +396,8 @@ struct alignas(64) PTO2ReadyQueue { toPop = std::min(toPop, amount); if (likely(tailCachedHead_.compare_exchange_weak( - tailCachedHead, pack(tail + toPop, head), - std::memory_order_relaxed, std::memory_order_relaxed))) { + tailCachedHead, pack(tail + toPop, head), std::memory_order_relaxed, std::memory_order_relaxed + ))) { break; } tail = get_tail(tailCachedHead); @@ -416,9 +413,7 @@ struct alignas(64) PTO2ReadyQueue { } #if PTO2_SCHED_PROFILING - int pop_batch( - PTO2TaskSlotState **out, int max_count, uint64_t &atomic_count, uint64_t &wait_cycle - ) noexcept { + int pop_batch(PTO2TaskSlotState **out, int max_count, uint64_t &atomic_count, uint64_t &wait_cycle) noexcept { uint32_t amount = static_cast(max_count); uint64_t tailCachedHead = tailCachedHead_.load(std::memory_order_relaxed); uint32_t tail = get_tail(tailCachedHead); @@ -426,14 +421,14 @@ struct alignas(64) PTO2ReadyQueue { uint32_t toPop; uint64_t t0 = get_sys_cnt_aicpu(); bool contended = false; - uint32_t atomic_ops = 1; // load + uint32_t atomic_ops = 1; // load while (true) { toPop = head - tail; if (toPop < amount) { head = get_head(headCachedTail_.load(std::memory_order_relaxed)); toPop = head - tail; - atomic_ops++; // load + atomic_ops++; // load if (toPop == 0u) { atomic_count += atomic_ops; if (contended) wait_cycle += (get_sys_cnt_aicpu() - t0); @@ -443,12 +438,12 @@ struct alignas(64) PTO2ReadyQueue { toPop = std::min(toPop, amount); if (likely(tailCachedHead_.compare_exchange_weak( - tailCachedHead, pack(tail + toPop, head), - std::memory_order_relaxed, std::memory_order_relaxed))) { - atomic_ops++; // CAS success + tailCachedHead, pack(tail + toPop, head), std::memory_order_relaxed, std::memory_order_relaxed + ))) { + atomic_ops++; // CAS success break; } - atomic_ops++; // CAS fail + atomic_ops++; // CAS fail tail = get_tail(tailCachedHead); head = std::max(head, get_cached_head(tailCachedHead)); } @@ -459,13 +454,13 @@ struct alignas(64) PTO2ReadyQueue { *out = element.exchange(nullptr, std::memory_order_acq_rel); atomic_ops++; // exchange while (unlikely(*out == nullptr)) { - contended = true; // contended only if very unlikely loop happened, more likely waiting for writer + contended = true; // contended only if very unlikely loop happened, more likely waiting for writer do { pto2_cpu_relax(); atomic_ops++; // load } while (unlikely(element.load(std::memory_order_relaxed) == nullptr)); *out = element.exchange(nullptr, std::memory_order_acq_rel); - atomic_ops++; // exchange + atomic_ops++; // exchange } out++; } From 661b90c4ad76cc72dfda32d7fa1a17851e4d3792 Mon Sep 17 00:00:00 2001 From: raphaelsteiner Date: Fri, 26 Jun 2026 13:20:51 +0200 Subject: [PATCH 09/10] typo Signed-off-by: raphaelsteiner --- .../tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h index 89190e8b6..8d8366a1a 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h @@ -256,7 +256,7 @@ struct alignas(64) PTO2ReadyQueue { const uint32_t end = head + toPush; for (uint32_t i = head; i < end; ++i) { - do_push_(*items++, get_index(head)); + do_push_(*items++, get_index(i)); } return toPush; From ba955ee32f4e0e4f12b8fc294d7a2b83f2e4b2b6 Mon Sep 17 00:00:00 2001 From: raphaelsteiner Date: Fri, 26 Jun 2026 14:23:13 +0200 Subject: [PATCH 10/10] more correct counting Signed-off-by: raphaelsteiner --- .../runtime/scheduler/pto_scheduler.h | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h index 8d8366a1a..5991feab9 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h @@ -298,7 +298,6 @@ struct alignas(64) PTO2ReadyQueue { while (unlikely(!element.compare_exchange_weak( empty = nullptr, slot_state, std::memory_order_release, std::memory_order_relaxed ))) { - contended = true; // contended only if very unlikely loop happened, more likely waiting for reader atomic_ops++; // element CAS failed do { pto2_cpu_relax(); @@ -366,7 +365,6 @@ struct alignas(64) PTO2ReadyQueue { PTO2TaskSlotState *val = element.exchange(nullptr, std::memory_order_acquire); atomic_ops++; // exchange while (unlikely(val == nullptr)) { - contended = true; // contended only if very unlikely loop happened, more likely waiting for writer do { pto2_cpu_relax(); atomic_ops++; // load @@ -443,6 +441,7 @@ struct alignas(64) PTO2ReadyQueue { atomic_ops++; // CAS success break; } + contended = true; atomic_ops++; // CAS fail tail = get_tail(tailCachedHead); head = std::max(head, get_cached_head(tailCachedHead)); @@ -454,7 +453,6 @@ struct alignas(64) PTO2ReadyQueue { *out = element.exchange(nullptr, std::memory_order_acq_rel); atomic_ops++; // exchange while (unlikely(*out == nullptr)) { - contended = true; // contended only if very unlikely loop happened, more likely waiting for writer do { pto2_cpu_relax(); atomic_ops++; // load