Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
728 changes: 728 additions & 0 deletions PR_1090_CHIP_CALLABLE_ASYNC_REVISED_PLAN.md

Large diffs are not rendered by default.

135 changes: 111 additions & 24 deletions docs/callable-identity-registration.md
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,42 @@ The L3+ orchestration function captures `CallableHandle` values and passes
them to `orch.submit_next_level` or `orch.submit_sub`. Hashid does not add a
new top-level registration requirement for `Worker.run`.

Async worker APIs are level-specific:

```python
# L2 direct chip worker
handle = worker.register(chip_callable)
pending_handle = worker.register_async(chip_callable)
run_handle = worker.run_async(handle, args, config)
unregister_handle = worker.unregister_async(handle)

# L3+ orchestration worker
handle = worker.register(chip_callable)
pending_handle = worker.register_async(chip_callable)
dag_handle = worker.run_async(orch_fn, args, config)
unregister_handle = worker.unregister_async(handle)
```

For L2, `run_async(handle, ...)` submits one chip callable run to the local
chip run lane and returns a run completion handle. Synchronous `run(handle,
...)` submits to the same lane and waits, so sync and async L2 runs share one
ordering queue.

For L3+, `run_async(orch_fn, ...)` is async relative to the caller but runs the
same orchestration DAG body as `run(orch_fn, ...)`. The current DAG scheduler
inside the run remains synchronous: `orch.submit_next_level(...)` uses the
existing ready-task dispatch path, and the DAG run drains before its handle
completes. Synchronous `run(orch_fn, ...)` submits to the same DAG run queue
and waits, so a later sync run cannot overtake an earlier async run.

`register_async(...)` and `unregister_async(...)` are chip-callable APIs in the
current implementation. Passing a Python callable, `RemoteCallable`, or a
non-chip handle raises `TypeError`. Generic `register(...)` remains
synchronous and still supports the existing callable kinds; for `ChipCallable`
it delegates to `register_async(...).wait()`. Generic `unregister(...)`
remains synchronous; for chip handles it delegates to
`unregister_async(...).wait()`.

### Registry Contract

Each target namespace records local identity state. In the current local
Expand Down Expand Up @@ -435,22 +471,30 @@ when the refcount reaches zero.
Current local slot reuse rule:

- A child resolves `hashid -> local_slot` immediately before execution.
- Each endpoint has one local mailbox operation in flight at a time.
- Parent-side dispatch and control operations to the same endpoint are
serialized by the per-WorkerThread mailbox lock.
- Final unregister removes the hashid from resolution and releases the private
slot only after the current mailbox operation has completed.
- A run submit holds an in-flight reference to the resolved private slot before
it enters the run lane.
- Async run/register/unregister controls may overlap a chip child task after
the child has copied that task's args and published `TASK_RUNNING`.
- Memory and CommDomain controls still wait for an in-flight task dispatch to
finish before claiming the mailbox.
- Final chip unregister tombstones the identity, rejects new runs through that
public handle, and releases executable state only when the slot is both
tombstoned and has no in-flight runs.

This rule prevents stale slot reuse without exposing any extra public field.
A future remote or multi-flight control channel must add explicit
A future remote or multi-flight control channel must preserve the same
`INSTALLED` / `TOMBSTONED` / `FAILED` target states, sequence numbers, and
in-flight user draining before it can reuse private slots safely.

### Registration Failure Contract

Registration remains synchronous and whole-scope. For a given
`target_namespace`, the scope is every active child endpoint in the current
`Worker`'s corresponding resolver domain at register start.
Synchronous registration remains whole-scope. For a given `target_namespace`,
the scope is every active child endpoint in the current `Worker`'s
corresponding resolver domain at register start. `register_async` for a
`ChipCallable` returns after the async prepare/register request has been
submitted; its `RegisterHandle.wait()` is the whole-scope completion barrier
that either returns the public `CallableHandle` or raises the registration
error.

1. Parent builds the canonical descriptor and computes the `hashid`.
2. Parent allocates an unpublished parent-side registration entry and handle
Expand All @@ -461,7 +505,8 @@ Registration remains synchronous and whole-scope. For a given
5. Target installs `hashid -> local_slot`, or increments `ref_count` when the
same descriptor and payload are already installed.
6. Parent returns the `CallableHandle` only after every target in the scope
reports success.
reports success. For `register_async`, this happens from
`RegisterHandle.wait()`.

If any target fails or times out:

Expand All @@ -488,10 +533,18 @@ Parent-side scheduling assumes the handle's `hashid` is installed on every
active target in its registration scope. Dispatch choices are constrained by
the handle namespace, submit-time affinity, and tensor/buffer accessibility.
Submit-time live validation is a preflight check only. It does not pin the
target identity through later drain or child dispatch. Callers must not
concurrently unregister a handle while `Worker.run()` or any in-flight task may
submit or use that handle; wait for the relevant run/drain to return before
unregistering it.
target identity through later drain or child dispatch. For chip callables, a
run submit pins the resolved slot before enqueueing work. A later
`unregister_async(handle)` removes that public handle from live resolution and
tombstones the slot only when the final public reference is removed. Runs that
already acquired the slot continue to completion; new runs through the
tombstoned handle are rejected. `UnregisterHandle.wait()` is the cleanup
barrier for target-local executable state, not a run result barrier. Call
`RunHandle.wait()` separately when the caller needs run timing or run errors.

Non-chip callable unregister remains synchronous in this PR. Async unregister
for Python callables or remote task dispatcher handles is not part of the
current contract.

Parent-side `TaskSlotState` stores the submitted callable's stable identity:
the 32-byte `sha256` digest plus parent-side scheduling metadata such as
Expand Down Expand Up @@ -597,15 +650,22 @@ Target unregister sequence:

1. Decrement the target-local refcount for `hashid`.
2. If the refcount remains nonzero, keep the mapping installed.
3. If the refcount reaches zero, stop new local resolutions from `hashid` to
private slot.
4. Clear executable state.
5. Release the private slot for reuse.
6. Remove or archive the `hashid` entry.
3. If the refcount reaches zero, remove the public digest resolution and mark
the private slot tombstoned.
4. If no run holds that private slot, call the target-local native unregister
path immediately and then release the slot for reuse.
5. If any run already holds the private slot, leave executable state installed
and attach the unregister state to that tombstone.
6. Each run completion decrements the private slot's in-flight count. The
completion that observes `tombstoned && inflight == 0` performs the native
unregister/free and completes the unregister state.

This sequence is the concrete unregister form of the target-local slot reuse
rule for the current single-flight local mailbox. A future multi-flight target
must insert a tombstone/drain phase before clearing executable state.
rule. Final unregister is non-blocking at submit time for chip callables:
`unregister_async(handle)` returns an `UnregisterHandle` after the tombstone is
submitted, and `UnregisterHandle.wait()` is the cleanup barrier for native
unregister/free. It does not replace `RunHandle.wait()` when the caller needs
the run's timing or error result.

If failed-register cleanup cannot be confirmed, the parent must not dispatch
that hashid to the uncertain target again during the current Worker lifetime.
Expand All @@ -625,9 +685,18 @@ The implementation provides canonical descriptor and hash helpers:
The public API is handle-based:

- `Worker.register` returns `CallableHandle`.
- `Worker.register_async` returns `RegisterHandle` for `ChipCallable`
targets only; non-chip targets raise `TypeError`.
- `Worker.unregister_async` returns `UnregisterHandle` for chip handles only;
non-chip handles raise `TypeError`.
- `CallableHandle` validation rejects forged, stale, mutated, or
wrong-namespace handles.
- L3+ `Worker.run(raw_orch_fn, ...)` behavior is unchanged.
- L2 `Worker.run_async(handle, ...)` submits to the local chip run lane.
- L3+ `Worker.run_async(raw_orch_fn, ...)` submits one whole DAG run to the
Worker-level DAG run lane.
- L3+ `Worker.run(raw_orch_fn, ...)` behavior is unchanged except that it
delegates to `run_async(...).wait()` so sync and async DAG runs share one
ordering queue.
- Integer execution slots remain private to the target child process.

Each target owns identity state:
Expand All @@ -649,10 +718,23 @@ Local mailbox task frames are hashid-based:

- The local mailbox task payload is prefixed with the 32-byte `sha256` digest.
- The existing `TaskArgs` blob follows the digest prefix.
- Chip and sub child loops resolve `hashid -> local_slot` immediately before
execution.
- Chip child loops resolve `hashid -> local_slot`, copy the args blob, pin the
private slot, enqueue work on their run lane, and publish `TASK_RUNNING`.
- Sub and Worker-child loops keep the historical synchronous path and publish
`TASK_DONE` after their Python callable or inner `Worker.run()` returns.
- `ChipWorker.run(local_slot)` remains private to the child process.

Async control overlap is chip-specific:

- `CTRL_REGISTER_ASYNC`, `CTRL_WAIT_REGISTER`, `CTRL_RUN_ASYNC`,
`CTRL_WAIT_RUN`, `CTRL_UNREGISTER_ASYNC`, and `CTRL_WAIT_UNREGISTER` may
claim the mailbox while the child is in `TASK_RUNNING`.
- Memory and CommDomain controls still wait for the task to publish
`TASK_DONE` before claiming the mailbox.
- The control command restores the previous `TASK_RUNNING` state after
`CONTROL_DONE`, so the parent dispatch path can keep waiting for the
original task completion.

Register failure cleanup is conservative:

- Handles are not published until every target in scope installed the hashid.
Expand Down Expand Up @@ -681,6 +763,11 @@ Required tests:
| Pre-start register | Startup hashid mappings are visible after ready. |
| Partial register failure | No public handle is returned. |
| Cleanup uncertainty | Unconfirmed cleanup blocks that target/hashid pair. |
| L2 run queue | Sync and async direct chip runs share FIFO ordering. |
| L3 DAG run queue | Sync and async DAG runs share FIFO ordering. |
| Async API type guard | Non-chip async register/unregister inputs raise `TypeError`. |
| Task/control overlap | Chip `TASK_RUNNING` permits async register/run/unregister controls. |
| Deferred unregister | Final unregister tombstones before native free. |
| Unregister cleanup | Hashid resolution stops before final slot cleanup. |
| Unsupported kind | Target rejects unsupported kind before install. |
| Hashid format fuzz | Bad prefix, length, or hex encoding is rejected. |
Expand Down
14 changes: 9 additions & 5 deletions docs/hierarchical_level_runtime.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,15 @@ See [scheduler.md](scheduler.md) for the dispatch loop and coordination.
The **execution layer**. `WorkerManager` holds two pools of `WorkerThread`s
(next-level pool and sub pool). Each `WorkerThread` owns one std::thread that
encodes `(callable, config, args_blob)` into a `MAILBOX_SIZE`-byte shared
memory region, signals the pre-forked Python child, and spin-polls
`TASK_DONE`, returning an explicit completion outcome to the Scheduler.
memory region, signals the pre-forked Python child, and waits for
`TASK_DONE`, returning an explicit completion outcome to the Scheduler.
Chip children may first publish `TASK_RUNNING` after copying the payload and
enqueueing the run on their child-local run lane; selected async controls can
use the mailbox during that running window.

- Next-level (chip) children run `_chip_process_loop`, which constructs a
`ChipWorker` and dispatches each kernel through it.
`ChipWorker`, owns child-local run/register lanes, and dispatches each
kernel through the run lane.
- SUB children run `_sub_worker_loop`, which decodes the args blob into a
`TaskArgs` and calls the registered Python callable as `fn(args)`. There
is no C++ `SubWorker` class — SUB workers exist only as a worker-type
Expand Down Expand Up @@ -149,8 +153,8 @@ what flows through `ChipWorker::run`.
│ │ pop ready_queue
│ │ pick idle WorkerThread
│ │ wt.dispatch(slot_id) ──────► WorkerThread
│ │ encode mailbox → spin-poll TASK_DONE
│ │ (blocking; child runs the kernel)
│ │ encode mailbox → wait TASK_DONE
│ │ (chip child may publish TASK_RUNNING)
│ │◄── completion_queue ────── on_complete_(completion)
│ │ on_task_complete:
│ │ success → COMPLETED
Expand Down
7 changes: 5 additions & 2 deletions docs/remote-l3-worker-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,11 @@ Relevant code paths:
- `_child_worker_loop()` runs a nested `Worker` child via shm mailbox.
- `_run_chip_main_loop()` handles task and control mailbox states.
- `src/common/hierarchical/worker_manager.{h,cpp}`
- `WorkerThread` owns one local mailbox and blocks until `TASK_DONE`.
- Control commands share the same mailbox and serialize on `mailbox_mu_`.
- `WorkerThread` owns one local mailbox and waits until `TASK_DONE`.
- The mailbox lock serializes payload writes and task acknowledgement.
After a chip child publishes `TASK_RUNNING`, selected async controls can
temporarily claim the same mailbox and restore `TASK_RUNNING` after
`CONTROL_DONE`.
- Errors are reported through `MAILBOX_OFF_ERROR` and
`MAILBOX_OFF_ERROR_MSG`.
- `src/common/hierarchical/orchestrator.{h,cpp}`
Expand Down
40 changes: 24 additions & 16 deletions docs/task-flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ to C++:

| Context | Namespace | How it's consumed |
| ------- | --------- | ----------------- |
| `w3.submit_next_level(handle, …)` dispatched to a chip child | `LOCAL_CHIP` | child resolves digest to its private chip slot, then calls `ChipWorker::run(local_slot, …)` |
| `w3.submit_next_level(handle, …)` dispatched to a chip child | `LOCAL_CHIP` | child resolves digest to its private chip slot, copies args, and enqueues the run on its chip run lane |
| `w4.submit_next_level(handle, …)` dispatched to an L3 `Worker` child | `LOCAL_PYTHON` | child resolves digest to an orchestration function and calls `inner_worker.run(orch_fn, …)` |
| remote `w4.submit_next_level(handle, …)` dispatched to remote L3 | `REMOTE_TASK_DISPATCHER` | remote endpoint resolves digest in its dispatcher registry and calls its embedded L3 Worker |
| `w3.submit_sub(handle, …)` dispatched to a SUB child | `LOCAL_PYTHON` | child resolves digest to a Python callable and calls `fn(args)` |
Expand Down Expand Up @@ -188,7 +188,8 @@ View does **not** own memory. Valid for the duration of a single
│ child decodes header → builds TaskArgsView over the blob bytes
child resolves digest -> local slot
ChipWorker::run(local_slot, view, config) (in the forked child)
child copies args and enqueues run_prepared_from_blob(local_slot, ...)
on the child-local chip run lane

│ (L2 ABI edge)
Expand Down Expand Up @@ -381,11 +382,17 @@ reclaim independently of outer-scope tasks. See

## 8. Data flow on completion

When the child finishes the kernel, it writes `TASK_DONE` to the mailbox;
`LocalMailboxEndpoint::run` exits its spin-poll, reads the mailbox error
fields, and returns a `WorkerCompletion`. `MAILBOX_OFF_ERROR == 0` maps to
success; a non-zero child error maps to task failure. The parent
`WorkerThread` pushes that completion onto `Scheduler::completion_queue_`.
For chip children, the child copies the task payload out of the mailbox,
pins the private callable slot, enqueues the work on its chip run lane, and
publishes `TASK_RUNNING`. `LocalMailboxEndpoint::run` still waits until the
same mailbox reaches `TASK_DONE`. When the chip run lane finishes the kernel,
it writes `TASK_DONE`; the parent reads the mailbox error fields and returns a
`WorkerCompletion`. `MAILBOX_OFF_ERROR == 0` maps to success; a non-zero child
error maps to task failure. The parent `WorkerThread` pushes that completion
onto `Scheduler::completion_queue_`.

Sub-worker and Worker-child dispatches publish `TASK_DONE` directly after the
Python callable or inner `Worker.run()` returns.

At this point:

Expand Down Expand Up @@ -473,7 +480,7 @@ L4 parent process
| 1 | L4 parent Python | `w4.run(my_l4_orch)` → `scope_begin` → `my_l4_orch(orch4, ...)` |
| 2 | L4 `Orchestrator.submit_next_level` | the L3 callable handle digest is stored in the slot's callable identity; slot pushed to L4's ready queue |
| 3 | L4 Scheduler | pop slot; pick idle WorkerThread → the L3 child's mailbox |
| 4 | L4 WorkerThread (PROCESS) | encode `(callable digest, config, args_blob)` into mailbox; write `TASK_READY`; spin-poll |
| 4 | L4 WorkerThread (PROCESS) | encode `(callable digest, config, args_blob)` into mailbox; write `TASK_READY`; wait for completion |
| 5 | L3 child `_child_worker_loop` | wake on `TASK_READY`; read digest → child-local slot → `my_l3_orch` |
| 6 | L3 child | `inner_worker.run(my_l3_orch, args, cfg)` → `scope_begin` → `my_l3_orch(orch3, ...)` |
| 7 | L3 `Orchestrator.submit_sub` | `l3_sub_handle` digest dispatched to L3's own sub worker child |
Expand Down Expand Up @@ -524,14 +531,15 @@ Step-by-step (one chip worker):
| 2 | `Worker::run` | `scope_begin` → call `my_orch(&orch_, args.view(), cfg)` |
| 3 | `Orchestrator::submit_next_level` | `slot = ring.alloc()`; move `chip_args` into `slot.task_args`; walk tags → `tensormap.lookup(a.data)`, `tensormap.lookup(b.data)`, `tensormap.insert(c.data, slot)`; push ready |
| 4 | Scheduler thread | pop `slot`; `wt = manager.pick_idle(NEXT_LEVEL)` (WT_chip_0); `wt->dispatch(slot)` |
| 5 | WT_chip_0 parent side | encode mailbox: write reserved callable field, `config`, digest prefix, `write_blob` of task_args; set `TASK_READY`; spin-poll |
| 6 | chip_0 child process | wake on `TASK_READY`; resolve digest to local slot; `read_blob` → `view`; call `ChipWorker::run(local_slot, view, cfg)` |
| 7 | `ChipWorker::run` | assemble `ChipStorageTaskArgs` POD (memcpy view); call `pto2_run_runtime(local_slot, &chip_storage, &cfg)` |
| 8 | runtime.so | translate host ptrs → device ptrs; dispatch AICPU / AICore; write output into `c`'s shm |
| 9 | chip_0 child | `run` returns; write `TASK_DONE` |
| 10 | WT_chip_0 parent | see `TASK_DONE`; push success completion |
| 11 | Scheduler | mark slot COMPLETED; fanout release (none in this DAG); scope_end will release scope ref |
| 12 | `Worker::run` returns | user's `w3.run(...)` returns; `c` contains result in shm, visible to user |
| 5 | WT_chip_0 parent side | encode mailbox: write reserved callable field, `config`, digest prefix, `write_blob` of task_args; set `TASK_READY`; wait for completion |
| 6 | chip_0 child process | wake on `TASK_READY`; resolve digest to local slot; copy `args_blob`; enqueue run; publish `TASK_RUNNING` |
| 7 | chip run lane | call `run_prepared_from_blob(local_slot, copied_args, cfg)` |
| 8 | `ChipWorker::run` path | assemble `ChipStorageTaskArgs` POD (memcpy view); call `pto2_run_runtime(local_slot, &chip_storage, &cfg)` |
| 9 | runtime.so | translate host ptrs → device ptrs; dispatch AICPU / AICore; write output into `c`'s shm |
| 10 | chip run lane | `run` returns; release in-flight slot ref; write `TASK_DONE` |
| 11 | WT_chip_0 parent | see `TASK_DONE`; push success completion |
| 12 | Scheduler | mark slot COMPLETED; fanout release (none in this DAG); scope_end will release scope ref |
| 13 | `Worker::run` returns | user's `w3.run(...)` returns; `c` contains result in shm, visible to user |

---

Expand Down
Loading
Loading