diff --git a/docs/comm-domain.md b/docs/comm-domain.md index 9d93635e6..21ad9979d 100644 --- a/docs/comm-domain.md +++ b/docs/comm-domain.md @@ -148,7 +148,101 @@ with orch.allocate_domain(...) as handle: --- -## 6. Examples +## 6. Host tensor visibility for `worker.run` + +A host tensor passed to `worker.run(...)` / `orch.submit_next_level(...)` is +ultimately dereferenced from the forked chip child, not the parent. For the +design rationale behind the choices below (copy vs zero-copy, explicit +registration, the procfs/inode classification, and the alternatives that were +deferred) see [`host-buffer-registration-design.md`](host-buffer-registration-design.md). +Either the +tensor's own storage must already be reachable there (fork-inherited), or +`worker.register_host_buffer(...)` must provide a child-visible shm mirror — in +the registered case the tensor storage itself need not be child-visible. Three +sources are legal: + +| Source | How | Why it works | +| ------ | --- | ------------ | +| **fork-inherited** | `tensor.share_memory_()` **before the chip children are forked** (i.e. before the first `Worker.run()`) | the child inherits the MAP_SHARED page at fork | +| **registered post-fork** | `worker.register_host_buffer(tensor)` after the chips exist | maps a shm into every child for the buffer's lifetime | +| anything else | — | raises an actionable error before dispatch | + +The chip children are forked lazily on the **first** `run()`. A host tensor +created after that — the natural dynamic-shape serving pattern — is invisible to +the children unless registered: + +```python +worker = Worker(level=3, ...); worker.register(chip); worker.init() +worker.run(orch0, ...) # forks the chips + +hidden = torch.empty((tokens, hidden_size)).share_memory_() # created post-fork +out = torch.empty((batch, vocab)).share_memory_() +h_hidden = worker.register_host_buffer(hidden) # map into every chip child +h_out = worker.register_host_buffer(out) +try: + for batch in batches: + fill(hidden); worker.run(orch, ...) # H2D copy-in, D2H copy-out per run + use(out) +finally: + worker.unregister_host_buffer(h_hidden) + worker.unregister_host_buffer(h_out) +``` + +**Register once, reuse many runs.** Registration maps a shm into each child and +keeps it mapped; every `run` mirrors the tensor through it (H2D copy-in before +the task, D2H copy-out after the run drains). Register the buffer — or a +fixed-size superset you sub-slice — once and reuse it; re-registering per run +pays a map/unmap broadcast each time. A sub-view (slice) of a registered buffer +is resolved automatically. + +**Error path.** An unregistered post-fork host tensor raises before any dispatch: + +> Host tensor 0x… is not visible to the L3 chip child (created after fork, not +> registered). Call worker.register_host_buffer(tensor) before run(), or allocate +> it with .share_memory_() before init(). + +### Scope / limits (v1) + +- **memcpy, not zero-copy.** A registered buffer is a *separate* shm; each run + copies `tensor → shm` (in) and `shm → tensor` (out). For a large hot-path + tensor this is a double copy. True zero-copy (mapping the tensor's own storage) + is a later optimization. +- **`orch.copy_to` is the unmanaged low-level path.** Registration covers the + `run` / `submit_next_level` host-tensor args (the post-fork host-tensor + scenario). The + explicit `orch.copy_to(src=tensor.data_ptr())` staging path (§5) is *not* + translated or validated by `register_host_buffer` — its `src` must still be + fork-inherited (`.share_memory_()` before the chip children are forked, i.e. + before the first `run()`). +- **Anonymous post-fork heap.** A *large* post-fork `torch.empty` (its own mmap) + is correctly rejected. A *small* non-shared tensor the allocator sub-slices out + of a fork-time heap arena can slip past the check (anonymous, inside a fork + range) and read stale data in the child — always `share_memory_` or register + host tensors used for chip dispatch. +- **Non-procfs platforms (e.g. macOS).** The reachability check reads + `/proc/self/maps`; where it is unavailable the fork snapshot is empty, so an + unregistered host tensor cannot be classified and is **passed through + unvalidated** (rather than rejected) — a fork-inherited tensor is the common + legitimate case and must keep working. The first such pass-through emits a + one-time `UserWarning`; the caller is then responsible for ensuring the tensor + is fork-inherited or registered. Error path C above is enforced only where + procfs exists (Linux, including onboard). +- **No in-run produce-then-consume of the same registered buffer.** Copy-in + (`tensor → shm`) runs per `submit_next_level`, while tasks may already be in + flight before `drain()`. If one task writes a registered buffer and a later + dependent task in the *same* `run` reads it, the consumer's copy-in can overwrite + the producer's result with the stale parent tensor. Use a registered buffer as a + run input *or* a run output, not as an intermediate handed between tasks within + one run; chain results through device buffers instead. +- **Fork-inherited anonymous memory is copy-on-write, hence stale.** Even a tensor + the child legitimately inherited is only useful as a *live* input if it is + MAP_SHARED: anonymous (non-`share_memory_`) pages are COW, so writes the parent + makes *after* fork do not reach the child. A live input must be file-backed + (`.share_memory_()` before `init()`) or registered. + +--- + +## 7. Examples - `examples/workers/l3/allreduce_distributed/` — single domain, PTO-ISA remote reads over the window. diff --git a/python/simpler/orchestrator.py b/python/simpler/orchestrator.py index 87ec02e16..c64c9522c 100644 --- a/python/simpler/orchestrator.py +++ b/python/simpler/orchestrator.py @@ -187,6 +187,19 @@ def submit_next_level( raise TypeError("RemoteTensorRef is only supported for RemoteCallable NEXT_LEVEL submits") remote_sidecar = None _validate_remote_sidecar_access(c_args, remote_sidecar) + # Mirror any registered post-fork host buffers into their shm (H2D) and + # reject host tensors the chip child cannot see (issue #1027). Only the + # LOCAL_CHIP path dereferences raw host pointers in the forked child. + if target_namespace == "LOCAL_CHIP" and self._worker is not None: + staged_len = len(self._worker._pending_host_copyback) + try: + self._worker._stage_host_buffers_for_chip_submit(c_args) + except BaseException: + # Staging appends D2H copybacks as it walks; if it aborts the + # submit never happens, so drop the copybacks it queued or a later + # clean drain would flush stale shm into those outputs. + del self._worker._pending_host_copyback[staged_len:] + raise final_worker_ids = _remote_data_eligible_worker_ids(remote_sidecar, eligible_worker_ids) cpp_worker_id = int(worker) captured_refs = self._worker._capture_remote_sidecar_refs(remote_sidecar) if self._worker is not None else [] @@ -249,6 +262,18 @@ def submit_next_level_group( if remote_sidecars is not None: for c_args, remote_sidecar in zip(c_args_list, remote_sidecars): _validate_remote_sidecar_access(c_args, remote_sidecar) + # Stage + validate registered/post-fork host buffers for chip dispatch + # (issue #1027), same as the single submit path. + if target_namespace == "LOCAL_CHIP" and self._worker is not None: + staged_len = len(self._worker._pending_host_copyback) + try: + for c_args in c_args_list: + self._worker._stage_host_buffers_for_chip_submit(c_args) + except BaseException: + # Roll back copybacks queued for every c_args staged so far; the + # group submit never happens, so none of them must survive. + del self._worker._pending_host_copyback[staged_len:] + raise worker_id_sets = ( [ _remote_data_eligible_worker_ids(remote_sidecar, eligible_worker_ids) diff --git a/python/simpler/worker.py b/python/simpler/worker.py index 3b7b92c3e..3384e02ce 100644 --- a/python/simpler/worker.py +++ b/python/simpler/worker.py @@ -59,6 +59,7 @@ def my_l4_orch(orch, args, config): from __future__ import annotations +import bisect import ctypes import importlib import json @@ -70,6 +71,7 @@ def my_l4_orch(orch, args, config): import sys import threading import uuid +import warnings from dataclasses import dataclass from multiprocessing.shared_memory import SharedMemory from typing import Any @@ -112,6 +114,7 @@ def my_l4_orch(orch, args, config): RemoteBufferExport, RemoteBufferHandle, TaskArgs, + TensorArgType, _Worker, ) @@ -208,6 +211,32 @@ def my_l4_orch(orch, args, config): _CTRL_PY_UNREGISTER = 11 _CTRL_PY_IMPORT_REGISTER = 12 _CTRL_L3_L2_ORCH_COMM_INIT = 13 +# Host-buffer registration. MAP_HOST maps a named host-buffer shm +# into every chip child *post-fork* and keeps it mapped so later runs can copy +# through it; UNMAP_HOST drops one. The child also records the parent VA range +# the shm stands in for, so the per-task blob's host pointers (raw parent VAs) +# can be rewritten to the child's own mapping before the runtime dereferences +# them. Unlike _CTRL_REGISTER (one-shot H2D then close), these mappings persist +# for the buffer's registered lifetime — see docs/comm-domain.md. +_CTRL_MAP_HOST = 14 +_CTRL_UNMAP_HOST = 15 + +# MAP_HOST payload: token (u64), parent_va (u64), nbytes (u64), then the +# NUL-free host-buffer shm name as the trailing bytes. UNMAP_HOST payload is the +# token alone. +_HOST_BUF_MAP_HEADER = struct.Struct(" list[tuple[int, int, int]]: + """Sorted ``(lo, hi, inode)`` for every mapping in this process. + + ``inode`` is the backing file's inode (0 for anonymous mappings: heap, stack, + the HeapRing). Distinguishing a fork-inherited host buffer from a post-fork + one needs the inode, not just the VA: torch frees a pre-fork ``share_memory_`` + tensor and the allocator hands its address back to a *new* shared buffer, so + the VA alone false-matches the fork snapshot — but the new buffer is backed by + a different shm inode (error path C). + + Returns an empty list when ``/proc/self/maps`` is unavailable (non-Linux, or a + sandbox without procfs). Callers treat an empty fork snapshot as "reachability + cannot be classified" and pass an unregistered host tensor through unvalidated + (fork-inheritance is the legitimate common case there); error path C is only + enforced where procfs exists. Registration does not depend on this snapshot. + """ + try: + with open("/proc/self/maps", "rb") as fh: + data = fh.read() + except OSError: + return [] + # The parse (a few hundred lines, three int() conversions each) dominates; + # reading the file does not. In steady state the mapping table is byte-for-byte + # identical between submits, so reuse the previous parse whenever the raw bytes + # are unchanged and re-parse only when something actually mmap'd/unmapped. Any + # byte change invalidates the cache, so a freed-and-reused VA (error path + # C) is never served a stale inode. Callers only read the returned + # list, so sharing the cached object is safe. + if data == _SELF_MAPS_CACHE["raw"]: + return _SELF_MAPS_CACHE["parsed"] + maps: list[tuple[int, int, int]] = [] + for raw in data.splitlines(): + # Fields: address(lo-hi) perms offset dev inode pathname. Cap the split at + # the inode column (maxsplit=5) so the pathname tail — the longest, + # sometimes space-containing column — is never tokenized. + fields = raw.split(None, 5) + if len(fields) < 5: + continue + addr_range = fields[0] + dash = addr_range.find(b"-") + if dash < 0: + continue + try: + lo = int(addr_range[:dash], 16) + hi = int(addr_range[dash + 1 :], 16) + inode = int(fields[4]) + except ValueError: + continue + maps.append((lo, hi, inode)) + maps.sort() + _SELF_MAPS_CACHE["raw"] = data + _SELF_MAPS_CACHE["parsed"] = maps + return maps + + +def _va_lookup_inode(maps: list[tuple[int, int, int]], addr: int) -> int | None: + """Inode of the mapping containing ``addr``, or None if ``addr`` is unmapped.""" + if not maps: + return None + idx = bisect.bisect_right(maps, (addr, _UINT64_MAX, _UINT64_MAX)) - 1 + if idx < 0: + return None + lo, hi, inode = maps[idx] + return inode if lo <= addr < hi else None + + +def _fork_range_covers(maps: list[tuple[int, int, int]], addr: int, nbytes: int, inode: int) -> bool: + """True if ``[addr, addr+nbytes)`` lies entirely within one fork-captured + mapping whose inode equals ``inode`` (error path C). + + Matching the inode alone is not enough: a post-fork re-mmap of a fork-inherited + file shares the inode but lands at a VA the child never inherited. Requiring + full VA coverage rejects that, and treats file-backed and anonymous (inode 0) + mappings the same. ``maps`` is sorted and non-overlapping, so the candidate + containing ``addr`` is unique. + """ + if not maps: + return False + idx = bisect.bisect_right(maps, (addr, _UINT64_MAX, _UINT64_MAX)) - 1 + if idx < 0: + return False + lo, hi, finode = maps[idx] + return finode == inode and lo <= addr and addr + nbytes <= hi + + +def _rewrite_blob_host_addrs(buf: memoryview, blob_off: int, ranges: list[tuple[int, int, int]]) -> None: + """Redirect registered host pointers in a task-args blob to child mappings. + + ``ranges`` is ``(parent_lo, parent_hi, child_base)`` for each host buffer the + child has mapped via _CTRL_MAP_HOST. For every tensor whose ``buffer.addr`` + (a parent VA) lands in a registered range, rewrite it in place to + ``child_base + (addr - parent_lo)`` so the runtime dereferences the child's + own mapping. Tensors outside every range (fork-inherited or child-allocated) + are left untouched. See _BLOB_TENSOR_STRIDE for the wire layout. + """ + tensor_count = struct.unpack_from(" str: + """Decode the staged-payload shm name a broadcast_control_all left at _OFF_ARGS.""" + raw = bytes(buf[_OFF_ARGS : _OFF_ARGS + _CTRL_SHM_NAME_BYTES]) + nul = raw.find(b"\x00") + return raw[: nul if nul >= 0 else _CTRL_SHM_NAME_BYTES].decode("utf-8", "replace") + + +def _shm_base_addr(shm: SharedMemory) -> int: + """Mapped base address of ``shm``. The mapping outlives the temporary buffer + view, so the address stays valid until ``shm.close()``.""" + view = shm.buf + assert view is not None + exporter = ctypes.c_char.from_buffer(view) + addr = ctypes.addressof(exporter) + del exporter + return addr + + +def _rebuild_host_buf_ranges( + host_buf_table: dict[int, tuple[SharedMemory, int, int, int]], host_buf_ranges: list[tuple[int, int, int]] +) -> None: + host_buf_ranges.clear() + for _shm, lo, hi, base in host_buf_table.values(): + host_buf_ranges.append((lo, hi, base)) + + +def _handle_ctrl_map_host( + buf: memoryview, + host_buf_table: dict[int, tuple[SharedMemory, int, int, int]], + host_buf_ranges: list[tuple[int, int, int]], +) -> None: + """Child handler for _CTRL_MAP_HOST: persist a host-buffer mapping. + + The staged payload is ``token, parent_va, nbytes`` followed by the host + buffer's shm name. Map that shm and remember the parent VA range it stands + in for so the per-task blob rewrite can redirect host pointers to this base. + """ + payload_size = struct.unpack_from("Q", buf, _CTRL_OFF_ARG0)[0] + staged = SharedMemory(name=_read_ctrl_staged_shm_name(buf)) + try: + staged_buf = staged.buf + assert staged_buf is not None + payload = bytes(staged_buf[:payload_size]) + finally: + staged.close() + token, parent_va, nbytes = _HOST_BUF_MAP_HEADER.unpack_from(payload, 0) + host_shm_name = payload[_HOST_BUF_MAP_HEADER.size :].decode("utf-8") + prior = host_buf_table.pop(token, None) + if prior is not None: + prior[0].close() + host_shm = SharedMemory(name=host_shm_name) + host_buf_table[token] = (host_shm, parent_va, parent_va + nbytes, _shm_base_addr(host_shm)) + _rebuild_host_buf_ranges(host_buf_table, host_buf_ranges) + + +def _handle_ctrl_unmap_host( + buf: memoryview, + host_buf_table: dict[int, tuple[SharedMemory, int, int, int]], + host_buf_ranges: list[tuple[int, int, int]], +) -> None: + """Child handler for _CTRL_UNMAP_HOST: drop a host-buffer mapping by token.""" + payload_size = struct.unpack_from("Q", buf, _CTRL_OFF_ARG0)[0] + staged = SharedMemory(name=_read_ctrl_staged_shm_name(buf)) + try: + staged_buf = staged.buf + assert staged_buf is not None + token = _HOST_BUF_UNMAP.unpack_from(bytes(staged_buf[:payload_size]), 0)[0] + finally: + staged.close() + entry = host_buf_table.pop(token, None) + if entry is not None: + entry[0].close() + _rebuild_host_buf_ranges(host_buf_table, host_buf_ranges) + + def _allocate_local_slot(registry: dict[int, Any]) -> int: for i in range(MAX_REGISTERED_CALLABLE_IDS): if i not in registry: @@ -974,6 +1221,12 @@ def _run_chip_main_loop( # noqa: PLR0912, PLR0913, PLR0915 -- unified TASK_READ """ prepared: set[int] = set() l3_l2_control_shms: list[SharedMemory] = [] + # Post-fork host buffers mapped into this child. `host_buf_table` + # owns the mmap per token (for unmap + teardown); `host_buf_ranges` is the + # parent-VA → child-VA translation table the per-task blob rewrite consults, + # rebuilt from the table on every map/unmap. + host_buf_table: dict[int, tuple[SharedMemory, int, int, int]] = {} # token -> (shm, lo, hi, child_base) + host_buf_ranges: list[tuple[int, int, int]] = [] # (parent_lo, parent_hi, child_base) try: while True: state = _mailbox_load_i32(state_addr) @@ -997,6 +1250,11 @@ def _run_chip_main_loop( # noqa: PLR0912, PLR0913, PLR0915 -- unified TASK_READ f"chip_process dev={device_id}: cid {cid} not prepared before TASK_READY " f"(register via _CTRL_PREPARE first)" ) + # Redirect any registered host pointer (a parent VA) in the + # blob to this child's own mapping before the runtime reads it. + # No-op when nothing is registered. + if host_buf_ranges: + _rewrite_blob_host_addrs(buf, _OFF_TASK_ARGS_BLOB, host_buf_ranges) # Hand the mailbox bytes straight to C++ (zero-copy zero-decode): # the blob layout is what `write_blob` already wrote, so re-parsing # it in Python is N×40B of avoidable work and a permanent @@ -1050,9 +1308,7 @@ def _run_chip_main_loop( # noqa: PLR0912, PLR0913, PLR0915 -- unified TASK_READ elif sub_cmd == _CTRL_REGISTER: digest = _read_control_digest(buf) payload_size = struct.unpack_from("Q", buf, _CTRL_OFF_ARG0)[0] - raw = bytes(buf[_OFF_ARGS : _OFF_ARGS + _CTRL_SHM_NAME_BYTES]) - nul = raw.find(b"\x00") - shm_name = raw[: nul if nul >= 0 else _CTRL_SHM_NAME_BYTES].decode("utf-8", "replace") + shm_name = _read_ctrl_staged_shm_name(buf) shm = SharedMemory(name=shm_name) shm_buf = shm.buf assert shm_buf is not None @@ -1111,6 +1367,10 @@ def _run_chip_main_loop( # noqa: PLR0912, PLR0913, PLR0915 -- unified TASK_READ _handle_ctrl_comm_init(cw, buf) elif sub_cmd == _CTRL_L3_L2_ORCH_COMM_INIT: l3_l2_control_shms.append(_handle_ctrl_l3_l2_orch_comm_init(cw, buf)) + elif sub_cmd == _CTRL_MAP_HOST: + _handle_ctrl_map_host(buf, host_buf_table, host_buf_ranges) + elif sub_cmd == _CTRL_UNMAP_HOST: + _handle_ctrl_unmap_host(buf, host_buf_table, host_buf_ranges) else: raise RuntimeError(f"unknown control sub-command {int(sub_cmd)}") except Exception as e: # noqa: BLE001 @@ -1139,6 +1399,11 @@ def _run_chip_main_loop( # noqa: PLR0912, PLR0913, PLR0915 -- unified TASK_READ control_shm.close() except Exception: # noqa: BLE001 pass + for host_shm, _lo, _hi, _base in host_buf_table.values(): + try: + host_shm.close() + except Exception: # noqa: BLE001 + pass def _chip_process_loop( @@ -1281,9 +1546,7 @@ def _child_worker_loop( if sub_cmd == _CTRL_REGISTER: digest = _read_control_digest(buf) payload_size = struct.unpack_from("Q", buf, _CTRL_OFF_ARG0)[0] - raw = bytes(buf[_OFF_ARGS : _OFF_ARGS + _CTRL_SHM_NAME_BYTES]) - nul = raw.find(b"\x00") - shm_name = raw[: nul if nul >= 0 else _CTRL_SHM_NAME_BYTES].decode("utf-8", "replace") + shm_name = _read_ctrl_staged_shm_name(buf) callable_obj = _read_chip_callable_from_shm(shm_name, int(payload_size)) inner_registered = False try: @@ -1432,6 +1695,46 @@ def __init__( self._live_l3_l2_regions: list[Any] = [] self._l3_l2_orch_comm_host_buffers: dict[int, int] = {} + # Post-fork host-buffer registration. Keyed by the user + # tensor's data_ptr; each entry maps a separate named shm into every chip + # child so a tensor created after the children were forked can still be + # used by a later run. ``_fork_maps`` is the address space the children + # inherited at fork — the authority for distinguishing a fork-inherited + # host pointer (legal, unregistered) from a post-fork one (must be + # registered). ``_pending_host_copyback`` collects the D2H mirror + # (shm → tensor) deferred until after the run drains. + self._host_buf_registry: dict[int, _HostBufEntry] = {} + # ``data_ptr`` keys of ``_host_buf_registry`` kept sorted so the per-task + # submit lookup (``_find_host_buf_entry``) is a bisect instead of a linear + # scan over every registration. Registered buffers are distinct, non- + # overlapping allocations, so the unique candidate for an address is the + # entry with the greatest base <= addr. Mutated under ``_registry_lock`` + # alongside the dict. + self._host_buf_sorted_ptrs: list[int] = [] + self._host_buf_token_counter: int = 0 + # Sorted ``(lo, hi, inode)`` mappings the chip children inherited at fork, + # used to reject an unregistered post-fork host tensor (error path C). A + # submitted tensor is reachable iff its whole ``[addr, addr+nbytes)`` range + # is covered by one of these mappings with a matching inode — matching the + # inode alone is not enough, since a post-fork re-mmap of a fork-inherited + # file lands at a VA the child never inherited (see _fork_range_covers). + # Captured once, immediately before the chip fork. + self._fork_maps: list[tuple[int, int, int]] | None = None + # Per-run cache of the current /proc/self/maps, built lazily the first + # time a run hits an unregistered host tensor and reset each run. + self._submit_maps: list[tuple[int, int, int]] | None = None + self._pending_host_copyback: list[tuple[int, int, int]] = [] + # Parent-VA ranges of registered buffers written by an earlier task in the + # CURRENT run (OUTPUT/INOUT/OUTPUT_EXISTING). A later task reading an + # overlapping range must NOT have the stale parent bytes copied back over + # the producer's device output — the registered shm is the live child↔child + # medium and the OverlapMap already orders the on-device accesses. Reset per + # run alongside ``_pending_host_copyback``. + self._staged_output_ranges: list[tuple[int, int]] = [] + # Latched once we pass an unregistered host tensor through unvalidated on a + # no-procfs platform, so the visibility warning is emitted only once. + self._warned_no_procfs_passthrough: bool = False + def _comm_plan_rootinfo_path(self) -> str: """Per-Worker rootinfo path used by HCCL/sim base comm_init. @@ -1618,6 +1921,32 @@ def _host_ptr_value(ptr: Any) -> int: except AttributeError as exc: raise TypeError("host_ptr must be an integer address, ctypes object, or object with data_ptr()") from exc + @staticmethod + def _host_nbytes(obj: Any) -> int: + """Byte size of a host buffer, duck-typed (torch / numpy / buffer protocol). + + Kept torch-free like ``_host_ptr_value`` — simpler must not import torch. + Callers that hold a buffer whose size cannot be inferred pass ``nbytes`` + to ``register_host_buffer`` explicitly. + """ + # numpy ndarray / memoryview expose a non-callable ``nbytes``; recent + # torch.Tensor does too. Prefer it when present and not a method. + nb = getattr(obj, "nbytes", None) + if nb is not None and not callable(nb): + return int(nb) + # torch.Tensor (and look-alikes): element_size() * number-of-elements. + elem = getattr(obj, "element_size", None) + numel = getattr(obj, "nelement", None) or getattr(obj, "numel", None) + if callable(elem) and callable(numel): + return int(elem() * numel()) + # Anything supporting the buffer protocol (bytes, bytearray, array, …). + try: + return memoryview(obj).nbytes + except TypeError as exc: + raise TypeError( + "register_host_buffer: cannot infer byte size of the host buffer; pass nbytes=" + ) from exc + def _require_live_remote_buffer(self, handle: RemoteBufferHandle) -> None: if not isinstance(handle, RemoteBufferHandle): raise TypeError("expected a RemoteBufferHandle returned by Worker.remote_malloc/import") @@ -3011,6 +3340,17 @@ def _start_hierarchical(self) -> None: # noqa: PLR0912 -- three parallel fork l # lazily on first ``orch.allocate_domain`` via CTRL_COMM_INIT. chip_log_level, chip_log_info_v = _simpler_log.get_current_config() if device_ids: + # Snapshot what the chip children are about to inherit (error + # path C), captured *immediately before* the fork so a + # parent-only mapping created later by dw.init()/prewarm cannot leak + # into it. Full (lo, hi, inode) ranges: error-path-C requires a + # submitted tensor's whole byte range to be covered by one inherited + # mapping of the same inode (see _fork_range_covers). Empty when + # procfs is unavailable — callers then pass unregistered tensors + # through unvalidated. Registration does not depend on this snapshot. + fork_maps = _read_self_maps() + if fork_maps: + self._fork_maps = fork_maps for idx, dev_id in enumerate(device_ids): pid = os.fork() if pid == 0: @@ -3786,6 +4126,373 @@ def copy_from(self, dst: int, src: int, size: int, worker_id: int = 0) -> None: assert self._orch is not None self._orch.copy_from(worker_id, dst, src, size) + # ------------------------------------------------------------------ + # Post-fork host-buffer registration + # ------------------------------------------------------------------ + + def register_host_buffer(self, tensor: Any, *, nbytes: int | None = None) -> HostBufferHandle: + """Make a host ``tensor`` created after the chip children were forked + usable by a later ``run()``. + + L3 chip children are forked lazily on the first ``run()``; a host tensor + created afterwards is not in their address space, so passing it to + ``orch.submit_next_level`` would otherwise read unmapped/stale memory. + This maps a separate named shared-memory buffer into every chip child and + keeps it mapped; each subsequent ``run()`` mirrors the tensor through it + (H2D copy-in before the task, D2H copy-out after the run drains). Call + ``unregister_host_buffer`` with the returned handle to release it. + + ``tensor`` is a host buffer whose address ``_host_ptr_value`` accepts — + a torch tensor (``data_ptr()``), a raw integer address, or a ctypes + object — same contract as the other host-pointer APIs. Its byte size is + inferred when possible; pass ``nbytes`` explicitly otherwise (required + for a raw address, and for a numpy array, where you pass + ``arr.ctypes.data`` with ``nbytes=arr.nbytes``). The chips are started on + demand, so this is valid any time after ``init()``. + + This call blocks until every chip child has mapped the buffer, so once it + returns the buffer is fully visible to ``run()``. It is not thread-safe + against a concurrent ``run`` / ``register`` / ``unregister`` on the same + Worker, though: drive registration and runs from one thread (the usual + register-then-run pattern), as the L3 worker is otherwise. + """ + if self.level < 3: + raise TypeError("register_host_buffer requires a level >= 3 Worker") + if not self._initialized: + raise RuntimeError("register_host_buffer requires Worker.init() before registration") + self._start_hierarchical() + if not self._chip_shms: + raise RuntimeError("register_host_buffer requires forked chip children (none are configured)") + assert self._worker is not None + + data_ptr = self._host_ptr_value(tensor) + nbytes = int(nbytes) if nbytes is not None else self._host_nbytes(tensor) + if nbytes <= 0: + raise ValueError("register_host_buffer: tensor has no bytes") + + # Allocate the token and reserve the registry slot under the lock so + # concurrent registrations cannot collide on the counter or both pass the + # duplicate-pointer check (mirrors Worker.register's _registry_lock + # discipline). The slow broadcast runs *outside* the lock — wire-level + # concurrency is serialized at the C++ mailbox, not here. + with self._registry_lock: + if data_ptr in self._host_buf_registry: + raise ValueError(f"register_host_buffer: host pointer 0x{data_ptr:x} is already registered") + token = self._host_buf_token_counter + self._host_buf_token_counter += 1 + shm = SharedMemory(create=True, size=nbytes) + entry = _HostBufEntry( + token=token, + data_ptr=data_ptr, + nbytes=nbytes, + shm=shm, + shm_name=shm.name, + # Cached for per-run memcpy; valid until shm.close() unmaps it. + shm_base=_shm_base_addr(shm), + tensor=tensor, + ) + self._host_buf_registry[data_ptr] = entry + bisect.insort(self._host_buf_sorted_ptrs, data_ptr) + + payload = _HOST_BUF_MAP_HEADER.pack(token, data_ptr, nbytes) + shm.name.encode("utf-8") + try: + results = self._worker.broadcast_control_all( + WorkerType.NEXT_LEVEL, + int(_CTRL_MAP_HOST), + payload, + None, + timeout_s=self._py_control_timeout_s, + ) + errors = self._control_errors(list(results)) + if errors: + raise RuntimeError( + f"register_host_buffer: MAP_HOST failed on {len(errors)} chip children; first error: {errors[0]}" + ) + except BaseException: + # Roll back on any failure — partial-map errors *or* an exception + # raised by the broadcast itself (which would otherwise leak the shm): + # unmap any child that took it, drop the reservation, free the shm. + try: + self._broadcast_host_unmap(token) + finally: + with self._registry_lock: + if self._host_buf_registry.pop(data_ptr, None) is not None: + self._host_buf_sorted_ptrs.remove(data_ptr) + shm.close() + shm.unlink() + raise + + return HostBufferHandle(token=token, data_ptr=data_ptr) + + def unregister_host_buffer(self, handle: HostBufferHandle) -> None: + """Release a registration created by ``register_host_buffer``. + + Unmaps the buffer from every chip child and frees the parent shm. + Best-effort, mirroring ``unregister``: a child-side error is reported but + the parent still drops the entry so the buffer's resources are reclaimed. + """ + if not isinstance(handle, HostBufferHandle): + raise TypeError("unregister_host_buffer expects a HostBufferHandle from register_host_buffer") + # Drop the entry under the lock; do the slow unmap broadcast + shm release + # outside it (narrow-lock discipline, same as register). + with self._registry_lock: + # Match on token, not just data_ptr: a stale handle whose buffer was + # already unregistered must not drop a newer registration that happens + # to have reused the same pointer. A mismatch means this handle's + # buffer is already gone — return silently (idempotent unregister). + entry = self._host_buf_registry.get(handle.data_ptr) + if entry is None or entry.token != handle.token: + return + self._host_buf_registry.pop(handle.data_ptr, None) + self._host_buf_sorted_ptrs.remove(handle.data_ptr) + errors: list[str] = [] + try: + if self._worker is not None and getattr(self, "_hierarchical_started", False): + errors = self._broadcast_host_unmap(entry.token) + except Exception as exc: # noqa: BLE001 + # Broadcast itself failed; still free the parent shm below so it is + # never leaked, and surface the failure as a warning. + errors = [str(exc)] + finally: + try: + entry.shm.close() + entry.shm.unlink() + except FileNotFoundError: + pass + if errors: + sys.stderr.write( + f"[worker pid={os.getpid()}] WARN: unregister_host_buffer token={entry.token} " + f"failed on {len(errors)} chip children; first error: {errors[0]}\n" + ) + sys.stderr.flush() + + def _release_all_host_buffers(self) -> None: + """Unmap + free every still-registered host buffer (called from close()).""" + with self._registry_lock: + entries = list(self._host_buf_registry.values()) + self._host_buf_registry.clear() + self._host_buf_sorted_ptrs.clear() + for entry in entries: + try: + if self._worker is not None and getattr(self, "_hierarchical_started", False): + self._broadcast_host_unmap(entry.token) + except Exception: # noqa: BLE001 + # A failed unmap broadcast must not strand the parent shm. + pass + finally: + try: + entry.shm.close() + entry.shm.unlink() + except FileNotFoundError: + pass + + def _broadcast_host_unmap(self, token: int) -> list[str]: + """Broadcast _CTRL_UNMAP_HOST for ``token`` to every chip child.""" + if self._worker is None: + return [] + results = self._worker.broadcast_control_all( + WorkerType.NEXT_LEVEL, + int(_CTRL_UNMAP_HOST), + _HOST_BUF_UNMAP.pack(token), + None, + timeout_s=self._py_control_timeout_s, + ) + return self._control_errors(list(results)) + + def _stage_host_buffers_for_chip_submit(self, args: Any) -> None: + """Validate + H2D copy-in for the host tensors of one chip submit. + + Called from ``Orchestrator.submit_next_level`` on the LOCAL_CHIP path, + before the task is dispatched. For each host tensor in ``args``: + + * registered (its base, or a sub-view inside a registered buffer) → + mirror the live bytes into the buffer's shm at the matching offset and + queue the D2H copy-out for after drain; + * fork-inherited → leave it alone; + * neither → raise the actionable error (error path C). + + The copy-in is skipped when the range was already written by an earlier + task in this run: the registered *host* shm is the live child↔child medium + (across devices too — each device's HBM is independent and transient, the + rendezvous is always this host shm mapped into every child). The producer + task's runtime D2H-mirrors its device output into the shm before TASK_DONE; + the consumer task H2Ds it back. The OverlapMap keys deps on the host + address (device-agnostic) and orders the consumer's read after the + producer's write, so re-mirroring stale parent bytes would clobber that + output (see ``_classify_staged_overlap``). A NO_DEP read of such a range, + or a partial overlap, raises instead — neither has safe semantics here. + + The child rewrites registered host pointers to its own mapping; see + _rewrite_blob_host_addrs. + + Registered buffers are always staged; the unregistered-tensor reachability + check (error path C) needs the fork snapshot. When it is unavailable (no + procfs — see _read_self_maps) reachability cannot be classified, so an + unregistered tensor is passed through unvalidated with a one-time warning + (a fork-inherited tensor is the legitimate common case); error path C is + enforced only where procfs exists. + """ + for i in range(args.tensor_count()): + tensor = args.tensor(i) + if tensor.child_memory: + continue + addr = int(tensor.data) + if addr == 0: + continue + tensor_nbytes = int(tensor.nbytes()) + entry = self._find_host_buf_entry(addr, tensor_nbytes) + if entry is not None: + # The submitted tensor may be a sub-view of the registered buffer + # (addr = base + offset); mirror at the same offset. The child + # rewrite uses the same base, so its read lands at child_base + + # offset. + offset = addr - entry.data_ptr + shm_dst = entry.shm_base + offset + tag = args.tag(i) + lo, hi = addr, addr + tensor_nbytes + reads = tag in (TensorArgType.INPUT, TensorArgType.INOUT, TensorArgType.NO_DEP) + if reads: + # If an earlier task in this run already wrote this range, its + # runtime D2H-mirrored the device output into this host shm and + # the OverlapMap (keyed on the host address, device-agnostic) + # orders the consumer's read after it. Copying the stale parent + # bytes in now would clobber that output before the consumer + # reads it, so skip the copy-in for an already-produced range + # (submit order == dependency order, so the producer is recorded + # by now). + overlap = self._classify_staged_overlap(lo, hi) + if overlap == "partial": + raise RuntimeError( + f"Host tensor 0x{addr:x} (+{tensor_nbytes} B) partially overlaps a " + f"buffer written earlier in this run; per-byte copy-in is not modelled. " + f"Use matching tensor views so reads either fully reuse the producer's " + f"output or are disjoint from it." + ) + if overlap == "covered": + if tag == TensorArgType.NO_DEP: + # NO_DEP skips the OverlapMap, so the runtime never orders + # this read after the in-run producer's write — neither + # copy-in nor skip is safe. Fail loudly. + raise RuntimeError( + f"NO_DEP host tensor 0x{addr:x} (+{tensor_nbytes} B) overlaps a " + f"buffer written earlier in this run; NO_DEP carries no dependency " + f"so the read is unordered against the producer. Tag it " + f"INPUT/INOUT to order it." + ) + # producer's D2H already wrote this host shm — skip the + # stale copy-in. + else: + ctypes.memmove(shm_dst, addr, tensor_nbytes) + if tag in (TensorArgType.OUTPUT, TensorArgType.INOUT, TensorArgType.OUTPUT_EXISTING): + self._staged_output_ranges.append((lo, hi)) + # Copy-out for everything but a pure INPUT. NO_DEP is a no-publish + # existing tensor that declares no read/write direction (tensor.h), + # so mirror both ways: if only read, copy-out rewrites the bytes + # copy-in already staged (a no-op); if written, the result reaches + # the user tensor. + if tag != TensorArgType.INPUT: + self._pending_host_copyback.append((addr, shm_dst, tensor_nbytes)) + continue + # Unregistered host pointer: confirm the child can actually reach it. + # Without the fork snapshot (no procfs, e.g. macOS) reachability cannot + # be classified, so we pass the pointer through unvalidated (with a + # one-time warning): a fork-inherited tensor is the legitimate common + # case and must keep working. Error path C is therefore only enforced + # where procfs exists (Linux, including onboard); non-procfs platforms + # are sim/dev-only. + if self._fork_maps is None: + if not self._warned_no_procfs_passthrough: + self._warned_no_procfs_passthrough = True + warnings.warn( + "L3 host-tensor visibility cannot be validated on this platform " + "(/proc/self/maps unavailable, e.g. macOS): an unregistered host tensor " + "is passed through to the chip child unchecked. This is safe for a " + "fork-inherited tensor (.share_memory_() before the chips fork), but a " + "post-fork tensor that was not registered with worker.register_host_buffer() " + "is not visible to the child and will read stale/unmapped memory. " + "Register post-fork host tensors to be safe.", + UserWarning, + stacklevel=2, + ) + continue + # Build the current map view once per run (registered-only runs never + # pay this read). + if self._submit_maps is None: + self._submit_maps = _read_self_maps() + inode = _va_lookup_inode(self._submit_maps, addr) + # Reachable iff the whole [addr, addr+nbytes) range sits inside one + # mapping the children inherited at fork *with the same inode*. A + # file-backed pointer that merely shares an inode with a fork mapping + # but lives at a post-fork VA (a re-mmap of the same file) is rejected; + # so is an anonymous pointer (inode 0: heap, HeapRing, a fresh + # post-fork ``torch.empty``) whose VA was not mapped at fork. + if inode is not None and _fork_range_covers(self._fork_maps, addr, tensor_nbytes, inode): + continue + raise RuntimeError( + f"Host tensor 0x{addr:x} is not visible to the L3 chip child (created after fork, not " + "registered). Call worker.register_host_buffer(tensor) before run(), or allocate it with " + ".share_memory_() before init()." + ) + + def _find_host_buf_entry(self, addr: int, nbytes: int) -> _HostBufEntry | None: + """Registered buffer whose ``[data_ptr, data_ptr+nbytes)`` contains the + whole ``[addr, addr+nbytes)`` view, or None. Raises if a view starts + inside a registered buffer but runs past its end (would read past the + shm in the child). + + Registered buffers are distinct, non-overlapping allocations, so the only + candidate for ``addr`` is the entry with the greatest base ``<= addr`` — + found by bisecting ``_host_buf_sorted_ptrs`` (kept sorted on every + register/unregister) so this stays log-time on the per-submit hot path + rather than scanning every registration. + + Sub-view matching assumes the blob's ``Tensor.buffer.addr`` is the + contiguous base of the host buffer (``make_tensor_arg`` builds tensors with + ``start_offset == 0``); a non-zero ``start_offset`` would shift ``addr`` + and is not modelled here. + """ + idx = bisect.bisect_right(self._host_buf_sorted_ptrs, addr) - 1 + if idx < 0: + return None + # The submit path reads the registry without _registry_lock; a concurrent + # unregister between the bisect and this lookup must degrade to None, not a + # KeyError (mirrors the lock-free read of the old values() scan). + entry = self._host_buf_registry.get(self._host_buf_sorted_ptrs[idx]) + if entry is None or addr >= entry.data_ptr + entry.nbytes: + return None + if addr + nbytes > entry.data_ptr + entry.nbytes: + raise RuntimeError( + f"Host tensor 0x{addr:x} (+{nbytes} B) overruns its registered buffer " + f"0x{entry.data_ptr:x} (+{entry.nbytes} B); register a buffer at least as large." + ) + return entry + + def _classify_staged_overlap(self, lo: int, hi: int) -> str: + """How ``[lo, hi)`` relates to registered ranges already written this run. + + Returns ``"none"`` (disjoint from every staged output → safe to copy the + parent bytes in), ``"covered"`` (fully inside one staged output → the + producer fills the shm on device, skip the copy-in), or ``"partial"`` + (straddles a staged-output boundary → copy-in would clobber the produced + part while a skip would drop the fresh part; the caller raises rather than + silently corrupt). + """ + intersects = False + for o_lo, o_hi in self._staged_output_ranges: + if lo < o_hi and o_lo < hi: + intersects = True + if o_lo <= lo and hi <= o_hi: + return "covered" + return "partial" if intersects else "none" + + def _flush_host_buffer_copyback(self) -> None: + """D2H mirror (shm → tensor) for this run's registered output buffers.""" + if not self._pending_host_copyback: + return + for dst_addr, shm_base, nbytes in self._pending_host_copyback: + ctypes.memmove(dst_addr, shm_base, nbytes) + self._pending_host_copyback.clear() + # ------------------------------------------------------------------ # run — uniform entry point # ------------------------------------------------------------------ @@ -3828,6 +4535,13 @@ def run(self, callable, args=None, config=None) -> None: # poked it. self._orch._clear_error() self._orch._scope_begin() + # Reset the registered-host-buffer D2H mirror list for this run; submit + # appends the output buffers it staged, flushed below once the run drains + # cleanly. Clearing here also drops anything a prior failed + # run left behind. ``_submit_maps`` is rebuilt lazily within the run. + self._pending_host_copyback.clear() + self._staged_output_ranges.clear() + self._submit_maps = None try: callable(self._orch, args, cfg) finally: @@ -3864,6 +4578,9 @@ def run(self, callable, args=None, config=None) -> None: self._execute_pending_domain_releases() if self._live_domains: self._release_all_live_domains() + # Run drained cleanly (no exception propagated): mirror registered output + # buffers back from their shm into the user tensors. + self._flush_host_buffer_copyback() # L3+ returns None like every other worker level; per-L2-child timing # is emitted as `[STRACE]` markers from each simpler_run. return None @@ -3912,6 +4629,10 @@ def close(self) -> None: # noqa: PLR0912 -- parallel teardown for _worker + sub sys.stderr.write(f"Worker.close(): remote buffer cleanup reported error (continuing): {exc}\n") sys.stderr.flush() + # Release any host buffers the user never unregistered. Must run while + # the chip mailboxes are still usable (before _worker.close()). + self._release_all_host_buffers() + if self.level == 2: if self._chip_worker: self._chip_worker.finalize() diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/test_l3_host_buffer_registration.py b/tests/st/a2a3/tensormap_and_ringbuffer/test_l3_host_buffer_registration.py new file mode 100644 index 000000000..0b69dc02b --- /dev/null +++ b/tests/st/a2a3/tensormap_and_ringbuffer/test_l3_host_buffer_registration.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +"""L3 host-buffer registration (issue #1027). + +A host tensor created *after* the chip children are forked (lazily on the +first ``Worker.run()``) is not visible to those children: the orch fn runs in +the parent and ``orch.copy_to`` carries a raw parent VA that is unmapped (or +stale) in the child. ``Worker.register_host_buffer`` maps a named shm into +each child post-fork so a later run can copy through it. + +Covers the mechanism end-to-end (B — register a post-fork buffer, run, get the +correct result). The unregistered-tensor error path (C) is a pure host-side +classifier with no kernel/device dependency, unit-tested in +``tests/ut/py/test_worker/test_host_buffer_registration.py``. + +a2a3sim: ``register_host_buffer`` is pure host-side (POSIX shm + a control +broadcast to the forked chip children) with no platform branching, so the sim +backend exercises the full mechanism without needing a device. The +vector_example orchestration kernels exist only for a2a3. +""" + +import torch +from simpler.task_interface import ArgDirection as D +from simpler.task_interface import CallConfig, TaskArgs, TensorArgType + +from simpler_setup import SceneTestCase, make_tensor_arg, scene_test + +KERNELS_BASE = "../../../../examples/a2a3/tensormap_and_ringbuffer/vector_example/kernels" + +SIZE = 128 * 128 + + +def _golden(a: torch.Tensor, b: torch.Tensor) -> torch.Tensor: + s = a + b + return (s + 1) * (s + 2) + s + + +def _one_task_orch(chip_handle, a, b, out): + def orch_fn(orch, _args, cfg): + ta = TaskArgs() + ta.add_tensor(make_tensor_arg(a), TensorArgType.INPUT) + ta.add_tensor(make_tensor_arg(b), TensorArgType.INPUT) + ta.add_tensor(make_tensor_arg(out), TensorArgType.OUTPUT_EXISTING) + orch.submit_next_level(chip_handle, ta, cfg, worker=0) + + return orch_fn + + +@scene_test(level=3, runtime="tensormap_and_ringbuffer") +class TestPostForkHostBufferRegistration(SceneTestCase): + """Post-fork host-buffer registration on a single L3 worker (issue #1027).""" + + CALLABLE = { + "callables": [ + { + "name": "vector", + "orchestration": { + "source": f"{KERNELS_BASE}/orchestration/example_orchestration.cpp", + "function_name": "aicpu_orchestration_entry", + "signature": [D.IN, D.IN, D.OUT], + }, + "incores": [ + { + "func_id": 0, + "source": f"{KERNELS_BASE}/aiv/kernel_add.cpp", + "core_type": "aiv", + "signature": [D.IN, D.IN, D.OUT], + }, + { + "func_id": 1, + "source": f"{KERNELS_BASE}/aiv/kernel_add_scalar.cpp", + "core_type": "aiv", + "signature": [D.IN, D.OUT], + }, + { + "func_id": 2, + "source": f"{KERNELS_BASE}/aiv/kernel_mul.cpp", + "core_type": "aiv", + "signature": [D.IN, D.IN, D.OUT], + }, + ], + }, + ], + } + + CASES = [ + {"name": "post_fork_registration", "platforms": ["a2a3sim"]}, + ] + + def _force_fork(self, worker, chip_handle): + """Run once with pre-fork shared tensors so the chip children get forked.""" + a = torch.full((SIZE,), 2.0, dtype=torch.float32).share_memory_() + b = torch.full((SIZE,), 3.0, dtype=torch.float32).share_memory_() + out = torch.zeros(SIZE, dtype=torch.float32).share_memory_() + worker.run(_one_task_orch(chip_handle, a, b, out), args=None, config=CallConfig()) + assert torch.allclose(out, _golden(a, b), rtol=self.RTOL, atol=self.ATOL) + + def test_run(self, st_worker): + """Mechanism B: a host tensor created AFTER the fork and registered with + ``register_host_buffer`` is visible to the chip child and round-trips to the + correct result. + + Overrides the default ``generate_args``/``compute_golden`` flow: issue #1027 + is about *timing* — the buffer must be created AFTER the chip children are + forked, which the standard pre-fork arg-generation path cannot express. The + ``st_worker`` L3 fixture owns the worker lifecycle (build/init/close). + """ + worker = st_worker + chip_handle = type(self)._st_chip_handles["vector"] + + self._force_fork(worker, chip_handle) + + # Created AFTER the fork — invisible to the child until register maps it in. + a = torch.full((SIZE,), 5.0, dtype=torch.float32).share_memory_() + b = torch.full((SIZE,), 7.0, dtype=torch.float32).share_memory_() + out = torch.zeros(SIZE, dtype=torch.float32).share_memory_() + ha = worker.register_host_buffer(a) + hb = worker.register_host_buffer(b) + hout = worker.register_host_buffer(out) + try: + worker.run(_one_task_orch(chip_handle, a, b, out), args=None, config=CallConfig()) + assert torch.allclose(out, _golden(a, b), rtol=self.RTOL, atol=self.ATOL) + finally: + worker.unregister_host_buffer(ha) + worker.unregister_host_buffer(hb) + worker.unregister_host_buffer(hout) + + +if __name__ == "__main__": + SceneTestCase.run_module(__name__) diff --git a/tests/ut/py/test_worker/test_host_buffer_registration.py b/tests/ut/py/test_worker/test_host_buffer_registration.py new file mode 100644 index 000000000..60dc3f0e3 --- /dev/null +++ b/tests/ut/py/test_worker/test_host_buffer_registration.py @@ -0,0 +1,230 @@ +# Copyright (c) PyPTO Contributors. +# This program is free software, you can redistribute it and/or modify it under the terms and conditions of +# CANN Open Software License Agreement Version 2.0 (the "License"). +# Please refer to the License for details. You may not use this file except in compliance with the License. +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. +# See LICENSE in the root of the software repository for the full text of the License. +# ----------------------------------------------------------------------------------------------------------- +"""Host-side reachability classifier for post-fork host buffers. + +Error path C: a host tensor created *after* the chip children fork is invisible +to them unless ``register_host_buffer`` maps it in. ``submit_next_level`` calls +``_stage_host_buffers_for_chip_submit`` *before any dispatch*; for an +unregistered post-fork tensor it must raise an actionable error rather than +silently submit a pointer the child cannot read. + +These are pure host-side unit tests: the classifier depends only on the fork +snapshot (``_fork_maps``) and ``/proc/self/maps``, so we inject that state +directly and never fork, compile a kernel, or touch a device (mirrors the +white-box style of ``test_host_worker.py``). The end-to-end registered +round-trip (mechanism B) lives in the a2a3sim scene test. +""" + +from __future__ import annotations + +import ctypes +import warnings + +import pytest +import torch +from simpler.task_interface import TaskArgs, TensorArgType +from simpler.worker import Worker, _HostBufEntry, _read_self_maps, _va_lookup_inode + +from simpler_setup import make_tensor_arg + +_SIZE = 128 * 128 +# Element count whose float32 allocation (64 MiB) is above glibc's max dynamic +# mmap threshold (32 MiB on 64-bit), so it is always its own anonymous mmap (a +# fresh post-fork VA), never served from a fork-time heap arena. +_BIG_ELEMS = 16 * 1024 * 1024 + +# Tests that inject a *real* fork snapshot (or read a tensor's real inode) only +# make sense where /proc/self/maps exists. On a non-procfs platform (macOS CI) +# _read_self_maps() is [], so skip them; the no-procfs behaviour itself is +# covered by test_passes_through_without_procfs. +_requires_procfs = pytest.mark.skipif(not _read_self_maps(), reason="/proc/self/maps unavailable") + + +def _classifier_worker(*, fork_maps): + """A ``Worker`` with only the classifier state populated. + + ``_stage_host_buffers_for_chip_submit`` reads nothing else, so we bypass the + real init/fork and inject the fork snapshot (sorted ``(lo, hi, inode)`` + ranges, or ``None`` for "no procfs") by hand. + """ + w = Worker.__new__(Worker) + w._host_buf_registry = {} + w._host_buf_sorted_ptrs = [] + w._fork_maps = fork_maps + w._submit_maps = None + w._pending_host_copyback = [] + w._warned_no_procfs_passthrough = False + return w + + +def _one_input_arg(tensor): + ta = TaskArgs() + ta.add_tensor(make_tensor_arg(tensor), TensorArgType.INPUT) + return ta + + +class TestErrorPathCRejection: + """Unregistered post-fork tensors are refused at submit, before dispatch.""" + + def test_post_fork_anonymous_tensor_rejected(self): + # inode 0 (anonymous mmap); VA not in the (empty) fork snapshot. + w = _classifier_worker(fork_maps=[]) + anon = torch.zeros(_BIG_ELEMS, dtype=torch.float32) + with pytest.raises(RuntimeError, match="register_host_buffer"): + w._stage_host_buffers_for_chip_submit(_one_input_arg(anon)) + + def test_post_fork_shared_tensor_rejected(self): + # File-backed (share_memory_) tensor whose inode was not inherited at fork. + w = _classifier_worker(fork_maps=[]) + shared = torch.zeros(_SIZE, dtype=torch.float32).share_memory_() + with pytest.raises(RuntimeError, match="register_host_buffer"): + w._stage_host_buffers_for_chip_submit(_one_input_arg(shared)) + + @_requires_procfs + def test_fork_inherited_tensor_passes(self): + # The legitimate common case: a tensor whose real mapping is in the fork + # snapshot (so the child inherited the exact VA + inode) is accepted + # without raising. Guards against the coverage check over-rejecting. + t = torch.zeros(_SIZE, dtype=torch.float32) + w = _classifier_worker(fork_maps=_read_self_maps()) + w._stage_host_buffers_for_chip_submit(_one_input_arg(t)) # no raise + + @_requires_procfs + def test_same_inode_other_va_rejected(self): + # Regression for the inode-only gap: a file-backed tensor whose inode WAS + # present at fork but at a *different* VA (a post-fork re-mmap of the same + # file) must still be rejected — the child never inherited this VA, so + # matching the inode alone is not enough. The old inode-membership check + # accepted it; the range-coverage check rejects it. + shared = torch.zeros(_SIZE, dtype=torch.float32).share_memory_() + addr = shared.data_ptr() + inode = _va_lookup_inode(_read_self_maps(), addr) + assert inode and inode != 0, "share_memory_ tensor must be file-backed" + # Same inode, but a VA range entirely below the tensor — does not cover it. + fork_maps = [(addr - 0x200000, addr - 0x100000, inode)] + w = _classifier_worker(fork_maps=fork_maps) + with pytest.raises(RuntimeError, match="register_host_buffer"): + w._stage_host_buffers_for_chip_submit(_one_input_arg(shared)) + + def test_passes_through_without_procfs(self): + # _fork_maps is None => no fork snapshot (no procfs, e.g. macOS): + # reachability cannot be classified, so an unregistered tensor is passed + # through unvalidated rather than rejected — a fork-inherited tensor is the + # legitimate common case and must keep working. Error path C is only + # enforced where procfs exists. The pass-through emits a one-time warning so + # the caller knows visibility went unverified. Regression guard for macOS. + w = _classifier_worker(fork_maps=None) + anon = torch.zeros(_BIG_ELEMS, dtype=torch.float32) + with pytest.warns(UserWarning, match="visibility cannot be validated"): + w._stage_host_buffers_for_chip_submit(_one_input_arg(anon)) # no raise + # Latched: a second pass-through is silent. + with warnings.catch_warnings(): + warnings.simplefilter("error") # any warning would raise + w._stage_host_buffers_for_chip_submit(_one_input_arg(anon)) + + +def _registered_worker(entry): + """A ``Worker`` with only the registered-buffer staging state populated. + + The registered path in ``_stage_host_buffers_for_chip_submit`` returns before + the fork-snapshot check, so it reads only ``_host_buf_registry``, + ``_host_buf_sorted_ptrs``, ``_staged_output_ranges`` and + ``_pending_host_copyback``. + """ + w = Worker.__new__(Worker) + w._host_buf_registry = {entry.data_ptr: entry} + w._host_buf_sorted_ptrs = [entry.data_ptr] + w._staged_output_ranges = [] + w._pending_host_copyback = [] + return w + + +def _entry_for(parent, shm_buf): + """A registered entry mapping ``parent``'s VA onto a caller-owned ``shm_buf`` + (a ctypes buffer standing in for the child-visible shm).""" + return _HostBufEntry( + token=1, + data_ptr=parent.data_ptr(), + nbytes=parent.numel() * parent.element_size(), + shm=None, # type: ignore[arg-type] # staging reads only shm_base + shm_name="", + shm_base=ctypes.addressof(shm_buf), + tensor=parent, + ) + + +def _arg(tensor, tag): + ta = TaskArgs() + ta.add_tensor(make_tensor_arg(tensor), tag) + return ta + + +class TestInRunProducerConsumer: + """Copy-in must not clobber a buffer an earlier task in the same run wrote. + + The registered shm is the live child↔child medium: producer task A writes its + device output into the shm, the OverlapMap orders consumer task B's read after + it, and B's host-side copy-in (stale parent → shm) is the only thing that can + race that output. Submit order == dependency order, so by the time B is staged + A's range is already recorded. + """ + + def test_consumer_input_does_not_clobber_producer_output(self): + # Stale parent = 7.0; shm pre-filled with 3.0 stands in for A's device + # output already mirrored into the shm. + parent = torch.full((_SIZE,), 7.0, dtype=torch.float32) + shm_buf = (ctypes.c_float * _SIZE)(*([3.0] * _SIZE)) + w = _registered_worker(_entry_for(parent, shm_buf)) + w._stage_host_buffers_for_chip_submit(_arg(parent, TensorArgType.OUTPUT)) + w._stage_host_buffers_for_chip_submit(_arg(parent, TensorArgType.INPUT)) + # copy-in skipped: the producer's output survives (3.0), not stale 7.0. + assert list(shm_buf[:4]) == [3.0, 3.0, 3.0, 3.0] + + def test_plain_input_still_copies_in(self): + # No earlier task wrote the range, so the parent snapshot must reach the shm. + parent = torch.full((_SIZE,), 7.0, dtype=torch.float32) + shm_buf = (ctypes.c_float * _SIZE)(*([3.0] * _SIZE)) + w = _registered_worker(_entry_for(parent, shm_buf)) + w._stage_host_buffers_for_chip_submit(_arg(parent, TensorArgType.INPUT)) + assert list(shm_buf[:4]) == [7.0, 7.0, 7.0, 7.0] + + def test_no_dep_over_produced_range_raises(self): + # NO_DEP skips the OverlapMap, so the read is unordered against the + # in-run producer — neither copy-in nor skip is safe. + parent = torch.full((_SIZE,), 7.0, dtype=torch.float32) + shm_buf = (ctypes.c_float * _SIZE)() + w = _registered_worker(_entry_for(parent, shm_buf)) + w._stage_host_buffers_for_chip_submit(_arg(parent, TensorArgType.OUTPUT)) + with pytest.raises(RuntimeError, match="NO_DEP"): + w._stage_host_buffers_for_chip_submit(_arg(parent, TensorArgType.NO_DEP)) + + def test_partial_overlap_raises(self): + # Producer writes [0, 64); consumer reads [32, 96) — straddles the + # boundary, so neither a full copy-in nor a full skip is correct. + parent = torch.full((256,), 7.0, dtype=torch.float32) + shm_buf = (ctypes.c_float * 256)() + w = _registered_worker(_entry_for(parent, shm_buf)) + w._stage_host_buffers_for_chip_submit(_arg(parent[0:64], TensorArgType.OUTPUT)) + with pytest.raises(RuntimeError, match="partially overlaps"): + w._stage_host_buffers_for_chip_submit(_arg(parent[32:96], TensorArgType.INPUT)) + + +class TestReadSelfMaps: + def test_returns_empty_when_procfs_absent(self, monkeypatch): + # _read_self_maps must degrade to [] (not FileNotFoundError) where + # /proc/self/maps does not exist — the bug that broke the macOS jobs. + real_open = open + + def fake_open(path, *args, **kwargs): + if str(path) == "/proc/self/maps": + raise FileNotFoundError(2, "No such file or directory", "/proc/self/maps") + return real_open(path, *args, **kwargs) + + monkeypatch.setattr("builtins.open", fake_open) + assert _read_self_maps() == []