diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h index 5619a9fdf..5991feab9 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/pto_scheduler.h @@ -29,7 +29,9 @@ #pragma once +#include #include +#include #include "common/core_type.h" #include "utils/device_arena.h" @@ -55,16 +57,33 @@ extern "C" bool is_scope_stats_enabled(); #endif // ============================================================================= -// Ready Queue (Lock-free bounded MPMC — Vyukov design) +// Ready Queue (Lock-free bounded MPMC — Maxim Egorushkin's design) // ============================================================================= -/** - * Per-slot entry: sequence counter for ABA safety + task payload - */ -struct PTO2ReadyQueueSlot { - std::atomic sequence; - PTO2TaskSlotState *slot_state; -}; +#if defined(__aarch64__) +inline void pto2_cpu_relax() noexcept { asm volatile("isb" ::: "memory"); } +#elif defined(__x86_64__) || defined(_M_X64) +#include +inline void pto2_cpu_relax() noexcept { _mm_pause(); } +#else +inline void pto2_cpu_relax() noexcept { std::atomic_signal_fence(std::memory_order_acquire); } +#endif + +#ifndef unlikely +#if defined(__GNUC__) || defined(__clang__) +#define unlikely(x) (__builtin_expect(!!(x), 0)) +#else +#define unlikely(x) (x) +#endif +#endif + +#ifndef likely +#if defined(__GNUC__) || defined(__clang__) +#define likely(x) (__builtin_expect(!!(x), 1)) +#else +#define likely(x) (x) +#endif +#endif /** * Thread-local ready buffer for local-first dispatch optimization. @@ -103,324 +122,369 @@ struct PTO2LocalReadyBuffer { }; /** - * Lock-free bounded MPMC queue (Dmitry Vyukov design) + * Lock-free bounded MPMC queue * * Key properties: - * - enqueue_pos and dequeue_pos on separate cache lines (no false sharing) - * - Per-slot sequence counter prevents ABA problem - * - Empty queue pop returns immediately (single atomic load, no lock) - * - CAS contention is split: producers only touch enqueue_pos, - * consumers only touch dequeue_pos + * - tailCachedHead_ bundles the consumer position (MSD) with a stale producer + * snapshot (LSD) on a single exclusive cache line — no cross-side contention + * - headCachedTail_ bundles the producer position (MSD) with a stale consumer + * snapshot (LSD) on a second exclusive cache line + * - Each slot is atomic with nullptr as the empty sentinel; + * no per-slot sequence counter is required + * - Index scattering (multiplier 9, coprime to 2^k) distributes consecutive + * positions across cache lines to reduce slot contention + * - Empty pop returns nullptr immediately; full push returns false */ struct alignas(64) PTO2ReadyQueue { - PTO2ReadyQueueSlot *slots; - uint64_t capacity; - uint64_t mask; // capacity - 1 - char _pad0[64 - 24]; // Pad to own cache line + // Consumer cache line: {tail[63:32] | cached_head[31:0]} + alignas(64) std::atomic tailCachedHead_; + + // Producer cache line: {head[63:32] | cached_tail[31:0]} + alignas(64) std::atomic headCachedTail_; + + // Shared read-only after wire_arena_pointers + alignas(64) std::atomic *slots; + uint32_t capacity; + uint32_t mask; // capacity - 1 + char _pad2[64 - sizeof(std::atomic *) - (2 * sizeof(uint32_t))]; + + static_assert( + std::atomic::is_always_lock_free, "PTO2ReadyQueue requires lock-free pointer atomics." + ); + static_assert(std::atomic::is_always_lock_free, "PTO2ReadyQueue requires lock-free uint64_t."); - std::atomic enqueue_pos; - char _pad1[64 - sizeof(std::atomic)]; // Own cache line + constexpr uint32_t get_msd(uint64_t v) const noexcept { return static_cast(v >> 32u); } + constexpr uint32_t get_lsd(uint64_t v) const noexcept { return static_cast(v); } + constexpr uint64_t pack(uint32_t msd, uint32_t lsd) const noexcept { + return (static_cast(msd) << 32u) | static_cast(lsd); + } + + constexpr uint32_t get_tail(uint64_t tailCachedHead) const noexcept { return get_msd(tailCachedHead); } + constexpr uint32_t get_cached_head(uint64_t tailCachedHead) const noexcept { return get_lsd(tailCachedHead); } + constexpr uint32_t get_head(uint64_t headCachedTail) const noexcept { return get_msd(headCachedTail); } + constexpr uint32_t get_cached_tail(uint64_t headCachedTail) const noexcept { return get_lsd(headCachedTail); } - std::atomic dequeue_pos; - char _pad2[64 - sizeof(std::atomic)]; // Own cache line + uint32_t get_index(uint32_t position) const noexcept { + static_assert(sizeof(PTO2TaskSlotState *) == 8u, "index_multiplier assumes 8-byte pointer"); + constexpr uint32_t mul = 9u; // ceil(64/8)=8, even → 9; coprime to every power-of-2 N + return (position * mul) & mask; + } - uint64_t size() { - uint64_t e = enqueue_pos.load(std::memory_order_relaxed); - uint64_t d = dequeue_pos.load(std::memory_order_relaxed); - return (e >= d) ? (e - d) : 0; + bool do_push_(PTO2TaskSlotState *value, uint32_t index) noexcept { + auto &element = slots[index]; + PTO2TaskSlotState *empty; + while (unlikely( + !element.compare_exchange_weak(empty = nullptr, value, std::memory_order_release, std::memory_order_relaxed) + )) { + do { + pto2_cpu_relax(); + } while (unlikely(element.load(std::memory_order_relaxed) != nullptr)); + } + return true; } - bool push(PTO2TaskSlotState *slot_state) { - uint64_t pos; - PTO2ReadyQueueSlot *slot; + PTO2TaskSlotState *do_pop_(uint32_t index) noexcept { + auto &element = slots[index]; + PTO2TaskSlotState *val = element.exchange(nullptr, std::memory_order_acquire); + while (unlikely(val == nullptr)) { + do { + pto2_cpu_relax(); + } while (unlikely(element.load(std::memory_order_relaxed) == nullptr)); + val = element.exchange(nullptr, std::memory_order_acquire); + } + return val; + } + + uint64_t size() noexcept { + uint32_t head = get_head(headCachedTail_.load(std::memory_order_relaxed)); + uint32_t tail = get_tail(tailCachedHead_.load(std::memory_order_relaxed)); + return likely(head >= tail) ? static_cast(head) - static_cast(tail) : 0u; + } + + bool push(PTO2TaskSlotState *slot_state) noexcept { + uint64_t headCachedTail = headCachedTail_.load(std::memory_order_relaxed); + uint32_t head = get_head(headCachedTail); + uint32_t tail = get_cached_tail(headCachedTail); while (true) { - pos = enqueue_pos.load(std::memory_order_relaxed); - slot = &slots[pos & mask]; - int64_t seq = slot->sequence.load(std::memory_order_acquire); - int64_t diff = seq - static_cast(pos); - if (diff == 0) { - if (enqueue_pos.compare_exchange_weak( - pos, pos + 1, std::memory_order_relaxed, std::memory_order_relaxed - )) { - break; - } - } else if (diff < 0) { - return false; // Queue full + uint32_t headTrail = head - capacity; + if (unlikely(headTrail == tail)) { + tail = get_tail(tailCachedHead_.load(std::memory_order_relaxed)); + if (unlikely(headTrail == tail)) return false; // full } + if (likely(headCachedTail_.compare_exchange_weak( + headCachedTail, pack(head + 1u, tail), std::memory_order_relaxed, std::memory_order_relaxed + ))) { + break; + } + head = get_head(headCachedTail); + tail = std::max(tail, get_cached_tail(headCachedTail)); } - - slot->slot_state = slot_state; - slot->sequence.store(static_cast(pos + 1), std::memory_order_release); - return true; + return do_push_(slot_state, get_index(head)); } - // Batch push: reserve count slots with a single CAS after confirming - // every target slot is available under the usual Vyukov sequence check. void push_batch(PTO2TaskSlotState **items, int count) { - if (count == 0) return; + uint32_t left = static_cast(count); + while (left -= try_push_batch(items, left)) {} + } - uint64_t pos; + template + uint32_t try_push_batch(It &&items, uint32_t count) noexcept { + uint64_t headCachedTail = headCachedTail_.load(std::memory_order_relaxed); + uint32_t head = get_head(headCachedTail); + uint32_t tail = get_cached_tail(headCachedTail); + uint32_t toPush; while (true) { - pos = enqueue_pos.load(std::memory_order_relaxed); - bool ready = true; - for (int i = 0; i < count; i++) { - PTO2ReadyQueueSlot *slot = &slots[(pos + i) & mask]; - int64_t seq = slot->sequence.load(std::memory_order_acquire); - int64_t diff = seq - static_cast(pos + i); - if (diff != 0) { - ready = false; - break; + uint32_t headTrail = head - capacity; + toPush = tail - headTrail; + if (unlikely(toPush < count)) { + tail = get_tail(tailCachedHead_.load(std::memory_order_relaxed)); + toPush = tail - headTrail; + if (unlikely(toPush == 0u)) { + return toPush; } } - if (!ready) { - continue; - } - if (enqueue_pos.compare_exchange_weak( - pos, pos + count, std::memory_order_relaxed, std::memory_order_relaxed - )) { + toPush = std::min(toPush, count); + + if (likely(headCachedTail_.compare_exchange_weak( + headCachedTail, pack(head + toPush, tail), std::memory_order_relaxed, std::memory_order_relaxed + ))) { break; } + head = get_head(headCachedTail); + tail = std::max(tail, get_cached_tail(headCachedTail)); } - for (int i = 0; i < count; i++) { - PTO2ReadyQueueSlot *slot = &slots[(pos + i) & mask]; - slot->slot_state = items[i]; - slot->sequence.store(static_cast(pos + i + 1), std::memory_order_release); + const uint32_t end = head + toPush; + for (uint32_t i = head; i < end; ++i) { + do_push_(*items++, get_index(i)); } + + return toPush; } #if PTO2_ORCH_PROFILING || PTO2_SCHED_PROFILING - bool push(PTO2TaskSlotState *slot_state, uint64_t &atomic_count, uint64_t &wait_cycle) { - uint64_t pos; - PTO2ReadyQueueSlot *slot; + bool push(PTO2TaskSlotState *slot_state, uint64_t &atomic_count, uint64_t &wait_cycle) noexcept { + uint64_t headCachedTail = headCachedTail_.load(std::memory_order_relaxed); + uint32_t head = get_head(headCachedTail); + uint32_t tail = get_cached_tail(headCachedTail); uint64_t t0 = get_sys_cnt_aicpu(); bool contended = false; - uint32_t atomic_ops = 0; + uint32_t atomic_ops = 1; // headCachedTail_.load while (true) { - pos = enqueue_pos.load(std::memory_order_relaxed); - slot = &slots[pos & mask]; - int64_t seq = slot->sequence.load(std::memory_order_acquire); - int64_t diff = seq - static_cast(pos); - atomic_ops += 2; // enqueue_pos.load + sequence.load - if (diff == 0) { - if (enqueue_pos.compare_exchange_weak( - pos, pos + 1, std::memory_order_relaxed, std::memory_order_relaxed - )) { - atomic_ops++; // successful CAS - break; + uint32_t headTrail = head - capacity; + if (unlikely(headTrail == tail)) { + tail = get_tail(tailCachedHead_.load(std::memory_order_relaxed)); + atomic_ops++; + if (unlikely(headTrail == tail)) { + atomic_count += atomic_ops; + if (contended) wait_cycle += (get_sys_cnt_aicpu() - t0); + return false; } - contended = true; - atomic_ops++; // failed CAS - } else if (diff < 0) { - return false; // Queue full - } else { - contended = true; // diff > 0: slot not yet released, spin } - } - atomic_ops++; // final sequence.store - atomic_count += atomic_ops; - if (contended) { - wait_cycle += (get_sys_cnt_aicpu() - t0); + if (likely(headCachedTail_.compare_exchange_weak( + headCachedTail, pack(head + 1u, tail), std::memory_order_relaxed, std::memory_order_relaxed + ))) { + atomic_ops++; // successful CAS + break; + } + contended = true; + atomic_ops++; // failed CAS + head = get_head(headCachedTail); + tail = std::max(tail, get_cached_tail(headCachedTail)); } - slot->slot_state = slot_state; - slot->sequence.store(static_cast(pos + 1), std::memory_order_release); + auto &element = slots[get_index(head)]; + PTO2TaskSlotState *empty; + while (unlikely(!element.compare_exchange_weak( + empty = nullptr, slot_state, std::memory_order_release, std::memory_order_relaxed + ))) { + atomic_ops++; // element CAS failed + do { + pto2_cpu_relax(); + atomic_ops++; // element load + } while (unlikely(element.load(std::memory_order_relaxed) != nullptr)); + } + atomic_ops++; // element CAS success + atomic_count += atomic_ops; + if (contended) wait_cycle += (get_sys_cnt_aicpu() - t0); return true; } #endif - PTO2TaskSlotState *pop() { - // Fast-path: skip slot load when queue is clearly empty - uint64_t d = dequeue_pos.load(std::memory_order_relaxed); - uint64_t e = enqueue_pos.load(std::memory_order_relaxed); - if (d >= e) { - return nullptr; - } - - uint64_t pos; - PTO2ReadyQueueSlot *slot; + PTO2TaskSlotState *pop() noexcept { + uint64_t tailCachedHead = tailCachedHead_.load(std::memory_order_relaxed); + uint32_t tail = get_tail(tailCachedHead); + uint32_t head = get_cached_head(tailCachedHead); while (true) { - pos = dequeue_pos.load(std::memory_order_relaxed); - slot = &slots[pos & mask]; - int64_t seq = slot->sequence.load(std::memory_order_acquire); - int64_t diff = seq - static_cast(pos + 1); - if (diff == 0) { - if (dequeue_pos.compare_exchange_weak( - pos, pos + 1, std::memory_order_relaxed, std::memory_order_relaxed - )) - break; - } else if (diff < 0) { - return nullptr; // Queue empty + if (unlikely(tail == head)) { + head = get_head(headCachedTail_.load(std::memory_order_relaxed)); + if (unlikely(tail == head)) return nullptr; // empty } + if (likely(tailCachedHead_.compare_exchange_weak( + tailCachedHead, pack(tail + 1u, head), std::memory_order_relaxed, std::memory_order_relaxed + ))) { + break; + } + tail = get_tail(tailCachedHead); + head = std::max(head, get_cached_head(tailCachedHead)); } - - PTO2TaskSlotState *result = slot->slot_state; - slot->sequence.store(static_cast(pos + mask + 1), std::memory_order_release); - return result; + return do_pop_(get_index(tail)); } #if PTO2_SCHED_PROFILING - PTO2TaskSlotState *pop(uint64_t &atomic_count, uint64_t &wait_cycle) { - // Fast-path: skip slot load when queue is clearly empty - uint64_t d = dequeue_pos.load(std::memory_order_relaxed); - uint64_t e = enqueue_pos.load(std::memory_order_relaxed); - atomic_count += 2; // dequeue_pos.load + enqueue_pos.load - if (d >= e) { - return nullptr; - } - - uint64_t pos; - PTO2ReadyQueueSlot *slot; + PTO2TaskSlotState *pop(uint64_t &atomic_count, uint64_t &wait_cycle) noexcept { + uint64_t tailCachedHead = tailCachedHead_.load(std::memory_order_relaxed); + uint32_t tail = get_tail(tailCachedHead); + uint32_t head = get_cached_head(tailCachedHead); uint64_t t0 = get_sys_cnt_aicpu(); bool contended = false; - uint32_t atomic_ops = 0; + uint32_t atomic_ops = 1; // tailCachedHead_.load while (true) { - pos = dequeue_pos.load(std::memory_order_relaxed); - slot = &slots[pos & mask]; - int64_t seq = slot->sequence.load(std::memory_order_acquire); - int64_t diff = seq - static_cast(pos + 1); - atomic_ops += 2; // dequeue_pos.load + sequence.load - if (diff == 0) { - if (dequeue_pos.compare_exchange_weak( - pos, pos + 1, std::memory_order_relaxed, std::memory_order_relaxed - )) { - atomic_ops++; // successful CAS - break; + if (unlikely(tail == head)) { + head = get_head(headCachedTail_.load(std::memory_order_relaxed)); + atomic_ops++; + if (unlikely(tail == head)) { + atomic_count += atomic_ops; + if (contended) wait_cycle += (get_sys_cnt_aicpu() - t0); + return nullptr; // empty } - contended = true; - atomic_ops++; // failed CAS - } else if (diff < 0) { - atomic_count += atomic_ops; - return nullptr; // Queue empty - } else { - contended = true; } - } - atomic_ops++; // final sequence.store - atomic_count += atomic_ops; - if (contended) { - wait_cycle += (get_sys_cnt_aicpu() - t0); + if (likely(tailCachedHead_.compare_exchange_weak( + tailCachedHead, pack(tail + 1u, head), std::memory_order_relaxed, std::memory_order_relaxed + ))) { + atomic_ops++; // successful CAS + break; + } + contended = true; + atomic_ops++; // failed CAS + tail = get_tail(tailCachedHead); + head = std::max(head, get_cached_head(tailCachedHead)); } - PTO2TaskSlotState *result = slot->slot_state; - slot->sequence.store(static_cast(pos + mask + 1), std::memory_order_release); - return result; + auto &element = slots[get_index(tail)]; + PTO2TaskSlotState *val = element.exchange(nullptr, std::memory_order_acquire); + atomic_ops++; // exchange + while (unlikely(val == nullptr)) { + do { + pto2_cpu_relax(); + atomic_ops++; // load + } while (unlikely(element.load(std::memory_order_relaxed) == nullptr)); + val = element.exchange(nullptr, std::memory_order_acquire); + atomic_ops++; // exchange + } + atomic_count += atomic_ops; + if (contended) wait_cycle += (get_sys_cnt_aicpu() - t0); + return val; } #endif - // Batch pop: reserve a contiguous run of ready slots with a single CAS. - // Returns actual number of items popped (may be less than max_count). - int pop_batch(PTO2TaskSlotState **out, int max_count) { - uint64_t pos; - int count; + int pop_batch(PTO2TaskSlotState **out, int max_count) noexcept { + uint32_t amount = static_cast(max_count); + uint64_t tailCachedHead = tailCachedHead_.load(std::memory_order_relaxed); + uint32_t tail = get_tail(tailCachedHead); + uint32_t head = get_cached_head(tailCachedHead); + uint32_t toPop; while (true) { - pos = dequeue_pos.load(std::memory_order_relaxed); - count = 0; - while (count < max_count) { - PTO2ReadyQueueSlot *slot = &slots[(pos + count) & mask]; - int64_t seq = slot->sequence.load(std::memory_order_acquire); - int64_t diff = seq - static_cast(pos + count + 1); - if (diff == 0) { - count++; - continue; - } - if (diff < 0) { - break; - } - count = -1; - break; + toPop = head - tail; + if (toPop < amount) { + head = get_head(headCachedTail_.load(std::memory_order_relaxed)); + toPop = head - tail; + if (toPop == 0u) return 0; } - if (count == 0) return 0; - if (count < 0) continue; - if (dequeue_pos.compare_exchange_weak( - pos, pos + count, std::memory_order_relaxed, std::memory_order_relaxed - )) { + toPop = std::min(toPop, amount); + + if (likely(tailCachedHead_.compare_exchange_weak( + tailCachedHead, pack(tail + toPop, head), std::memory_order_relaxed, std::memory_order_relaxed + ))) { break; } + tail = get_tail(tailCachedHead); + head = std::max(head, get_cached_head(tailCachedHead)); } - for (int i = 0; i < count; i++) { - PTO2ReadyQueueSlot *slot = &slots[(pos + i) & mask]; - out[i] = slot->slot_state; - slot->sequence.store(static_cast(pos + i + mask + 1), std::memory_order_release); + const uint32_t end = tail + toPop; + for (uint32_t i = tail; i < end; ++i) { + *out++ = do_pop_(get_index(i)); } - return count; + + return static_cast(toPop); } #if PTO2_SCHED_PROFILING - int pop_batch(PTO2TaskSlotState **out, int max_count, uint64_t &atomic_count, uint64_t &wait_cycle) { - uint64_t pos; - int count; + int pop_batch(PTO2TaskSlotState **out, int max_count, uint64_t &atomic_count, uint64_t &wait_cycle) noexcept { + uint32_t amount = static_cast(max_count); + uint64_t tailCachedHead = tailCachedHead_.load(std::memory_order_relaxed); + uint32_t tail = get_tail(tailCachedHead); + uint32_t head = get_cached_head(tailCachedHead); + uint32_t toPop; uint64_t t0 = get_sys_cnt_aicpu(); bool contended = false; - uint32_t atomic_ops = 0; + uint32_t atomic_ops = 1; // load + while (true) { - pos = dequeue_pos.load(std::memory_order_relaxed); - atomic_ops++; // dequeue_pos.load - count = 0; - while (count < max_count) { - PTO2ReadyQueueSlot *slot = &slots[(pos + count) & mask]; - int64_t seq = slot->sequence.load(std::memory_order_acquire); - int64_t diff = seq - static_cast(pos + count + 1); - atomic_ops++; // sequence.load - if (diff == 0) { - count++; - continue; - } - if (diff < 0) { - break; + toPop = head - tail; + if (toPop < amount) { + head = get_head(headCachedTail_.load(std::memory_order_relaxed)); + toPop = head - tail; + atomic_ops++; // load + if (toPop == 0u) { + atomic_count += atomic_ops; + if (contended) wait_cycle += (get_sys_cnt_aicpu() - t0); + return 0; } - contended = true; - count = -1; - break; - } - if (count == 0) { - atomic_count += atomic_ops; - return 0; - } - if (count < 0) { - continue; } - if (dequeue_pos.compare_exchange_weak( - pos, pos + count, std::memory_order_relaxed, std::memory_order_relaxed - )) { - atomic_ops++; // successful CAS + toPop = std::min(toPop, amount); + + if (likely(tailCachedHead_.compare_exchange_weak( + tailCachedHead, pack(tail + toPop, head), std::memory_order_relaxed, std::memory_order_relaxed + ))) { + atomic_ops++; // CAS success break; } contended = true; - atomic_ops++; // failed CAS + atomic_ops++; // CAS fail + tail = get_tail(tailCachedHead); + head = std::max(head, get_cached_head(tailCachedHead)); } - for (int i = 0; i < count; i++) { - PTO2ReadyQueueSlot *slot = &slots[(pos + i) & mask]; - out[i] = slot->slot_state; - slot->sequence.store(static_cast(pos + i + mask + 1), std::memory_order_release); - atomic_ops++; // sequence.store + const uint32_t end = tail + toPop; + for (uint32_t i = tail; i < end; ++i) { + auto &element = slots[get_index(i)]; + *out = element.exchange(nullptr, std::memory_order_acq_rel); + atomic_ops++; // exchange + while (unlikely(*out == nullptr)) { + do { + pto2_cpu_relax(); + atomic_ops++; // load + } while (unlikely(element.load(std::memory_order_relaxed) == nullptr)); + *out = element.exchange(nullptr, std::memory_order_acq_rel); + atomic_ops++; // exchange + } + out++; } + atomic_count += atomic_ops; - if (contended) { - wait_cycle += (get_sys_cnt_aicpu() - t0); - } - return count; + if (contended) wait_cycle += (get_sys_cnt_aicpu() - t0); + return static_cast(toPop); } #endif }; -// Cold-path ready queue operations (defined in pto_scheduler.cpp). Declared +// Cold-path ready queue operations (defined in pto_runtime2_init.cpp). Declared // as non-member so PTO2ReadyQueue stays a POD-like struct with cache-line // alignment. Storage is owned by the caller-supplied arena. -// reserve_layout: declare the slots[] region on the arena (must precede commit) -// init_from_layout: bind slots pointer from arena.region_ptr(off) and -// initialize sequence counters -// destroy: forget the slots pointer (arena owns the buffer) +// reserve_layout: declare the slots[] region on the arena (must precede commit) +// init_from_layout: initialize positions/mask and zero all slot atomics to nullptr; +// does NOT store the slots_ pointer (call wire_arena_pointers for that) +// destroy: forget the slots_ pointer (arena owns the buffer) size_t ready_queue_reserve_layout(DeviceArena &arena, uint64_t capacity); -// Writes everything *except* the arena-internal `slots` pointer field -// (sequences/positions on the slot array, capacity, mask). Uses +// Writes everything except the arena-internal `slots_` pointer field +// (slot atomics to nullptr, mask_, tailCachedHead_, headCachedTail_). Uses // arena.region_ptr(slots_off) only to address the slot array for writes; -// does NOT store the pointer in `queue->slots`. Call +// does NOT store the pointer in `queue->slots_`. Call // `ready_queue_wire_arena_pointers` afterwards to set the field itself. bool ready_queue_init_data_from_layout(PTO2ReadyQueue *queue, DeviceArena &arena, size_t slots_off, uint64_t capacity); -// Stores queue->slots = arena.region_ptr(slots_off). Idempotent. +// Stores queue->slots_ = arena.region_ptr(slots_off). Idempotent. void ready_queue_wire_arena_pointers(PTO2ReadyQueue *queue, DeviceArena &arena, size_t slots_off); void ready_queue_destroy(PTO2ReadyQueue *queue); diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/shared/pto_runtime2_init.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/shared/pto_runtime2_init.cpp index 24585db85..6458f1d68 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/shared/pto_runtime2_init.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/shared/pto_runtime2_init.cpp @@ -37,28 +37,27 @@ size_t ready_queue_reserve_layout(DeviceArena &arena, uint64_t capacity) { // Align the slots[] base to a full cache line so MPMC CAS traffic on the // first slot cannot false-share with whatever region sits in front of us // (e.g. orchestrator tensormap heads written by the orch thread). - return arena.reserve(capacity * sizeof(PTO2ReadyQueueSlot), PTO2_ALIGN_SIZE); + return arena.reserve(capacity * sizeof(std::atomic), PTO2_ALIGN_SIZE); } bool ready_queue_init_data_from_layout(PTO2ReadyQueue *queue, DeviceArena &arena, size_t slots_off, uint64_t capacity) { // Address the slots region for data writes without storing the pointer in // queue->slots — that field is set by ready_queue_wire_arena_pointers. - auto *slots_arena = static_cast(arena.region_ptr(slots_off)); + auto *slots_arena = static_cast *>(arena.region_ptr(slots_off)); queue->capacity = capacity; queue->mask = capacity - 1; - queue->enqueue_pos.store(0, std::memory_order_relaxed); - queue->dequeue_pos.store(0, std::memory_order_relaxed); + queue->tailCachedHead_.store(0u, std::memory_order_relaxed); + queue->headCachedTail_.store(0u, std::memory_order_relaxed); for (uint64_t i = 0; i < capacity; i++) { - slots_arena[i].sequence.store((int64_t)i, std::memory_order_relaxed); - slots_arena[i].slot_state = nullptr; + slots_arena[i].store(nullptr, std::memory_order_relaxed); } return true; } void ready_queue_wire_arena_pointers(PTO2ReadyQueue *queue, DeviceArena &arena, size_t slots_off) { - queue->slots = static_cast(arena.region_ptr(slots_off)); + queue->slots = static_cast *>(arena.region_ptr(slots_off)); } void ready_queue_destroy(PTO2ReadyQueue *queue) {