dsv4 MoE: gate perf + drop EP=1, inline dispatch/combine#660
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis PR removes the single-card DeepSeek-V4 MoE EP=1 path entirely (deleting combine.py and moe_ep1-related code), restricts EP world size to {2,4,8}, and introduces an in-file distributed dispatch/combine implementation in moe.py. Separately, gate.py's tiling constants, gate matmul logic, and CLI options are refactored, and EP_ROUTING_GLOBAL is removed from config.py. ChangesEP1 removal and distributed MoE dispatch/combine
Estimated code review effort: 4 (Complex) | ~60 minutes Gate routing refactor
Estimated code review effort: 4 (Complex) | ~45 minutes Sequence Diagram(s)sequenceDiagram
participant Rank as dispatch/combine (rank)
participant Peer as Remote rank
participant Window as Receive window / routed_y_buf
Rank->>Rank: build routing histogram
Rank->>Peer: notify(count_done, exact counts)
Rank->>Peer: wait(count_done)
Rank->>Window: remote-store x/scale/weights/r_route
Rank->>Peer: notify(data_done)
Window->>Rank: copy rows into output tensors
Rank->>Window: remote-put recv_y keyed by r_route
Rank->>Peer: notify(combine_done)
Rank->>Peer: wait(combine_done)
Window->>Rank: reduce TOPK contributions (FP32)
Rank->>Rank: write BF16 ffn_out
Possibly related PRs
Suggested labels: Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request removes the legacy single-card (EP=1) execution path and its associated standalone files (combine.py and dispatch.py), refactoring the DeepSeek-V4 MoE implementation to focus entirely on multi-rank distributed execution. The dispatch and combine kernels are now inlined directly into moe.py, and the gate.py module has been optimized by transitioning from pl.parallel to pl.spmd loops and parallelizing the gate matmul over expert columns. Feedback on the changes suggests further optimizing the gate matmul loop in gate.py by replacing pl.range with pl.pipeline to enable pipelining of global memory loads on Ascend hardware.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| for kb in pl.range(0, D // GATE_D_TILE): | ||
| gd_kd = kb * GATE_D_TILE | ||
| gd_x = x_norm_gate_buf[t1 : t1 + GATE_M_TILE, gd_kd : gd_kd + GATE_D_TILE] | ||
| gd_w = gate_w[n0 : n0 + GATE_N_TILE, gd_kd : gd_kd + GATE_D_TILE] | ||
| if gd_kd == 0: | ||
| gate_logits_tile = pl.matmul(gd_x, gd_w, out_dtype=pl.FP32, b_trans=True) | ||
| else: | ||
| gate_logits_tile = pl.matmul_acc(gate_logits_tile, gd_x, gd_w, b_trans=True) |
There was a problem hiding this comment.
The gate matmul loop currently uses pl.range to iterate over the hidden dimension chunks. On CANN/Ascend hardware, using pl.pipeline with a conditional check inside the loop is preferred over a standard range loop. This allows the compiler to pipeline the global memory loads (gd_x and gd_w) and overlap them with the matrix multiplication computation, significantly improving performance on the critical path.
| for kb in pl.range(0, D // GATE_D_TILE): | |
| gd_kd = kb * GATE_D_TILE | |
| gd_x = x_norm_gate_buf[t1 : t1 + GATE_M_TILE, gd_kd : gd_kd + GATE_D_TILE] | |
| gd_w = gate_w[n0 : n0 + GATE_N_TILE, gd_kd : gd_kd + GATE_D_TILE] | |
| if gd_kd == 0: | |
| gate_logits_tile = pl.matmul(gd_x, gd_w, out_dtype=pl.FP32, b_trans=True) | |
| else: | |
| gate_logits_tile = pl.matmul_acc(gate_logits_tile, gd_x, gd_w, b_trans=True) | |
| for gd_kd in pl.pipeline(0, D, GATE_D_TILE, stage=2): | |
| gd_x = x_norm_gate_buf[t1 : t1 + GATE_M_TILE, gd_kd : gd_kd + GATE_D_TILE] | |
| gd_w = gate_w[n0 : n0 + GATE_N_TILE, gd_kd : gd_kd + GATE_D_TILE] | |
| if gd_kd == 0: | |
| gate_logits_tile = pl.matmul(gd_x, gd_w, out_dtype=pl.FP32, b_trans=True) | |
| else: | |
| gate_logits_tile = pl.matmul_acc(gate_logits_tile, gd_x, gd_w, b_trans=True) |
References
- In PyPTO on CANN/Ascend hardware, keeping a conditional check (e.g., if db == 0) inside a pl.pipeline loop can be preferred over peeling the first iteration. This allows the first chunk's load to overlap with the rest of the pipeline rather than running as an un-pipelined prologue, provided the compiler successfully pipelines the loop-index branch.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@models/deepseek/v4/moe.py`:
- Around line 178-206: The dynamic routing in moe.py uses `RECV_MAX` as a
per-expert slot stride, but `payload_push` can still compute a `slot` from
`cursor[bucket] + my_slot_at_dst[bucket]` that exceeds the allocated capacity.
Add an explicit capacity check or hard fail in the `payload_push` loop before
computing `row` so `slot` never crosses `RECV_MAX`, and apply the same guard in
the corresponding remote store/copy path referenced by the same routing logic.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 8b087a29-a20d-46c0-ace3-d4e47ddb406f
📒 Files selected for processing (5)
models/deepseek/v4/combine.pymodels/deepseek/v4/config.pymodels/deepseek/v4/dispatch.pymodels/deepseek/v4/gate.pymodels/deepseek/v4/moe.py
💤 Files with no reviewable changes (3)
- models/deepseek/v4/dispatch.py
- models/deepseek/v4/combine.py
- models/deepseek/v4/config.py
| for e in pl.range(N_LOCAL): | ||
| acc = pl.const(0, pl.INT32) | ||
| for s in pl.range(N_RANKS): | ||
| acc = acc + pl.read(pub_counts, [s * N_RANKS + my_rank, e]) | ||
| pl.write(recv_count_out, [e, 0], acc) | ||
|
|
||
| # ---------- payload_push: 4 channels per (t, k) ---------- | ||
| cursor = pl.array.create(N_RANKS * N_LOCAL, pl.INT32) | ||
| for d in pl.range(N_RANKS): | ||
| for e in pl.range(N_LOCAL): | ||
| cursor[d * N_LOCAL + e] = 0 | ||
|
|
||
| # Pad tiles, zero-initialised once; only column 0 is overwritten per | ||
| # push (UB tile + remote_store is the proven path for runtime-computed | ||
| # scalars — a GM pack table written by scalar pl.write corrupts). | ||
| scale_tile = pl.tile.full([1, W_PAD], dtype=pl.FP32, value=0.0) | ||
| w_tile = pl.tile.full([1, W_PAD], dtype=pl.FP32, value=0.0) | ||
| idx_tile = pl.tile.full([1, IDX_PAD], dtype=pl.INT32, value=0) | ||
|
|
||
| for t in pl.range(active_tokens): | ||
| for k in pl.range(TOPK): | ||
| eid = pl.read(indices, [t, k]) | ||
| dst = eid // N_LOCAL | ||
| loc_e = eid - dst * N_LOCAL | ||
| bucket = dst * N_LOCAL + loc_e | ||
| cur_val = cursor[bucket] | ||
| slot_off = my_slot_at_dst[bucket] | ||
| slot = slot_off + cur_val | ||
| row = loc_e * RECV_MAX + slot |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🔴 Critical | ⚡ Quick win
Guard RECV_MAX before using dynamic route counts as slots.
A skewed gate can make acc/slot exceed RECV_MAX; then row = loc_e * RECV_MAX + slot writes into another expert’s region or past the distributed window. Add a hard capacity contract here: provision worst-case capacity, enforce gate capacity, or fail before the remote stores and scalar copies.
Also applies to: 258-264
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@models/deepseek/v4/moe.py` around lines 178 - 206, The dynamic routing in
moe.py uses `RECV_MAX` as a per-expert slot stride, but `payload_push` can still
compute a `slot` from `cursor[bucket] + my_slot_at_dst[bucket]` that exceeds the
allocated capacity. Add an explicit capacity check or hard fail in the
`payload_push` loop before computing `row` so `slot` never crosses `RECV_MAX`,
and apply the same guard in the corresponding remote store/copy path referenced
by the same routing logic.
…tiles Convert the four pl.parallel+pl.at loops (ffn_norm / x_norm_quant / gate / route) to pl.spmd. Fan the gate matmul over expert columns (GATE_N_TILE=16) so each spmd block lands on its own cube core; move the [N_EXPERTS,SCORE_PAD) pad-init to a separate gate_pad_init scope since a conditional write to an internal buffer inside the spmd body breaks orch return mapping. x_norm_quant reads the bf16 x_norm output instead of the fp32 buffer (same values, half the bytes) and widens QUANT_TILE 32->256; ffn_norm widens D_TILE 128->256. decode_layer a2a3 ep2 (layer 10 / CSA, sort routing) swimlane, per layer: gate 69->~18us, x_norm_quant 43->~12us, ffn_norm 17.7->~11.4us. Standalone gate.py (hash + sort) and decode_layer x_next/kv_cache PASS. Also: default --layer-id 10 (matches decode_layer's CSA layer) and switch --enable-l2-swimlane to the int 0/1/2 form.
- Remove dispatch.py and combine.py; move the distributed dispatch and combine definitions into moe.py, renaming N_LOCAL_EXPERTS/EP_WORLD_SIZE to moe's N_LOCAL/N_RANKS and adding the X_STAGE_ROWS constant. - Delete the single-card EP=1 path (moe_ep1, moe_ep1_test, golden_moe_ep1, build_tensor_specs_ep1) and the *_ep1 dispatch/combine variants; --ep choices are now (2, 4, 8) and __main__ only runs the distributed l3_moe path. - Remove the now-dead EP_ROUTING_GLOBAL flag: gate.py always routes over the full global expert set (N_EXPERTS = M.n_routed_experts), which moe shrinks to 32*EP before import, so the distributed path is unchanged. - Drop the dead EP_RANK / EXPERTS_START_IDX constants left over from the removed EP=1 dispatch.
Summary
pl.spmd, fan the gate matmul across experts (1 core/block), read bf16x_normwithQUANT_TILE=256, and widen the ffn-normD_TILEto 256.dispatch.pyandcombine.py; move the distributeddispatch/combinedefinitions intomoe.py(renamed to moe'sN_LOCAL/N_RANKS, addedX_STAGE_ROWS).moe_ep1,moe_ep1_test,golden_moe_ep1,build_tensor_specs_ep1, the*_ep1dispatch/combine variants);--epchoices are now(2, 4, 8)and__main__only runs the distributedl3_moepath.EP_ROUTING_GLOBALflag: gate always routes over the full global expert set (N_EXPERTS = M.n_routed_experts), which moe shrinks to32*EPbefore import, so the distributed path is bit-for-bit unchanged.Related Issues
N/A