Add: L2 input window support to L3-L2 message queue#1236
Conversation
- Define the staged base queue transport design and PR1/PR2 split. - Add the base implementation plan for the queue stack.
- Implement the PR1 L3 queue wrapper and L2 endpoint ABI on top of the primitive L3-L2 orchestration region transport. - Wire Orchestrator.create_l3_l2_queue and cover descriptor layout, zero-byte messages, abort flags, capacity, and fast-path buffers in Python and C++ unit tests.
- Drop the base implementation guide from tracked PR1 files while keeping it available locally for PR2 planning. - Keep the L3-L2 queue Python tests compatible with the pyright target and ruff formatting used by CI.
- Fail closed on queue layout uint64 overflow in C++ and Python mirror calculations - Validate cached L2 input handle metadata before release and use cached descriptor state - Gate C++ spin-loop timer reads and clean up Python regions on partial construction failure
- Add the user-facing L3-L2 message queue documentation. - Link the primitive L3-L2 orchestration communication doc to the queue wrapper doc. - Remove the design-stage document from the branch while leaving the local copy available for follow-up work.
- Add strict payload and counter size checks to the L3-L2 queue task args. - Validate L2 input payload offsets before exposing payload views. - Document timeout, layout, and queue free semantics, and expand no-hardware tests.
- Allow ordinary host buffers through lazy queue staging while keeping registered tensors on the no-staging fast path. - Add a tensormap_and_ringbuffer example that streams DATA and STOP messages through the L3-L2 queue on sim and onboard platforms.
- Keep dynamic tensor-like host buffer access explicit for pyright. - Build the example child callable across both positional and arg_index binding variants.
- Add the L2 input-window helper behavior to the queue endpoint - Add the new input-window example with PTO-ISA AIV tile compute - Extend C++ coverage for window capacity, STOP drain, and ownership
There was a problem hiding this comment.
Code Review
This pull request introduces the L3-L2 Message Queue, enabling ordered message exchange between an L3 Host Orchestrator and a persistent L2 AICPU Orchestrator task, complete with documentation, Python/C++ implementations, examples, and unit tests. The review feedback focuses on enhancing C++ memory safety and resource management within the l3_l2_message_queue.h header. Specifically, the reviewer recommends explicitly deleting copy and move operations for InputQueue and L3L2QueueEndpoint to prevent double-free vulnerabilities, and safeguarding InputQueue::initialize against potential memory leaks from multiple invocations.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| explicit InputQueue(L3L2QueueEndpoint *parent) : | ||
| parent_(parent) {} | ||
|
|
||
| ~InputQueue() { delete[] active_entries_; } |
There was a problem hiding this comment.
To prevent accidental copying or moving of InputQueue which manages the raw pointer active_entries_ (allocated via new[]), we should explicitly delete its copy and move constructors and assignment operators. This avoids potential double-free vulnerabilities or memory leaks.
explicit InputQueue(L3L2QueueEndpoint *parent) :
parent_(parent) {}
InputQueue(const InputQueue &) = delete;
InputQueue &operator=(const InputQueue &) = delete;
InputQueue(InputQueue &&) = delete;
InputQueue &operator=(InputQueue &&) = delete;
~InputQueue() { delete[] active_entries_; }| L3L2QueueEndpoint( | ||
| const L3L2OrchRegionDesc &desc, const L3L2QueueArgs &args, const L3L2QueueEndpointConfig &config | ||
| ) : | ||
| endpoint_(desc), | ||
| config_(config), | ||
| input_queue_(this), | ||
| output_queue_(this) { | ||
| if (endpoint_.error().kind != L3L2EndpointErrorKind::NONE || | ||
| !l3_l2_queue_validate_region(desc, args, &layout_)) { | ||
| set_error(L3L2QueueErrorKind::BAD_DESCRIPTOR, "init", desc.region_id, "invalid queue descriptor"); | ||
| return; | ||
| } | ||
| if (config_.max_l2_input_inflight == 0 || config_.max_l2_input_inflight > layout_.depth) { | ||
| set_error(L3L2QueueErrorKind::BAD_ARGUMENT, "init", desc.region_id, "invalid endpoint config"); | ||
| return; | ||
| } | ||
| input_queue_.initialize(config_.max_l2_input_inflight); | ||
| } |
There was a problem hiding this comment.
To prevent accidental copying or moving of L3L2QueueEndpoint (which owns input_queue_ and manages its lifetime), we should explicitly delete its copy and move constructors and assignment operators. This ensures proper RAII and resource safety.
L3L2QueueEndpoint(
const L3L2OrchRegionDesc &desc, const L3L2QueueArgs &args, const L3L2QueueEndpointConfig &config
) :
endpoint_(desc),
config_(config),
input_queue_(this),
output_queue_(this) {
if (endpoint_.error().kind != L3L2EndpointErrorKind::NONE ||
!l3_l2_queue_validate_region(desc, args, &layout_)) {
set_error(L3L2QueueErrorKind::BAD_DESCRIPTOR, "init", desc.region_id, "invalid queue descriptor");
return;
}
if (config_.max_l2_input_inflight == 0 || config_.max_l2_input_inflight > layout_.depth) {
set_error(L3L2QueueErrorKind::BAD_ARGUMENT, "init", desc.region_id, "invalid endpoint config");
return;
}
input_queue_.initialize(config_.max_l2_input_inflight);
}
L3L2QueueEndpoint(const L3L2QueueEndpoint &) = delete;
L3L2QueueEndpoint &operator=(const L3L2QueueEndpoint &) = delete;
L3L2QueueEndpoint(L3L2QueueEndpoint &&) = delete;
L3L2QueueEndpoint &operator=(L3L2QueueEndpoint &&) = delete;| bool initialize(uint64_t max_l2_input_inflight) { | ||
| entry_capacity_ = max_l2_input_inflight + 1; | ||
| active_entries_ = new (std::nothrow) ActiveInputEntry[entry_capacity_]; |
There was a problem hiding this comment.
If initialize is called multiple times on the same InputQueue instance, the previously allocated active_entries_ array will be leaked. To prevent memory leaks, we should explicitly delete any existing allocation before allocating a new one.
bool initialize(uint64_t max_l2_input_inflight) {
if (active_entries_ != nullptr) {
delete[] active_entries_;
}
entry_capacity_ = max_l2_input_inflight + 1;
active_entries_ = new (std::nothrow) ActiveInputEntry[entry_capacity_];|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughAdds an ordered L3-L2 message queue: C++ ABI/endpoint header, Python queue wrapper ( ChangesL3-L2 Message Queue
Estimated code review effort: 5 (Critical) | ~120 minutes Sequence Diagram(s)sequenceDiagram
participant L3Script as l3_l2_message_queue.py
participant L3Queue as L3L2Queue (Python)
participant Region as L3L2OrchRegion
participant L2Endpoint as L3L2QueueEndpoint (C++)
participant AIVKernel as kernel_queue_transform.cpp
L3Script->>L3Queue: create_l3_l2_queue(depth, arena bytes)
L3Queue->>Region: allocate descriptor rings + payload arenas
L3Script->>L3Queue: enqueue(input payload, mode)
L3Queue->>Region: write descriptor + payload, notify tail counter
L2Endpoint->>Region: peek(timeout) input queue
Region-->>L2Endpoint: DATA descriptor + payload
L2Endpoint->>AIVKernel: rt_submit_aiv_task(tile/scalar op)
AIVKernel-->>L2Endpoint: computed output tile
L2Endpoint->>Region: reserve + publish output descriptor
L3Script->>L3Queue: peek/read_into output
L3Queue->>Region: read payload, release, advance head
L3Script->>L3Queue: request_stop()
L3Queue->>Region: publish STOP descriptor
L2Endpoint->>Region: peek observes STOP, drains
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
src/common/platform/include/aicpu/l3_l2_message_queue.h (1)
38-43: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winPin the descriptor ABI in the header.
The size/offset guarantees only live in the unit test right now. Adding
static_asserts besideL3L2QueueDescSlotwill make any ABI drift fail in every build, not just the test target.Suggested hardening
struct L3L2QueueDescSlot { uint64_t seq; uint64_t opcode; uint64_t payload_offset; uint64_t payload_nbytes; }; + +static_assert(sizeof(L3L2QueueDescSlot) == 32, "L3L2QueueDescSlot ABI size changed"); +static_assert(offsetof(L3L2QueueDescSlot, seq) == 0, "L3L2QueueDescSlot::seq offset changed"); +static_assert(offsetof(L3L2QueueDescSlot, opcode) == 8, "L3L2QueueDescSlot::opcode offset changed"); +static_assert(offsetof(L3L2QueueDescSlot, payload_offset) == 16, "L3L2QueueDescSlot::payload_offset changed"); +static_assert(offsetof(L3L2QueueDescSlot, payload_nbytes) == 24, "L3L2QueueDescSlot::payload_nbytes changed");🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/common/platform/include/aicpu/l3_l2_message_queue.h` around lines 38 - 43, The descriptor ABI for L3L2QueueDescSlot is only enforced in tests right now, so add compile-time static_assert checks next to the struct definition to pin its size and member offsets. Use the L3L2QueueDescSlot type in the header to assert the expected layout so any ABI drift fails in every build, not just the unit test target.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@docs/l3-l2-message-queue.md`:
- Around line 169-172: Update the message-queue docs around
queue.input().try_peek() and queue.output().try_reserve() to remove timeout from
the false-return contract, since these methods are non-blocking and false should
only cover no progress, validation failure, or poison; keep the guidance to
check queue.error().kind for terminal errors and ensure the wording is
consistent with the try_* API semantics.
In
`@examples/a2a3/tensormap_and_ringbuffer/l3_l2_message_queue/kernels/orchestration/l3_l2_message_queue_orch.cpp`:
- Around line 123-145: process_first_pair() leaves consumed ActiveRequest slots
populated after release_input(), which lets the next batch overwrite or reuse
stale handles. Clear the released entries in the active array after each
successful release and make the mode == 1 path verify that active[0] is free
before assigning a new input handle. Apply the same cleanup to the related batch
handling code in the other matching block so both paths reset
active[0]/active[1] consistently before continuing the receive/process/publish
loop.
In `@src/common/platform/include/aicpu/l3_l2_message_queue.h`:
- Around line 360-419: The input peek path in the queue handling logic accepts
STOP descriptors with payload bytes, which should be rejected by the STOP
contract. Update the `try_peek` flow in the `L3L2Queue` input handling to
validate `opcode == L3L2QueueOpcode::STOP` before any payload read, and poison
with `L3L2QueueErrorKind::INVALID_DESCRIPTOR` if `slot.payload_nbytes` is
nonzero. Ensure `payload_matches_head`, `endpoint_.payload_read`, and the
`active_entries_` / `input_payload_head_` bookkeeping are only reached for valid
zero-byte STOP descriptors.
---
Nitpick comments:
In `@src/common/platform/include/aicpu/l3_l2_message_queue.h`:
- Around line 38-43: The descriptor ABI for L3L2QueueDescSlot is only enforced
in tests right now, so add compile-time static_assert checks next to the struct
definition to pin its size and member offsets. Use the L3L2QueueDescSlot type in
the header to assert the expected layout so any ABI drift fails in every build,
not just the unit test target.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e6cfe6c1-0173-4585-8104-6d1cd2e9d978
📒 Files selected for processing (12)
docs/l3-l2-message-queue.mddocs/l3-l2-orch-comm.mdexamples/a2a3/tensormap_and_ringbuffer/l3_l2_message_queue/kernels/aiv/kernel_queue_transform.cppexamples/a2a3/tensormap_and_ringbuffer/l3_l2_message_queue/kernels/orchestration/l3_l2_message_queue_orch.cppexamples/a2a3/tensormap_and_ringbuffer/l3_l2_message_queue/l3_l2_message_queue.pyexamples/a2a3/tensormap_and_ringbuffer/l3_l2_message_queue/test_l3_l2_message_queue.pypython/simpler/l3_l2_message_queue.pypython/simpler/orchestrator.pysrc/common/platform/include/aicpu/l3_l2_message_queue.htests/ut/cpp/CMakeLists.txttests/ut/cpp/common/test_l3_l2_message_queue.cpptests/ut/py/test_worker/test_l3_l2_message_queue.py
| `queue.input().try_peek(&input)` and | ||
| `queue.output().try_reserve(nbytes, &reservation)` are non-blocking. A `false` | ||
| return can mean no progress, timeout, validation failure, or poison; check | ||
| `queue.error().kind` to distinguish ordinary no-progress from terminal error. |
There was a problem hiding this comment.
📐 Maintainability & Code Quality | 🟡 Minor | ⚡ Quick win
Remove timeout from the try_* contract.
try_peek() and try_reserve() are non-blocking, so false here cannot mean timeout. Keeping timeout in this list makes the API contract internally inconsistent for callers.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@docs/l3-l2-message-queue.md` around lines 169 - 172, Update the message-queue
docs around queue.input().try_peek() and queue.output().try_reserve() to remove
timeout from the false-return contract, since these methods are non-blocking and
false should only cover no progress, validation failure, or poison; keep the
guidance to check queue.error().kind for terminal errors and ensure the wording
is consistent with the try_* API semantics.
| bool process_first_pair(L3L2QueueEndpoint &queue, ActiveRequest *active, const L3L2QueueInputHandle &input) { | ||
| active[1].handle = input; | ||
| if (!parse_input_header(input, &active[1].header) || active[0].header.request_id == 0) { | ||
| rt_report_fatal(PTO2_ERROR_EXPLICIT_ORCH_FATAL, "invalid L3-L2 queue example request"); | ||
| return false; | ||
| } | ||
| if (!publish_aiv_output( | ||
| queue, active[1].handle, active[1].handle, active[1].header.request_id, 20, 0, ADD_SCALAR, 20.0F | ||
| )) { | ||
| return false; | ||
| } | ||
| if (!release_input(queue, active[1].handle)) { | ||
| return false; | ||
| } | ||
| if (!publish_aiv_output( | ||
| queue, active[0].handle, active[0].handle, active[0].header.request_id, 10, 0, ADD_SCALAR, 10.0F | ||
| ) || | ||
| !publish_aiv_output( | ||
| queue, active[0].handle, active[0].handle, active[0].header.request_id, 11, 0, ADD_SCALAR, 11.0F | ||
| )) { | ||
| return false; | ||
| } | ||
| return release_input(queue, active[0].handle); |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟡 Minor | ⚡ Quick win
Clear consumed active slots before the next batch.
process_first_pair() releases active[0]/active[1] but leaves both slots populated, and the mode == 1 path overwrites active[0] without checking whether a prior request is still pending. A second batch can therefore leak the overwritten handle or reuse a handle that was already released, which breaks the example’s persistent receive/process/publish loop.
Proposed fix
bool process_first_pair(L3L2QueueEndpoint &queue, ActiveRequest *active, const L3L2QueueInputHandle &input) {
active[1].handle = input;
if (!parse_input_header(input, &active[1].header) || active[0].header.request_id == 0) {
rt_report_fatal(PTO2_ERROR_EXPLICIT_ORCH_FATAL, "invalid L3-L2 queue example request");
return false;
@@
if (!release_input(queue, active[1].handle)) {
return false;
}
+ active[1] = {};
if (!publish_aiv_output(
queue, active[0].handle, active[0].handle, active[0].header.request_id, 10, 0, ADD_SCALAR, 10.0F
) ||
!publish_aiv_output(
queue, active[0].handle, active[0].handle, active[0].header.request_id, 11, 0, ADD_SCALAR, 11.0F
)) {
return false;
}
- return release_input(queue, active[0].handle);
+ if (!release_input(queue, active[0].handle)) {
+ return false;
+ }
+ active[0] = {};
+ return true;
}
@@
if (header.mode == 1) {
+ if (active[0].header.request_id != 0) {
+ rt_report_fatal(PTO2_ERROR_EXPLICIT_ORCH_FATAL, "L3-L2 queue example received mode=1 while a request is still pending");
+ return false;
+ }
active[0].handle = input;
active[0].header = header;
return true;
}Also applies to: 169-178
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@examples/a2a3/tensormap_and_ringbuffer/l3_l2_message_queue/kernels/orchestration/l3_l2_message_queue_orch.cpp`
around lines 123 - 145, process_first_pair() leaves consumed ActiveRequest slots
populated after release_input(), which lets the next batch overwrite or reuse
stale handles. Clear the released entries in the active array after each
successful release and make the mode == 1 path verify that active[0] is free
before assigning a new input handle. Apply the same cleanup to the related batch
handling code in the other matching block so both paths reset
active[0]/active[1] consistently before continuing the receive/process/publish
loop.
| L3L2QueueOpcode opcode = static_cast<L3L2QueueOpcode>(slot.opcode); | ||
| if (!l3_l2_queue_valid_opcode(opcode)) { | ||
| parent_->poison(L3L2QueueErrorKind::INVALID_DESCRIPTOR, "input.try_peek", "invalid input opcode"); | ||
| return false; | ||
| } | ||
| bool counts_against_window = opcode == L3L2QueueOpcode::DATA || opcode == L3L2QueueOpcode::ERROR; | ||
| if (counts_against_window && active_non_stop_count_ >= parent_->config_.max_l2_input_inflight) { | ||
| return false; | ||
| } | ||
| if (active_count_ >= entry_capacity_) { | ||
| parent_->poison(L3L2QueueErrorKind::OWNERSHIP, "input.try_peek", "input window state full"); | ||
| return false; | ||
| } | ||
|
|
||
| L3L2OrchPayloadView view{0, 0}; | ||
| if (slot.payload_nbytes == 0) { | ||
| if (slot.payload_offset != 0) { | ||
| parent_->poison( | ||
| L3L2QueueErrorKind::INVALID_DESCRIPTOR, "input.try_peek", | ||
| "zero-byte descriptor uses nonzero payload offset" | ||
| ); | ||
| return false; | ||
| } | ||
| } else if (!parent_->payload_in_arena( | ||
| slot.payload_offset, slot.payload_nbytes, parent_->layout_.input_arena_offset, | ||
| parent_->layout_.input_arena_bytes | ||
| )) { | ||
| parent_->poison(L3L2QueueErrorKind::INVALID_DESCRIPTOR, "input.try_peek", "input payload out of arena"); | ||
| return false; | ||
| } else if (!parent_->payload_matches_head( | ||
| parent_->input_payload_head_, slot.payload_offset, slot.payload_nbytes, | ||
| parent_->layout_.input_arena_offset, parent_->layout_.input_arena_bytes, "input.try_peek" | ||
| )) { | ||
| return false; | ||
| } else if (!parent_->endpoint_.payload_read(slot.payload_offset, slot.payload_nbytes, &view)) { | ||
| parent_->poison( | ||
| L3L2QueueErrorKind::ENDPOINT_ERROR, "input.try_peek", parent_->endpoint_.error().message | ||
| ); | ||
| return false; | ||
| } | ||
|
|
||
| *out = L3L2QueueInputHandle{slot.seq, opcode, slot.payload_offset, slot.payload_nbytes, view}; | ||
| active_entries_[active_count_] = | ||
| ActiveInputEntry{slot.seq, opcode, slot.payload_offset, slot.payload_nbytes, view, false}; | ||
| active_count_ += 1; | ||
| if (counts_against_window) { | ||
| active_non_stop_count_ += 1; | ||
| } | ||
| input_acquire_ += 1; | ||
| if (opcode == L3L2QueueOpcode::STOP) { | ||
| stop_observed_ = true; | ||
| if (parent_->input_tail_ != input_acquire_) { | ||
| parent_->poison( | ||
| L3L2QueueErrorKind::INVALID_DESCRIPTOR, "input.try_peek", | ||
| "input descriptor published after STOP" | ||
| ); | ||
| return false; | ||
| } | ||
| } | ||
| return true; |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win
Reject STOP descriptors that carry payload bytes.
The STOP contract is zero-byte, but this path currently accepts opcode == STOP with nonzero payload_nbytes, exposes a payload view, and then advances input_payload_head_ on release. That lets malformed shared state bypass INVALID_DESCRIPTOR poison and breaks the documented STOP ABI.
Suggested fix
L3L2QueueOpcode opcode = static_cast<L3L2QueueOpcode>(slot.opcode);
if (!l3_l2_queue_valid_opcode(opcode)) {
parent_->poison(L3L2QueueErrorKind::INVALID_DESCRIPTOR, "input.try_peek", "invalid input opcode");
return false;
}
+ if (opcode == L3L2QueueOpcode::STOP && (slot.payload_offset != 0 || slot.payload_nbytes != 0)) {
+ parent_->poison(
+ L3L2QueueErrorKind::INVALID_DESCRIPTOR, "input.try_peek", "STOP descriptor must be zero-byte"
+ );
+ return false;
+ }
bool counts_against_window = opcode == L3L2QueueOpcode::DATA || opcode == L3L2QueueOpcode::ERROR;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| L3L2QueueOpcode opcode = static_cast<L3L2QueueOpcode>(slot.opcode); | |
| if (!l3_l2_queue_valid_opcode(opcode)) { | |
| parent_->poison(L3L2QueueErrorKind::INVALID_DESCRIPTOR, "input.try_peek", "invalid input opcode"); | |
| return false; | |
| } | |
| bool counts_against_window = opcode == L3L2QueueOpcode::DATA || opcode == L3L2QueueOpcode::ERROR; | |
| if (counts_against_window && active_non_stop_count_ >= parent_->config_.max_l2_input_inflight) { | |
| return false; | |
| } | |
| if (active_count_ >= entry_capacity_) { | |
| parent_->poison(L3L2QueueErrorKind::OWNERSHIP, "input.try_peek", "input window state full"); | |
| return false; | |
| } | |
| L3L2OrchPayloadView view{0, 0}; | |
| if (slot.payload_nbytes == 0) { | |
| if (slot.payload_offset != 0) { | |
| parent_->poison( | |
| L3L2QueueErrorKind::INVALID_DESCRIPTOR, "input.try_peek", | |
| "zero-byte descriptor uses nonzero payload offset" | |
| ); | |
| return false; | |
| } | |
| } else if (!parent_->payload_in_arena( | |
| slot.payload_offset, slot.payload_nbytes, parent_->layout_.input_arena_offset, | |
| parent_->layout_.input_arena_bytes | |
| )) { | |
| parent_->poison(L3L2QueueErrorKind::INVALID_DESCRIPTOR, "input.try_peek", "input payload out of arena"); | |
| return false; | |
| } else if (!parent_->payload_matches_head( | |
| parent_->input_payload_head_, slot.payload_offset, slot.payload_nbytes, | |
| parent_->layout_.input_arena_offset, parent_->layout_.input_arena_bytes, "input.try_peek" | |
| )) { | |
| return false; | |
| } else if (!parent_->endpoint_.payload_read(slot.payload_offset, slot.payload_nbytes, &view)) { | |
| parent_->poison( | |
| L3L2QueueErrorKind::ENDPOINT_ERROR, "input.try_peek", parent_->endpoint_.error().message | |
| ); | |
| return false; | |
| } | |
| *out = L3L2QueueInputHandle{slot.seq, opcode, slot.payload_offset, slot.payload_nbytes, view}; | |
| active_entries_[active_count_] = | |
| ActiveInputEntry{slot.seq, opcode, slot.payload_offset, slot.payload_nbytes, view, false}; | |
| active_count_ += 1; | |
| if (counts_against_window) { | |
| active_non_stop_count_ += 1; | |
| } | |
| input_acquire_ += 1; | |
| if (opcode == L3L2QueueOpcode::STOP) { | |
| stop_observed_ = true; | |
| if (parent_->input_tail_ != input_acquire_) { | |
| parent_->poison( | |
| L3L2QueueErrorKind::INVALID_DESCRIPTOR, "input.try_peek", | |
| "input descriptor published after STOP" | |
| ); | |
| return false; | |
| } | |
| } | |
| return true; | |
| L3L2QueueOpcode opcode = static_cast<L3L2QueueOpcode>(slot.opcode); | |
| if (!l3_l2_queue_valid_opcode(opcode)) { | |
| parent_->poison(L3L2QueueErrorKind::INVALID_DESCRIPTOR, "input.try_peek", "invalid input opcode"); | |
| return false; | |
| } | |
| if (opcode == L3L2QueueOpcode::STOP && (slot.payload_offset != 0 || slot.payload_nbytes != 0)) { | |
| parent_->poison( | |
| L3L2QueueErrorKind::INVALID_DESCRIPTOR, "input.try_peek", "STOP descriptor must be zero-byte" | |
| ); | |
| return false; | |
| } | |
| bool counts_against_window = opcode == L3L2QueueOpcode::DATA || opcode == L3L2QueueOpcode::ERROR; | |
| if (counts_against_window && active_non_stop_count_ >= parent_->config_.max_l2_input_inflight) { | |
| return false; | |
| } | |
| if (active_count_ >= entry_capacity_) { | |
| parent_->poison(L3L2QueueErrorKind::OWNERSHIP, "input.try_peek", "input window state full"); | |
| return false; | |
| } | |
| L3L2OrchPayloadView view{0, 0}; | |
| if (slot.payload_nbytes == 0) { | |
| if (slot.payload_offset != 0) { | |
| parent_->poison( | |
| L3L2QueueErrorKind::INVALID_DESCRIPTOR, "input.try_peek", | |
| "zero-byte descriptor uses nonzero payload offset" | |
| ); | |
| return false; | |
| } | |
| } else if (!parent_->payload_in_arena( | |
| slot.payload_offset, slot.payload_nbytes, parent_->layout_.input_arena_offset, | |
| parent_->layout_.input_arena_bytes | |
| )) { | |
| parent_->poison(L3L2QueueErrorKind::INVALID_DESCRIPTOR, "input.try_peek", "input payload out of arena"); | |
| return false; | |
| } else if (!parent_->payload_matches_head( | |
| parent_->input_payload_head_, slot.payload_offset, slot.payload_nbytes, | |
| parent_->layout_.input_arena_offset, parent_->layout_.input_arena_bytes, "input.try_peek" | |
| )) { | |
| return false; | |
| } else if (!parent_->endpoint_.payload_read(slot.payload_offset, slot.payload_nbytes, &view)) { | |
| parent_->poison( | |
| L3L2QueueErrorKind::ENDPOINT_ERROR, "input.try_peek", parent_->endpoint_.error().message | |
| ); | |
| return false; | |
| } | |
| *out = L3L2QueueInputHandle{slot.seq, opcode, slot.payload_offset, slot.payload_nbytes, view}; | |
| active_entries_[active_count_] = | |
| ActiveInputEntry{slot.seq, opcode, slot.payload_offset, slot.payload_nbytes, view, false}; | |
| active_count_ += 1; | |
| if (counts_against_window) { | |
| active_non_stop_count_ += 1; | |
| } | |
| input_acquire_ += 1; | |
| if (opcode == L3L2QueueOpcode::STOP) { | |
| stop_observed_ = true; | |
| if (parent_->input_tail_ != input_acquire_) { | |
| parent_->poison( | |
| L3L2QueueErrorKind::INVALID_DESCRIPTOR, "input.try_peek", | |
| "input descriptor published after STOP" | |
| ); | |
| return false; | |
| } | |
| } | |
| return true; |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/common/platform/include/aicpu/l3_l2_message_queue.h` around lines 360 -
419, The input peek path in the queue handling logic accepts STOP descriptors
with payload bytes, which should be rejected by the STOP contract. Update the
`try_peek` flow in the `L3L2Queue` input handling to validate `opcode ==
L3L2QueueOpcode::STOP` before any payload read, and poison with
`L3L2QueueErrorKind::INVALID_DESCRIPTOR` if `slot.payload_nbytes` is nonzero.
Ensure `payload_matches_head`, `endpoint_.payload_read`, and the
`active_entries_` / `input_payload_head_` bookkeeping are only reached for valid
zero-byte STOP descriptors.
- Let L2 peek spin on input timeout while L3 reads outputs between bursts - Track input_payload_acquire_head separately from release head so multi- inflight non-zero payload peeks no longer fail offset validation - Add C++ UT for non-zero payload offsets before release
5da6cc4 to
008ba6f
Compare
Summary
L3L2QueueEndpointConfig{.max_l2_input_inflight = N}.queue.input().drained().tensormap_and_ringbuffer/l3_l2_message_queueexample, using PTO-ISA AIV tile compute while L2 keeps a persistent receive/process/publish loop.Base / CI Notes
The PR base is
mainbecause #1187 has not merged intomainyet. With this setup, GitHub CI will test the combined stack: #1187 plus this PR.After #1187 is merged, this PR should be rebased onto the updated
mainso the review diff only shows the input-window changes.