Skip to content

Rewrite non-L3 Qwen3 kernels through L3 worker#22

Open
ndleslx wants to merge 1 commit into
hw-native-sys:mainfrom
ndleslx:runner-l3-worker
Open

Rewrite non-L3 Qwen3 kernels through L3 worker#22
ndleslx wants to merge 1 commit into
hw-native-sys:mainfrom
ndleslx:runner-l3-worker

Conversation

@ndleslx

@ndleslx ndleslx commented Jun 3, 2026

Copy link
Copy Markdown
Collaborator

Summary

  • dispatch non-L3 compiled kernels through a wrapped Worker(level=3) using submit_next_level
  • defer child-memory tensor materialization into the L3 orchestration callback and pre-share host tensors before worker init
  • keep full KV host tensors tagged as INOUT for prefill/decode, without compact KV
  • add one-shot L3 worker discard cleanup to avoid the Simpler child-process lifecycle hang tracked in [Bug] L3 Worker leaves chip child defunct after submit_next_level run simpler#980
  • add focused unit tests for L3 worker submission, KV tagging, and runtime switching cleanup

Verification

  • python -m pytest tests/test_npu_runner_l3_worker.py tests/test_batching.py tests/test_cli.py -q
  • ruff check --config ruff.toml examples/model/qwen3_14b/runner/npu_runner.py python/runtime/worker.py tests/test_npu_runner_l3_worker.py
  • git diff --check
  • Offline generation pass with large ring settings:
task-submit --device auto --max-time 0 --run "PTO2_RING_HEAP=4294967296 PTO2_RING_TASK_WINDOW=1048576 PTO2_RING_DEP_POOL=1048576 python examples/model/qwen3_14b/npu_generate.py --model-dir /data/linyifan/models/Qwen3-14B --prompt 'Huawei is' --platform a2a3 --max-seq-len 512 --max-new-tokens 5"

Generated text:

text:  a Chinese company. The
token_ids: [264, 8453, 2813, 13, 576]
finish_reason: length

Notes

The smaller default ring settings still fail in prefill with AICPU 507018 for this path:

PTO2_RING_HEAP=536870912 PTO2_RING_TASK_WINDOW=131072 PTO2_RING_DEP_POOL=131072

@coderabbitai

coderabbitai Bot commented Jun 3, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Walkthrough

This PR implements L3 hierarchical orchestration support for chip program execution with deferred tensor allocation. The Worker class gains L3 submission APIs and orchestrator-aware memory operations. The npu_runner extends L2→L3 execution paths to defer child-memory tensor allocation/upload until inside L3 orchestration scope, enabling safer lifecycle management and post-submit device-to-host synchronization.

Changes

L3 Orchestration and Deferred Tensor Handling

Layer / File(s) Summary
Worker L3 orchestration foundation
python/runtime/worker.py
Exposes chip_contexts passthrough; adds submit_next_level(...) and run_next_level(...) for L3 submission from inside orchestration callbacks; updates malloc, free, copy_to, copy_from to accept optional orchestrator parameter for delegation; implements discard_l3_children() for best-effort cleanup of level-3 worker resources after chip child exit (mailbox shutdown, PID wait, shared memory teardown).
Deferred tensor data model and L3 resolution
examples/model/qwen3_14b/runner/npu_runner.py
Adds _DeferredChildTensor and _PostSubmitCopy dataclasses to represent CPU-backed tensors and sync-back records; implements _resolve_l3_child_tensor(...) to allocate/reuse and validate tensors inside L3 scope with optional post-submit copy tracking; adds _share_l2_host_args(...) to pre-L3 storage sharing; updates close()/worker teardown to track and free runtime-local child allocations and cached program handles.
L2 program orchestration refactor
examples/model/qwen3_14b/runner/npu_runner.py
Reworks _run_l2_program(...) to execute compiled chip callables through L3 orchestration: shares host arg mappings before L3 init, ensures/registers callable on correct runtime, builds orchestration task arguments with deferred tensor resolution, submits chip task with post-submit copy handling, and discards one-shot L3 worker state.
Callable registration and child tensor API
examples/model/qwen3_14b/runner/npu_runner.py
Updates L2 program registration flow to initialize L3 workers on first use and pre-register known compiled callables for that runtime; changes _l2_child_tensor(...) to return _DeferredChildTensor with new require_existing control instead of immediate WorkerTensor allocation.
Task argument construction from deferred tensors
examples/model/qwen3_14b/runner/npu_runner.py
Extends _build_l2_orch_args(...) to build TaskArgs using parameter metadata, routing scalars/tensors by kind and recording post-submit copies for deferred inout tensors; adds _l2_tensor_arg_type(...) to compute Simpler tensor argument tags with special INOUT handling for cache tensors.
Decode path integration with deferred tensors
examples/model/qwen3_14b/runner/npu_runner.py
Updates run_decode(...) to remove refresh_kv_cache conditional logic and route KV tensors through _run_l2_program using deferred mechanism; refactors _project_logits(...) to avoid direct L2 child allocation/freeing and conditionally use deferred tensors when final_rms and lm_head share a runtime.
L3 worker behavior test coverage
tests/test_npu_runner_l3_worker.py
Introduces fake L3 worker and orchestrator implementations with operation recording and lifecycle tracking; tests non-L3 programs execute through L3 with correct operation sequence and tensor argument tags; validates KV host tensors use INOUT semantics without extra copyback; confirms runtime switching closes previous L3 worker while creating new ones.

Sequence Diagram(s)

sequenceDiagram
  participant Runner as Qwen314B<br/>ModelRunner
  participant Worker as L3<br/>Worker
  participant Orch as Orchestrator<br/>Callback
  participant Simpler as Simpler<br/>Worker
  Runner->>Runner: _run_l2_program(...)
  Runner->>Runner: _share_l2_host_args(args)
  Runner->>Runner: _ensure_l2_program(callable_spec)
  Runner->>Worker: initialize L3 worker
  Runner->>Simpler: register_known_callables()
  Runner->>Simpler: register_callable(callable_id)
  Runner->>Runner: _build_l2_orch_args(spec, args, worker, orchestrator)
  Runner->>Runner: _resolve_l3_child_tensor(deferred, worker, orchestrator)
  Runner->>Worker: run_next_level(callable_id, orch_args)
  Worker->>Orch: submit_next_level(callable_id, orch_args, orchestrator)
  Orch->>Simpler: run(orch_args)
  Simpler-->>Orch: result
  Orch-->>Worker: orchestration complete
  Worker-->>Runner: result
  Runner->>Runner: post_submit_copies: copy_from(dst, src, nbytes)
  Runner->>Worker: discard_l3_children()
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related issues

Possibly related PRs

  • hw-native-sys/pypto-serving#14: Both PRs modify examples/model/qwen3_14b/runner/npu_runner.py's logits projection path (_project_logits) and its handling/requirements for final_rms/lm_head, so the changes are directly coupled.

Poem

🐰 A rabbit hops through L3 queues,
Deferring tensors 'til they're used,
Child workers tidy, orchards bloom,
Where once lived chaos, order looms!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 54.39% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Rewrite non-L3 Qwen3 kernels through L3 worker' directly and accurately summarizes the main change: dispatching non-L3 compiled kernels through a wrapped L3 worker, which is the primary objective of this PR.
Description check ✅ Passed The description provides clear, actionable details about the changes including deferred tensor handling, KV tagging, worker cleanup, and test coverage, all of which directly relate to the changeset.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the Qwen3-14B NPU runner to support deferred child tensor materialization and post-submit device-to-host copies within L3 orchestration scopes, alongside adding cleanup mechanisms for one-shot L3 workers. The review feedback highlights three critical issues: first, the reuse of the deferred child tensor normed is broken because the worker and its cached allocations are discarded at the end of each _run_l2_program execution; second, the blocking os.waitpid call during worker cleanup can cause the main process to hang indefinitely if a child process is deadlocked; and third, the type check for identifying the final output tensor fails to account for _DeferredChildTensor and WorkerTensor types, incorrectly classifying them as inputs.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +328 to 334
if isinstance(normed_arg, _DeferredChildTensor):
lm_head_input = self._l2_child_tensor(
compiled.lm_head.runtime_name,
normed,
self._l2_child_tensor(compiled.lm_head.runtime_name, compiled.padded_lm_head_weight),
logits_padded,
upload=False,
require_existing=True,
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The reuse of the deferred child tensor normed via require_existing=True is broken because _run_l2_program discards the worker at the end of every execution.

Specifically:

  1. _run_l2_program(compiled.final_rms, ...) is called with normed_arg (a _DeferredChildTensor).
  2. _resolve_l3_child_tensor allocates the tensor on the worker and caches it in self._l2_child_allocs.
  3. In the finally block of _run_l2_program, self._discard_l2_worker(handle.runtime_name) is called.
  4. _discard_l2_worker pops the worker from self._l2_workers, deletes the cached allocation from self._l2_child_allocs, and discards the worker's child processes (worker.discard_l3_children()).
  5. _run_l2_program(compiled.lm_head, ...) is called with lm_head_input (which has require_existing=True).
  6. _resolve_l3_child_tensor attempts to look up the existing allocation, but it has been deleted and the worker discarded. It then raises RuntimeError("missing worker-resident tensor allocation").

To fix this, the lifecycle of the L3 worker and its child allocations needs to persist across these dependent L2 program runs, or both operations should be submitted within a single orchestration session before discarding the worker.

Comment thread python/runtime/worker.py
Comment on lines +247 to +255
for pid in list(getattr(impl, "_sub_pids", [])) + list(getattr(impl, "_chip_pids", [])) + list(
getattr(impl, "_next_level_pids", [])
):
try:
os.waitpid(int(pid), 0)
except ChildProcessError:
pass
except OSError:
pass

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Calling os.waitpid(int(pid), 0) is a blocking call. If any of the child processes (sub-workers, chip workers, or next-level workers) are hung or deadlocked (which is the exact scenario tracked in simpler#980), the main process will hang indefinitely here.

To prevent the main process from hanging, consider using os.waitpid(int(pid), os.WNOHANG) in a loop with a timeout, or sending a termination signal (e.g., SIGTERM or SIGKILL) to the process if it does not exit gracefully within a reasonable timeframe.

Suggested change
for pid in list(getattr(impl, "_sub_pids", [])) + list(getattr(impl, "_chip_pids", [])) + list(
getattr(impl, "_next_level_pids", [])
):
try:
os.waitpid(int(pid), 0)
except ChildProcessError:
pass
except OSError:
pass
for pid in list(getattr(impl, "_sub_pids", [])) + list(getattr(impl, "_chip_pids", [])) + list(
getattr(impl, "_next_level_pids", [])
):
try:
pid_val = int(pid)
res, _ = os.waitpid(pid_val, os.WNOHANG)
if res == 0:
try:
os.kill(pid_val, 9)
os.waitpid(pid_val, 0)
except OSError:
pass
except ChildProcessError:
pass
except OSError:
pass

Comment on lines +650 to +651
if isinstance(arg, torch.Tensor) and arg_index == arg_count - 1:
return tensor_arg_type.OUTPUT_EXISTING

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The check isinstance(arg, torch.Tensor) will evaluate to False if the last argument is a _DeferredChildTensor or a WorkerTensor. This means that if the last argument is one of these types, it will not be automatically classified as OUTPUT_EXISTING and will instead fall back to INPUT, which can cause runtime errors or incorrect execution.

We should update the check to include _DeferredChildTensor and WorkerTensor.

Suggested change
if isinstance(arg, torch.Tensor) and arg_index == arg_count - 1:
return tensor_arg_type.OUTPUT_EXISTING
if isinstance(arg, (torch.Tensor, _DeferredChildTensor, WorkerTensor)) and arg_index == arg_count - 1:
return tensor_arg_type.OUTPUT_EXISTING

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@python/runtime/worker.py`:
- Around line 247-255: The current cleanup loop calling os.waitpid(pid, 0) can
block indefinitely; change it to poll non‑blocking using os.waitpid(pid,
os.WNOHANG) inside a bounded retry/timeout loop: for each pid from
impl._sub_pids/_chip_pids/_next_level_pids call os.waitpid with WNOHANG and if
it returns (0, 0) sleep briefly and retry until a deadline (use time.time()),
then if still alive attempt os.kill(pid, signal.SIGTERM) and after another short
deadline escalate to os.kill(pid, signal.SIGKILL); keep handling
ChildProcessError/OSError as before and ensure the loop breaks when waitpid
reports the child reaped or errors.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: e5ffe8af-c374-42ac-b48c-1ac1af8d7d60

📥 Commits

Reviewing files that changed from the base of the PR and between 970312c and b2080dc.

📒 Files selected for processing (3)
  • examples/model/qwen3_14b/runner/npu_runner.py
  • python/runtime/worker.py
  • tests/test_npu_runner_l3_worker.py

Comment thread python/runtime/worker.py
Comment on lines +247 to +255
for pid in list(getattr(impl, "_sub_pids", [])) + list(getattr(impl, "_chip_pids", [])) + list(
getattr(impl, "_next_level_pids", [])
):
try:
os.waitpid(int(pid), 0)
except ChildProcessError:
pass
except OSError:
pass

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

os.waitpid(..., 0) blocks indefinitely if a child process hangs.

If a chip child process becomes unresponsive (deadlock, infinite loop, etc.), this cleanup will block the caller forever. Consider using os.WNOHANG with a bounded retry/timeout loop, or wrapping in a timeout mechanism.

🛠️ Suggested non-blocking approach
         for pid in list(getattr(impl, "_sub_pids", [])) + list(getattr(impl, "_chip_pids", [])) + list(
             getattr(impl, "_next_level_pids", [])
         ):
             try:
-                os.waitpid(int(pid), 0)
+                # Non-blocking check; child may already be reaped or still running.
+                os.waitpid(int(pid), os.WNOHANG)
             except ChildProcessError:
                 pass
             except OSError:
                 pass
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@python/runtime/worker.py` around lines 247 - 255, The current cleanup loop
calling os.waitpid(pid, 0) can block indefinitely; change it to poll
non‑blocking using os.waitpid(pid, os.WNOHANG) inside a bounded retry/timeout
loop: for each pid from impl._sub_pids/_chip_pids/_next_level_pids call
os.waitpid with WNOHANG and if it returns (0, 0) sleep briefly and retry until a
deadline (use time.time()), then if still alive attempt os.kill(pid,
signal.SIGTERM) and after another short deadline escalate to os.kill(pid,
signal.SIGKILL); keep handling ChildProcessError/OSError as before and ensure
the loop breaks when waitpid reports the child reaped or errors.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant