From 2e85364e415bb9b62d777e3e22e06b5184a41a47 Mon Sep 17 00:00:00 2001 From: majin0824 Date: Mon, 29 Jun 2026 14:35:11 +0800 Subject: [PATCH] feat(runtime): L3 post-fork host-buffer registration (#1027) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Host tensors created after the L3 chip children are forked (lazily on the first run) were invisible to those children, forcing serving to preallocate all buffers before worker creation. Add Worker.register_host_buffer / unregister_host_buffer: a named shm is mapped into every chip child and the per-task mailbox blob's host pointers are rewritten to the child's own mapping before the runtime dereferences them, so a later run can H2D/D2H through it. Pure Python (worker.py / orchestrator.py) — no runtime C++ change. submit_next_level validates host-tensor visibility and raises an actionable error for an unregistered post-fork tensor (fork-inherited vs post-fork is decided by backing inode + fork VA snapshot). Lifetime/visibility contract documented in comm-domain.md; one a2a3 scene test covers the mechanism and the error path. --- docs/comm-domain.md | 96 ++- python/simpler/orchestrator.py | 25 + python/simpler/worker.py | 733 +++++++++++++++++- .../test_l3_host_buffer_registration.py | 137 ++++ .../test_host_buffer_registration.py | 230 ++++++ 5 files changed, 1214 insertions(+), 7 deletions(-) create mode 100644 tests/st/a2a3/tensormap_and_ringbuffer/test_l3_host_buffer_registration.py create mode 100644 tests/ut/py/test_worker/test_host_buffer_registration.py diff --git a/docs/comm-domain.md b/docs/comm-domain.md index 9d93635e6..21ad9979d 100644 --- a/docs/comm-domain.md +++ b/docs/comm-domain.md @@ -148,7 +148,101 @@ with orch.allocate_domain(...) as handle: --- -## 6. Examples +## 6. Host tensor visibility for `worker.run` + +A host tensor passed to `worker.run(...)` / `orch.submit_next_level(...)` is +ultimately dereferenced from the forked chip child, not the parent. For the +design rationale behind the choices below (copy vs zero-copy, explicit +registration, the procfs/inode classification, and the alternatives that were +deferred) see [`host-buffer-registration-design.md`](host-buffer-registration-design.md). +Either the +tensor's own storage must already be reachable there (fork-inherited), or +`worker.register_host_buffer(...)` must provide a child-visible shm mirror — in +the registered case the tensor storage itself need not be child-visible. Three +sources are legal: + +| Source | How | Why it works | +| ------ | --- | ------------ | +| **fork-inherited** | `tensor.share_memory_()` **before the chip children are forked** (i.e. before the first `Worker.run()`) | the child inherits the MAP_SHARED page at fork | +| **registered post-fork** | `worker.register_host_buffer(tensor)` after the chips exist | maps a shm into every child for the buffer's lifetime | +| anything else | — | raises an actionable error before dispatch | + +The chip children are forked lazily on the **first** `run()`. A host tensor +created after that — the natural dynamic-shape serving pattern — is invisible to +the children unless registered: + +```python +worker = Worker(level=3, ...); worker.register(chip); worker.init() +worker.run(orch0, ...) # forks the chips + +hidden = torch.empty((tokens, hidden_size)).share_memory_() # created post-fork +out = torch.empty((batch, vocab)).share_memory_() +h_hidden = worker.register_host_buffer(hidden) # map into every chip child +h_out = worker.register_host_buffer(out) +try: + for batch in batches: + fill(hidden); worker.run(orch, ...) # H2D copy-in, D2H copy-out per run + use(out) +finally: + worker.unregister_host_buffer(h_hidden) + worker.unregister_host_buffer(h_out) +``` + +**Register once, reuse many runs.** Registration maps a shm into each child and +keeps it mapped; every `run` mirrors the tensor through it (H2D copy-in before +the task, D2H copy-out after the run drains). Register the buffer — or a +fixed-size superset you sub-slice — once and reuse it; re-registering per run +pays a map/unmap broadcast each time. A sub-view (slice) of a registered buffer +is resolved automatically. + +**Error path.** An unregistered post-fork host tensor raises before any dispatch: + +> Host tensor 0x… is not visible to the L3 chip child (created after fork, not +> registered). Call worker.register_host_buffer(tensor) before run(), or allocate +> it with .share_memory_() before init(). + +### Scope / limits (v1) + +- **memcpy, not zero-copy.** A registered buffer is a *separate* shm; each run + copies `tensor → shm` (in) and `shm → tensor` (out). For a large hot-path + tensor this is a double copy. True zero-copy (mapping the tensor's own storage) + is a later optimization. +- **`orch.copy_to` is the unmanaged low-level path.** Registration covers the + `run` / `submit_next_level` host-tensor args (the post-fork host-tensor + scenario). The + explicit `orch.copy_to(src=tensor.data_ptr())` staging path (§5) is *not* + translated or validated by `register_host_buffer` — its `src` must still be + fork-inherited (`.share_memory_()` before the chip children are forked, i.e. + before the first `run()`). +- **Anonymous post-fork heap.** A *large* post-fork `torch.empty` (its own mmap) + is correctly rejected. A *small* non-shared tensor the allocator sub-slices out + of a fork-time heap arena can slip past the check (anonymous, inside a fork + range) and read stale data in the child — always `share_memory_` or register + host tensors used for chip dispatch. +- **Non-procfs platforms (e.g. macOS).** The reachability check reads + `/proc/self/maps`; where it is unavailable the fork snapshot is empty, so an + unregistered host tensor cannot be classified and is **passed through + unvalidated** (rather than rejected) — a fork-inherited tensor is the common + legitimate case and must keep working. The first such pass-through emits a + one-time `UserWarning`; the caller is then responsible for ensuring the tensor + is fork-inherited or registered. Error path C above is enforced only where + procfs exists (Linux, including onboard). +- **No in-run produce-then-consume of the same registered buffer.** Copy-in + (`tensor → shm`) runs per `submit_next_level`, while tasks may already be in + flight before `drain()`. If one task writes a registered buffer and a later + dependent task in the *same* `run` reads it, the consumer's copy-in can overwrite + the producer's result with the stale parent tensor. Use a registered buffer as a + run input *or* a run output, not as an intermediate handed between tasks within + one run; chain results through device buffers instead. +- **Fork-inherited anonymous memory is copy-on-write, hence stale.** Even a tensor + the child legitimately inherited is only useful as a *live* input if it is + MAP_SHARED: anonymous (non-`share_memory_`) pages are COW, so writes the parent + makes *after* fork do not reach the child. A live input must be file-backed + (`.share_memory_()` before `init()`) or registered. + +--- + +## 7. Examples - `examples/workers/l3/allreduce_distributed/` — single domain, PTO-ISA remote reads over the window. diff --git a/python/simpler/orchestrator.py b/python/simpler/orchestrator.py index 87ec02e16..c64c9522c 100644 --- a/python/simpler/orchestrator.py +++ b/python/simpler/orchestrator.py @@ -187,6 +187,19 @@ def submit_next_level( raise TypeError("RemoteTensorRef is only supported for RemoteCallable NEXT_LEVEL submits") remote_sidecar = None _validate_remote_sidecar_access(c_args, remote_sidecar) + # Mirror any registered post-fork host buffers into their shm (H2D) and + # reject host tensors the chip child cannot see (issue #1027). Only the + # LOCAL_CHIP path dereferences raw host pointers in the forked child. + if target_namespace == "LOCAL_CHIP" and self._worker is not None: + staged_len = len(self._worker._pending_host_copyback) + try: + self._worker._stage_host_buffers_for_chip_submit(c_args) + except BaseException: + # Staging appends D2H copybacks as it walks; if it aborts the + # submit never happens, so drop the copybacks it queued or a later + # clean drain would flush stale shm into those outputs. + del self._worker._pending_host_copyback[staged_len:] + raise final_worker_ids = _remote_data_eligible_worker_ids(remote_sidecar, eligible_worker_ids) cpp_worker_id = int(worker) captured_refs = self._worker._capture_remote_sidecar_refs(remote_sidecar) if self._worker is not None else [] @@ -249,6 +262,18 @@ def submit_next_level_group( if remote_sidecars is not None: for c_args, remote_sidecar in zip(c_args_list, remote_sidecars): _validate_remote_sidecar_access(c_args, remote_sidecar) + # Stage + validate registered/post-fork host buffers for chip dispatch + # (issue #1027), same as the single submit path. + if target_namespace == "LOCAL_CHIP" and self._worker is not None: + staged_len = len(self._worker._pending_host_copyback) + try: + for c_args in c_args_list: + self._worker._stage_host_buffers_for_chip_submit(c_args) + except BaseException: + # Roll back copybacks queued for every c_args staged so far; the + # group submit never happens, so none of them must survive. + del self._worker._pending_host_copyback[staged_len:] + raise worker_id_sets = ( [ _remote_data_eligible_worker_ids(remote_sidecar, eligible_worker_ids) diff --git a/python/simpler/worker.py b/python/simpler/worker.py index 3b7b92c3e..3384e02ce 100644 --- a/python/simpler/worker.py +++ b/python/simpler/worker.py @@ -59,6 +59,7 @@ def my_l4_orch(orch, args, config): from __future__ import annotations +import bisect import ctypes import importlib import json @@ -70,6 +71,7 @@ def my_l4_orch(orch, args, config): import sys import threading import uuid +import warnings from dataclasses import dataclass from multiprocessing.shared_memory import SharedMemory from typing import Any @@ -112,6 +114,7 @@ def my_l4_orch(orch, args, config): RemoteBufferExport, RemoteBufferHandle, TaskArgs, + TensorArgType, _Worker, ) @@ -208,6 +211,32 @@ def my_l4_orch(orch, args, config): _CTRL_PY_UNREGISTER = 11 _CTRL_PY_IMPORT_REGISTER = 12 _CTRL_L3_L2_ORCH_COMM_INIT = 13 +# Host-buffer registration. MAP_HOST maps a named host-buffer shm +# into every chip child *post-fork* and keeps it mapped so later runs can copy +# through it; UNMAP_HOST drops one. The child also records the parent VA range +# the shm stands in for, so the per-task blob's host pointers (raw parent VAs) +# can be rewritten to the child's own mapping before the runtime dereferences +# them. Unlike _CTRL_REGISTER (one-shot H2D then close), these mappings persist +# for the buffer's registered lifetime — see docs/comm-domain.md. +_CTRL_MAP_HOST = 14 +_CTRL_UNMAP_HOST = 15 + +# MAP_HOST payload: token (u64), parent_va (u64), nbytes (u64), then the +# NUL-free host-buffer shm name as the trailing bytes. UNMAP_HOST payload is the +# token alone. +_HOST_BUF_MAP_HEADER = struct.Struct(" list[tuple[int, int, int]]: + """Sorted ``(lo, hi, inode)`` for every mapping in this process. + + ``inode`` is the backing file's inode (0 for anonymous mappings: heap, stack, + the HeapRing). Distinguishing a fork-inherited host buffer from a post-fork + one needs the inode, not just the VA: torch frees a pre-fork ``share_memory_`` + tensor and the allocator hands its address back to a *new* shared buffer, so + the VA alone false-matches the fork snapshot — but the new buffer is backed by + a different shm inode (error path C). + + Returns an empty list when ``/proc/self/maps`` is unavailable (non-Linux, or a + sandbox without procfs). Callers treat an empty fork snapshot as "reachability + cannot be classified" and pass an unregistered host tensor through unvalidated + (fork-inheritance is the legitimate common case there); error path C is only + enforced where procfs exists. Registration does not depend on this snapshot. + """ + try: + with open("/proc/self/maps", "rb") as fh: + data = fh.read() + except OSError: + return [] + # The parse (a few hundred lines, three int() conversions each) dominates; + # reading the file does not. In steady state the mapping table is byte-for-byte + # identical between submits, so reuse the previous parse whenever the raw bytes + # are unchanged and re-parse only when something actually mmap'd/unmapped. Any + # byte change invalidates the cache, so a freed-and-reused VA (error path + # C) is never served a stale inode. Callers only read the returned + # list, so sharing the cached object is safe. + if data == _SELF_MAPS_CACHE["raw"]: + return _SELF_MAPS_CACHE["parsed"] + maps: list[tuple[int, int, int]] = [] + for raw in data.splitlines(): + # Fields: address(lo-hi) perms offset dev inode pathname. Cap the split at + # the inode column (maxsplit=5) so the pathname tail — the longest, + # sometimes space-containing column — is never tokenized. + fields = raw.split(None, 5) + if len(fields) < 5: + continue + addr_range = fields[0] + dash = addr_range.find(b"-") + if dash < 0: + continue + try: + lo = int(addr_range[:dash], 16) + hi = int(addr_range[dash + 1 :], 16) + inode = int(fields[4]) + except ValueError: + continue + maps.append((lo, hi, inode)) + maps.sort() + _SELF_MAPS_CACHE["raw"] = data + _SELF_MAPS_CACHE["parsed"] = maps + return maps + + +def _va_lookup_inode(maps: list[tuple[int, int, int]], addr: int) -> int | None: + """Inode of the mapping containing ``addr``, or None if ``addr`` is unmapped.""" + if not maps: + return None + idx = bisect.bisect_right(maps, (addr, _UINT64_MAX, _UINT64_MAX)) - 1 + if idx < 0: + return None + lo, hi, inode = maps[idx] + return inode if lo <= addr < hi else None + + +def _fork_range_covers(maps: list[tuple[int, int, int]], addr: int, nbytes: int, inode: int) -> bool: + """True if ``[addr, addr+nbytes)`` lies entirely within one fork-captured + mapping whose inode equals ``inode`` (error path C). + + Matching the inode alone is not enough: a post-fork re-mmap of a fork-inherited + file shares the inode but lands at a VA the child never inherited. Requiring + full VA coverage rejects that, and treats file-backed and anonymous (inode 0) + mappings the same. ``maps`` is sorted and non-overlapping, so the candidate + containing ``addr`` is unique. + """ + if not maps: + return False + idx = bisect.bisect_right(maps, (addr, _UINT64_MAX, _UINT64_MAX)) - 1 + if idx < 0: + return False + lo, hi, finode = maps[idx] + return finode == inode and lo <= addr and addr + nbytes <= hi + + +def _rewrite_blob_host_addrs(buf: memoryview, blob_off: int, ranges: list[tuple[int, int, int]]) -> None: + """Redirect registered host pointers in a task-args blob to child mappings. + + ``ranges`` is ``(parent_lo, parent_hi, child_base)`` for each host buffer the + child has mapped via _CTRL_MAP_HOST. For every tensor whose ``buffer.addr`` + (a parent VA) lands in a registered range, rewrite it in place to + ``child_base + (addr - parent_lo)`` so the runtime dereferences the child's + own mapping. Tensors outside every range (fork-inherited or child-allocated) + are left untouched. See _BLOB_TENSOR_STRIDE for the wire layout. + """ + tensor_count = struct.unpack_from(" str: + """Decode the staged-payload shm name a broadcast_control_all left at _OFF_ARGS.""" + raw = bytes(buf[_OFF_ARGS : _OFF_ARGS + _CTRL_SHM_NAME_BYTES]) + nul = raw.find(b"\x00") + return raw[: nul if nul >= 0 else _CTRL_SHM_NAME_BYTES].decode("utf-8", "replace") + + +def _shm_base_addr(shm: SharedMemory) -> int: + """Mapped base address of ``shm``. The mapping outlives the temporary buffer + view, so the address stays valid until ``shm.close()``.""" + view = shm.buf + assert view is not None + exporter = ctypes.c_char.from_buffer(view) + addr = ctypes.addressof(exporter) + del exporter + return addr + + +def _rebuild_host_buf_ranges( + host_buf_table: dict[int, tuple[SharedMemory, int, int, int]], host_buf_ranges: list[tuple[int, int, int]] +) -> None: + host_buf_ranges.clear() + for _shm, lo, hi, base in host_buf_table.values(): + host_buf_ranges.append((lo, hi, base)) + + +def _handle_ctrl_map_host( + buf: memoryview, + host_buf_table: dict[int, tuple[SharedMemory, int, int, int]], + host_buf_ranges: list[tuple[int, int, int]], +) -> None: + """Child handler for _CTRL_MAP_HOST: persist a host-buffer mapping. + + The staged payload is ``token, parent_va, nbytes`` followed by the host + buffer's shm name. Map that shm and remember the parent VA range it stands + in for so the per-task blob rewrite can redirect host pointers to this base. + """ + payload_size = struct.unpack_from("Q", buf, _CTRL_OFF_ARG0)[0] + staged = SharedMemory(name=_read_ctrl_staged_shm_name(buf)) + try: + staged_buf = staged.buf + assert staged_buf is not None + payload = bytes(staged_buf[:payload_size]) + finally: + staged.close() + token, parent_va, nbytes = _HOST_BUF_MAP_HEADER.unpack_from(payload, 0) + host_shm_name = payload[_HOST_BUF_MAP_HEADER.size :].decode("utf-8") + prior = host_buf_table.pop(token, None) + if prior is not None: + prior[0].close() + host_shm = SharedMemory(name=host_shm_name) + host_buf_table[token] = (host_shm, parent_va, parent_va + nbytes, _shm_base_addr(host_shm)) + _rebuild_host_buf_ranges(host_buf_table, host_buf_ranges) + + +def _handle_ctrl_unmap_host( + buf: memoryview, + host_buf_table: dict[int, tuple[SharedMemory, int, int, int]], + host_buf_ranges: list[tuple[int, int, int]], +) -> None: + """Child handler for _CTRL_UNMAP_HOST: drop a host-buffer mapping by token.""" + payload_size = struct.unpack_from("Q", buf, _CTRL_OFF_ARG0)[0] + staged = SharedMemory(name=_read_ctrl_staged_shm_name(buf)) + try: + staged_buf = staged.buf + assert staged_buf is not None + token = _HOST_BUF_UNMAP.unpack_from(bytes(staged_buf[:payload_size]), 0)[0] + finally: + staged.close() + entry = host_buf_table.pop(token, None) + if entry is not None: + entry[0].close() + _rebuild_host_buf_ranges(host_buf_table, host_buf_ranges) + + def _allocate_local_slot(registry: dict[int, Any]) -> int: for i in range(MAX_REGISTERED_CALLABLE_IDS): if i not in registry: @@ -974,6 +1221,12 @@ def _run_chip_main_loop( # noqa: PLR0912, PLR0913, PLR0915 -- unified TASK_READ """ prepared: set[int] = set() l3_l2_control_shms: list[SharedMemory] = [] + # Post-fork host buffers mapped into this child. `host_buf_table` + # owns the mmap per token (for unmap + teardown); `host_buf_ranges` is the + # parent-VA → child-VA translation table the per-task blob rewrite consults, + # rebuilt from the table on every map/unmap. + host_buf_table: dict[int, tuple[SharedMemory, int, int, int]] = {} # token -> (shm, lo, hi, child_base) + host_buf_ranges: list[tuple[int, int, int]] = [] # (parent_lo, parent_hi, child_base) try: while True: state = _mailbox_load_i32(state_addr) @@ -997,6 +1250,11 @@ def _run_chip_main_loop( # noqa: PLR0912, PLR0913, PLR0915 -- unified TASK_READ f"chip_process dev={device_id}: cid {cid} not prepared before TASK_READY " f"(register via _CTRL_PREPARE first)" ) + # Redirect any registered host pointer (a parent VA) in the + # blob to this child's own mapping before the runtime reads it. + # No-op when nothing is registered. + if host_buf_ranges: + _rewrite_blob_host_addrs(buf, _OFF_TASK_ARGS_BLOB, host_buf_ranges) # Hand the mailbox bytes straight to C++ (zero-copy zero-decode): # the blob layout is what `write_blob` already wrote, so re-parsing # it in Python is N×40B of avoidable work and a permanent @@ -1050,9 +1308,7 @@ def _run_chip_main_loop( # noqa: PLR0912, PLR0913, PLR0915 -- unified TASK_READ elif sub_cmd == _CTRL_REGISTER: digest = _read_control_digest(buf) payload_size = struct.unpack_from("Q", buf, _CTRL_OFF_ARG0)[0] - raw = bytes(buf[_OFF_ARGS : _OFF_ARGS + _CTRL_SHM_NAME_BYTES]) - nul = raw.find(b"\x00") - shm_name = raw[: nul if nul >= 0 else _CTRL_SHM_NAME_BYTES].decode("utf-8", "replace") + shm_name = _read_ctrl_staged_shm_name(buf) shm = SharedMemory(name=shm_name) shm_buf = shm.buf assert shm_buf is not None @@ -1111,6 +1367,10 @@ def _run_chip_main_loop( # noqa: PLR0912, PLR0913, PLR0915 -- unified TASK_READ _handle_ctrl_comm_init(cw, buf) elif sub_cmd == _CTRL_L3_L2_ORCH_COMM_INIT: l3_l2_control_shms.append(_handle_ctrl_l3_l2_orch_comm_init(cw, buf)) + elif sub_cmd == _CTRL_MAP_HOST: + _handle_ctrl_map_host(buf, host_buf_table, host_buf_ranges) + elif sub_cmd == _CTRL_UNMAP_HOST: + _handle_ctrl_unmap_host(buf, host_buf_table, host_buf_ranges) else: raise RuntimeError(f"unknown control sub-command {int(sub_cmd)}") except Exception as e: # noqa: BLE001 @@ -1139,6 +1399,11 @@ def _run_chip_main_loop( # noqa: PLR0912, PLR0913, PLR0915 -- unified TASK_READ control_shm.close() except Exception: # noqa: BLE001 pass + for host_shm, _lo, _hi, _base in host_buf_table.values(): + try: + host_shm.close() + except Exception: # noqa: BLE001 + pass def _chip_process_loop( @@ -1281,9 +1546,7 @@ def _child_worker_loop( if sub_cmd == _CTRL_REGISTER: digest = _read_control_digest(buf) payload_size = struct.unpack_from("Q", buf, _CTRL_OFF_ARG0)[0] - raw = bytes(buf[_OFF_ARGS : _OFF_ARGS + _CTRL_SHM_NAME_BYTES]) - nul = raw.find(b"\x00") - shm_name = raw[: nul if nul >= 0 else _CTRL_SHM_NAME_BYTES].decode("utf-8", "replace") + shm_name = _read_ctrl_staged_shm_name(buf) callable_obj = _read_chip_callable_from_shm(shm_name, int(payload_size)) inner_registered = False try: @@ -1432,6 +1695,46 @@ def __init__( self._live_l3_l2_regions: list[Any] = [] self._l3_l2_orch_comm_host_buffers: dict[int, int] = {} + # Post-fork host-buffer registration. Keyed by the user + # tensor's data_ptr; each entry maps a separate named shm into every chip + # child so a tensor created after the children were forked can still be + # used by a later run. ``_fork_maps`` is the address space the children + # inherited at fork — the authority for distinguishing a fork-inherited + # host pointer (legal, unregistered) from a post-fork one (must be + # registered). ``_pending_host_copyback`` collects the D2H mirror + # (shm → tensor) deferred until after the run drains. + self._host_buf_registry: dict[int, _HostBufEntry] = {} + # ``data_ptr`` keys of ``_host_buf_registry`` kept sorted so the per-task + # submit lookup (``_find_host_buf_entry``) is a bisect instead of a linear + # scan over every registration. Registered buffers are distinct, non- + # overlapping allocations, so the unique candidate for an address is the + # entry with the greatest base <= addr. Mutated under ``_registry_lock`` + # alongside the dict. + self._host_buf_sorted_ptrs: list[int] = [] + self._host_buf_token_counter: int = 0 + # Sorted ``(lo, hi, inode)`` mappings the chip children inherited at fork, + # used to reject an unregistered post-fork host tensor (error path C). A + # submitted tensor is reachable iff its whole ``[addr, addr+nbytes)`` range + # is covered by one of these mappings with a matching inode — matching the + # inode alone is not enough, since a post-fork re-mmap of a fork-inherited + # file lands at a VA the child never inherited (see _fork_range_covers). + # Captured once, immediately before the chip fork. + self._fork_maps: list[tuple[int, int, int]] | None = None + # Per-run cache of the current /proc/self/maps, built lazily the first + # time a run hits an unregistered host tensor and reset each run. + self._submit_maps: list[tuple[int, int, int]] | None = None + self._pending_host_copyback: list[tuple[int, int, int]] = [] + # Parent-VA ranges of registered buffers written by an earlier task in the + # CURRENT run (OUTPUT/INOUT/OUTPUT_EXISTING). A later task reading an + # overlapping range must NOT have the stale parent bytes copied back over + # the producer's device output — the registered shm is the live child↔child + # medium and the OverlapMap already orders the on-device accesses. Reset per + # run alongside ``_pending_host_copyback``. + self._staged_output_ranges: list[tuple[int, int]] = [] + # Latched once we pass an unregistered host tensor through unvalidated on a + # no-procfs platform, so the visibility warning is emitted only once. + self._warned_no_procfs_passthrough: bool = False + def _comm_plan_rootinfo_path(self) -> str: """Per-Worker rootinfo path used by HCCL/sim base comm_init. @@ -1618,6 +1921,32 @@ def _host_ptr_value(ptr: Any) -> int: except AttributeError as exc: raise TypeError("host_ptr must be an integer address, ctypes object, or object with data_ptr()") from exc + @staticmethod + def _host_nbytes(obj: Any) -> int: + """Byte size of a host buffer, duck-typed (torch / numpy / buffer protocol). + + Kept torch-free like ``_host_ptr_value`` — simpler must not import torch. + Callers that hold a buffer whose size cannot be inferred pass ``nbytes`` + to ``register_host_buffer`` explicitly. + """ + # numpy ndarray / memoryview expose a non-callable ``nbytes``; recent + # torch.Tensor does too. Prefer it when present and not a method. + nb = getattr(obj, "nbytes", None) + if nb is not None and not callable(nb): + return int(nb) + # torch.Tensor (and look-alikes): element_size() * number-of-elements. + elem = getattr(obj, "element_size", None) + numel = getattr(obj, "nelement", None) or getattr(obj, "numel", None) + if callable(elem) and callable(numel): + return int(elem() * numel()) + # Anything supporting the buffer protocol (bytes, bytearray, array, …). + try: + return memoryview(obj).nbytes + except TypeError as exc: + raise TypeError( + "register_host_buffer: cannot infer byte size of the host buffer; pass nbytes=" + ) from exc + def _require_live_remote_buffer(self, handle: RemoteBufferHandle) -> None: if not isinstance(handle, RemoteBufferHandle): raise TypeError("expected a RemoteBufferHandle returned by Worker.remote_malloc/import") @@ -3011,6 +3340,17 @@ def _start_hierarchical(self) -> None: # noqa: PLR0912 -- three parallel fork l # lazily on first ``orch.allocate_domain`` via CTRL_COMM_INIT. chip_log_level, chip_log_info_v = _simpler_log.get_current_config() if device_ids: + # Snapshot what the chip children are about to inherit (error + # path C), captured *immediately before* the fork so a + # parent-only mapping created later by dw.init()/prewarm cannot leak + # into it. Full (lo, hi, inode) ranges: error-path-C requires a + # submitted tensor's whole byte range to be covered by one inherited + # mapping of the same inode (see _fork_range_covers). Empty when + # procfs is unavailable — callers then pass unregistered tensors + # through unvalidated. Registration does not depend on this snapshot. + fork_maps = _read_self_maps() + if fork_maps: + self._fork_maps = fork_maps for idx, dev_id in enumerate(device_ids): pid = os.fork() if pid == 0: @@ -3786,6 +4126,373 @@ def copy_from(self, dst: int, src: int, size: int, worker_id: int = 0) -> None: assert self._orch is not None self._orch.copy_from(worker_id, dst, src, size) + # ------------------------------------------------------------------ + # Post-fork host-buffer registration + # ------------------------------------------------------------------ + + def register_host_buffer(self, tensor: Any, *, nbytes: int | None = None) -> HostBufferHandle: + """Make a host ``tensor`` created after the chip children were forked + usable by a later ``run()``. + + L3 chip children are forked lazily on the first ``run()``; a host tensor + created afterwards is not in their address space, so passing it to + ``orch.submit_next_level`` would otherwise read unmapped/stale memory. + This maps a separate named shared-memory buffer into every chip child and + keeps it mapped; each subsequent ``run()`` mirrors the tensor through it + (H2D copy-in before the task, D2H copy-out after the run drains). Call + ``unregister_host_buffer`` with the returned handle to release it. + + ``tensor`` is a host buffer whose address ``_host_ptr_value`` accepts — + a torch tensor (``data_ptr()``), a raw integer address, or a ctypes + object — same contract as the other host-pointer APIs. Its byte size is + inferred when possible; pass ``nbytes`` explicitly otherwise (required + for a raw address, and for a numpy array, where you pass + ``arr.ctypes.data`` with ``nbytes=arr.nbytes``). The chips are started on + demand, so this is valid any time after ``init()``. + + This call blocks until every chip child has mapped the buffer, so once it + returns the buffer is fully visible to ``run()``. It is not thread-safe + against a concurrent ``run`` / ``register`` / ``unregister`` on the same + Worker, though: drive registration and runs from one thread (the usual + register-then-run pattern), as the L3 worker is otherwise. + """ + if self.level < 3: + raise TypeError("register_host_buffer requires a level >= 3 Worker") + if not self._initialized: + raise RuntimeError("register_host_buffer requires Worker.init() before registration") + self._start_hierarchical() + if not self._chip_shms: + raise RuntimeError("register_host_buffer requires forked chip children (none are configured)") + assert self._worker is not None + + data_ptr = self._host_ptr_value(tensor) + nbytes = int(nbytes) if nbytes is not None else self._host_nbytes(tensor) + if nbytes <= 0: + raise ValueError("register_host_buffer: tensor has no bytes") + + # Allocate the token and reserve the registry slot under the lock so + # concurrent registrations cannot collide on the counter or both pass the + # duplicate-pointer check (mirrors Worker.register's _registry_lock + # discipline). The slow broadcast runs *outside* the lock — wire-level + # concurrency is serialized at the C++ mailbox, not here. + with self._registry_lock: + if data_ptr in self._host_buf_registry: + raise ValueError(f"register_host_buffer: host pointer 0x{data_ptr:x} is already registered") + token = self._host_buf_token_counter + self._host_buf_token_counter += 1 + shm = SharedMemory(create=True, size=nbytes) + entry = _HostBufEntry( + token=token, + data_ptr=data_ptr, + nbytes=nbytes, + shm=shm, + shm_name=shm.name, + # Cached for per-run memcpy; valid until shm.close() unmaps it. + shm_base=_shm_base_addr(shm), + tensor=tensor, + ) + self._host_buf_registry[data_ptr] = entry + bisect.insort(self._host_buf_sorted_ptrs, data_ptr) + + payload = _HOST_BUF_MAP_HEADER.pack(token, data_ptr, nbytes) + shm.name.encode("utf-8") + try: + results = self._worker.broadcast_control_all( + WorkerType.NEXT_LEVEL, + int(_CTRL_MAP_HOST), + payload, + None, + timeout_s=self._py_control_timeout_s, + ) + errors = self._control_errors(list(results)) + if errors: + raise RuntimeError( + f"register_host_buffer: MAP_HOST failed on {len(errors)} chip children; first error: {errors[0]}" + ) + except BaseException: + # Roll back on any failure — partial-map errors *or* an exception + # raised by the broadcast itself (which would otherwise leak the shm): + # unmap any child that took it, drop the reservation, free the shm. + try: + self._broadcast_host_unmap(token) + finally: + with self._registry_lock: + if self._host_buf_registry.pop(data_ptr, None) is not None: + self._host_buf_sorted_ptrs.remove(data_ptr) + shm.close() + shm.unlink() + raise + + return HostBufferHandle(token=token, data_ptr=data_ptr) + + def unregister_host_buffer(self, handle: HostBufferHandle) -> None: + """Release a registration created by ``register_host_buffer``. + + Unmaps the buffer from every chip child and frees the parent shm. + Best-effort, mirroring ``unregister``: a child-side error is reported but + the parent still drops the entry so the buffer's resources are reclaimed. + """ + if not isinstance(handle, HostBufferHandle): + raise TypeError("unregister_host_buffer expects a HostBufferHandle from register_host_buffer") + # Drop the entry under the lock; do the slow unmap broadcast + shm release + # outside it (narrow-lock discipline, same as register). + with self._registry_lock: + # Match on token, not just data_ptr: a stale handle whose buffer was + # already unregistered must not drop a newer registration that happens + # to have reused the same pointer. A mismatch means this handle's + # buffer is already gone — return silently (idempotent unregister). + entry = self._host_buf_registry.get(handle.data_ptr) + if entry is None or entry.token != handle.token: + return + self._host_buf_registry.pop(handle.data_ptr, None) + self._host_buf_sorted_ptrs.remove(handle.data_ptr) + errors: list[str] = [] + try: + if self._worker is not None and getattr(self, "_hierarchical_started", False): + errors = self._broadcast_host_unmap(entry.token) + except Exception as exc: # noqa: BLE001 + # Broadcast itself failed; still free the parent shm below so it is + # never leaked, and surface the failure as a warning. + errors = [str(exc)] + finally: + try: + entry.shm.close() + entry.shm.unlink() + except FileNotFoundError: + pass + if errors: + sys.stderr.write( + f"[worker pid={os.getpid()}] WARN: unregister_host_buffer token={entry.token} " + f"failed on {len(errors)} chip children; first error: {errors[0]}\n" + ) + sys.stderr.flush() + + def _release_all_host_buffers(self) -> None: + """Unmap + free every still-registered host buffer (called from close()).""" + with self._registry_lock: + entries = list(self._host_buf_registry.values()) + self._host_buf_registry.clear() + self._host_buf_sorted_ptrs.clear() + for entry in entries: + try: + if self._worker is not None and getattr(self, "_hierarchical_started", False): + self._broadcast_host_unmap(entry.token) + except Exception: # noqa: BLE001 + # A failed unmap broadcast must not strand the parent shm. + pass + finally: + try: + entry.shm.close() + entry.shm.unlink() + except FileNotFoundError: + pass + + def _broadcast_host_unmap(self, token: int) -> list[str]: + """Broadcast _CTRL_UNMAP_HOST for ``token`` to every chip child.""" + if self._worker is None: + return [] + results = self._worker.broadcast_control_all( + WorkerType.NEXT_LEVEL, + int(_CTRL_UNMAP_HOST), + _HOST_BUF_UNMAP.pack(token), + None, + timeout_s=self._py_control_timeout_s, + ) + return self._control_errors(list(results)) + + def _stage_host_buffers_for_chip_submit(self, args: Any) -> None: + """Validate + H2D copy-in for the host tensors of one chip submit. + + Called from ``Orchestrator.submit_next_level`` on the LOCAL_CHIP path, + before the task is dispatched. For each host tensor in ``args``: + + * registered (its base, or a sub-view inside a registered buffer) → + mirror the live bytes into the buffer's shm at the matching offset and + queue the D2H copy-out for after drain; + * fork-inherited → leave it alone; + * neither → raise the actionable error (error path C). + + The copy-in is skipped when the range was already written by an earlier + task in this run: the registered *host* shm is the live child↔child medium + (across devices too — each device's HBM is independent and transient, the + rendezvous is always this host shm mapped into every child). The producer + task's runtime D2H-mirrors its device output into the shm before TASK_DONE; + the consumer task H2Ds it back. The OverlapMap keys deps on the host + address (device-agnostic) and orders the consumer's read after the + producer's write, so re-mirroring stale parent bytes would clobber that + output (see ``_classify_staged_overlap``). A NO_DEP read of such a range, + or a partial overlap, raises instead — neither has safe semantics here. + + The child rewrites registered host pointers to its own mapping; see + _rewrite_blob_host_addrs. + + Registered buffers are always staged; the unregistered-tensor reachability + check (error path C) needs the fork snapshot. When it is unavailable (no + procfs — see _read_self_maps) reachability cannot be classified, so an + unregistered tensor is passed through unvalidated with a one-time warning + (a fork-inherited tensor is the legitimate common case); error path C is + enforced only where procfs exists. + """ + for i in range(args.tensor_count()): + tensor = args.tensor(i) + if tensor.child_memory: + continue + addr = int(tensor.data) + if addr == 0: + continue + tensor_nbytes = int(tensor.nbytes()) + entry = self._find_host_buf_entry(addr, tensor_nbytes) + if entry is not None: + # The submitted tensor may be a sub-view of the registered buffer + # (addr = base + offset); mirror at the same offset. The child + # rewrite uses the same base, so its read lands at child_base + + # offset. + offset = addr - entry.data_ptr + shm_dst = entry.shm_base + offset + tag = args.tag(i) + lo, hi = addr, addr + tensor_nbytes + reads = tag in (TensorArgType.INPUT, TensorArgType.INOUT, TensorArgType.NO_DEP) + if reads: + # If an earlier task in this run already wrote this range, its + # runtime D2H-mirrored the device output into this host shm and + # the OverlapMap (keyed on the host address, device-agnostic) + # orders the consumer's read after it. Copying the stale parent + # bytes in now would clobber that output before the consumer + # reads it, so skip the copy-in for an already-produced range + # (submit order == dependency order, so the producer is recorded + # by now). + overlap = self._classify_staged_overlap(lo, hi) + if overlap == "partial": + raise RuntimeError( + f"Host tensor 0x{addr:x} (+{tensor_nbytes} B) partially overlaps a " + f"buffer written earlier in this run; per-byte copy-in is not modelled. " + f"Use matching tensor views so reads either fully reuse the producer's " + f"output or are disjoint from it." + ) + if overlap == "covered": + if tag == TensorArgType.NO_DEP: + # NO_DEP skips the OverlapMap, so the runtime never orders + # this read after the in-run producer's write — neither + # copy-in nor skip is safe. Fail loudly. + raise RuntimeError( + f"NO_DEP host tensor 0x{addr:x} (+{tensor_nbytes} B) overlaps a " + f"buffer written earlier in this run; NO_DEP carries no dependency " + f"so the read is unordered against the producer. Tag it " + f"INPUT/INOUT to order it." + ) + # producer's D2H already wrote this host shm — skip the + # stale copy-in. + else: + ctypes.memmove(shm_dst, addr, tensor_nbytes) + if tag in (TensorArgType.OUTPUT, TensorArgType.INOUT, TensorArgType.OUTPUT_EXISTING): + self._staged_output_ranges.append((lo, hi)) + # Copy-out for everything but a pure INPUT. NO_DEP is a no-publish + # existing tensor that declares no read/write direction (tensor.h), + # so mirror both ways: if only read, copy-out rewrites the bytes + # copy-in already staged (a no-op); if written, the result reaches + # the user tensor. + if tag != TensorArgType.INPUT: + self._pending_host_copyback.append((addr, shm_dst, tensor_nbytes)) + continue + # Unregistered host pointer: confirm the child can actually reach it. + # Without the fork snapshot (no procfs, e.g. macOS) reachability cannot + # be classified, so we pass the pointer through unvalidated (with a + # one-time warning): a fork-inherited tensor is the legitimate common + # case and must keep working. Error path C is therefore only enforced + # where procfs exists (Linux, including onboard); non-procfs platforms + # are sim/dev-only. + if self._fork_maps is None: + if not self._warned_no_procfs_passthrough: + self._warned_no_procfs_passthrough = True + warnings.warn( + "L3 host-tensor visibility cannot be validated on this platform " + "(/proc/self/maps unavailable, e.g. macOS): an unregistered host tensor " + "is passed through to the chip child unchecked. This is safe for a " + "fork-inherited tensor (.share_memory_() before the chips fork), but a " + "post-fork tensor that was not registered with worker.register_host_buffer() " + "is not visible to the child and will read stale/unmapped memory. " + "Register post-fork host tensors to be safe.", + UserWarning, + stacklevel=2, + ) + continue + # Build the current map view once per run (registered-only runs never + # pay this read). + if self._submit_maps is None: + self._submit_maps = _read_self_maps() + inode = _va_lookup_inode(self._submit_maps, addr) + # Reachable iff the whole [addr, addr+nbytes) range sits inside one + # mapping the children inherited at fork *with the same inode*. A + # file-backed pointer that merely shares an inode with a fork mapping + # but lives at a post-fork VA (a re-mmap of the same file) is rejected; + # so is an anonymous pointer (inode 0: heap, HeapRing, a fresh + # post-fork ``torch.empty``) whose VA was not mapped at fork. + if inode is not None and _fork_range_covers(self._fork_maps, addr, tensor_nbytes, inode): + continue + raise RuntimeError( + f"Host tensor 0x{addr:x} is not visible to the L3 chip child (created after fork, not " + "registered). Call worker.register_host_buffer(tensor) before run(), or allocate it with " + ".share_memory_() before init()." + ) + + def _find_host_buf_entry(self, addr: int, nbytes: int) -> _HostBufEntry | None: + """Registered buffer whose ``[data_ptr, data_ptr+nbytes)`` contains the + whole ``[addr, addr+nbytes)`` view, or None. Raises if a view starts + inside a registered buffer but runs past its end (would read past the + shm in the child). + + Registered buffers are distinct, non-overlapping allocations, so the only + candidate for ``addr`` is the entry with the greatest base ``<= addr`` — + found by bisecting ``_host_buf_sorted_ptrs`` (kept sorted on every + register/unregister) so this stays log-time on the per-submit hot path + rather than scanning every registration. + + Sub-view matching assumes the blob's ``Tensor.buffer.addr`` is the + contiguous base of the host buffer (``make_tensor_arg`` builds tensors with + ``start_offset == 0``); a non-zero ``start_offset`` would shift ``addr`` + and is not modelled here. + """ + idx = bisect.bisect_right(self._host_buf_sorted_ptrs, addr) - 1 + if idx < 0: + return None + # The submit path reads the registry without _registry_lock; a concurrent + # unregister between the bisect and this lookup must degrade to None, not a + # KeyError (mirrors the lock-free read of the old values() scan). + entry = self._host_buf_registry.get(self._host_buf_sorted_ptrs[idx]) + if entry is None or addr >= entry.data_ptr + entry.nbytes: + return None + if addr + nbytes > entry.data_ptr + entry.nbytes: + raise RuntimeError( + f"Host tensor 0x{addr:x} (+{nbytes} B) overruns its registered buffer " + f"0x{entry.data_ptr:x} (+{entry.nbytes} B); register a buffer at least as large." + ) + return entry + + def _classify_staged_overlap(self, lo: int, hi: int) -> str: + """How ``[lo, hi)`` relates to registered ranges already written this run. + + Returns ``"none"`` (disjoint from every staged output → safe to copy the + parent bytes in), ``"covered"`` (fully inside one staged output → the + producer fills the shm on device, skip the copy-in), or ``"partial"`` + (straddles a staged-output boundary → copy-in would clobber the produced + part while a skip would drop the fresh part; the caller raises rather than + silently corrupt). + """ + intersects = False + for o_lo, o_hi in self._staged_output_ranges: + if lo < o_hi and o_lo < hi: + intersects = True + if o_lo <= lo and hi <= o_hi: + return "covered" + return "partial" if intersects else "none" + + def _flush_host_buffer_copyback(self) -> None: + """D2H mirror (shm → tensor) for this run's registered output buffers.""" + if not self._pending_host_copyback: + return + for dst_addr, shm_base, nbytes in self._pending_host_copyback: + ctypes.memmove(dst_addr, shm_base, nbytes) + self._pending_host_copyback.clear() + # ------------------------------------------------------------------ # run — uniform entry point # ------------------------------------------------------------------ @@ -3828,6 +4535,13 @@ def run(self, callable, args=None, config=None) -> None: # poked it. self._orch._clear_error() self._orch._scope_begin() + # Reset the registered-host-buffer D2H mirror list for this run; submit + # appends the output buffers it staged, flushed below once the run drains + # cleanly. Clearing here also drops anything a prior failed + # run left behind. ``_submit_maps`` is rebuilt lazily within the run. + self._pending_host_copyback.clear() + self._staged_output_ranges.clear() + self._submit_maps = None try: callable(self._orch, args, cfg) finally: @@ -3864,6 +4578,9 @@ def run(self, callable, args=None, config=None) -> None: self._execute_pending_domain_releases() if self._live_domains: self._release_all_live_domains() + # Run drained cleanly (no exception propagated): mirror registered output + # buffers back from their shm into the user tensors. + self._flush_host_buffer_copyback() # L3+ returns None like every other worker level; per-L2-child timing # is emitted as `[STRACE]` markers from each simpler_run. return None @@ -3912,6 +4629,10 @@ def close(self) -> None: # noqa: PLR0912 -- parallel teardown for _worker + sub sys.stderr.write(f"Worker.close(): remote buffer cleanup reported error (continuing): {exc}\n") sys.stderr.flush() + # Release any host buffers the user never unregistered. Must run while + # the chip mailboxes are still usable (before _worker.close()). + self._release_all_host_buffers() + if self.level == 2: if self._chip_worker: self._chip_worker.finalize() diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/test_l3_host_buffer_registration.py b/tests/st/a2a3/tensormap_and_ringbuffer/test_l3_host_buffer_registration.py new file mode 100644 index 000000000..0b69dc02b --- /dev/null +++ b/tests/st/a2a3/tensormap_and_ringbuffer/test_l3_host_buffer_registration.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +"""L3 host-buffer registration (issue #1027). + +A host tensor created *after* the chip children are forked (lazily on the +first ``Worker.run()``) is not visible to those children: the orch fn runs in +the parent and ``orch.copy_to`` carries a raw parent VA that is unmapped (or +stale) in the child. ``Worker.register_host_buffer`` maps a named shm into +each child post-fork so a later run can copy through it. + +Covers the mechanism end-to-end (B — register a post-fork buffer, run, get the +correct result). The unregistered-tensor error path (C) is a pure host-side +classifier with no kernel/device dependency, unit-tested in +``tests/ut/py/test_worker/test_host_buffer_registration.py``. + +a2a3sim: ``register_host_buffer`` is pure host-side (POSIX shm + a control +broadcast to the forked chip children) with no platform branching, so the sim +backend exercises the full mechanism without needing a device. The +vector_example orchestration kernels exist only for a2a3. +""" + +import torch +from simpler.task_interface import ArgDirection as D +from simpler.task_interface import CallConfig, TaskArgs, TensorArgType + +from simpler_setup import SceneTestCase, make_tensor_arg, scene_test + +KERNELS_BASE = "../../../../examples/a2a3/tensormap_and_ringbuffer/vector_example/kernels" + +SIZE = 128 * 128 + + +def _golden(a: torch.Tensor, b: torch.Tensor) -> torch.Tensor: + s = a + b + return (s + 1) * (s + 2) + s + + +def _one_task_orch(chip_handle, a, b, out): + def orch_fn(orch, _args, cfg): + ta = TaskArgs() + ta.add_tensor(make_tensor_arg(a), TensorArgType.INPUT) + ta.add_tensor(make_tensor_arg(b), TensorArgType.INPUT) + ta.add_tensor(make_tensor_arg(out), TensorArgType.OUTPUT_EXISTING) + orch.submit_next_level(chip_handle, ta, cfg, worker=0) + + return orch_fn + + +@scene_test(level=3, runtime="tensormap_and_ringbuffer") +class TestPostForkHostBufferRegistration(SceneTestCase): + """Post-fork host-buffer registration on a single L3 worker (issue #1027).""" + + CALLABLE = { + "callables": [ + { + "name": "vector", + "orchestration": { + "source": f"{KERNELS_BASE}/orchestration/example_orchestration.cpp", + "function_name": "aicpu_orchestration_entry", + "signature": [D.IN, D.IN, D.OUT], + }, + "incores": [ + { + "func_id": 0, + "source": f"{KERNELS_BASE}/aiv/kernel_add.cpp", + "core_type": "aiv", + "signature": [D.IN, D.IN, D.OUT], + }, + { + "func_id": 1, + "source": f"{KERNELS_BASE}/aiv/kernel_add_scalar.cpp", + "core_type": "aiv", + "signature": [D.IN, D.OUT], + }, + { + "func_id": 2, + "source": f"{KERNELS_BASE}/aiv/kernel_mul.cpp", + "core_type": "aiv", + "signature": [D.IN, D.IN, D.OUT], + }, + ], + }, + ], + } + + CASES = [ + {"name": "post_fork_registration", "platforms": ["a2a3sim"]}, + ] + + def _force_fork(self, worker, chip_handle): + """Run once with pre-fork shared tensors so the chip children get forked.""" + a = torch.full((SIZE,), 2.0, dtype=torch.float32).share_memory_() + b = torch.full((SIZE,), 3.0, dtype=torch.float32).share_memory_() + out = torch.zeros(SIZE, dtype=torch.float32).share_memory_() + worker.run(_one_task_orch(chip_handle, a, b, out), args=None, config=CallConfig()) + assert torch.allclose(out, _golden(a, b), rtol=self.RTOL, atol=self.ATOL) + + def test_run(self, st_worker): + """Mechanism B: a host tensor created AFTER the fork and registered with + ``register_host_buffer`` is visible to the chip child and round-trips to the + correct result. + + Overrides the default ``generate_args``/``compute_golden`` flow: issue #1027 + is about *timing* — the buffer must be created AFTER the chip children are + forked, which the standard pre-fork arg-generation path cannot express. The + ``st_worker`` L3 fixture owns the worker lifecycle (build/init/close). + """ + worker = st_worker + chip_handle = type(self)._st_chip_handles["vector"] + + self._force_fork(worker, chip_handle) + + # Created AFTER the fork — invisible to the child until register maps it in. + a = torch.full((SIZE,), 5.0, dtype=torch.float32).share_memory_() + b = torch.full((SIZE,), 7.0, dtype=torch.float32).share_memory_() + out = torch.zeros(SIZE, dtype=torch.float32).share_memory_() + ha = worker.register_host_buffer(a) + hb = worker.register_host_buffer(b) + hout = worker.register_host_buffer(out) + try: + worker.run(_one_task_orch(chip_handle, a, b, out), args=None, config=CallConfig()) + assert torch.allclose(out, _golden(a, b), rtol=self.RTOL, atol=self.ATOL) + finally: + worker.unregister_host_buffer(ha) + worker.unregister_host_buffer(hb) + worker.unregister_host_buffer(hout) + + +if __name__ == "__main__": + SceneTestCase.run_module(__name__) diff --git a/tests/ut/py/test_worker/test_host_buffer_registration.py b/tests/ut/py/test_worker/test_host_buffer_registration.py new file mode 100644 index 000000000..60dc3f0e3 --- /dev/null +++ b/tests/ut/py/test_worker/test_host_buffer_registration.py @@ -0,0 +1,230 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +"""Host-side reachability classifier for post-fork host buffers. + +Error path C: a host tensor created *after* the chip children fork is invisible +to them unless ``register_host_buffer`` maps it in. ``submit_next_level`` calls +``_stage_host_buffers_for_chip_submit`` *before any dispatch*; for an +unregistered post-fork tensor it must raise an actionable error rather than +silently submit a pointer the child cannot read. + +These are pure host-side unit tests: the classifier depends only on the fork +snapshot (``_fork_maps``) and ``/proc/self/maps``, so we inject that state +directly and never fork, compile a kernel, or touch a device (mirrors the +white-box style of ``test_host_worker.py``). The end-to-end registered +round-trip (mechanism B) lives in the a2a3sim scene test. +""" + +from __future__ import annotations + +import ctypes +import warnings + +import pytest +import torch +from simpler.task_interface import TaskArgs, TensorArgType +from simpler.worker import Worker, _HostBufEntry, _read_self_maps, _va_lookup_inode + +from simpler_setup import make_tensor_arg + +_SIZE = 128 * 128 +# Element count whose float32 allocation (64 MiB) is above glibc's max dynamic +# mmap threshold (32 MiB on 64-bit), so it is always its own anonymous mmap (a +# fresh post-fork VA), never served from a fork-time heap arena. +_BIG_ELEMS = 16 * 1024 * 1024 + +# Tests that inject a *real* fork snapshot (or read a tensor's real inode) only +# make sense where /proc/self/maps exists. On a non-procfs platform (macOS CI) +# _read_self_maps() is [], so skip them; the no-procfs behaviour itself is +# covered by test_passes_through_without_procfs. +_requires_procfs = pytest.mark.skipif(not _read_self_maps(), reason="/proc/self/maps unavailable") + + +def _classifier_worker(*, fork_maps): + """A ``Worker`` with only the classifier state populated. + + ``_stage_host_buffers_for_chip_submit`` reads nothing else, so we bypass the + real init/fork and inject the fork snapshot (sorted ``(lo, hi, inode)`` + ranges, or ``None`` for "no procfs") by hand. + """ + w = Worker.__new__(Worker) + w._host_buf_registry = {} + w._host_buf_sorted_ptrs = [] + w._fork_maps = fork_maps + w._submit_maps = None + w._pending_host_copyback = [] + w._warned_no_procfs_passthrough = False + return w + + +def _one_input_arg(tensor): + ta = TaskArgs() + ta.add_tensor(make_tensor_arg(tensor), TensorArgType.INPUT) + return ta + + +class TestErrorPathCRejection: + """Unregistered post-fork tensors are refused at submit, before dispatch.""" + + def test_post_fork_anonymous_tensor_rejected(self): + # inode 0 (anonymous mmap); VA not in the (empty) fork snapshot. + w = _classifier_worker(fork_maps=[]) + anon = torch.zeros(_BIG_ELEMS, dtype=torch.float32) + with pytest.raises(RuntimeError, match="register_host_buffer"): + w._stage_host_buffers_for_chip_submit(_one_input_arg(anon)) + + def test_post_fork_shared_tensor_rejected(self): + # File-backed (share_memory_) tensor whose inode was not inherited at fork. + w = _classifier_worker(fork_maps=[]) + shared = torch.zeros(_SIZE, dtype=torch.float32).share_memory_() + with pytest.raises(RuntimeError, match="register_host_buffer"): + w._stage_host_buffers_for_chip_submit(_one_input_arg(shared)) + + @_requires_procfs + def test_fork_inherited_tensor_passes(self): + # The legitimate common case: a tensor whose real mapping is in the fork + # snapshot (so the child inherited the exact VA + inode) is accepted + # without raising. Guards against the coverage check over-rejecting. + t = torch.zeros(_SIZE, dtype=torch.float32) + w = _classifier_worker(fork_maps=_read_self_maps()) + w._stage_host_buffers_for_chip_submit(_one_input_arg(t)) # no raise + + @_requires_procfs + def test_same_inode_other_va_rejected(self): + # Regression for the inode-only gap: a file-backed tensor whose inode WAS + # present at fork but at a *different* VA (a post-fork re-mmap of the same + # file) must still be rejected — the child never inherited this VA, so + # matching the inode alone is not enough. The old inode-membership check + # accepted it; the range-coverage check rejects it. + shared = torch.zeros(_SIZE, dtype=torch.float32).share_memory_() + addr = shared.data_ptr() + inode = _va_lookup_inode(_read_self_maps(), addr) + assert inode and inode != 0, "share_memory_ tensor must be file-backed" + # Same inode, but a VA range entirely below the tensor — does not cover it. + fork_maps = [(addr - 0x200000, addr - 0x100000, inode)] + w = _classifier_worker(fork_maps=fork_maps) + with pytest.raises(RuntimeError, match="register_host_buffer"): + w._stage_host_buffers_for_chip_submit(_one_input_arg(shared)) + + def test_passes_through_without_procfs(self): + # _fork_maps is None => no fork snapshot (no procfs, e.g. macOS): + # reachability cannot be classified, so an unregistered tensor is passed + # through unvalidated rather than rejected — a fork-inherited tensor is the + # legitimate common case and must keep working. Error path C is only + # enforced where procfs exists. The pass-through emits a one-time warning so + # the caller knows visibility went unverified. Regression guard for macOS. + w = _classifier_worker(fork_maps=None) + anon = torch.zeros(_BIG_ELEMS, dtype=torch.float32) + with pytest.warns(UserWarning, match="visibility cannot be validated"): + w._stage_host_buffers_for_chip_submit(_one_input_arg(anon)) # no raise + # Latched: a second pass-through is silent. + with warnings.catch_warnings(): + warnings.simplefilter("error") # any warning would raise + w._stage_host_buffers_for_chip_submit(_one_input_arg(anon)) + + +def _registered_worker(entry): + """A ``Worker`` with only the registered-buffer staging state populated. + + The registered path in ``_stage_host_buffers_for_chip_submit`` returns before + the fork-snapshot check, so it reads only ``_host_buf_registry``, + ``_host_buf_sorted_ptrs``, ``_staged_output_ranges`` and + ``_pending_host_copyback``. + """ + w = Worker.__new__(Worker) + w._host_buf_registry = {entry.data_ptr: entry} + w._host_buf_sorted_ptrs = [entry.data_ptr] + w._staged_output_ranges = [] + w._pending_host_copyback = [] + return w + + +def _entry_for(parent, shm_buf): + """A registered entry mapping ``parent``'s VA onto a caller-owned ``shm_buf`` + (a ctypes buffer standing in for the child-visible shm).""" + return _HostBufEntry( + token=1, + data_ptr=parent.data_ptr(), + nbytes=parent.numel() * parent.element_size(), + shm=None, # type: ignore[arg-type] # staging reads only shm_base + shm_name="", + shm_base=ctypes.addressof(shm_buf), + tensor=parent, + ) + + +def _arg(tensor, tag): + ta = TaskArgs() + ta.add_tensor(make_tensor_arg(tensor), tag) + return ta + + +class TestInRunProducerConsumer: + """Copy-in must not clobber a buffer an earlier task in the same run wrote. + + The registered shm is the live child↔child medium: producer task A writes its + device output into the shm, the OverlapMap orders consumer task B's read after + it, and B's host-side copy-in (stale parent → shm) is the only thing that can + race that output. Submit order == dependency order, so by the time B is staged + A's range is already recorded. + """ + + def test_consumer_input_does_not_clobber_producer_output(self): + # Stale parent = 7.0; shm pre-filled with 3.0 stands in for A's device + # output already mirrored into the shm. + parent = torch.full((_SIZE,), 7.0, dtype=torch.float32) + shm_buf = (ctypes.c_float * _SIZE)(*([3.0] * _SIZE)) + w = _registered_worker(_entry_for(parent, shm_buf)) + w._stage_host_buffers_for_chip_submit(_arg(parent, TensorArgType.OUTPUT)) + w._stage_host_buffers_for_chip_submit(_arg(parent, TensorArgType.INPUT)) + # copy-in skipped: the producer's output survives (3.0), not stale 7.0. + assert list(shm_buf[:4]) == [3.0, 3.0, 3.0, 3.0] + + def test_plain_input_still_copies_in(self): + # No earlier task wrote the range, so the parent snapshot must reach the shm. + parent = torch.full((_SIZE,), 7.0, dtype=torch.float32) + shm_buf = (ctypes.c_float * _SIZE)(*([3.0] * _SIZE)) + w = _registered_worker(_entry_for(parent, shm_buf)) + w._stage_host_buffers_for_chip_submit(_arg(parent, TensorArgType.INPUT)) + assert list(shm_buf[:4]) == [7.0, 7.0, 7.0, 7.0] + + def test_no_dep_over_produced_range_raises(self): + # NO_DEP skips the OverlapMap, so the read is unordered against the + # in-run producer — neither copy-in nor skip is safe. + parent = torch.full((_SIZE,), 7.0, dtype=torch.float32) + shm_buf = (ctypes.c_float * _SIZE)() + w = _registered_worker(_entry_for(parent, shm_buf)) + w._stage_host_buffers_for_chip_submit(_arg(parent, TensorArgType.OUTPUT)) + with pytest.raises(RuntimeError, match="NO_DEP"): + w._stage_host_buffers_for_chip_submit(_arg(parent, TensorArgType.NO_DEP)) + + def test_partial_overlap_raises(self): + # Producer writes [0, 64); consumer reads [32, 96) — straddles the + # boundary, so neither a full copy-in nor a full skip is correct. + parent = torch.full((256,), 7.0, dtype=torch.float32) + shm_buf = (ctypes.c_float * 256)() + w = _registered_worker(_entry_for(parent, shm_buf)) + w._stage_host_buffers_for_chip_submit(_arg(parent[0:64], TensorArgType.OUTPUT)) + with pytest.raises(RuntimeError, match="partially overlaps"): + w._stage_host_buffers_for_chip_submit(_arg(parent[32:96], TensorArgType.INPUT)) + + +class TestReadSelfMaps: + def test_returns_empty_when_procfs_absent(self, monkeypatch): + # _read_self_maps must degrade to [] (not FileNotFoundError) where + # /proc/self/maps does not exist — the bug that broke the macOS jobs. + real_open = open + + def fake_open(path, *args, **kwargs): + if str(path) == "/proc/self/maps": + raise FileNotFoundError(2, "No such file or directory", "/proc/self/maps") + return real_open(path, *args, **kwargs) + + monkeypatch.setattr("builtins.open", fake_open) + assert _read_self_maps() == []