Skip to content

Add: L3-L2 message queue example#1187

Open
ccyywwen wants to merge 11 commits into
hw-native-sys:mainfrom
ccyywwen:l3-l2-message-queue-example
Open

Add: L3-L2 message queue example#1187
ccyywwen wants to merge 11 commits into
hw-native-sys:mainfrom
ccyywwen:l3-l2-message-queue-example

Conversation

@ccyywwen

Copy link
Copy Markdown
Contributor

Summary

  • This is a stacked PR based on Add: L3-L2 message queue design #1130.
  • Add support for lazy host-buffer staging in the L3-L2 message queue: ordinary readable host buffers can be enqueued, ordinary writable host buffers can receive outputs, and registered orch.alloc(...) tensors still use the no-staging fast path.
  • Add a tensormap_and_ringbuffer example that streams DATA and STOP messages through the L3-L2 queue and runs on a2a3sim, a5sim, and a2a3.

Base / CI Notes

This PR intentionally contains the commits from #1130 plus one additional PR2 commit.

The PR base is main because #1130's branch lives in the fork as ccyywwen:l3-l2-orch-message-queue, not as a branch in hw-native-sys/simpler. With this setup, GitHub CI will test the combined stack: #1130 plus this PR.

After #1130 is merged, this PR should be rebased onto the updated main so the review diff only shows the PR2 changes.

ccyywwen added 3 commits June 26, 2026 15:49
- Define the staged base queue transport design and PR1/PR2 split.
- Add the base implementation plan for the queue stack.
- Implement the PR1 L3 queue wrapper and L2 endpoint ABI on top of
  the primitive L3-L2 orchestration region transport.
- Wire Orchestrator.create_l3_l2_queue and cover descriptor layout,
  zero-byte messages, abort flags, capacity, and fast-path buffers in
  Python and C++ unit tests.
- Drop the base implementation guide from tracked PR1 files while keeping
  it available locally for PR2 planning.
- Keep the L3-L2 queue Python tests compatible with the pyright target and
  ruff formatting used by CI.
@coderabbitai

coderabbitai Bot commented Jun 29, 2026

Copy link
Copy Markdown

Review Change Stack

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 91c551e6-5142-491c-bdb4-3d12db4e6024

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Introduces a complete L3↔L2 SPSC message queue abstraction: a design specification document, a C++ single-header endpoint implementation (L3L2QueueEndpoint with InputQueue/OutputQueue), a Python L3-side wrapper module with staging and abort-flag logic, an Orchestrator.create_l3_l2_queue API, an end-to-end example with an AIV elementwise kernel and an L2 orchestration kernel, plus C++ GoogleTest and Python pytest unit test suites.

Changes

L3↔L2 SPSC Message Queue

Layer / File(s) Summary
Design specification
docs/l3-l2-message-queue-design.md
Full 922-line spec covering API contract, region/descriptor ABI, opcode semantics, counter signaling model, payload arena rules, operation sequences, STOP/error/poison model, L2 input-window extension, lifetime/cleanup, and staged implementation plan.
C++ header: constants, types, layout, and endpoint
src/common/platform/include/aicpu/l3_l2_message_queue.h
Single-header implementation defining ABI constants, descriptor/layout/handle structs, opcode and error enums, inline layout computation and validation helpers, counter reconstruction, and the full L3L2QueueEndpoint class with InputQueue (peek/try_peek/release) and OutputQueue (reserve/try_reserve/publish) implementations.
Python L3-side queue module
python/simpler/l3_l2_message_queue.py, python/simpler/orchestrator.py
New module with ABI constants, dataclasses, make_l3_l2_queue_layout, create_l3_l2_queue factory, L3L2Queue core lifecycle (state tracking, staging allocation, counter reconstruction, remote-abort sampling), _L3InputQueue enqueue path, _L3OutputQueue dequeue path; plus Orchestrator.create_l3_l2_queue wrapper with Worker binding check.
End-to-end example
examples/a2a3/tensormap_and_ringbuffer/l3_l2_message_queue/kernels/aiv/kernel_queue_transform.cpp, examples/.../kernels/orchestration/l3_l2_message_queue_orch.cpp, examples/.../l3_l2_message_queue.py, examples/.../test_l3_l2_message_queue.py
AIV kernel performing elementwise float-add; L2 orchestration kernel implementing the peek/DATA/STOP dispatch loop with AI task submission; Python driver that enqueues payloads, signals stop, and validates transformed outputs; pytest wrapper for platform execution.
C++ unit tests
tests/ut/cpp/CMakeLists.txt, tests/ut/cpp/common/test_l3_l2_message_queue.cpp
GoogleTest suite (CMake-registered, no_hardware label) covering layout computation, descriptor slot ABI stability, counter reconstruction wraparound, input/output peek/reserve/publish flows, abort-flag semantics, capacity fullness, error opcodes, ownership protocol violations, null-handle rejection, and STOP ordering.
Python unit tests
tests/ut/py/test_worker/test_l3_l2_message_queue.py
Pytest suite with fake worker/orchestrator/client infrastructure covering layout validation, queue creation wiring, enqueue paths (zero-byte, fast-path, staging, descriptor replay, alloc failure injection), output dequeue paths (fast-path, staging, read-only rejection, STOP/offset poison), negative full/oversized paths, timeout/peer-abort coordination, and expired-queue rejection.

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Poem

🐇 Hoppity-hop through the queue we go,
L3 sends messages, L2 says "whoa!"
Descriptors and arenas, counters in stride,
Abort flags and staging all tucked inside.
STOP means stop, DATA means go—
The rabbit reviewed it, tail to and fro! 🐾

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 1.27% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title is concise and accurately points to the new L3-L2 message queue example added in this changeset.
Description check ✅ Passed The description is clearly related and matches the queue staging support and example work in the pull request.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces an L3-L2 SPSC message queue wrapper, including design documentation, Python wrappers, C++ headers, orchestration examples, and comprehensive unit tests. The review feedback highlights several critical improvement opportunities: gating system timer reads inside tight spin loops in peek and reserve to reduce CPU overhead, validating user-supplied parameters to prevent a potential integer overflow when calculating output_arena_offset, and defensively guarding against null pointers in report_queue_error to avoid crashes.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +224 to +243
bool peek(uint64_t timeout_ns, L3L2QueueInputHandle *out) {
if (out == nullptr) {
return false;
}
uint64_t start = l3_l2_orch_endpoint_now();
uint64_t frequency_hz = l3_l2_orch_endpoint_timer_frequency_hz();
while (true) {
if (try_peek(out)) {
return true;
}
if (parent_->error_.kind != L3L2QueueErrorKind::NONE) {
return false;
}
uint64_t now = l3_l2_orch_endpoint_now();
if (timeout_ns == 0 || l3_l2_orch_endpoint_elapsed_ns(start, now, frequency_hz) >= timeout_ns) {
parent_->disambiguate_timeout();
return false;
}
}
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The peek function implements a tight spin loop that calls try_peek and l3_l2_orch_endpoint_now() on every single iteration. Reading system counters (such as l3_l2_orch_endpoint_now()) frequently can be an expensive operation (e.g., system register or MMIO reads) that introduces significant CPU overhead and latency.

To minimize CPU overhead and latency impact, gate the system timer checks to run periodically (e.g., every 1024 spins) as per the repository's general rules.

        bool peek(uint64_t timeout_ns, L3L2QueueInputHandle *out) {
            if (out == nullptr) {
                return false;
            }
            uint64_t start = l3_l2_orch_endpoint_now();
            uint64_t frequency_hz = l3_l2_orch_endpoint_timer_frequency_hz();
            uint32_t spins = 0;
            while (true) {
                if (try_peek(out)) {
                    return true;
                }
                if (parent_->error_.kind != L3L2QueueErrorKind::NONE) {
                    return false;
                }
                if (timeout_ns == 0) {
                    parent_->disambiguate_timeout();
                    return false;
                }
                if (++spins >= 1024) {
                    spins = 0;
                    uint64_t now = l3_l2_orch_endpoint_now();
                    if (l3_l2_orch_endpoint_elapsed_ns(start, now, frequency_hz) >= timeout_ns) {
                        parent_->disambiguate_timeout();
                        return false;
                    }
                }
            }
        }
References
  1. Avoid reading system counters (which can be expensive MMIO reads) or performing complex structural checks on every iteration of a tight spin loop. Instead, gate these checks to run periodically (e.g., every 1024 spins) to minimize CPU overhead and latency impact.

Comment on lines +368 to +387
bool reserve(uint64_t nbytes, uint64_t timeout_ns, L3L2QueueOutputReservation *out) {
if (out == nullptr) {
return false;
}
uint64_t start = l3_l2_orch_endpoint_now();
uint64_t frequency_hz = l3_l2_orch_endpoint_timer_frequency_hz();
while (true) {
if (try_reserve(nbytes, out)) {
return true;
}
if (parent_->error_.kind != L3L2QueueErrorKind::NONE) {
return false;
}
uint64_t now = l3_l2_orch_endpoint_now();
if (timeout_ns == 0 || l3_l2_orch_endpoint_elapsed_ns(start, now, frequency_hz) >= timeout_ns) {
parent_->disambiguate_timeout();
return false;
}
}
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similarly to peek, the reserve function implements a tight spin loop that calls try_reserve and l3_l2_orch_endpoint_now() on every single iteration. This causes high CPU overhead due to frequent system counter reads.

Gate the system timer checks to run periodically (e.g., every 1024 spins) to minimize CPU overhead and latency impact.

        bool reserve(uint64_t nbytes, uint64_t timeout_ns, L3L2QueueOutputReservation *out) {
            if (out == nullptr) {
                return false;
            }
            uint64_t start = l3_l2_orch_endpoint_now();
            uint64_t frequency_hz = l3_l2_orch_endpoint_timer_frequency_hz();
            uint32_t spins = 0;
            while (true) {
                if (try_reserve(nbytes, out)) {
                    return true;
                }
                if (parent_->error_.kind != L3L2QueueErrorKind::NONE) {
                    return false;
                }
                if (timeout_ns == 0) {
                    parent_->disambiguate_timeout();
                    return false;
                }
                if (++spins >= 1024) {
                    spins = 0;
                    uint64_t now = l3_l2_orch_endpoint_now();
                    if (l3_l2_orch_endpoint_elapsed_ns(start, now, frequency_hz) >= timeout_ns) {
                        parent_->disambiguate_timeout();
                        return false;
                    }
                }
            }
        }
References
  1. Avoid reading system counters (which can be expensive MMIO reads) or performing complex structural checks on every iteration of a tight spin loop. Instead, gate these checks to run periodically (e.g., every 1024 spins) to minimize CPU overhead and latency impact.

Comment on lines +147 to +151
uint64_t output_arena_offset =
l3_l2_queue_align_up(input_arena_offset + input_arena_bytes, L3L2_QUEUE_PAYLOAD_ARENA_ALIGNMENT);
if (l3_l2_orch_comm_add_overflows(output_arena_offset, output_arena_bytes)) {
return false;
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The calculation of output_arena_offset uses input_arena_offset + input_arena_bytes. Since input_arena_bytes is a user-supplied parameter, this addition can overflow uint64_t. If it overflows, output_arena_offset will wrap around to a small value, and the subsequent overflow check l3_l2_orch_comm_add_overflows(output_arena_offset, output_arena_bytes) may not detect the overflow if the final sum does not overflow. This can lead to out-of-bounds memory access or layout corruption.

Validate that input_arena_offset + input_arena_bytes does not overflow before aligning and calculating output_arena_offset.

    if (l3_l2_orch_comm_add_overflows(input_arena_offset, input_arena_bytes)) {
        return false;
    }
    uint64_t output_arena_offset =
        l3_l2_queue_align_up(input_arena_offset + input_arena_bytes, L3L2_QUEUE_PAYLOAD_ARENA_ALIGNMENT);
    if (l3_l2_orch_comm_add_overflows(output_arena_offset, output_arena_bytes)) {
        return false;
    }

Comment on lines +23 to +29
void report_queue_error(const L3L2QueueEndpoint &queue) {
const L3L2QueueError &err = queue.error();
rt_report_fatal(
PTO2_ERROR_EXPLICIT_ORCH_FATAL, "L3-L2 queue error op=%s kind=%u region=%llu msg=%s", err.op,
static_cast<unsigned>(err.kind), static_cast<unsigned long long>(err.region_id), err.message
);
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In report_queue_error, err.op and err.message are printed using %s in rt_report_fatal. If either err.op or err.message is nullptr (for example, if an underlying endpoint error has a null message), this will cause a null pointer dereference and crash the AICPU process.

Defensively guard against null pointers by providing fallback string literals.

Suggested change
void report_queue_error(const L3L2QueueEndpoint &queue) {
const L3L2QueueError &err = queue.error();
rt_report_fatal(
PTO2_ERROR_EXPLICIT_ORCH_FATAL, "L3-L2 queue error op=%s kind=%u region=%llu msg=%s", err.op,
static_cast<unsigned>(err.kind), static_cast<unsigned long long>(err.region_id), err.message
);
}
void report_queue_error(const L3L2QueueEndpoint &queue) {
const L3L2QueueError &err = queue.error();
rt_report_fatal(
PTO2_ERROR_EXPLICIT_ORCH_FATAL, "L3-L2 queue error op=%s kind=%u region=%llu msg=%s",
err.op ? err.op : "unknown",
static_cast<unsigned>(err.kind),
static_cast<unsigned long long>(err.region_id),
err.message ? err.message : "unknown"
);
}

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
python/simpler/l3_l2_message_queue.py (1)

497-527: 🚀 Performance & Scalability | 🔵 Trivial | ⚡ Quick win

Move ordinary-buffer staging after the no-progress checks.

This path allocates/copies the staging tensor before you know whether the ring or payload arena can accept the message. On a full queue, try_enqueue() still does host copies and may even allocate, which defeats the PR’s “lazy staging” goal and turns retry loops into allocator hot paths.

Suggested fix
-        if staged_span is not None:
-            payload_tensor = queue._ensure_staging_capacity(nbytes)
-            queue._copy_host_span_to_tensor(staged_span, payload_tensor)
-
         old_head = queue._input_head
         queue._input_head = queue._refresh_counter(
             queue._layout.input_desc_head_offset, queue._input_head, queue._layout.depth
         )
@@
         if nbytes != 0:
             arena_pos = queue._input_payload_tail % queue._layout.input_arena_bytes
             if arena_pos + nbytes > queue._layout.input_arena_bytes:
                 queue._input_payload_tail += queue._layout.input_arena_bytes - arena_pos
                 arena_pos = 0
             if queue._input_payload_tail + nbytes - queue._input_payload_head > queue._layout.input_arena_bytes:
                 return False
+            if staged_span is not None:
+                payload_tensor = queue._ensure_staging_capacity(nbytes)
+                queue._copy_host_span_to_tensor(staged_span, payload_tensor)
             payload_offset = queue._layout.input_arena_offset + arena_pos
             queue._run_primitive(queue._region.payload_write, payload_offset, payload_tensor, nbytes=nbytes)
             queue._input_payload_tail += nbytes
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@python/simpler/l3_l2_message_queue.py` around lines 497 - 527, In the enqueue
path around the queue._ensure_staging_capacity and
queue._copy_host_span_to_tensor logic, delay ordinary-buffer staging until after
all no-progress checks have passed. First validate queue fullness, payload arena
space, and STOP/size constraints using try_enqueue and the existing
_input_head/_input_tail and _layout checks, then allocate/copy the staged tensor
only when the message is guaranteed to proceed. This keeps lazy staging behavior
and avoids host copies or allocations on rejected enqueue attempts.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@python/simpler/l3_l2_message_queue.py`:
- Around line 207-225: The L3L2 queue factory leaks the remote region if
construction fails after `orch.create_l3_l2_region(...)` succeeds, because later
`orch.alloc(...)` calls or counter initialization can exit without a freeable
handle. Update the queue निर्माण path in `l3_l2_message_queue.py` around
`make_l3_l2_queue_layout`, `create_l3_l2_region`, and the
`orch.alloc`/`region.counter(...).notify(...)` sequence to use cleanup on
exceptions: keep track of the created region and ensure it is released if any
subsequent step fails before returning `L3L2Queue`.

In `@src/common/platform/include/aicpu/l3_l2_message_queue.h`:
- Around line 322-350: The input.release path in L3L2QueueInputQueue is trusting
mutable fields from L3L2QueueInputHandle, which lets callers alter payload
offsets, sizes, or opcode after try_peek() and before release(). Cache the
active descriptor state inside InputQueue when the handle is acquired, then have
release() validate and advance using the cached values instead of the
caller-provided handle fields; keep active_seq_ as the ownership check, but use
the cached payload and opcode data to update input_payload_head_ and stopped_
safely.
- Around line 121-127: The layout builder in l3_l2_queue_make_layout() can
overflow on intermediate offset/size calculations before the final bounds check.
Add fail-closed overflow validation for each add/align step when computing
output_desc_offset, input_desc_offset, input_arena_offset, and the aligned sizes
before writing into L3L2QueueLayout. Reuse l3_l2_queue_align_up() and ensure
l3_l2_queue_validate_region() only sees fully verified, non-wrapping offsets so
OutputQueue::try_reserve() cannot expose metadata as writable payload.

---

Nitpick comments:
In `@python/simpler/l3_l2_message_queue.py`:
- Around line 497-527: In the enqueue path around the
queue._ensure_staging_capacity and queue._copy_host_span_to_tensor logic, delay
ordinary-buffer staging until after all no-progress checks have passed. First
validate queue fullness, payload arena space, and STOP/size constraints using
try_enqueue and the existing _input_head/_input_tail and _layout checks, then
allocate/copy the staged tensor only when the message is guaranteed to proceed.
This keeps lazy staging behavior and avoids host copies or allocations on
rejected enqueue attempts.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 2bfe4962-4412-4c31-8e01-fee327e47dc5

📥 Commits

Reviewing files that changed from the base of the PR and between 8ac5ee8 and 090a3d7.

📒 Files selected for processing (11)
  • docs/l3-l2-message-queue-design.md
  • examples/a2a3/tensormap_and_ringbuffer/l3_l2_message_queue/kernels/aiv/kernel_queue_transform.cpp
  • examples/a2a3/tensormap_and_ringbuffer/l3_l2_message_queue/kernels/orchestration/l3_l2_message_queue_orch.cpp
  • examples/a2a3/tensormap_and_ringbuffer/l3_l2_message_queue/l3_l2_message_queue.py
  • examples/a2a3/tensormap_and_ringbuffer/l3_l2_message_queue/test_l3_l2_message_queue.py
  • python/simpler/l3_l2_message_queue.py
  • python/simpler/orchestrator.py
  • src/common/platform/include/aicpu/l3_l2_message_queue.h
  • tests/ut/cpp/CMakeLists.txt
  • tests/ut/cpp/common/test_l3_l2_message_queue.cpp
  • tests/ut/py/test_worker/test_l3_l2_message_queue.py

Comment on lines +207 to +225
layout = make_l3_l2_queue_layout(depth, input_arena_bytes, output_arena_bytes)
region = orch.create_l3_l2_region(
worker_id=int(worker_id),
payload_bytes=layout.payload_bytes,
counter_bytes=layout.counter_bytes,
)
desc_fields = orch.alloc([24], DataType.UINT8)
desc_seq = orch.alloc([8], DataType.UINT8)
desc_read = orch.alloc([L3L2_QUEUE_DESC_SLOT_BYTES], DataType.UINT8)
for offset in (
layout.input_desc_tail_offset,
layout.input_desc_head_offset,
layout.output_desc_tail_offset,
layout.output_desc_head_offset,
layout.l3_abort_flag_offset,
layout.l2_abort_flag_offset,
):
region.counter(offset).notify(0, NotifyOp.Set)
return L3L2Queue(orch, region, layout, desc_fields, desc_seq, desc_read)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟠 Major | ⚡ Quick win

Clean up the region on partial queue-construction failures.

After orch.create_l3_l2_region(...) succeeds, any later failure in orch.alloc(...) or counter initialization exits this factory without giving the caller a handle they can free(). That leaks the remote region on the error path.

Suggested fix
 def create_l3_l2_queue(
     orch: Any,
@@
 ) -> L3L2Queue:
     layout = make_l3_l2_queue_layout(depth, input_arena_bytes, output_arena_bytes)
     region = orch.create_l3_l2_region(
         worker_id=int(worker_id),
         payload_bytes=layout.payload_bytes,
         counter_bytes=layout.counter_bytes,
     )
-    desc_fields = orch.alloc([24], DataType.UINT8)
-    desc_seq = orch.alloc([8], DataType.UINT8)
-    desc_read = orch.alloc([L3L2_QUEUE_DESC_SLOT_BYTES], DataType.UINT8)
-    for offset in (
-        layout.input_desc_tail_offset,
-        layout.input_desc_head_offset,
-        layout.output_desc_tail_offset,
-        layout.output_desc_head_offset,
-        layout.l3_abort_flag_offset,
-        layout.l2_abort_flag_offset,
-    ):
-        region.counter(offset).notify(0, NotifyOp.Set)
-    return L3L2Queue(orch, region, layout, desc_fields, desc_seq, desc_read)
+    try:
+        desc_fields = orch.alloc([24], DataType.UINT8)
+        desc_seq = orch.alloc([8], DataType.UINT8)
+        desc_read = orch.alloc([L3L2_QUEUE_DESC_SLOT_BYTES], DataType.UINT8)
+        for offset in (
+            layout.input_desc_tail_offset,
+            layout.input_desc_head_offset,
+            layout.output_desc_tail_offset,
+            layout.output_desc_head_offset,
+            layout.l3_abort_flag_offset,
+            layout.l2_abort_flag_offset,
+        ):
+            region.counter(offset).notify(0, NotifyOp.Set)
+        return L3L2Queue(orch, region, layout, desc_fields, desc_seq, desc_read)
+    except Exception:
+        region.free()
+        raise
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
layout = make_l3_l2_queue_layout(depth, input_arena_bytes, output_arena_bytes)
region = orch.create_l3_l2_region(
worker_id=int(worker_id),
payload_bytes=layout.payload_bytes,
counter_bytes=layout.counter_bytes,
)
desc_fields = orch.alloc([24], DataType.UINT8)
desc_seq = orch.alloc([8], DataType.UINT8)
desc_read = orch.alloc([L3L2_QUEUE_DESC_SLOT_BYTES], DataType.UINT8)
for offset in (
layout.input_desc_tail_offset,
layout.input_desc_head_offset,
layout.output_desc_tail_offset,
layout.output_desc_head_offset,
layout.l3_abort_flag_offset,
layout.l2_abort_flag_offset,
):
region.counter(offset).notify(0, NotifyOp.Set)
return L3L2Queue(orch, region, layout, desc_fields, desc_seq, desc_read)
layout = make_l3_l2_queue_layout(depth, input_arena_bytes, output_arena_bytes)
region = orch.create_l3_l2_region(
worker_id=int(worker_id),
payload_bytes=layout.payload_bytes,
counter_bytes=layout.counter_bytes,
)
try:
desc_fields = orch.alloc([24], DataType.UINT8)
desc_seq = orch.alloc([8], DataType.UINT8)
desc_read = orch.alloc([L3L2_QUEUE_DESC_SLOT_BYTES], DataType.UINT8)
for offset in (
layout.input_desc_tail_offset,
layout.input_desc_head_offset,
layout.output_desc_tail_offset,
layout.output_desc_head_offset,
layout.l3_abort_flag_offset,
layout.l2_abort_flag_offset,
):
region.counter(offset).notify(0, NotifyOp.Set)
return L3L2Queue(orch, region, layout, desc_fields, desc_seq, desc_read)
except Exception:
region.free()
raise
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@python/simpler/l3_l2_message_queue.py` around lines 207 - 225, The L3L2 queue
factory leaks the remote region if construction fails after
`orch.create_l3_l2_region(...)` succeeds, because later `orch.alloc(...)` calls
or counter initialization can exit without a freeable handle. Update the queue
निर्माण path in `l3_l2_message_queue.py` around `make_l3_l2_queue_layout`,
`create_l3_l2_region`, and the `orch.alloc`/`region.counter(...).notify(...)`
sequence to use cleanup on exceptions: keep track of the created region and
ensure it is released if any subsequent step fails before returning `L3L2Queue`.

Comment on lines +121 to +127
static inline uint64_t l3_l2_queue_align_up(uint64_t value, uint64_t align) {
if (align == 0) {
return value;
}
uint64_t remainder = value % align;
return remainder == 0 ? value : value + (align - remainder);
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🔴 Critical | ⚡ Quick win

Make the layout builder fail closed on intermediate overflow.

l3_l2_queue_make_layout() only checks the last addition. If output_desc_offset + desc_ring_bytes or input_arena_offset + input_arena_bytes wraps first, l3_l2_queue_validate_region() can accept a layout whose payload arenas overlap the descriptor rings, and OutputQueue::try_reserve() will hand out writable views into queue metadata. Please check each add/align step before populating L3L2QueueLayout.

Proposed fix
+static inline bool l3_l2_queue_align_up_checked(uint64_t value, uint64_t align, uint64_t *out) {
+    if (out == nullptr) {
+        return false;
+    }
+    if (align == 0) {
+        *out = value;
+        return true;
+    }
+    uint64_t remainder = value % align;
+    uint64_t bump = remainder == 0 ? 0 : align - remainder;
+    if (l3_l2_orch_comm_add_overflows(value, bump)) {
+        return false;
+    }
+    *out = value + bump;
+    return true;
+}
+
 static inline bool
 l3_l2_queue_make_layout(uint64_t depth, uint64_t input_arena_bytes, uint64_t output_arena_bytes, L3L2QueueLayout *out) {
@@
-    uint64_t input_arena_offset =
-        l3_l2_queue_align_up(output_desc_offset + desc_ring_bytes, L3L2_QUEUE_PAYLOAD_ARENA_ALIGNMENT);
-    uint64_t output_arena_offset =
-        l3_l2_queue_align_up(input_arena_offset + input_arena_bytes, L3L2_QUEUE_PAYLOAD_ARENA_ALIGNMENT);
-    if (l3_l2_orch_comm_add_overflows(output_arena_offset, output_arena_bytes)) {
+    uint64_t desc_end = 0;
+    uint64_t input_end = 0;
+    uint64_t input_arena_offset = 0;
+    uint64_t output_arena_offset = 0;
+    if (l3_l2_orch_comm_add_overflows(output_desc_offset, desc_ring_bytes) ||
+        !l3_l2_queue_align_up_checked(output_desc_offset + desc_ring_bytes,
+                                      L3L2_QUEUE_PAYLOAD_ARENA_ALIGNMENT,
+                                      &input_arena_offset) ||
+        l3_l2_orch_comm_add_overflows(input_arena_offset, input_arena_bytes) ||
+        !l3_l2_queue_align_up_checked(input_arena_offset + input_arena_bytes,
+                                      L3L2_QUEUE_PAYLOAD_ARENA_ALIGNMENT,
+                                      &output_arena_offset) ||
+        l3_l2_orch_comm_add_overflows(output_arena_offset, output_arena_bytes)) {
         return false;
     }

Also applies to: 142-149

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/common/platform/include/aicpu/l3_l2_message_queue.h` around lines 121 -
127, The layout builder in l3_l2_queue_make_layout() can overflow on
intermediate offset/size calculations before the final bounds check. Add
fail-closed overflow validation for each add/align step when computing
output_desc_offset, input_desc_offset, input_arena_offset, and the aligned sizes
before writing into L3L2QueueLayout. Reuse l3_l2_queue_align_up() and ensure
l3_l2_queue_validate_region() only sees fully verified, non-wrapping offsets so
OutputQueue::try_reserve() cannot expose metadata as writable payload.

Comment on lines +322 to +350
*out = L3L2QueueInputHandle{slot.seq, opcode, slot.payload_offset, slot.payload_nbytes, view};
active_ = true;
active_seq_ = slot.seq;
return true;
}

bool release(const L3L2QueueInputHandle &handle) {
if (!parent_->ensure_live("input.release")) {
return false;
}
if (!active_ || handle.seq != active_seq_ || handle.seq != parent_->input_head_ + 1) {
parent_->poison(L3L2QueueErrorKind::OWNERSHIP, "input.release", "input handle is not active");
return false;
}
if (handle.payload_nbytes != 0) {
parent_->advance_payload_head(
parent_->input_payload_head_, handle.payload_offset, handle.payload_nbytes,
parent_->layout_.input_arena_offset, parent_->layout_.input_arena_bytes, "input.release"
);
if (parent_->error_.kind != L3L2QueueErrorKind::NONE) {
return false;
}
}
parent_->input_head_ += 1;
if (handle.opcode == L3L2QueueOpcode::STOP) {
stopped_ = true;
}
active_ = false;
active_seq_ = 0;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win

Don’t trust caller-mutated fields in input.release().

try_peek() remembers only active_seq_, but release() advances input_payload_head_ and toggles stopped_ from the caller-provided handle. Mutating handle.payload_offset, handle.payload_nbytes, or handle.opcode between peek and release can corrupt arena replay or synthesize a STOP. Cache the active descriptor fields inside InputQueue and release against that cached state instead.

Proposed fix
             *out = L3L2QueueInputHandle{slot.seq, opcode, slot.payload_offset, slot.payload_nbytes, view};
             active_ = true;
             active_seq_ = slot.seq;
+            active_opcode_ = opcode;
+            active_payload_offset_ = slot.payload_offset;
+            active_payload_nbytes_ = slot.payload_nbytes;
             return true;
         }
@@
-            if (!active_ || handle.seq != active_seq_ || handle.seq != parent_->input_head_ + 1) {
+            if (!active_ || handle.seq != active_seq_ || handle.seq != parent_->input_head_ + 1 ||
+                handle.opcode != active_opcode_ || handle.payload_offset != active_payload_offset_ ||
+                handle.payload_nbytes != active_payload_nbytes_) {
                 parent_->poison(L3L2QueueErrorKind::OWNERSHIP, "input.release", "input handle is not active");
                 return false;
             }
-            if (handle.payload_nbytes != 0) {
+            if (active_payload_nbytes_ != 0) {
                 parent_->advance_payload_head(
-                    parent_->input_payload_head_, handle.payload_offset, handle.payload_nbytes,
+                    parent_->input_payload_head_, active_payload_offset_, active_payload_nbytes_,
                     parent_->layout_.input_arena_offset, parent_->layout_.input_arena_bytes, "input.release"
                 );
                 if (parent_->error_.kind != L3L2QueueErrorKind::NONE) {
                     return false;
                 }
             }
             parent_->input_head_ += 1;
-            if (handle.opcode == L3L2QueueOpcode::STOP) {
+            if (active_opcode_ == L3L2QueueOpcode::STOP) {
                 stopped_ = true;
             }
             active_ = false;
             active_seq_ = 0;
+            active_opcode_ = L3L2QueueOpcode::INVALID;
+            active_payload_offset_ = 0;
+            active_payload_nbytes_ = 0;
             return parent_->notify_counter(
                 parent_->layout_.input_desc_head_offset, static_cast<int32_t>(parent_->input_head_), "input.release"
             );
@@
         bool active_{false};
         uint64_t active_seq_{0};
+        L3L2QueueOpcode active_opcode_{L3L2QueueOpcode::INVALID};
+        uint64_t active_payload_offset_{0};
+        uint64_t active_payload_nbytes_{0};
         bool stopped_{false};

Also applies to: 356-360

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/common/platform/include/aicpu/l3_l2_message_queue.h` around lines 322 -
350, The input.release path in L3L2QueueInputQueue is trusting mutable fields
from L3L2QueueInputHandle, which lets callers alter payload offsets, sizes, or
opcode after try_peek() and before release(). Cache the active descriptor state
inside InputQueue when the handle is acquired, then have release() validate and
advance using the cached values instead of the caller-provided handle fields;
keep active_seq_ as the ownership check, but use the cached payload and opcode
data to update input_payload_head_ and stopped_ safely.

@ccyywwen ccyywwen force-pushed the l3-l2-message-queue-example branch 4 times, most recently from 104e54b to e931689 Compare June 30, 2026 06:45
ccyywwen added 2 commits June 30, 2026 15:26
- Fail closed on queue layout uint64 overflow in C++ and Python mirror calculations

- Validate cached L2 input handle metadata before release and use cached descriptor state

- Gate C++ spin-loop timer reads and clean up Python regions on partial construction failure
- Add the user-facing L3-L2 message queue documentation.

- Link the primitive L3-L2 orchestration communication doc to the queue wrapper doc.

- Remove the design-stage document from the branch while leaving the local copy available for follow-up work.
@ccyywwen ccyywwen force-pushed the l3-l2-message-queue-example branch from e931689 to fc2b8d1 Compare June 30, 2026 08:05
ccyywwen added 4 commits July 1, 2026 11:02
- Add strict payload and counter size checks to the L3-L2 queue task args.

- Validate L2 input payload offsets before exposing payload views.

- Document timeout, layout, and queue free semantics, and expand no-hardware tests.
- Allow ordinary host buffers through lazy queue staging while keeping
  registered tensors on the no-staging fast path.
- Add a tensormap_and_ringbuffer example that streams DATA and STOP
  messages through the L3-L2 queue on sim and onboard platforms.
- Keep dynamic tensor-like host buffer access explicit for pyright.
- Build the example child callable across both positional and
  arg_index binding variants.
@ccyywwen ccyywwen force-pushed the l3-l2-message-queue-example branch from fc2b8d1 to 2dcf096 Compare July 1, 2026 03:44
@ccyywwen ccyywwen force-pushed the l3-l2-message-queue-example branch from 2dcf096 to 08cd240 Compare July 1, 2026 06:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant