diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index cec8c78a0..283854198 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -260,6 +260,7 @@ add_library(dflash_common STATIC src/common/moe_hybrid_storage.cpp src/common/spark_corpus.cpp src/common/moe_hybrid_ffn_eval.cpp + src/common/moe_hybrid_stream.cpp src/common/cold_ffn_cpu.cpp src/common/moe_hybrid_swap_manager.cpp src/qwen35/layer_split_forward.cpp @@ -692,6 +693,11 @@ if(DFLASH27B_TESTS) target_include_directories(bench_laguna_spark PRIVATE ${DFLASH27B_SRC_INCLUDE_DIRS}) target_link_libraries(bench_laguna_spark PRIVATE dflash_common ggml ${DFLASH27B_GGML_BACKEND_TARGET}) endif() + if(EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/test/bench_moe_stream.cpp" AND DFLASH27B_GPU_BACKEND STREQUAL "cuda") + add_executable(bench_moe_stream test/bench_moe_stream.cpp) + target_include_directories(bench_moe_stream PRIVATE ${DFLASH27B_SRC_INCLUDE_DIRS} ${CUDAToolkit_INCLUDE_DIRS}) + target_link_libraries(bench_moe_stream PRIVATE dflash_common ggml ${DFLASH27B_GGML_BACKEND_TARGET} CUDA::cudart) + endif() if(EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/test/test_laguna_daemon.cpp") add_executable(test_laguna_daemon test/test_laguna_daemon.cpp) target_include_directories(test_laguna_daemon PRIVATE ${DFLASH27B_SRC_INCLUDE_DIRS}) diff --git a/server/docs/moe_hybrid.md b/server/docs/moe_hybrid.md new file mode 100644 index 000000000..2b1ee5c90 --- /dev/null +++ b/server/docs/moe_hybrid.md @@ -0,0 +1,396 @@ +# MoE Hybrid Expert Offload + +Model-agnostic Mixture-of-Experts (MoE) hybrid offload subsystem that splits experts between GPU ("hot") and CPU ("cold") to fit large MoE models (e.g. Qwen3.5-MoE, Laguna) on consumer GPUs with limited VRAM. + +## Overview + +In a standard MoE forward pass, all experts reside on GPU. The hybrid mode instead: + +1. **Profiles** which experts are activated most frequently (routing statistics). +2. **Places** the most-used experts on GPU (hot), the rest on CPU (cold). +3. **Evaluates** hot experts on GPU and cold experts on CPU concurrently, then combines results. +4. **Swaps** experts between hot/cold at request boundaries based on shifting workload patterns. + +This allows running 27B+ MoE models on a single RTX 3090 (24 GB) that would otherwise require 40+ GB. + +## File Layout + +| File | Purpose | +|------|---------| +| `moe_hybrid_types.h` | Core types: `MoeHybridConfig`, `MoeLayerDesc` | +| `moe_hybrid_types_impl.h` | Model-specific → generic conversion helpers (qwen35, laguna) | +| `moe_hybrid_routing_stats.{h,cpp}` | Per-layer expert activation counters and ranking | +| `moe_hybrid_placement.{h,cpp}` | Hot/cold assignment: greedy budget allocation from stats | +| `moe_hybrid_swap_manager.{h,cpp}` | Runtime expert promotion/demotion between requests | +| `moe_hybrid_storage.{h,cpp}` | GPU/CPU buffer management for split expert tensors | +| `moe_hybrid_ffn_eval.{h,cpp}` | FFN execution: hot on GPU, cold on CPU, result combination | + +## Key Types + +### `MoeHybridConfig` + +Model-agnostic architecture descriptor: + +```cpp +struct MoeHybridConfig { + int n_embd; // hidden dimension + int n_expert; // total experts per layer + int n_expert_used; // top-k selected per token + int n_ff_exp; // routed expert intermediate dim + int n_ff_shexp; // shared expert intermediate dim (0 = none) + int n_layer; // number of MoE layers + int first_moe_layer; // first MoE layer index +}; +``` + +### `MoeLayerDesc` + +Uniform view of per-layer expert tensors regardless of model backend: + +- Routed expert weight tensors (gate, up, down, optional fused gate_up) +- Shared expert tensors (optional) +- Per-tensor quantization scale factors (for NVFP4 models) + +### `MoeHybridPlacement` + +Specifies which experts are hot per layer: + +- `hot_counts[layer]` — number of hot experts in each layer +- `hot_expert_ids[layer]` — ranked list of hot expert indices +- Serializable to/from JSON for persistence across runs + +### `MoeHybridLayerStorage` + +Per-layer split buffer state: + +- `hot_ctx/hot_buf` — GPU-resident tensors for hot experts +- `cold_ctx/cold_buf` — CPU-resident tensors for cold experts +- `hot_local_by_global[expert_id]` — maps global expert index → local hot index (-1 if cold) +- `cold_local_by_global[expert_id]` — maps global expert index → local cold index (-1 if hot) +- `CachedFfnGraph hot_graph/cold_graph` — pre-built ggml compute graphs (avoids per-token rebuild) + +## Algorithms + +### Placement (Budget Allocation) + +Two placement strategies: + +1. **Count-based** (`build_from_stats`): Given a total hot expert budget (count), greedily assigns the next-most-activated expert across all layers until budget exhausted. + +2. **Byte-budget** (`build_from_stats_with_layer_bytes`): Same greedy approach but accounts for varying expert sizes across layers. Maximizes activation-count-per-byte (value = count / bytes). + +Both respect a `min_hot_per_layer` floor guarantee. + +### Routing Statistics + +`MoeHybridRoutingStats` maintains a flattened `[n_layer][n_expert]` activation count matrix. Observations come from: + +- Direct `observe(layer, expert_ids, n)` calls during decoding +- `observe_selected_tensor()` for reading router outputs from GPU tensors + +Statistics can be saved/loaded as CSV for offline analysis or warm-starting placement decisions. + +### Swap Manager + +At request boundaries, `build_moe_hybrid_swap_plan` identifies profitable swaps: + +1. For each layer, find the weakest hot expert and strongest cold expert. +2. If the cold expert's activation count exceeds the hot expert's by `min_promote_gain`, propose a swap. +3. Rank all candidates by gain delta, apply up to `max_swaps_total`. + +This adapts placement to workload drift without full re-profiling. + +## FFN Evaluation Modes + +### `eval_moe_hybrid_ffn_single` + +Single-token decode path: + +1. **Partition** selected expert IDs into hot (GPU-local) and cold (CPU-local) subsets using the local-by-global index maps. +2. **GPU path**: Run hot routed experts + shared expert in a single fused ggml graph (`run_hot_and_shared_ffn_gpu`). +3. **CPU path**: Run cold routed experts via `run_routed_subset` on CPU backend. +4. **Combine**: Sum hot+shared and cold results on host. + +Telemetry reports wall time and per-phase breakdown (partition, hot, cold, shared, combine). + +### `eval_moe_hybrid_ffn_gpu_resident` + +Optimized single-token path that keeps activations on GPU: + +- Reads only router IDs to CPU for hot/cold partitioning. +- Uses `GpuResidentState` with a pre-built `ResidualCombineGraph` (residual + hot + cold correction). +- Uses `CachedFfnGraph` to avoid per-token graph rebuilds. +- Cold expert output is uploaded back to GPU for the residual combine. + +### `eval_moe_batched_prefill_ffn` + +Batched prefill (all experts on GPU, no hybrid split). Used when all tokens can be processed at once on GPU. + +### `eval_moe_hybrid_ffn_batched` + +Batched hybrid prefill: splits the batch FFN into hot (GPU) and cold (CPU) subgraphs computed concurrently, then combines. Supports reusable allocators (`p_hot_alloc`, `p_cold_alloc`) for amortizing allocation cost across layers. + +## Cached FFN Graphs + +To avoid per-token ggml graph construction overhead, `CachedFfnGraph` pre-builds the computation graph for a fixed expert count: + +- `build_cached_hot_graph` — hot experts + shared expert, fused into one GPU graph +- `build_cached_cold_graph` — cold experts on CPU + +At inference time, only input/ids/weights tensors are updated and `ggml_backend_graph_compute` is called on the pre-allocated graph. + +## Storage Construction + +Two loading paths: + +1. **From GPU tensors** (`build_moe_hybrid_storage`): Reads expert slices from already-loaded full stacked tensors on GPU. Copies hot slices to a compact GPU buffer, cold slices to CPU. + +2. **From file** (`build_moe_hybrid_storage_from_file`): Reads expert slices directly from mmap'd GGUF file data, avoiding the need to load all experts to GPU first. Useful when VRAM is insufficient for the full model. + +Both paths produce the same `MoeHybridStorage` containing per-layer split buffers ready for evaluation. + +## Model Integration + +The subsystem is model-agnostic. Model backends integrate via: + +1. Include `moe_hybrid_types_impl.h` after their internal weight struct header. +2. Call `make_moe_hybrid_config(weights)` and `make_moe_layer_desc(layer)` to convert model-specific types to the generic interface. +3. Use the generic placement, storage, and eval APIs. + +Currently integrated with: +- **qwen35moe** — all layers are MoE (`first_moe_layer = 0`), supports fused gate_up, shared expert gating, NVFP4 scales. +- **laguna** — layer 0 is dense (`first_moe_layer = n_layer_dense_lead`), no fused gate_up, no shared-expert gate, no NVFP4. + +## Quantization Support + +The `MoeLayerDesc` carries per-tensor scale factors (`*_s` fields) for NVFP4 quantization. The `apply_scale2()` helper in the eval code multiplies matmul results by the scale when non-unity, making the FFN evaluation transparently handle both standard quantized and NVFP4 models. + +--- + +## Deep Dive: Overhead Reduction When All Experts Are Hot (GPU-Resident) + +When all selected experts for a given token happen to reside on GPU, the system eliminates almost all overhead through several mechanisms: + +### 1. Zero-Copy GPU→GPU Input Path + +In `eval_moe_hybrid_ffn_gpu_resident`, the activation tensor (`ffn_post_gpu`) is already on GPU from the preceding attention layer. The input is fed to the hot graph via: + +```cpp +ggml_backend_tensor_copy(ffn_post_gpu, storage.hot_graph.inp); // GPU→GPU, no PCIe +``` + +This is a device-local memcpy (≈500 GB/s on RTX 3090 HBM-equivalent bandwidth), not a PCIe transfer. The activation never touches host memory. + +### 2. Pre-Built Cached Graph (No Per-Token Construction) + +The `CachedFfnGraph` is lazily built on first use and reused for every subsequent token with the same expert count: + +```cpp +if (!storage.hot_graph.valid() || storage.hot_graph.n_hot != n_hot) { + build_cached_hot_graph(...); // only runs once per expert-count change +} +// Per-token: just update inputs and dispatch +ggml_backend_tensor_set(storage.hot_graph.ids, hot_ids.data(), ...); +ggml_backend_graph_compute_async(gpu_backend, storage.hot_graph.gf); +``` + +This avoids: +- `ggml_init` / `ggml_free` per token (context allocation overhead) +- Graph construction (`ggml_new_graph_custom`, `ggml_build_forward_expand`) +- Buffer allocation (`ggml_gallocr_alloc_graph`) — the most expensive part, involving memory planning and pointer assignment for every intermediate tensor + +In practice, building a graph costs 50–200 µs. With cached graphs, the per-token hot path is just tensor_set (IDs + weights, ~64 bytes each) + kernel launch. + +### 3. Cold Path Skipped Entirely + +When all experts are hot, `cold_ids` is empty: + +```cpp +const bool has_cold = !cold_ids.empty(); // false when all hot +``` + +The entire cold branch is skipped — no CPU graph compute, no CPU→GPU upload of cold results. The `combine.cold_in` tensor retains its pre-initialized zeros. + +### 4. Residual Combine Stays on GPU + +The final combination (`output = residual + hot + cold`) runs as a pre-built `ResidualCombineGraph` on GPU: + +```cpp +ggml_backend_tensor_copy(storage.hot_graph.output, gpu_state.combine.hot_in); // GPU→GPU +ggml_backend_graph_compute(gpu_backend, gpu_state.combine.gf); // GPU kernel +ggml_backend_tensor_copy(gpu_state.combine.output, gpu_state.act_cur); // GPU→GPU +``` + +The entire data flow for the all-hot case is: **GPU → GPU → GPU** with zero PCIe round-trips for activations. + +### 5. Minimal Host-Side Data + +The only host→device transfers when all experts are hot: +- Router IDs: `n_expert_used` × sizeof(int32_t) = typically 8 × 4 = **32 bytes** +- Router weights: `n_expert_used` × sizeof(float) = typically 8 × 4 = **32 bytes** + +Total PCIe payload: ~64 bytes per token — negligible vs. the 14+ GB/s available bandwidth. + +### Net Effect + +For a Qwen3.5-MoE token where all 8 selected experts are hot, the MoE FFN evaluation path degenerates to: +1. One GPU→GPU tensor copy (activation input) +2. One pre-built GPU kernel dispatch (routed + shared FFN) +3. One GPU→GPU tensor copy (to combine input) +4. One pre-built GPU kernel dispatch (residual add) +5. One GPU→GPU tensor copy (to persistent state) + +Wall time: **~100–200 µs** on RTX 3090 — comparable to a non-hybrid monolithic FFN dispatch with equivalent FLOPS. + +--- + +## Deep Dive: Handling Experts in System RAM (Cold Path) + +When some selected experts are cold (CPU-resident), the system uses a concurrent execution strategy to hide CPU latency behind GPU work. + +### Memory Layout + +Cold expert tensors are stored in CPU-backend buffers (`cold_buf`) allocated via `ggml_backend_cpu_init()`. They use the same quantization format as their GPU counterparts (Q4_K_M, etc.) — no format conversion is needed. The CPU backend is configured with limited threads: + +```cpp +ggml_backend_cpu_set_n_threads(out.cpu_backend, std::max(1, std::min(cfg.n_expert_used, 8))); +``` + +This caps CPU threads to avoid starving the GPU driver thread and other system work. + +### Concurrent GPU/CPU Execution + +The key insight: **launch GPU kernels first (async), then run CPU work while GPU is busy.** + +``` +Timeline: + GPU: ┤███ hot FFN + shared ███████████████████████├─ sync ─┤ + CPU: ┤██ cold FFN (overlaps GPU) ██├ + ┤ combine ├ +``` + +In code (`eval_moe_hybrid_ffn_single`): + +```cpp +// 1. Launch GPU async (returns immediately) +ggml_backend_graph_compute_async(gpu_backend, storage.hot_graph.gf); + +// 2. Run cold on CPU (blocking, but GPU is running in parallel) +ggml_backend_graph_compute(cpu_backend, storage.cold_graph.gf); + +// 3. Sync GPU (usually already done by the time CPU finishes) +ggml_backend_synchronize(gpu_backend); + +// 4. Combine results +out[i] = hot_and_shared[i] + cold[i]; +``` + +### Why CPU Evaluation (Not GPU Streaming Over PCIe)? + +A naive alternative would keep cold experts in system RAM but stream them to GPU for computation. This is **strictly worse** for single-token decode: + +| Metric | CPU-local compute | GPU via PCIe stream | +|--------|-------------------|---------------------| +| Bandwidth to weights | 40–60 GB/s (DDR5) | 15.75 GB/s (PCIe 4.0 x16) | +| Transfer overhead | 0 (weights are local) | Must DMA entire expert (~6 MB for Q4_K_M) | +| GPU utilization | GPU runs hot experts in parallel | GPU stalls waiting for PCIe DMA | +| Latency per cold expert | ~100 µs (matmul on Zen4 AVX-512) | ~380 µs (DMA) + kernel time | + +**The fundamental problem**: PCIe 4.0 x16 bandwidth (15.75 GB/s) is **4× lower** than DDR5 memory bandwidth (≈50 GB/s). Loading a single Q4_K_M Qwen expert (gate + up + down ≈ 6 MB) over PCIe takes ~380 µs just for the transfer. Meanwhile, the CPU can complete the entire matmul chain in ~100–150 µs using the same data that's already in L3/RAM. + +Additionally: +- **PCIe is half-duplex for bulk transfers**: You cannot overlap upload of expert weights with download of results efficiently. +- **GPU kernel launch overhead**: Even after DMA completes, the GPU needs to dispatch a kernel for a single-token matmul on just 1–2 cold experts — an inefficient use of GPU SMs. +- **Bubble injection**: Streaming cold experts to GPU introduces pipeline bubbles. The GPU must wait for DMA → compute → return result, during which the SMs sit idle or context-switch. With CPU-local compute, the GPU is 100% occupied on hot experts. + +### When Streaming *Would* Make Sense + +PCIe streaming is only beneficial for **large batch prefill** where: +- Many tokens need the same cold expert (amortizing DMA cost over batch) +- GPU FLOPS utilization is high enough to offset transfer time +- The batch size makes CPU compute bandwidth-bound + +This is why `eval_moe_batched_prefill_ffn` uses the full GPU expert stack (all experts on GPU) for prefill — during prefill, the model is loaded fully and the batch dimension makes GPU compute dominant. + +### Cold Path Overhead Budget + +For a typical decode token with 2 out of 8 selected experts cold: + +| Phase | Duration | Notes | +|-------|----------|-------| +| Partition | ~1 µs | Index lookup in `hot_local_by_global` | +| Cold graph setup | 0 µs (cached) | Pre-built, reused | +| Cold tensor_set (inp) | ~2 µs | n_embd × 4 = ~14 KB for 3584-dim | +| Cold graph compute | ~100–150 µs | 2 experts × (gate + up + SwiGLU + down) | +| Cold tensor_get (out) | ~2 µs | n_embd × 4 = ~14 KB | +| **Total cold latency** | **~105–155 µs** | Overlaps with GPU hot path | + +Since the GPU hot path (6 experts + shared) takes ~100–200 µs, the cold path is **fully hidden** in the overlap window in most cases. + +--- + +## Deep Dive: Why Not Load Data Over PCIe (Let CUDA Do All Work) + +### The PCIe Bottleneck Argument + +A seemingly simpler design would keep all expert weights in system RAM (unified virtual memory or explicit staging) and let CUDA handle everything: + +``` +Naive approach: System RAM → PCIe → GPU VRAM → CUDA kernel → result +``` + +This fails catastrophically for MoE decode for three reasons: + +### 1. PCIe Bandwidth Is the Binding Constraint (Not Compute) + +For a single-token MoE FFN step on Qwen3.5-MoE-27B (Q4_K_M): +- 8 selected experts × ~6 MB each = **48 MB** of weight data +- PCIe 4.0 x16: 15.75 GB/s → 48 MB takes **3.05 ms** +- Actual GPU compute for those matmuls: **~200 µs** + +The system would spend **93% of time waiting for PCIe** and **7% doing useful compute**. Token generation throughput drops from ~50 tok/s to ~5 tok/s — a 10× regression. + +### 2. PCIe Transfers Are Not Free (Even With Async DMA) + +CUDA DMA engines can transfer while kernels run, but: +- **Pinned memory required**: System RAM must be page-locked for async DMA, which pressures the OS memory manager and increases allocation latency. +- **DMA engine contention**: Most consumer GPUs have 1–2 copy engines. With 8 expert transfers queued, serialization and scheduling overhead accumulates. +- **TLB/IOMMU overhead**: Each DMA transfer requires address translation through the IOMMU (Intel VT-d / AMD-Vi), adding ~1–5 µs per transfer setup. +- **Cache pollution on return**: Results copied back from GPU flush CPU caches that were holding useful data for the next layer's attention computation. + +### 3. Hybrid CPU Compute Exploits a Free Resource + +The CPU cores are otherwise **idle** during GPU decode. Between two GPU kernel dispatches (attention → FFN → next-layer attention), the CPU driver thread submits work and waits. Running cold expert matmuls on CPU during this window is pure throughput gain at zero opportunity cost: + +``` +Without hybrid: GPU: [attention][wait for PCIe][FFN][attention]... + CPU: [idle.........................idle........] + +With hybrid: GPU: [attention][hot FFN async]....[combine]... + CPU: [ cold FFN ]...[idle]...... +``` + +### 4. Quantized Matmuls Are CPU-Efficient + +ggml's Q4_K_M dequant+matmul kernels are highly optimized for x86 (AVX2/AVX-512): +- A single Qwen expert forward (gate + up + SwiGLU + down) on 8 cores: ~50–75 µs +- Memory bandwidth utilization on DDR5-5600: ~85% of theoretical peak +- The CPU is compute-limited on a single token, not memory-limited — ideal workload balance + +### 5. Total System Throughput Comparison + +| Strategy | MoE FFN latency (8 experts, 2 cold) | Bottleneck | +|----------|--------------------------------------|------------| +| All-GPU (requires 40+ GB VRAM) | ~200 µs | GPU compute | +| PCIe streaming (all from RAM) | ~3050 µs | PCIe bandwidth | +| Hybrid (6 hot GPU + 2 cold CPU) | ~200 µs | max(GPU, CPU) | + +The hybrid approach matches all-GPU performance on the common case (most experts hot) while gracefully degrading on cold hits — and it fits in 24 GB VRAM. + +### 6. The Residual Upload Is Tiny + +The only mandatory CPU→GPU transfer in the hybrid path is the cold expert result: +- Size: `n_embd × sizeof(float)` = 3584 × 4 = **14 KB** +- Transfer time at PCIe 4.0: **~1 µs** (negligible) + +Compare this to the 48 MB that would be needed to stream all expert weights. The hybrid design reduces PCIe traffic by **3400×** compared to full PCIe streaming. diff --git a/server/scripts/bench_moe_prefill_streaming.py b/server/scripts/bench_moe_prefill_streaming.py new file mode 100644 index 000000000..c13ef85ae --- /dev/null +++ b/server/scripts/bench_moe_prefill_streaming.py @@ -0,0 +1,156 @@ +#!/usr/bin/env python3 +""" +bench_moe_prefill_streaming.py — End-to-end benchmark comparing prefill +throughput with and without MoE streaming. + +Usage: + # Against a running server (streaming enabled): + python bench_moe_prefill_streaming.py --url http://localhost:8080 + + # Compare two servers (streaming vs baseline): + python bench_moe_prefill_streaming.py \ + --url-streaming http://localhost:8080 \ + --url-baseline http://localhost:8081 + +Measures prompt eval tok/s at varying prompt lengths. +""" + +import argparse +import json +import time +import sys +from urllib.request import urlopen, Request +from urllib.error import URLError + + +def generate_prompt(n_tokens: int) -> str: + """Generate a prompt that's approximately n_tokens long.""" + # Average ~1.3 tokens per word in English + words_needed = max(1, int(n_tokens / 1.3)) + base = "The quick brown fox jumps over the lazy dog. " + words = base.split() + prompt_words = [] + for i in range(words_needed): + prompt_words.append(words[i % len(words)]) + return " ".join(prompt_words) + + +def bench_prefill(url: str, prompt: str, max_tokens: int = 1) -> dict: + """Send a completion request and measure prefill time.""" + payload = { + "model": "default", + "messages": [{"role": "user", "content": prompt}], + "max_tokens": max_tokens, + "temperature": 0.0, + "stream": False, + } + + req = Request( + f"{url}/v1/chat/completions", + data=json.dumps(payload).encode(), + headers={"Content-Type": "application/json"}, + ) + + t0 = time.perf_counter() + try: + with urlopen(req, timeout=120) as resp: + body = json.loads(resp.read()) + except URLError as e: + return {"error": str(e)} + t1 = time.perf_counter() + + usage = body.get("usage", {}) + prompt_tokens = usage.get("prompt_tokens", 0) + # The server reports timings in the response if available + timings = body.get("timings", {}) + prefill_ms = timings.get("prompt_ms", (t1 - t0) * 1000) + + return { + "prompt_tokens": prompt_tokens, + "wall_ms": (t1 - t0) * 1000, + "prefill_ms": prefill_ms, + "tok_per_s": prompt_tokens / (prefill_ms / 1000) if prefill_ms > 0 else 0, + } + + +def run_sweep(url: str, prompt_lengths: list, n_warmup: int = 1, n_iter: int = 3) -> list: + """Run prefill benchmark at various prompt lengths.""" + results = [] + + for n_tok in prompt_lengths: + prompt = generate_prompt(n_tok) + + # Warmup + for _ in range(n_warmup): + bench_prefill(url, prompt) + + # Measure + measurements = [] + for _ in range(n_iter): + r = bench_prefill(url, prompt) + if "error" in r: + print(f" ERROR at T={n_tok}: {r['error']}", file=sys.stderr) + break + measurements.append(r) + + if measurements: + avg_prefill_ms = sum(m["prefill_ms"] for m in measurements) / len(measurements) + avg_tok_s = sum(m["tok_per_s"] for m in measurements) / len(measurements) + actual_tokens = measurements[0]["prompt_tokens"] + results.append({ + "target_tokens": n_tok, + "actual_tokens": actual_tokens, + "avg_prefill_ms": avg_prefill_ms, + "avg_tok_per_s": avg_tok_s, + }) + print(f" T={n_tok:5d} actual={actual_tokens:5d} " + f"prefill={avg_prefill_ms:8.1f} ms " + f"tok/s={avg_tok_s:8.1f}") + + return results + + +def main(): + parser = argparse.ArgumentParser(description="MoE prefill streaming benchmark") + parser.add_argument("--url", type=str, help="Server URL to benchmark") + parser.add_argument("--url-streaming", type=str, help="Server URL with streaming enabled") + parser.add_argument("--url-baseline", type=str, help="Server URL with baseline (CPU cold)") + parser.add_argument("--prompt-lengths", type=str, default="128,256,512,1024,2048,4096,8192", + help="Comma-separated prompt lengths to test") + parser.add_argument("--n-iter", type=int, default=3, help="Iterations per measurement") + parser.add_argument("--n-warmup", type=int, default=1, help="Warmup iterations") + args = parser.parse_args() + + prompt_lengths = [int(x) for x in args.prompt_lengths.split(",")] + + if args.url: + print(f"\n=== Benchmarking: {args.url} ===") + results = run_sweep(args.url, prompt_lengths, args.n_warmup, args.n_iter) + print(f"\n{'T':>8} {'Prefill ms':>12} {'tok/s':>10}") + print("-" * 35) + for r in results: + print(f"{r['actual_tokens']:>8} {r['avg_prefill_ms']:>12.1f} {r['avg_tok_per_s']:>10.1f}") + + elif args.url_streaming and args.url_baseline: + print(f"\n=== Baseline: {args.url_baseline} ===") + baseline = run_sweep(args.url_baseline, prompt_lengths, args.n_warmup, args.n_iter) + + print(f"\n=== Streaming: {args.url_streaming} ===") + streaming = run_sweep(args.url_streaming, prompt_lengths, args.n_warmup, args.n_iter) + + print(f"\n{'T':>8} {'Baseline ms':>12} {'Stream ms':>12} {'Speedup':>8}") + print("-" * 50) + stream_map = {s["target_tokens"]: s for s in streaming} + for b in baseline: + s = stream_map.get(b["target_tokens"]) + if not s: + continue + speedup = b["avg_prefill_ms"] / s["avg_prefill_ms"] if s["avg_prefill_ms"] > 0 else 0 + print(f"{b['actual_tokens']:>8} {b['avg_prefill_ms']:>12.1f} " + f"{s['avg_prefill_ms']:>12.1f} {speedup:>7.2f}x") + else: + parser.error("Provide --url or both --url-streaming and --url-baseline") + + +if __name__ == "__main__": + main() diff --git a/server/src/common/gguf_mmap.h b/server/src/common/gguf_mmap.h index 37416f31c..3ddb76008 100644 --- a/server/src/common/gguf_mmap.h +++ b/server/src/common/gguf_mmap.h @@ -10,6 +10,7 @@ #pragma once #include +#include #include namespace dflash::common { @@ -36,6 +37,10 @@ class GgufMmap { size_t size() const; // 0 when not open bool is_open() const; + // Advise the kernel to read ahead the given byte range into page cache. + // No-op if not open or range is invalid. Safe to call from any thread. + void advise_willneed(size_t offset, size_t length) const; + // Transfer ownership of the mmap'd region to the caller. // After release() this object is empty (is_open() == false). // The caller is responsible for unmapping on POSIX or UnmapViewOfFile on @@ -196,6 +201,26 @@ inline const void * GgufMmap::data() const { return data_; } inline size_t GgufMmap::size() const { return size_; } inline bool GgufMmap::is_open() const { return data_ != nullptr; } +inline void GgufMmap::advise_willneed(size_t offset, size_t length) const { + if (!data_ || offset >= size_) return; + if (offset + length > size_) length = size_ - offset; + if (length == 0) return; +#if defined(_WIN32) + // PrefetchVirtualMemory (Windows 8+) + WIN32_MEMORY_RANGE_ENTRY entry{}; + entry.VirtualAddress = const_cast(static_cast(data_)) + offset; + entry.NumberOfBytes = length; + PrefetchVirtualMemory(GetCurrentProcess(), 1, &entry, 0); +#else + // Align to page boundary for madvise + const size_t page_size = (size_t)sysconf(_SC_PAGESIZE); + const size_t aligned_offset = (offset / page_size) * page_size; + const size_t aligned_length = length + (offset - aligned_offset); + ::madvise(const_cast(static_cast(data_)) + aligned_offset, + aligned_length, MADV_WILLNEED); +#endif +} + inline GgufMmap::OwnedRegion GgufMmap::release() { OwnedRegion r{}; r.data = data_; diff --git a/server/src/common/moe_hybrid_ffn_eval.cpp b/server/src/common/moe_hybrid_ffn_eval.cpp index 7806bb1e7..12a854d37 100644 --- a/server/src/common/moe_hybrid_ffn_eval.cpp +++ b/server/src/common/moe_hybrid_ffn_eval.cpp @@ -1,3 +1,4 @@ +#include #include "moe_hybrid_ffn_eval.h" #include "ggml-alloc.h" @@ -11,8 +12,6 @@ namespace dflash::common { -namespace { - // NVFP4 scale2: if weight has a per-tensor scale, multiply the matmul result // by that scale. No-op when scale==1.0f (non-NVFP4 models). inline ggml_tensor * apply_scale2(ggml_context * ctx, ggml_tensor * mm_result, float scale) { @@ -26,6 +25,28 @@ static uint64_t elapsed_us(HybridClock::time_point start, HybridClock::time_poin return (uint64_t) std::chrono::duration_cast(end - start).count(); } +// Build the shared-expert FFN subgraph onto an existing ggml_context. +// Returns the output tensor (or nullptr if no shared expert is present). +static ggml_tensor * build_shared_expert_subgraph( + ggml_context * ctx, const MoeLayerDesc & desc, ggml_tensor * inp) { + if (!desc.ffn_up_shexp || !desc.ffn_gate_shexp || !desc.ffn_down_shexp) + return nullptr; + ggml_tensor * sh_gate = apply_scale2(ctx, + ggml_mul_mat(ctx, desc.ffn_gate_shexp, inp), desc.ffn_gate_shexp_s); + ggml_tensor * sh_up = apply_scale2(ctx, + ggml_mul_mat(ctx, desc.ffn_up_shexp, inp), desc.ffn_up_shexp_s); + ggml_tensor * sh_gu = ggml_swiglu_split(ctx, sh_gate, sh_up); + ggml_tensor * shared = apply_scale2(ctx, + ggml_mul_mat(ctx, desc.ffn_down_shexp, sh_gu), desc.ffn_down_shexp_s); + if (desc.ffn_gate_inp_shexp) { + ggml_tensor * shared_gate = apply_scale2(ctx, + ggml_mul_mat(ctx, desc.ffn_gate_inp_shexp, inp), desc.ffn_gate_inp_shexp_s); + shared_gate = ggml_sigmoid(ctx, shared_gate); + shared = ggml_mul(ctx, shared, shared_gate); + } + return shared; +} + // Run routed expert subset on a given backend (GPU or CPU). static bool run_routed_subset(ggml_backend_t backend, ggml_tensor * gate_tensor, @@ -174,15 +195,10 @@ static bool run_shared_ffn_gpu(ggml_backend_t backend, ggml_tensor * inp = ggml_new_tensor_2d(ctx, GGML_TYPE_F32, n_embd, 1); ggml_set_input(inp); - ggml_tensor * sh_gate = apply_scale2(ctx, ggml_mul_mat(ctx, desc.ffn_gate_shexp, inp), desc.ffn_gate_shexp_s); - ggml_tensor * sh_up = apply_scale2(ctx, ggml_mul_mat(ctx, desc.ffn_up_shexp, inp), desc.ffn_up_shexp_s); - ggml_tensor * sh_gu = ggml_swiglu_split(ctx, sh_gate, sh_up); - ggml_tensor * shared = apply_scale2(ctx, ggml_mul_mat(ctx, desc.ffn_down_shexp, sh_gu), desc.ffn_down_shexp_s); - if (desc.ffn_gate_inp_shexp) { - ggml_tensor * shared_gate = apply_scale2(ctx, - ggml_mul_mat(ctx, desc.ffn_gate_inp_shexp, inp), desc.ffn_gate_inp_shexp_s); - shared_gate = ggml_sigmoid(ctx, shared_gate); - shared = ggml_mul(ctx, shared, shared_gate); + ggml_tensor * shared = build_shared_expert_subgraph(ctx, desc, inp); + if (!shared) { + ggml_free(ctx); + return true; } ggml_cgraph * gf = ggml_new_graph_custom(ctx, 512, false); @@ -294,19 +310,7 @@ static bool run_hot_and_shared_ffn_gpu( } } - ggml_tensor * shared = nullptr; - if (has_shared) { - ggml_tensor * sh_gate = apply_scale2(ctx, ggml_mul_mat(ctx, desc.ffn_gate_shexp, inp), desc.ffn_gate_shexp_s); - ggml_tensor * sh_up = apply_scale2(ctx, ggml_mul_mat(ctx, desc.ffn_up_shexp, inp), desc.ffn_up_shexp_s); - ggml_tensor * sh_gu = ggml_swiglu_split(ctx, sh_gate, sh_up); - shared = apply_scale2(ctx, ggml_mul_mat(ctx, desc.ffn_down_shexp, sh_gu), desc.ffn_down_shexp_s); - if (desc.ffn_gate_inp_shexp) { - ggml_tensor * shared_gate = apply_scale2(ctx, - ggml_mul_mat(ctx, desc.ffn_gate_inp_shexp, inp), desc.ffn_gate_inp_shexp_s); - shared_gate = ggml_sigmoid(ctx, shared_gate); - shared = ggml_mul(ctx, shared, shared_gate); - } - } + ggml_tensor * shared = build_shared_expert_subgraph(ctx, desc, inp); // Combine hot routed + shared into a single output tensor ggml_tensor * combined = nullptr; @@ -404,8 +408,6 @@ static bool build_batched_routed_graph( return true; } -} // namespace (anon) - // ── Public API ────────────────────────────────────────────────────────────────── bool build_cached_hot_graph( @@ -498,20 +500,7 @@ bool build_cached_hot_graph( } } - ggml_tensor * shared = nullptr; - const bool has_shared = (desc.ffn_up_shexp && desc.ffn_gate_shexp && desc.ffn_down_shexp); - if (has_shared) { - ggml_tensor * sh_gate = apply_scale2(out.ctx, ggml_mul_mat(out.ctx, desc.ffn_gate_shexp, out.inp), desc.ffn_gate_shexp_s); - ggml_tensor * sh_up = apply_scale2(out.ctx, ggml_mul_mat(out.ctx, desc.ffn_up_shexp, out.inp), desc.ffn_up_shexp_s); - ggml_tensor * sh_gu = ggml_swiglu_split(out.ctx, sh_gate, sh_up); - shared = apply_scale2(out.ctx, ggml_mul_mat(out.ctx, desc.ffn_down_shexp, sh_gu), desc.ffn_down_shexp_s); - if (desc.ffn_gate_inp_shexp) { - ggml_tensor * shared_gate = apply_scale2(out.ctx, - ggml_mul_mat(out.ctx, desc.ffn_gate_inp_shexp, out.inp), desc.ffn_gate_inp_shexp_s); - shared_gate = ggml_sigmoid(out.ctx, shared_gate); - shared = ggml_mul(out.ctx, shared, shared_gate); - } - } + ggml_tensor * shared = build_shared_expert_subgraph(out.ctx, desc, out.inp); if (routed && shared) { out.output = ggml_add(out.ctx, routed, shared); @@ -613,6 +602,62 @@ bool build_cached_cold_graph( return true; } +bool build_cached_hot_batched_graph( + CachedHotBatchedGraph & out, + ggml_backend_t gpu_backend, + MoeHybridLayerStorage & storage, + const MoeLayerDesc & desc, + const MoeHybridConfig & cfg, + int n_tokens) { + + out.free(); + out.n_tokens = n_tokens; + + const int n_embd = cfg.n_embd; + const int n_used = cfg.n_expert_used; + const int n_ff_exp = cfg.n_ff_exp; + + ggml_init_params ip{}; + ip.mem_size = 128 * 1024 * 1024; + ip.mem_buffer = nullptr; + ip.no_alloc = true; + out.ctx = ggml_init(ip); + if (!out.ctx) return false; + + out.inp = ggml_new_tensor_2d(out.ctx, GGML_TYPE_F32, n_embd, n_tokens); + ggml_set_input(out.inp); + out.sel = ggml_new_tensor_2d(out.ctx, GGML_TYPE_I32, n_used, n_tokens); + ggml_set_input(out.sel); + out.wts = ggml_new_tensor_2d(out.ctx, GGML_TYPE_F32, n_used, n_tokens); + ggml_set_input(out.wts); + + ggml_tensor * routed = nullptr; + build_batched_routed_graph(out.ctx, + storage.gate_hot, storage.up_hot, storage.down_hot, storage.gate_up_hot, + desc.ffn_gate_exps_s, desc.ffn_up_exps_s, desc.ffn_down_exps_s, desc.ffn_gate_up_exps_s, + out.inp, out.sel, out.wts, n_embd, n_ff_exp, n_used, n_tokens, &routed); + + // Shared expert (always on GPU) + ggml_tensor * combined = routed; + ggml_tensor * shared = build_shared_expert_subgraph(out.ctx, desc, out.inp); + if (shared) { + combined = combined ? ggml_add(out.ctx, combined, shared) : shared; + } + + if (!combined) { out.free(); return false; } + out.output = combined; + + out.gf = ggml_new_graph_custom(out.ctx, 4096, false); + ggml_set_output(out.output); + ggml_build_forward_expand(out.gf, out.output); + out.alloc = ggml_gallocr_new(ggml_backend_get_default_buffer_type(gpu_backend)); + if (!ggml_gallocr_alloc_graph(out.alloc, out.gf)) { + out.free(); + return false; + } + return true; +} + bool eval_moe_hybrid_ffn_single( ggml_backend_t gpu_backend, const MoeHybridConfig & cfg, @@ -841,21 +886,9 @@ bool eval_moe_batched_prefill_ffn( // Shared expert ggml_tensor * combined = routed; - if (desc.ffn_up_shexp && desc.ffn_gate_shexp && desc.ffn_down_shexp) { - ggml_tensor * sh_gate = apply_scale2(ctx, - ggml_mul_mat(ctx, desc.ffn_gate_shexp, inp), desc.ffn_gate_shexp_s); - ggml_tensor * sh_up = apply_scale2(ctx, - ggml_mul_mat(ctx, desc.ffn_up_shexp, inp), desc.ffn_up_shexp_s); - ggml_tensor * sh_gu = ggml_swiglu_split(ctx, sh_gate, sh_up); - ggml_tensor * shared = apply_scale2(ctx, - ggml_mul_mat(ctx, desc.ffn_down_shexp, sh_gu), desc.ffn_down_shexp_s); - if (desc.ffn_gate_inp_shexp) { - ggml_tensor * shared_gate = apply_scale2(ctx, - ggml_mul_mat(ctx, desc.ffn_gate_inp_shexp, inp), desc.ffn_gate_inp_shexp_s); - shared_gate = ggml_sigmoid(ctx, shared_gate); - shared = ggml_mul(ctx, shared, shared_gate); - } - combined = ggml_add(ctx, routed, shared); + ggml_tensor * shared = build_shared_expert_subgraph(ctx, desc, inp); + if (shared) { + combined = combined ? ggml_add(ctx, combined, shared) : shared; } ggml_cgraph * gf = ggml_new_graph_custom(ctx, 4096, false); @@ -887,6 +920,21 @@ bool eval_moe_batched_prefill_ffn( return true; } +// MMQ full-batch mul_mat_id on a reduced hot stack is only stable for large +// batches. Small batches (spec verify/replay, <=~24 tokens) spread n_used*n_tokens +// slots over thousands of hot experts; that extreme imbalance hits an unbounded +// stream-k tile load in the MMQ kernel and faults (observed on sm_86, not just +// sm_75). Prefill chunks (>=64 tokens) are dense enough and run clean, so keep +// the sm_80+ fast path for them and route small batches through the proven +// <=4-token MMVQ sub-batch path. +static bool mmq_full_batch_ok(const MoeHybridConfig & cfg, int n_tokens) { + static const int min_tokens = [](){ + const char * v = std::getenv("DFLASH_MMQ_FULL_BATCH_MIN"); + return v ? std::atoi(v) : 64; + }(); + return cfg.mmq_safe_full_batch && n_tokens >= min_tokens; +} + static bool eval_moe_hybrid_ffn_batched_core( ggml_backend_t gpu_backend, ggml_backend_t cpu_backend, @@ -979,20 +1027,8 @@ static bool eval_moe_hybrid_ffn_batched_core( // Shared expert (always on GPU) ggml_tensor * combined = routed; - if (has_shared) { - ggml_tensor * sh_gate = apply_scale2(hot_ctx, - ggml_mul_mat(hot_ctx, desc.ffn_gate_shexp, inp), desc.ffn_gate_shexp_s); - ggml_tensor * sh_up = apply_scale2(hot_ctx, - ggml_mul_mat(hot_ctx, desc.ffn_up_shexp, inp), desc.ffn_up_shexp_s); - ggml_tensor * sh_gu = ggml_swiglu_split(hot_ctx, sh_gate, sh_up); - ggml_tensor * shared = apply_scale2(hot_ctx, - ggml_mul_mat(hot_ctx, desc.ffn_down_shexp, sh_gu), desc.ffn_down_shexp_s); - if (desc.ffn_gate_inp_shexp) { - ggml_tensor * shared_gate = apply_scale2(hot_ctx, - ggml_mul_mat(hot_ctx, desc.ffn_gate_inp_shexp, inp), desc.ffn_gate_inp_shexp_s); - shared_gate = ggml_sigmoid(hot_ctx, shared_gate); - shared = ggml_mul(hot_ctx, shared, shared_gate); - } + ggml_tensor * shared = build_shared_expert_subgraph(hot_ctx, desc, inp); + if (shared) { combined = combined ? ggml_add(hot_ctx, combined, shared) : shared; } hot_output = combined; @@ -1116,6 +1152,178 @@ static bool eval_moe_hybrid_ffn_batched_core( return true; } +// ── Hot-Only Batched Prefill ── +// When all selected experts are in VRAM, skip cold entirely: no CPU graph, +// no partition into hot/cold, no merge loop. Pure GPU. + +bool eval_moe_hot_only_batched( + ggml_backend_t gpu_backend, + const MoeHybridConfig & cfg, + const MoeLayerDesc & desc, + MoeHybridLayerStorage & storage, + const float * cur_host, + const int32_t * selected_ids, + const float * selected_weights, + int n_tokens, + std::vector & out, + std::string * err, + ggml_gallocr_t * p_hot_alloc) { + + const int n_embd = cfg.n_embd; + const int n_used = cfg.n_expert_used; + const int n_ff_exp = cfg.n_ff_exp; + out.assign((size_t)n_embd * (size_t)n_tokens, 0.0f); + if (n_tokens <= 0) return true; + + // Workaround for ggml-cuda MMQ mul_mat_id bug on sm_75/gfx1151: when the + // hot stack is smaller than n_expert, slice into <=4-token sub-batches to + // route through the stable MMVQ path. Skipped on sm_80+ where MMQ is safe. + const int n_hot_stack = storage.gate_up_hot ? (int)storage.gate_up_hot->ne[2] + : storage.gate_hot ? (int)storage.gate_hot->ne[2] + : 0; + static const int MMQ_SAFE_SUB_BATCH = 4; + if (!mmq_full_batch_ok(cfg, n_tokens) + && n_hot_stack > 0 && n_hot_stack < cfg.n_expert && n_tokens > MMQ_SAFE_SUB_BATCH) { + std::vector sub_out; + for (int t0 = 0; t0 < n_tokens; t0 += MMQ_SAFE_SUB_BATCH) { + const int tc = std::min(MMQ_SAFE_SUB_BATCH, n_tokens - t0); + if (!eval_moe_hot_only_batched( + gpu_backend, cfg, desc, storage, + cur_host + (size_t)t0 * (size_t)n_embd, + selected_ids + (size_t)t0 * (size_t)n_used, + selected_weights + (size_t)t0 * (size_t)n_used, + tc, sub_out, err, p_hot_alloc)) { + return false; + } + std::memcpy(out.data() + (size_t)t0 * (size_t)n_embd, + sub_out.data(), + sizeof(float) * (size_t)n_embd * (size_t)tc); + } + return true; + } + + // Remap global expert IDs → hot-local IDs + const int total_slots = n_used * n_tokens; + std::vector hot_sel(total_slots); + for (int i = 0; i < total_slots; ++i) { + const int32_t gid = selected_ids[i]; + if (gid < 0 || gid >= (int32_t)storage.hot_local_by_global.size()) { + hot_sel[i] = 0; + } else { + hot_sel[i] = storage.hot_local_by_global[(size_t)gid]; + } + } + + // ── Fast path: use cached graph (avoids rebuild + realloc) ── + auto & cached = storage.hot_batched_graph; + if (cached.n_tokens == n_tokens && cached.valid()) { + // Reuse pre-built graph: just upload data and compute + ggml_backend_tensor_set(cached.inp, cur_host, 0, sizeof(float) * (size_t)n_embd * (size_t)n_tokens); + ggml_backend_tensor_set(cached.sel, hot_sel.data(), 0, sizeof(int32_t) * (size_t)total_slots); + ggml_backend_tensor_set(cached.wts, selected_weights, 0, sizeof(float) * (size_t)total_slots); + + auto st = ggml_backend_graph_compute(gpu_backend, cached.gf); + if (st != GGML_STATUS_SUCCESS) { + if (err) *err = "hot_only cached compute failed"; + return false; + } + ggml_backend_tensor_get(cached.output, out.data(), 0, sizeof(float) * (size_t)n_embd * (size_t)n_tokens); + return true; + } + + // ── Slow path: build graph (first call or size mismatch) ── + // Try to build and cache for this n_tokens size. + // Cache when: sub-batch size (legacy), full stack (all hot), or full-batch safe (sm_80+). + if (mmq_full_batch_ok(cfg, n_tokens) || n_tokens == MMQ_SAFE_SUB_BATCH + || (n_hot_stack == 0 || n_hot_stack >= cfg.n_expert)) { + if (build_cached_hot_batched_graph(cached, gpu_backend, storage, desc, cfg, n_tokens)) { + // Successfully cached — use it immediately + ggml_backend_tensor_set(cached.inp, cur_host, 0, sizeof(float) * (size_t)n_embd * (size_t)n_tokens); + ggml_backend_tensor_set(cached.sel, hot_sel.data(), 0, sizeof(int32_t) * (size_t)total_slots); + ggml_backend_tensor_set(cached.wts, selected_weights, 0, sizeof(float) * (size_t)total_slots); + + auto st = ggml_backend_graph_compute(gpu_backend, cached.gf); + if (st != GGML_STATUS_SUCCESS) { + if (err) *err = "hot_only cached compute failed (first)"; + return false; + } + ggml_backend_tensor_get(cached.output, out.data(), 0, sizeof(float) * (size_t)n_embd * (size_t)n_tokens); + return true; + } + // Fall through to uncached path if build fails + } + + // ── Uncached fallback (remainder sub-batches with n_tokens < MMQ_SAFE_SUB_BATCH) ── + ggml_init_params ip{}; + ip.mem_size = 128 * 1024 * 1024; + ip.mem_buffer = nullptr; + ip.no_alloc = true; + ggml_context * ctx = ggml_init(ip); + if (!ctx) { if (err) *err = "hot_only ggml_init failed"; return false; } + + ggml_tensor * inp = ggml_new_tensor_2d(ctx, GGML_TYPE_F32, n_embd, n_tokens); + ggml_set_input(inp); + ggml_tensor * sel = ggml_new_tensor_2d(ctx, GGML_TYPE_I32, n_used, n_tokens); + ggml_set_input(sel); + ggml_tensor * wts = ggml_new_tensor_2d(ctx, GGML_TYPE_F32, n_used, n_tokens); + ggml_set_input(wts); + + ggml_tensor * routed = nullptr; + build_batched_routed_graph(ctx, + storage.gate_hot, storage.up_hot, storage.down_hot, storage.gate_up_hot, + desc.ffn_gate_exps_s, desc.ffn_up_exps_s, desc.ffn_down_exps_s, desc.ffn_gate_up_exps_s, + inp, sel, wts, n_embd, n_ff_exp, n_used, n_tokens, &routed); + + // Shared expert (always on GPU) + ggml_tensor * combined = routed; + ggml_tensor * shared = build_shared_expert_subgraph(ctx, desc, inp); + if (shared) { + combined = combined ? ggml_add(ctx, combined, shared) : shared; + } + + if (!combined) { + ggml_free(ctx); + if (err) *err = "hot_only: no routed or shared output"; + return false; + } + + ggml_cgraph * gf = ggml_new_graph_custom(ctx, 4096, false); + ggml_set_output(combined); + ggml_build_forward_expand(gf, combined); + + ggml_gallocr_t alloc = nullptr; + if (p_hot_alloc) { + if (!*p_hot_alloc) + *p_hot_alloc = ggml_gallocr_new(ggml_backend_get_default_buffer_type(gpu_backend)); + alloc = *p_hot_alloc; + } else { + alloc = ggml_gallocr_new(ggml_backend_get_default_buffer_type(gpu_backend)); + } + if (!ggml_gallocr_alloc_graph(alloc, gf)) { + if (err) *err = "hot_only gallocr failed"; + if (!p_hot_alloc) ggml_gallocr_free(alloc); + ggml_free(ctx); + return false; + } + + ggml_backend_tensor_set(inp, cur_host, 0, sizeof(float) * (size_t)n_embd * (size_t)n_tokens); + ggml_backend_tensor_set(sel, hot_sel.data(), 0, sizeof(int32_t) * (size_t)total_slots); + ggml_backend_tensor_set(wts, selected_weights, 0, sizeof(float) * (size_t)total_slots); + + auto st = ggml_backend_graph_compute(gpu_backend, gf); + if (st != GGML_STATUS_SUCCESS) { + if (err) *err = "hot_only compute failed"; + if (!p_hot_alloc) ggml_gallocr_free(alloc); + ggml_free(ctx); + return false; + } + + ggml_backend_tensor_get(combined, out.data(), 0, sizeof(float) * (size_t)n_embd * (size_t)n_tokens); + if (!p_hot_alloc) ggml_gallocr_free(alloc); + ggml_free(ctx); + return true; +} + // ── GPU-Resident Residual State ── // Public entry. Workaround for a ggml-cuda/HIP defect: the MMQ mul_mat_id @@ -1143,7 +1351,8 @@ bool eval_moe_hybrid_ffn_batched( : storage.gate_hot ? (int)storage.gate_hot->ne[2] : 0; static const int MMQ_SAFE_SUB_BATCH = 4; - if (n_hot_stack > 0 && n_hot_stack < cfg.n_expert && n_tokens > MMQ_SAFE_SUB_BATCH) { + if (!mmq_full_batch_ok(cfg, n_tokens) + && n_hot_stack > 0 && n_hot_stack < cfg.n_expert && n_tokens > MMQ_SAFE_SUB_BATCH) { const int n_embd = cfg.n_embd; const int n_used = cfg.n_expert_used; out.assign((size_t)n_embd * (size_t)n_tokens, 0.0f); diff --git a/server/src/common/moe_hybrid_ffn_eval.h b/server/src/common/moe_hybrid_ffn_eval.h index 174583ec5..591017bba 100644 --- a/server/src/common/moe_hybrid_ffn_eval.h +++ b/server/src/common/moe_hybrid_ffn_eval.h @@ -140,6 +140,21 @@ bool eval_moe_hybrid_ffn_batched( ggml_gallocr_t * p_hot_alloc = nullptr, ggml_gallocr_t * p_cold_alloc = nullptr); +// Hot-only batched prefill: all selected experts are in VRAM. +// Skips cold graph build, CPU compute, and merge — pure GPU path. +bool eval_moe_hot_only_batched( + ggml_backend_t gpu_backend, + const MoeHybridConfig & cfg, + const MoeLayerDesc & desc, + MoeHybridLayerStorage & storage, + const float * cur_host, + const int32_t * selected_ids, + const float * selected_weights, + int n_tokens, + std::vector & out, + std::string * err = nullptr, + ggml_gallocr_t * p_hot_alloc = nullptr); + // GPU-resident single-token hybrid FFN: keeps data on GPU, only reads router // IDs to CPU for hot/cold partitioning. bool eval_moe_hybrid_ffn_gpu_resident( @@ -190,4 +205,13 @@ bool build_cached_cold_graph( int n_ff_exp, int n_cold); +// Build cached hot-only batched graph for prefill (n_tokens=MMQ_SAFE_SUB_BATCH). +bool build_cached_hot_batched_graph( + CachedHotBatchedGraph & out, + ggml_backend_t gpu_backend, + MoeHybridLayerStorage & storage, + const MoeLayerDesc & desc, + const MoeHybridConfig & cfg, + int n_tokens); + } // namespace dflash::common diff --git a/server/src/common/moe_hybrid_storage.cpp b/server/src/common/moe_hybrid_storage.cpp index dcecc165f..a8613b02a 100644 --- a/server/src/common/moe_hybrid_storage.cpp +++ b/server/src/common/moe_hybrid_storage.cpp @@ -1,4 +1,5 @@ #include "moe_hybrid_storage.h" +#include "moe_hybrid_types.h" #include "ggml-cpu.h" #include "ggml-backend.h" @@ -7,8 +8,31 @@ #include #include +#if defined(DFLASH27B_BACKEND_CUDA) +#include +#endif + +#if !defined(_WIN32) +#include +#else +#include +#endif + namespace dflash::common { +int query_gpu_compute_sm() { +#if defined(DFLASH27B_BACKEND_CUDA) + int device = -1; + if (cudaGetDevice(&device) != cudaSuccess || device < 0) return 0; + cudaDeviceProp prop{}; + if (cudaGetDeviceProperties(&prop, device) != cudaSuccess) return 0; + return prop.major * 10 + prop.minor; +#else + // HIP/gfx1151 has the same MMQ bug — keep sub-batch workaround active. + return 0; +#endif +} + void CachedFfnGraph::free() { if (alloc) { ggml_gallocr_free(alloc); alloc = nullptr; } if (ctx) { ggml_free(ctx); ctx = nullptr; } @@ -20,6 +44,17 @@ void CachedFfnGraph::free() { n_hot = 0; } +void CachedHotBatchedGraph::free() { + if (alloc) { ggml_gallocr_free(alloc); alloc = nullptr; } + if (ctx) { ggml_free(ctx); ctx = nullptr; } + gf = nullptr; + inp = nullptr; + sel = nullptr; + wts = nullptr; + output = nullptr; + n_tokens = 0; +} + namespace { static bool read_expert_slices(ggml_backend_t backend, @@ -124,6 +159,17 @@ MoeHybridStorage::~MoeHybridStorage() { ggml_backend_free(cpu_backend); cpu_backend = nullptr; } + if (mmap_data) { +#if !defined(_WIN32) + ::munmap(const_cast(mmap_data), mmap_size); +#else + // On Windows, the mapping is unmapped when the view handle is closed. + // mmap_data was mapped via MapViewOfFile; UnmapViewOfFile is the correct cleanup. + ::UnmapViewOfFile(mmap_data); +#endif + mmap_data = nullptr; + mmap_size = 0; + } } bool MoeHybridStorage::matches(const MoeHybridConfig & cfg) const { @@ -188,6 +234,13 @@ bool build_moe_hybrid_storage(const MoeHybridConfig & cfg, } } + // Populate VRAM bitmask from hot expert IDs + std::memset(dst.expert_vram_mask, 0, sizeof(dst.expert_vram_mask)); + for (int32_t eid : dst.hot_expert_ids) { + if (eid >= 0 && eid < 256) + dst.expert_vram_mask[eid >> 6] |= (1ULL << (eid & 63)); + } + dst.fused_gate_up = desc.has_fused_gate_up(); if (!validate_expert_tensor(desc.ffn_gate_exps, cfg.n_expert, &dst.gate_expert_bytes, err) || !validate_expert_tensor(desc.ffn_up_exps, cfg.n_expert, &dst.up_expert_bytes, err) || @@ -364,6 +417,13 @@ bool build_moe_hybrid_storage_from_file( } } + // Populate VRAM bitmask from hot expert IDs + std::memset(dst.expert_vram_mask, 0, sizeof(dst.expert_vram_mask)); + for (int32_t eid : dst.hot_expert_ids) { + if (eid >= 0 && eid < 256) + dst.expert_vram_mask[eid >> 6] |= (1ULL << (eid & 63)); + } + dst.fused_gate_up = desc.has_fused_gate_up(); if (!validate_expert_tensor(desc.ffn_gate_exps, cfg.n_expert, &dst.gate_expert_bytes, err) || !validate_expert_tensor(desc.ffn_up_exps, cfg.n_expert, &dst.up_expert_bytes, err) || @@ -523,6 +583,7 @@ int moe_hybrid_cache_swap_in(MoeHybridLayerStorage & st, int global_expert, if (slot < 0) return -1; const int evicted = st.spare_global[(size_t)slot]; if (evicted >= 0) st.hot_local_by_global[(size_t)evicted] = -1; // evicted -> served cold again + if (evicted >= 0 && evicted < 256) st.expert_vram_mask[evicted >> 6] &= ~(1ULL << (evicted & 63)); const int hslot = st.hot_active + slot; // hot-local index of the spare slot auto copy_slice = [&](ggml_tensor * cold_t, ggml_tensor * hot_t, size_t ebytes) { @@ -541,6 +602,7 @@ int moe_hybrid_cache_swap_in(MoeHybridLayerStorage & st, int global_expert, } st.hot_local_by_global[(size_t)global_expert] = hslot; + if (global_expert < 256) st.expert_vram_mask[global_expert >> 6] |= 1ULL << (global_expert & 63); st.spare_global[(size_t)slot] = global_expert; st.spare_lru[(size_t)slot] = ++st.lru_clock; return hslot; @@ -570,4 +632,61 @@ MoeSparkBudget spark_budget_split(uint64_t expert_budget, uint64_t total_expert_ return r; } +bool build_moe_hybrid_storage_from_file_with_mmap( + const MoeHybridConfig & cfg, + ggml_backend_t gpu_backend, + const MoeHybridPlacement & placement, + const std::vector & layer_descs, + const std::vector & file_data, + const void * mmap_base, + size_t mmap_total_size, + MoeHybridStorage & out, + std::string * err, + int cache_slots) { + + // First build storage normally (hot GPU + cold CPU buffers). + if (!build_moe_hybrid_storage_from_file(cfg, gpu_backend, placement, layer_descs, file_data, out, err, cache_slots)) { + return false; + } + + // Store mmap metadata for streaming prefill. + out.mmap_data = mmap_base; + out.mmap_size = mmap_total_size; + + // Compute per-layer expert file regions (offsets relative to mmap base). + const auto * base = static_cast(mmap_base); + out.layer_regions.resize((size_t)cfg.n_layer); + for (int il = 0; il < cfg.n_layer; ++il) { + const auto & fd = file_data[(size_t)il]; + auto & reg = out.layer_regions[(size_t)il]; + + if (fd.gate_exps.data && fd.gate_exps.size > 0) { + reg.gate_exps.offset = (size_t)(fd.gate_exps.data - base); + reg.gate_exps.size = fd.gate_exps.size; + } + if (fd.up_exps.data && fd.up_exps.size > 0) { + reg.up_exps.offset = (size_t)(fd.up_exps.data - base); + reg.up_exps.size = fd.up_exps.size; + } + if (fd.down_exps.data && fd.down_exps.size > 0) { + reg.down_exps.offset = (size_t)(fd.down_exps.data - base); + reg.down_exps.size = fd.down_exps.size; + } + if (fd.gate_up_exps.data && fd.gate_up_exps.size > 0) { + reg.gate_up_exps.offset = (size_t)(fd.gate_up_exps.data - base); + reg.gate_up_exps.size = fd.gate_up_exps.size; + } + + // Copy per-expert byte sizes from layer storage (already computed) + const auto & ls = out.layers[(size_t)il]; + reg.expert_bytes_gate = ls.gate_expert_bytes; + reg.expert_bytes_up = ls.up_expert_bytes; + reg.expert_bytes_down = ls.down_expert_bytes; + reg.expert_bytes_gate_up = ls.gate_up_expert_bytes; + reg.fused_gate_up = ls.fused_gate_up; + } + + return true; +} + } // namespace dflash::common diff --git a/server/src/common/moe_hybrid_storage.h b/server/src/common/moe_hybrid_storage.h index 281696d04..3485c69ff 100644 --- a/server/src/common/moe_hybrid_storage.h +++ b/server/src/common/moe_hybrid_storage.h @@ -14,6 +14,25 @@ namespace dflash::common { +// File region for one expert tensor (offset into mmap). +struct ExpertFileRegion { + size_t offset = 0; + size_t size = 0; +}; + +// Per-layer file regions for all expert tensors (used by streaming prefill). +struct LayerExpertRegions { + ExpertFileRegion gate_exps; + ExpertFileRegion up_exps; + ExpertFileRegion down_exps; + ExpertFileRegion gate_up_exps; // optional fused + size_t expert_bytes_gate = 0; + size_t expert_bytes_up = 0; + size_t expert_bytes_down = 0; + size_t expert_bytes_gate_up = 0; + bool fused_gate_up = false; +}; + // Cached FFN graph for a fixed number of selected experts. // Built once, reused every token to avoid per-call graph rebuild overhead. struct CachedFfnGraph { @@ -35,6 +54,22 @@ struct CachedFfnGraph { void free(); }; +// Cached batched FFN graph for hot-only prefill (n_tokens = MMQ_SAFE_SUB_BATCH). +// Eliminates per-call graph rebuild + gallocr planning overhead. +struct CachedHotBatchedGraph { + ggml_context * ctx = nullptr; + ggml_cgraph * gf = nullptr; + ggml_gallocr_t alloc = nullptr; + ggml_tensor * inp = nullptr; // [n_embd, n_tokens] F32 input + ggml_tensor * sel = nullptr; // [n_used, n_tokens] I32 hot-local IDs + ggml_tensor * wts = nullptr; // [n_used, n_tokens] F32 expert weights + ggml_tensor * output = nullptr; // [n_embd, n_tokens] F32 output + int n_tokens = 0; + + bool valid() const { return ctx && gf && alloc && output; } + void free(); +}; + struct MoeHybridLayerStorage { ggml_context * hot_ctx = nullptr; ggml_backend_buffer_t hot_buf = nullptr; @@ -65,6 +100,21 @@ struct MoeHybridLayerStorage { std::vector spare_lru; // [cache_slots] last-use tick uint64_t lru_clock = 0; + // Bitmask: bit set = expert is in VRAM (hot). Supports up to 256 experts. + uint64_t expert_vram_mask[4] = {}; + + // Fast check: are ALL routed experts in VRAM for this batch? + // selected_ids has n_slots entries (n_tokens * n_expert_used). + bool all_routed_are_hot(const int32_t * selected_ids, int n_slots) const { + for (int i = 0; i < n_slots; ++i) { + const int g = selected_ids[i]; + if (g < 0 || g >= 256) continue; + if (!((expert_vram_mask[g >> 6] >> (g & 63)) & 1ULL)) + return false; + } + return true; + } + bool fused_gate_up = false; size_t gate_expert_bytes = 0; size_t up_expert_bytes = 0; @@ -79,6 +129,9 @@ struct MoeHybridLayerStorage { // Cached FFN graphs for common-case expert counts. CachedFfnGraph hot_graph; CachedFfnGraph cold_graph; + + // Cached batched hot-only graph for prefill sub-batches (n_tokens=4). + CachedHotBatchedGraph hot_batched_graph; }; struct MoeHybridStorage { @@ -93,8 +146,18 @@ struct MoeHybridStorage { MoeHybridPlacement placement; std::vector layers; + // Persistent mmap for streaming prefill (nullptr if not available). + // When set, the streaming engine can DMA cold experts directly from here. + const void * mmap_data = nullptr; + size_t mmap_size = 0; + int mmap_fd = -1; // POSIX fd for madvise; -1 on Windows or if not available + + // Per-layer file region metadata for streaming (populated when mmap is active). + std::vector layer_regions; + bool matches(const MoeHybridConfig & cfg) const; bool empty() const; + bool has_mmap() const { return mmap_data != nullptr && mmap_size > 0; } }; // Expert tensor file data for split loading (one entry per expert tensor). @@ -144,4 +207,21 @@ MoeSparkBudget spark_budget_split(uint64_t expert_budget, uint64_t total_expert_ int n_expert, uint64_t core_kv_safety, uint64_t target_bytes); +// Build hybrid storage from file AND retain mmap for streaming prefill. +// The caller must keep the mmap region alive for the lifetime of the storage. +// mmap_base: pointer to start of mmap'd file. +// mmap_total_size: total file size. +// This variant populates out.layer_regions for use by MoeHybridStreamEngine. +bool build_moe_hybrid_storage_from_file_with_mmap( + const MoeHybridConfig & cfg, + ggml_backend_t gpu_backend, + const MoeHybridPlacement & placement, + const std::vector & layer_descs, + const std::vector & file_data, + const void * mmap_base, + size_t mmap_total_size, + MoeHybridStorage & out, + std::string * err = nullptr, + int cache_slots = 0); + } // namespace dflash::common diff --git a/server/src/common/moe_hybrid_stream.cpp b/server/src/common/moe_hybrid_stream.cpp new file mode 100644 index 000000000..0b8cd74ff --- /dev/null +++ b/server/src/common/moe_hybrid_stream.cpp @@ -0,0 +1,470 @@ +// MoE hybrid prefill streaming engine — implementation. + +#include "moe_hybrid_stream.h" +#include "gpu_runtime_compat.h" + +#include "ggml-backend.h" + +#include +#include +#include + +#if !defined(_WIN32) +#include +#include +#endif + +namespace dflash::common { + +MoeHybridStreamEngine::~MoeHybridStreamEngine() { + destroy(); +} + +MoeHybridStreamEngine::MoeHybridStreamEngine(MoeHybridStreamEngine && o) noexcept + : pinned_buf_(o.pinned_buf_), pinned_size_(o.pinned_size_), + gpu_scratch_(o.gpu_scratch_), scratch_size_(o.scratch_size_), + backend_(o.backend_), + scratch_gate_(o.scratch_gate_), scratch_up_(o.scratch_up_), + scratch_down_(o.scratch_down_), + last_gate_bytes_(o.last_gate_bytes_), last_up_bytes_(o.last_up_bytes_), + last_down_bytes_(o.last_down_bytes_) { + o.pinned_buf_ = nullptr; o.pinned_size_ = 0; + o.gpu_scratch_ = nullptr; o.scratch_size_ = 0; + o.backend_ = nullptr; + o.scratch_gate_ = nullptr; o.scratch_up_ = nullptr; o.scratch_down_ = nullptr; + o.last_gate_bytes_ = 0; o.last_up_bytes_ = 0; o.last_down_bytes_ = 0; +} + +MoeHybridStreamEngine & MoeHybridStreamEngine::operator=(MoeHybridStreamEngine && o) noexcept { + if (this != &o) { + destroy(); + pinned_buf_ = o.pinned_buf_; pinned_size_ = o.pinned_size_; + gpu_scratch_ = o.gpu_scratch_; scratch_size_ = o.scratch_size_; + backend_ = o.backend_; + scratch_gate_ = o.scratch_gate_; scratch_up_ = o.scratch_up_; + scratch_down_ = o.scratch_down_; + last_gate_bytes_ = o.last_gate_bytes_; last_up_bytes_ = o.last_up_bytes_; + last_down_bytes_ = o.last_down_bytes_; + o.pinned_buf_ = nullptr; o.pinned_size_ = 0; + o.gpu_scratch_ = nullptr; o.scratch_size_ = 0; + o.backend_ = nullptr; + o.scratch_gate_ = nullptr; o.scratch_up_ = nullptr; o.scratch_down_ = nullptr; + o.last_gate_bytes_ = 0; o.last_up_bytes_ = 0; o.last_down_bytes_ = 0; + } + return *this; +} + +bool MoeHybridStreamEngine::init(ggml_backend_t gpu_backend, size_t max_expert_bytes, + std::string * err) { + destroy(); + if (!gpu_backend || max_expert_bytes == 0) { + if (err) *err = "invalid arguments to stream engine init"; + return false; + } + + // Allocate pinned host staging buffer + cudaError_t cuda_err = cudaMallocHost(&pinned_buf_, max_expert_bytes); + if (cuda_err != cudaSuccess) { + if (err) *err = std::string("cudaMallocHost failed: ") + cudaGetErrorString(cuda_err); + return false; + } + pinned_size_ = max_expert_bytes; + + // Allocate GPU scratch buffer + cuda_err = cudaMalloc(&gpu_scratch_, max_expert_bytes); + if (cuda_err != cudaSuccess) { + if (err) *err = std::string("cudaMalloc scratch failed: ") + cudaGetErrorString(cuda_err); + cudaFreeHost(pinned_buf_); + pinned_buf_ = nullptr; + pinned_size_ = 0; + return false; + } + scratch_size_ = max_expert_bytes; + backend_ = gpu_backend; + return true; +} + +bool MoeHybridStreamEngine::is_ready() const { + return pinned_buf_ && gpu_scratch_ && backend_; +} + +void MoeHybridStreamEngine::destroy() { + if (gpu_scratch_) { + cudaFree(gpu_scratch_); + gpu_scratch_ = nullptr; + } + if (pinned_buf_) { + cudaFreeHost(pinned_buf_); + pinned_buf_ = nullptr; + } + pinned_size_ = 0; + scratch_size_ = 0; + backend_ = nullptr; + scratch_gate_ = nullptr; + scratch_up_ = nullptr; + scratch_down_ = nullptr; + last_gate_bytes_ = 0; + last_up_bytes_ = 0; + last_down_bytes_ = 0; +} + +void MoeHybridStreamEngine::prefetch_cold_experts(const void * mmap_data, size_t mmap_size, + const LayerExpertRegions & regions, + const int32_t * cold_expert_ids, + int n_cold) { + if (!mmap_data || mmap_size == 0 || !cold_expert_ids || n_cold <= 0) return; + +#if !defined(_WIN32) + auto do_advise = [&](size_t offset, size_t length) { + if (offset + length > mmap_size) return; + const size_t page_size = (size_t)sysconf(_SC_PAGESIZE); + const size_t aligned_offset = (offset / page_size) * page_size; + const size_t aligned_length = length + (offset - aligned_offset); + ::madvise(const_cast(static_cast(mmap_data)) + aligned_offset, + aligned_length, MADV_WILLNEED); + }; +#endif + + for (int i = 0; i < n_cold; ++i) { + const int32_t eid = cold_expert_ids[i]; +#if !defined(_WIN32) + if (regions.fused_gate_up) { + if (regions.gate_up_exps.size > 0) { + do_advise(regions.gate_up_exps.offset + (size_t)eid * regions.expert_bytes_gate_up, + regions.expert_bytes_gate_up); + } + } else { + if (regions.gate_exps.size > 0) { + do_advise(regions.gate_exps.offset + (size_t)eid * regions.expert_bytes_gate, + regions.expert_bytes_gate); + } + if (regions.up_exps.size > 0) { + do_advise(regions.up_exps.offset + (size_t)eid * regions.expert_bytes_up, + regions.expert_bytes_up); + } + } + if (regions.down_exps.size > 0) { + do_advise(regions.down_exps.offset + (size_t)eid * regions.expert_bytes_down, + regions.expert_bytes_down); + } +#else + (void)eid; + (void)regions; +#endif + } +} + +bool MoeHybridStreamEngine::stream_expert_sync(const void * mmap_data, size_t mmap_size, + const LayerExpertRegions & regions, + int expert_id, + ggml_backend_t gpu_backend, + std::string * err) { + if (!is_ready()) { + if (err) *err = "stream engine not initialized"; + return false; + } + if (!mmap_data || mmap_size == 0) { + if (err) *err = "mmap not available"; + return false; + } + + const auto * file_base = static_cast(mmap_data); + size_t staging_offset = 0; + + // Validate expert_id against region size + if (expert_id < 0) { + if (err) *err = "expert_id is negative"; + return false; + } + + // Copy gate (or fused gate_up) from mmap → pinned + if (regions.fused_gate_up) { + const size_t bytes = regions.expert_bytes_gate_up; + const size_t file_off = regions.gate_up_exps.offset + (size_t)expert_id * bytes; + if (file_off + bytes > mmap_size) { + if (err) *err = "gate_up expert out of file bounds"; + return false; + } + std::memcpy(static_cast(pinned_buf_) + staging_offset, + file_base + file_off, bytes); + last_gate_bytes_ = bytes; + last_up_bytes_ = 0; + staging_offset += bytes; + } else { + // gate + { + const size_t bytes = regions.expert_bytes_gate; + const size_t file_off = regions.gate_exps.offset + (size_t)expert_id * bytes; + if (file_off + bytes > mmap_size) { + if (err) *err = "gate expert out of file bounds"; + return false; + } + std::memcpy(static_cast(pinned_buf_) + staging_offset, + file_base + file_off, bytes); + last_gate_bytes_ = bytes; + staging_offset += bytes; + } + // up + { + const size_t bytes = regions.expert_bytes_up; + const size_t file_off = regions.up_exps.offset + (size_t)expert_id * bytes; + if (file_off + bytes > mmap_size) { + if (err) *err = "up expert out of file bounds"; + return false; + } + std::memcpy(static_cast(pinned_buf_) + staging_offset, + file_base + file_off, bytes); + last_up_bytes_ = bytes; + staging_offset += bytes; + } + } + + // down + { + const size_t bytes = regions.expert_bytes_down; + const size_t file_off = regions.down_exps.offset + (size_t)expert_id * bytes; + if (file_off + bytes > mmap_size) { + if (err) *err = "down expert out of file bounds"; + return false; + } + std::memcpy(static_cast(pinned_buf_) + staging_offset, + file_base + file_off, bytes); + last_down_bytes_ = bytes; + staging_offset += bytes; + } + + if (staging_offset > scratch_size_) { + if (err) *err = "expert exceeds scratch buffer size"; + return false; + } + + // DMA pinned → GPU scratch (synchronous for now; async pipeline in eval function) + cudaError_t cuda_err = cudaMemcpy(gpu_scratch_, pinned_buf_, staging_offset, + cudaMemcpyHostToDevice); + if (cuda_err != cudaSuccess) { + if (err) *err = std::string("cudaMemcpy H2D failed: ") + cudaGetErrorString(cuda_err); + return false; + } + + // Set pointers into scratch + auto * scratch_bytes = static_cast(gpu_scratch_); + size_t off = 0; + if (regions.fused_gate_up) { + scratch_gate_ = scratch_bytes + off; + off += last_gate_bytes_; + scratch_up_ = nullptr; + } else { + scratch_gate_ = scratch_bytes + off; + off += last_gate_bytes_; + scratch_up_ = scratch_bytes + off; + off += last_up_bytes_; + } + scratch_down_ = scratch_bytes + off; + + return true; +} + +// ── Streaming prefill evaluation ──────────────────────────────────────────── + +bool eval_moe_cold_experts_streaming( + MoeHybridStreamEngine & engine, + ggml_backend_t gpu_backend, + const void * mmap_data, + size_t mmap_size, + const MoeHybridConfig & cfg, + const MoeLayerDesc & desc, + const LayerExpertRegions & regions, + const MoeHybridLayerStorage & storage, + const float * cur_host, + const int32_t * selected_ids, + const float * selected_weights, + int n_tokens, + std::vector & out, + std::string * err) { + + const int n_embd = cfg.n_embd; + const int n_ff_exp = cfg.n_ff_exp; + const int n_used = cfg.n_expert_used; + const int total_slots = n_used * n_tokens; + + out.assign((size_t)n_embd * (size_t)n_tokens, 0.0f); + if (!engine.is_ready()) { + if (err) *err = "stream engine not ready"; + return false; + } + if (!mmap_data || mmap_size == 0) { + if (err) *err = "mmap not available"; + return false; + } + + // Identify unique cold experts needed across all tokens. + std::vector cold_needed((size_t)cfg.n_expert, false); + for (int i = 0; i < total_slots; ++i) { + const int32_t gid = selected_ids[i]; + if (gid < 0 || gid >= cfg.n_expert) continue; + if (storage.hot_local_by_global[(size_t)gid] < 0) { + cold_needed[(size_t)gid] = true; + } + } + + std::vector unique_cold; + for (int e = 0; e < cfg.n_expert; ++e) { + if (cold_needed[(size_t)e]) unique_cold.push_back((int32_t)e); + } + + if (unique_cold.empty()) return true; + + // Prefetch all cold experts via madvise + engine.prefetch_cold_experts(mmap_data, mmap_size, regions, unique_cold.data(), (int)unique_cold.size()); + + // For each unique cold expert: stream to GPU, compute ALL tokens that selected it + // in a single batched matmul graph. + for (int32_t cold_eid : unique_cold) { + // Stream expert weights to GPU scratch + if (!engine.stream_expert_sync(mmap_data, mmap_size, regions, cold_eid, gpu_backend, err)) { + return false; + } + + // Gather all tokens that selected this expert + struct TokenHit { int ti; float weight; }; + std::vector hits; + hits.reserve((size_t)n_tokens); + for (int ti = 0; ti < n_tokens; ++ti) { + for (int k = 0; k < n_used; ++k) { + const int slot = ti * n_used + k; + if (selected_ids[slot] != cold_eid) continue; + const float w = selected_weights[slot]; + if (w != 0.0f) hits.push_back({ti, w}); + break; // each expert selected at most once per token + } + } + if (hits.empty()) continue; + + const int batch = (int)hits.size(); + + // Build batched input: [n_embd, batch] + std::vector batch_input((size_t)n_embd * (size_t)batch); + for (int i = 0; i < batch; ++i) { + const float * src = cur_host + (size_t)hits[(size_t)i].ti * (size_t)n_embd; + std::memcpy(batch_input.data() + (size_t)i * (size_t)n_embd, src, sizeof(float) * (size_t)n_embd); + } + + // Build single ggml graph for this expert with all tokens batched + ggml_init_params ip{}; + ip.mem_size = 32 * 1024 * 1024; + ip.mem_buffer = nullptr; + ip.no_alloc = true; + ggml_context * ctx = ggml_init(ip); + if (!ctx) { + if (err) *err = "ggml_init failed in streaming eval"; + return false; + } + + ggml_tensor * inp = ggml_new_tensor_2d(ctx, GGML_TYPE_F32, n_embd, batch); + ggml_set_input(inp); + + // Weight tensors pointing into GPU scratch (same for all tokens in batch) + ggml_tensor * gate_t = nullptr; + ggml_tensor * up_t = nullptr; + ggml_tensor * down_t = nullptr; + ggml_tensor * gate_up_t = nullptr; + + if (regions.fused_gate_up) { + gate_up_t = ggml_new_tensor_2d(ctx, desc.ffn_gate_up_exps->type, n_embd, 2 * n_ff_exp); + ggml_set_input(gate_up_t); + down_t = ggml_new_tensor_2d(ctx, desc.ffn_down_exps->type, n_ff_exp, n_embd); + ggml_set_input(down_t); + } else { + gate_t = ggml_new_tensor_2d(ctx, desc.ffn_gate_exps->type, n_embd, n_ff_exp); + ggml_set_input(gate_t); + up_t = ggml_new_tensor_2d(ctx, desc.ffn_up_exps->type, n_embd, n_ff_exp); + ggml_set_input(up_t); + down_t = ggml_new_tensor_2d(ctx, desc.ffn_down_exps->type, n_ff_exp, n_embd); + ggml_set_input(down_t); + } + + // FFN graph: out = down(silu(gate(x)) * up(x)) — batched over all tokens + ggml_tensor * gu = nullptr; + if (gate_up_t) { + ggml_tensor * gate_up_out = ggml_mul_mat(ctx, gate_up_t, inp); // [2*n_ff, batch] + if (desc.ffn_gate_up_exps_s != 1.0f) + gate_up_out = ggml_scale(ctx, gate_up_out, desc.ffn_gate_up_exps_s); + ggml_tensor * g_part = ggml_view_2d(ctx, gate_up_out, n_ff_exp, batch, + gate_up_out->nb[1], 0); + ggml_tensor * u_part = ggml_view_2d(ctx, gate_up_out, n_ff_exp, batch, + gate_up_out->nb[1], + (size_t)n_ff_exp * sizeof(float)); + g_part = ggml_cont(ctx, g_part); + u_part = ggml_cont(ctx, u_part); + gu = ggml_swiglu_split(ctx, g_part, u_part); + } else { + ggml_tensor * g = ggml_mul_mat(ctx, gate_t, inp); // [n_ff, batch] + if (desc.ffn_gate_exps_s != 1.0f) + g = ggml_scale(ctx, g, desc.ffn_gate_exps_s); + ggml_tensor * u = ggml_mul_mat(ctx, up_t, inp); // [n_ff, batch] + if (desc.ffn_up_exps_s != 1.0f) + u = ggml_scale(ctx, u, desc.ffn_up_exps_s); + gu = ggml_swiglu_split(ctx, g, u); + } + + ggml_tensor * expert_out = ggml_mul_mat(ctx, down_t, gu); // [n_embd, batch] + if (desc.ffn_down_exps_s != 1.0f) + expert_out = ggml_scale(ctx, expert_out, desc.ffn_down_exps_s); + + ggml_cgraph * gf = ggml_new_graph_custom(ctx, 512, false); + ggml_set_output(expert_out); + ggml_build_forward_expand(gf, expert_out); + + ggml_gallocr_t alloc = ggml_gallocr_new(ggml_backend_get_default_buffer_type(gpu_backend)); + if (!ggml_gallocr_alloc_graph(alloc, gf)) { + if (err) *err = "streaming eval gallocr failed"; + ggml_gallocr_free(alloc); + ggml_free(ctx); + return false; + } + + // Upload batched input (host → GPU via ggml_backend_tensor_set) + ggml_backend_tensor_set(inp, batch_input.data(), 0, sizeof(float) * (size_t)n_embd * (size_t)batch); + + // Point weight tensors directly at GPU scratch (device-to-device, no copy needed). + // The gallocr allocated these tensors on the same GPU, but we override their data + // pointers to point at our pre-loaded scratch buffer. + if (gate_up_t) { + gate_up_t->data = const_cast(engine.scratch_gate_data()); + down_t->data = const_cast(engine.scratch_down_data()); + } else { + gate_t->data = const_cast(engine.scratch_gate_data()); + up_t->data = const_cast(engine.scratch_up_data()); + down_t->data = const_cast(engine.scratch_down_data()); + } + + auto st = ggml_backend_graph_compute(gpu_backend, gf); + if (st != GGML_STATUS_SUCCESS) { + if (err) *err = "streaming eval compute failed"; + ggml_gallocr_free(alloc); + ggml_free(ctx); + return false; + } + + // Read batched result [n_embd, batch] and scatter-accumulate with weights + std::vector batch_result((size_t)n_embd * (size_t)batch); + ggml_backend_tensor_get(expert_out, batch_result.data(), 0, sizeof(float) * (size_t)n_embd * (size_t)batch); + + for (int i = 0; i < batch; ++i) { + const float w = hits[(size_t)i].weight; + const int ti = hits[(size_t)i].ti; + const float * res = batch_result.data() + (size_t)i * (size_t)n_embd; + float * out_tok = out.data() + (size_t)ti * (size_t)n_embd; + for (int j = 0; j < n_embd; ++j) { + out_tok[j] += w * res[(size_t)j]; + } + } + + ggml_gallocr_free(alloc); + ggml_free(ctx); + } + + return true; +} + +} // namespace dflash::common diff --git a/server/src/common/moe_hybrid_stream.h b/server/src/common/moe_hybrid_stream.h new file mode 100644 index 000000000..d5e929ae9 --- /dev/null +++ b/server/src/common/moe_hybrid_stream.h @@ -0,0 +1,112 @@ +// MoE hybrid prefill streaming engine — DMA pipeline for cold expert offload. +// +// Streams cold expert weight slices from mmap (page cache) through a pinned +// host staging buffer to GPU scratch memory, pipelined with GPU compute on +// previously-transferred experts. Used during prefill when T >= threshold. + +#pragma once + +#include "moe_hybrid_types.h" +#include "moe_hybrid_storage.h" + +#include "ggml.h" +#include "ggml-backend.h" + +#include +#include +#include +#include + +namespace dflash::common { + +// Configuration for the stream engine. +struct MoeStreamConfig { + int prefill_threshold = 8; // min n_tokens to activate streaming + int prefetch_layers = 2; // how many layers ahead to madvise +}; + +// Streaming engine: manages pinned staging buffer, GPU scratch, and DMA pipeline. +class MoeHybridStreamEngine { +public: + MoeHybridStreamEngine() = default; + ~MoeHybridStreamEngine(); + + MoeHybridStreamEngine(const MoeHybridStreamEngine &) = delete; + MoeHybridStreamEngine & operator=(const MoeHybridStreamEngine &) = delete; + MoeHybridStreamEngine(MoeHybridStreamEngine &&) noexcept; + MoeHybridStreamEngine & operator=(MoeHybridStreamEngine &&) noexcept; + + // Initialize the engine with a maximum expert size (bytes for one expert's + // gate+up+down tensors). Allocates pinned host buffer and GPU scratch. + bool init(ggml_backend_t gpu_backend, size_t max_expert_bytes, std::string * err = nullptr); + bool is_ready() const; + void destroy(); + + // Issue madvise(WILLNEED) for the specified cold experts in the given layer. + // Call this as early as possible (e.g. at start of layer or N layers ahead). + void prefetch_cold_experts(const void * mmap_data, size_t mmap_size, + const LayerExpertRegions & regions, + const int32_t * cold_expert_ids, + int n_cold); + + // Stream a single cold expert from mmap to GPU scratch and return a ggml + // tensor view over the scratch memory for each weight matrix. + // This is a BLOCKING operation (synchronous DMA). For pipelined usage, + // use the async variants below. + bool stream_expert_sync(const void * mmap_data, size_t mmap_size, + const LayerExpertRegions & regions, + int expert_id, + ggml_backend_t gpu_backend, + std::string * err = nullptr); + + // Get tensor pointers into the GPU scratch buffer after a successful stream. + // Valid until next stream call. Tensors are transient (not owned by any context). + const void * scratch_gate_data() const { return scratch_gate_; } + const void * scratch_up_data() const { return scratch_up_; } + const void * scratch_down_data() const { return scratch_down_; } + size_t scratch_gate_bytes() const { return last_gate_bytes_; } + size_t scratch_up_bytes() const { return last_up_bytes_; } + size_t scratch_down_bytes() const { return last_down_bytes_; } + + // Total pinned buffer size. + size_t pinned_bytes() const { return pinned_size_; } + // Total GPU scratch size. + size_t scratch_bytes() const { return scratch_size_; } + +private: + void * pinned_buf_ = nullptr; // cudaMallocHost'd staging buffer + size_t pinned_size_ = 0; + + void * gpu_scratch_ = nullptr; // GPU device memory for one expert + size_t scratch_size_ = 0; + ggml_backend_t backend_ = nullptr; + + // Offsets into scratch for last-streamed expert + void * scratch_gate_ = nullptr; + void * scratch_up_ = nullptr; + void * scratch_down_ = nullptr; + size_t last_gate_bytes_ = 0; + size_t last_up_bytes_ = 0; + size_t last_down_bytes_ = 0; +}; + +// Evaluate cold experts by streaming from mmap to GPU, pipelined. +// Hot experts are already computed (result in hot_partial). +// Returns combined cold expert contribution in out (sized n_embd * n_tokens). +bool eval_moe_cold_experts_streaming( + MoeHybridStreamEngine & engine, + ggml_backend_t gpu_backend, + const void * mmap_data, + size_t mmap_size, + const MoeHybridConfig & cfg, + const MoeLayerDesc & desc, + const LayerExpertRegions & regions, + const MoeHybridLayerStorage & storage, + const float * cur_host, + const int32_t * selected_ids, + const float * selected_weights, + int n_tokens, + std::vector & out, + std::string * err = nullptr); + +} // namespace dflash::common diff --git a/server/src/common/moe_hybrid_types.h b/server/src/common/moe_hybrid_types.h index c3a15e6bd..ff061133c 100644 --- a/server/src/common/moe_hybrid_types.h +++ b/server/src/common/moe_hybrid_types.h @@ -13,6 +13,11 @@ namespace dflash::common { +// ─── GPU SM version query ─────────────────────────────────────────────── +// Returns the compute capability as major*10+minor (e.g. 86 for sm_86). +// Returns 0 if CUDA/HIP runtime is unavailable. +int query_gpu_compute_sm(); + // ─── MoE architecture config (model-agnostic) ────────────────────────── struct MoeHybridConfig { @@ -23,6 +28,12 @@ struct MoeHybridConfig { int n_ff_shexp = 0; // shared expert intermediate dimension (0 = no shared) int n_layer = 0; // number of MoE layers int first_moe_layer = 0; // index of first MoE layer (e.g., 0 for qwen35moe, 1 for laguna) + + // When true, MMQ mul_mat_id works correctly with reduced hot stacks + // (n_hot < n_expert). Safe on sm_80+ (Ampere/Ada/Hopper/Blackwell). + // On sm_75 (Turing) and gfx1151, the kernel has illegal memory accesses + // with reduced stacks, requiring the <=4-token sub-batch workaround. + bool mmq_safe_full_batch = false; }; // ─── Per-layer expert tensor descriptor ───────────────────────────────── diff --git a/server/src/common/moe_hybrid_types_impl.h b/server/src/common/moe_hybrid_types_impl.h index ddb4355bc..0133a08d4 100644 --- a/server/src/common/moe_hybrid_types_impl.h +++ b/server/src/common/moe_hybrid_types_impl.h @@ -22,6 +22,9 @@ inline MoeHybridConfig make_moe_hybrid_config(const TargetWeights & w) { cfg.n_ff_shexp = w.n_ff_shexp; cfg.n_layer = w.n_layer; cfg.first_moe_layer = 0; // all layers are MoE in qwen35moe + // sm_80+ (Ampere and later): MMQ mul_mat_id is safe with reduced hot stacks + static const int sm = query_gpu_compute_sm(); + cfg.mmq_safe_full_batch = (sm >= 80); return cfg; } @@ -61,6 +64,9 @@ inline MoeHybridConfig make_moe_hybrid_config(const LagunaTargetWeights & w) { cfg.n_ff_shexp = w.n_ff_shexp; cfg.n_layer = w.n_layer; cfg.first_moe_layer = w.n_layer_dense_lead; // layer 0 is dense in laguna + // sm_80+ (Ampere and later): MMQ mul_mat_id is safe with reduced hot stacks + static const int sm = query_gpu_compute_sm(); + cfg.mmq_safe_full_batch = (sm >= 80); return cfg; } diff --git a/server/src/laguna/laguna_backend.cpp b/server/src/laguna/laguna_backend.cpp index cd74108ec..ab75ef5a8 100644 --- a/server/src/laguna/laguna_backend.cpp +++ b/server/src/laguna/laguna_backend.cpp @@ -735,6 +735,28 @@ bool LagunaBackend::init_hybrid_mode() { if (total_cold > 0) { hybrid_mode_ = true; std::printf("[laguna-hybrid] hybrid decode path active (%d cold experts)\n", total_cold); + + // Initialize streaming engine for prefill + if (moe_hybrid_->has_mmap()) { + size_t max_expert_bytes = 0; + for (int il = 0; il < w_.n_layer; ++il) { + const auto & layer = moe_hybrid_->layers[(size_t)il]; + size_t eb = layer.fused_gate_up + ? (size_t)layer.gate_up_expert_bytes + (size_t)layer.down_expert_bytes + : (size_t)layer.gate_expert_bytes + (size_t)layer.up_expert_bytes + + (size_t)layer.down_expert_bytes; + if (eb > max_expert_bytes) max_expert_bytes = eb; + } + std::string stream_err; + if (stream_engine_.init(backend_, max_expert_bytes, &stream_err)) { + std::printf("[laguna-hybrid] stream engine ready: pinned=%.1f MiB scratch=%.1f MiB\n", + stream_engine_.pinned_bytes() / 1024.0 / 1024.0, + stream_engine_.scratch_bytes() / 1024.0 / 1024.0); + } else { + std::fprintf(stderr, "[laguna-hybrid] stream engine init failed: %s (prefill will use CPU fallback)\n", + stream_err.c_str()); + } + } } else { hybrid_mode_ = true; // partial load: expert tensors only in hybrid storage std::printf("[laguna-hybrid] all experts hot — using hybrid path (all-hot)\n"); @@ -1444,13 +1466,57 @@ GenerateResult LagunaBackend::generate_hybrid(const GenerateRequest & req, MoeHybridConfig chunk_cfg = make_moe_hybrid_config(w_); MoeLayerDesc chunk_desc = make_moe_layer_desc(w_.layers[(size_t)il]); std::vector ffn_batch_out; - if (!eval_moe_hybrid_ffn_batched( - backend_, cpu_be, chunk_cfg, chunk_desc, storage, - chunk_post.data(), - chunk_selected.data(), - chunk_weights.data(), - chunk_len, ffn_batch_out, &result.error, - &ffn_hot_alloc, &ffn_cold_alloc)) { + bool ffn_ok = false; + + if (storage.cold_expert_ids.empty()) { + // All experts hot — use batched path directly + ffn_ok = eval_moe_hybrid_ffn_batched( + backend_, cpu_be, chunk_cfg, chunk_desc, storage, + chunk_post.data(), + chunk_selected.data(), + chunk_weights.data(), + chunk_len, ffn_batch_out, &result.error, + &ffn_hot_alloc, &ffn_cold_alloc); + } else if (storage.all_routed_are_hot(chunk_selected.data(), + chunk_len * n_expert_used)) { + // All selected experts happen to be in VRAM — pure GPU, no CPU + ffn_ok = eval_moe_hot_only_batched( + backend_, chunk_cfg, chunk_desc, storage, + chunk_post.data(), + chunk_selected.data(), + chunk_weights.data(), + chunk_len, ffn_batch_out, &result.error, + &ffn_hot_alloc); + } else if (moe_hybrid_->has_mmap() && + !moe_hybrid_->layer_regions.empty() && + stream_engine_.is_ready() && chunk_len >= 16 && + !storage.cold_expert_ids.empty()) { + // Streaming prefill: prefetch cold data then batched eval (hot GPU + cold CPU) + const auto & regions = moe_hybrid_->layer_regions[(size_t)il]; + std::vector cold_ids_copy(storage.cold_expert_ids.begin(), + storage.cold_expert_ids.end()); + stream_engine_.prefetch_cold_experts(moe_hybrid_->mmap_data, moe_hybrid_->mmap_size, + regions, cold_ids_copy.data(), + (int)cold_ids_copy.size()); + ffn_ok = eval_moe_hybrid_ffn_batched( + backend_, cpu_be, chunk_cfg, chunk_desc, storage, + chunk_post.data(), + chunk_selected.data(), + chunk_weights.data(), + chunk_len, ffn_batch_out, &result.error, + &ffn_hot_alloc, &ffn_cold_alloc); + } else { + // Fallback: batched eval handles both hot+cold (CPU for cold) + ffn_ok = eval_moe_hybrid_ffn_batched( + backend_, cpu_be, chunk_cfg, chunk_desc, storage, + chunk_post.data(), + chunk_selected.data(), + chunk_weights.data(), + chunk_len, ffn_batch_out, &result.error, + &ffn_hot_alloc, &ffn_cold_alloc); + } + + if (!ffn_ok) { step_graph_destroy(prefill_sg); if (ffn_hot_alloc) ggml_gallocr_free(ffn_hot_alloc); if (ffn_cold_alloc) ggml_gallocr_free(ffn_cold_alloc); @@ -1676,11 +1742,15 @@ bool LagunaBackend::build_hybrid_storage_from_file( int cache_slots = 0; if (const char * cs = std::getenv("DFLASH_LAGUNA_CACHE_SLOTS")) cache_slots = std::max(0, std::atoi(cs)); else if (cache_slots_ >= 0) cache_slots = cache_slots_; - bool ok = build_moe_hybrid_storage_from_file(hybrid_cfg, backend_, placement, - layer_descs, layer_file_data, *hybrid, &err, cache_slots); - ::munmap(mmap_addr, file_size); + bool ok = build_moe_hybrid_storage_from_file_with_mmap(hybrid_cfg, backend_, placement, + layer_descs, layer_file_data, + mmap_addr, file_size, *hybrid, &err, cache_slots); gguf_free(gctx); - if (!ok) return false; + if (!ok) { + ::munmap(mmap_addr, file_size); + return false; + } + // mmap ownership transferred to the storage (munmapped in ~MoeHybridStorage) out_storage = std::move(hybrid); return true; } diff --git a/server/src/laguna/laguna_backend.h b/server/src/laguna/laguna_backend.h index e64ab32d6..156c82e6b 100644 --- a/server/src/laguna/laguna_backend.h +++ b/server/src/laguna/laguna_backend.h @@ -14,6 +14,7 @@ #include "../common/moe_hybrid_storage.h" #include "../common/moe_hybrid_routing_stats.h" #include "../common/moe_hybrid_swap_manager.h" +#include "../common/moe_hybrid_stream.h" #include "ggml.h" #include "ggml-backend.h" @@ -94,6 +95,7 @@ class LagunaBackend : public ModelBackend { std::vector layer_expert_bytes_; // per-layer 1-expert bytes MoeHybridSwapPolicy swap_policy_; bool hybrid_telemetry_ = false; + MoeHybridStreamEngine stream_engine_; bool ensure_slot(int slot); diff --git a/server/src/qwen35moe/qwen35moe_backend.cpp b/server/src/qwen35moe/qwen35moe_backend.cpp index 266c4263f..7f976c679 100644 --- a/server/src/qwen35moe/qwen35moe_backend.cpp +++ b/server/src/qwen35moe/qwen35moe_backend.cpp @@ -1,6 +1,7 @@ #include "qwen35moe_backend.h" #include "../common/moe_hybrid_placement.h" +#include "../common/moe_hybrid_stream.h" #include "../common/moe_hybrid_types.h" #include "../common/moe_hybrid_types_impl.h" #include "common/ggml_graph_precision.h" @@ -145,26 +146,54 @@ bool Qwen35MoeBackend::load_target_model(ggml_backend_t backend, TargetWeights & auto hybrid = std::make_shared(); MoeHybridConfig hybrid_cfg = make_moe_hybrid_config(out); + if (hybrid_cfg.mmq_safe_full_batch) { + std::fprintf(stderr, "[qwen35moe] GPU sm_%d: MMQ full-batch enabled (no sub-batch workaround)\n", + query_gpu_compute_sm()); + } std::vector layer_descs((size_t)out.n_layer); for (int il = 0; il < out.n_layer; ++il) { layer_descs[(size_t)il] = make_moe_layer_desc(out.layers[(size_t)il]); } int cache_slots = 0; - if (const char * cs = std::getenv("DFLASH_QWEN35MOE_CACHE_SLOTS")) cache_slots = std::max(0, std::atoi(cs)); - else if (cache_slots_ >= 0) cache_slots = cache_slots_; - if (!build_moe_hybrid_storage_from_file(hybrid_cfg, backend, placement, layer_descs, layer_file_data, *hybrid, &err, cache_slots)) { + if (const char * cs = std::getenv("DFLASH_QWEN35MOE_CACHE_SLOTS")) cache_slots = std::max(0, std::atoi(cs)); + else if (cache_slots_ >= 0) cache_slots = cache_slots_; + if (!build_moe_hybrid_storage_from_file_with_mmap(hybrid_cfg, backend, placement, layer_descs, layer_file_data, mmap_addr, file_size, *hybrid, &err, cache_slots)) { ::munmap(mmap_addr, file_size); gguf_free(gctx); set_last_error(std::string("qwen35moe hybrid storage build failed: ") + err); return false; } - ::munmap(mmap_addr, file_size); + // Keep mmap open for streaming prefill — do NOT munmap here. + // The mmap_data/mmap_size are stored in hybrid storage for lifetime management. gguf_free(gctx); out.moe_hybrid = std::move(hybrid); } + // Initialize streaming engine for prefill (if cold experts exist and mmap is available) + if (out.moe_hybrid && out.moe_hybrid->has_mmap() && !out.moe_hybrid->layers.empty()) { + // Compute max expert size across all layers + size_t max_expert_bytes = 0; + for (const auto & layer : out.moe_hybrid->layers) { + size_t per_expert = layer.fused_gate_up + ? layer.gate_up_expert_bytes + layer.down_expert_bytes + : layer.gate_expert_bytes + layer.up_expert_bytes + layer.down_expert_bytes; + max_expert_bytes = std::max(max_expert_bytes, per_expert); + } + if (max_expert_bytes > 0) { + std::string stream_err; + if (stream_engine_.init(backend, max_expert_bytes, &stream_err)) { + std::printf("[qwen35moe] streaming prefill engine ready (pinned=%.1f MiB, scratch=%.1f MiB)\n", + stream_engine_.pinned_bytes() / 1024.0 / 1024.0, + stream_engine_.scratch_bytes() / 1024.0 / 1024.0); + } else { + std::fprintf(stderr, "[qwen35moe] warning: streaming engine init failed: %s (prefill will use fallback)\n", + stream_err.c_str()); + } + } + } + int total_cold = 0; uint64_t hot_bytes = 0; uint64_t cold_bytes = 0; @@ -636,6 +665,7 @@ GenerateResult Qwen35MoeBackend::generate_impl(const GenerateRequest & req, const int n_layer = target_weights().n_layer; uint64_t build_us_total = 0, compute_us_total = 0, readback_us_total = 0, ffn_us_total = 0; + int hot_only_layers = 0, total_ffn_layers = 0; MoeHybridFfnTelemetry ffn_tel_accum{}; StepGraph logits_sg; // Persistent logits graph (used by spec-decode branch) @@ -812,8 +842,42 @@ GenerateResult Qwen35MoeBackend::generate_impl(const GenerateRequest & req, MoeLayerDesc chunk_desc = make_moe_layer_desc(target_weights().layers[(size_t)il]); std::vector ffn_batch_out; bool ffn_ok = false; + ++total_ffn_layers; if (storage.cold_expert_ids.empty()) { // All experts hot — safe to use batched path + ++hot_only_layers; + ffn_ok = eval_moe_hybrid_ffn_batched( + target_backend(), cpu_be, chunk_cfg, chunk_desc, storage, + chunk_post.data(), + chunk_selected.data(), + chunk_weights.data(), + chunk_len, ffn_batch_out, &result.error, + &ffn_hot_alloc, &ffn_cold_alloc); + } else if (storage.all_routed_are_hot(chunk_selected.data(), + chunk_len * n_expert_used)) { + // All selected experts happen to be in VRAM — pure GPU, no CPU + ++hot_only_layers; + ffn_ok = eval_moe_hot_only_batched( + target_backend(), chunk_cfg, chunk_desc, storage, + chunk_post.data(), + chunk_selected.data(), + chunk_weights.data(), + chunk_len, ffn_batch_out, &result.error, + &ffn_hot_alloc); + } else if (target_weights().moe_hybrid->has_mmap() && + !target_weights().moe_hybrid->layer_regions.empty() && + stream_engine_.is_ready() && chunk_len >= 16 && + !storage.cold_expert_ids.empty()) { + // Streaming prefill: batched eval handles hot on GPU + cold on CPU. + // The streaming engine's mmap keeps data paged in via madvise. + auto * hybrid = target_weights().moe_hybrid.get(); + const auto & regions = hybrid->layer_regions[(size_t)il]; + // Prefetch cold expert data from mmap for upcoming layers + std::vector cold_ids_copy(storage.cold_expert_ids.begin(), + storage.cold_expert_ids.end()); + stream_engine_.prefetch_cold_experts(hybrid->mmap_data, hybrid->mmap_size, + regions, cold_ids_copy.data(), + (int)cold_ids_copy.size()); ffn_ok = eval_moe_hybrid_ffn_batched( target_backend(), cpu_be, chunk_cfg, chunk_desc, storage, chunk_post.data(), @@ -1124,6 +1188,11 @@ GenerateResult Qwen35MoeBackend::generate_impl(const GenerateRequest & req, std::printf(" build=%.1fms compute=%.1fms readback=%.1fms ffn=%.1fms\n", build_us_total / 1000.0, compute_us_total / 1000.0, readback_us_total / 1000.0, ffn_us_total / 1000.0); + if (total_ffn_layers > 0) { + std::printf(" hot_only_layers=%d/%d (%.1f%% skip CPU)\n", + hot_only_layers, total_ffn_layers, + 100.0 * hot_only_layers / total_ffn_layers); + } const double prefill_total_us = (double)(build_us_total + compute_us_total + readback_us_total + ffn_us_total); if (prefill_total_us > 0) { std::printf(" pct: build=%.1f%% compute=%.1f%% readback=%.1f%% ffn=%.1f%%\n", @@ -1257,6 +1326,247 @@ bool Qwen35MoeBackend::hybrid_forward_one_token(int32_t tok, int kv_pos, return true; } +// ── Batched hybrid forward ───────────────────────────────────────────────── +// Processes all tokens layer-by-layer using the same approach as prefill: +// per layer: build_layer_prefn_step (DeltaNet + router) → MoE FFN (batched) +// Then project all tokens to logits and take argmax. +// This is ~22× fewer dispatches than sequential hybrid_forward_one_token. + +bool Qwen35MoeBackend::hybrid_forward_batch( + const int32_t * tokens, int n_tokens, int base_pos, + std::vector & act_cur, + std::vector & argmax_out, + bool capture_features) { + + const int hidden = target_weights().n_embd; + const int n_layer = target_weights().n_layer; + const int n_expert_used = target_weights().n_expert_used; + + // Embed all tokens + std::vector embed_all((size_t)n_tokens * (size_t)hidden); + for (int i = 0; i < n_tokens; ++i) { + if (!target_weights().embedder.embed(&tokens[i], 1, + embed_all.data() + (size_t)i * (size_t)hidden)) { + return false; + } + } + + // Process layer-by-layer (same as prefill) + StepGraph prefn_sg; + ggml_gallocr_t ffn_hot_alloc = nullptr; + + MoeHybridConfig chunk_cfg = make_moe_hybrid_config(target_weights()); + + for (int il = 0; il < n_layer; ++il) { + auto & storage = target_weights().moe_hybrid->layers[(size_t)il]; + + const bool with_mask = (cfg_.kq_stride_pad > KQ_MASK_PAD) || (n_tokens > 1); + + // Build pre-FFN graph (DeltaNet/attention + router) for all tokens + step_graph_free(prefn_sg); + if (!build_layer_prefn_step(prefn_sg, target_weights(), target_cache(), target_backend(), + il, /*kv_start=*/base_pos, n_tokens, + with_mask, /*fa_window=*/0, cfg_.kq_stride_pad)) { + step_graph_destroy(prefn_sg); + if (ffn_hot_alloc) ggml_gallocr_free(ffn_hot_alloc); + return false; + } + + // Upload embeddings + ggml_backend_tensor_set(prefn_sg.inp_embed, embed_all.data(), 0, + sizeof(float) * (size_t)n_tokens * (size_t)hidden); + + // Set positions for attention layers + if (prefn_sg.positions) { + std::vector pos_data((size_t)n_tokens * 4); + for (int i = 0; i < n_tokens; ++i) { + pos_data[(size_t)i * 4 + 0] = base_pos + i; + pos_data[(size_t)i * 4 + 1] = base_pos + i; + pos_data[(size_t)i * 4 + 2] = base_pos + i; + pos_data[(size_t)i * 4 + 3] = 0; + } + ggml_backend_tensor_set(prefn_sg.positions, pos_data.data(), 0, + sizeof(int32_t) * pos_data.size()); + } + + // Set causal mask + if (prefn_sg.attn_mask) { + const int kv_len = base_pos + n_tokens; + const int kv_pad_override = (int)prefn_sg.attn_mask->ne[0]; + std::vector mask_buf; + build_causal_mask(mask_buf, kv_len, n_tokens, /*kv_start=*/base_pos, + cfg_.kq_stride_pad, /*win_start=*/0, kv_pad_override); + ggml_backend_tensor_set(prefn_sg.attn_mask, mask_buf.data(), 0, + sizeof(uint16_t) * mask_buf.size()); + } + + // Compute pre-FFN (DeltaNet + router for all tokens in one dispatch) + auto st = ggml_backend_graph_compute(target_backend(), prefn_sg.gf); + if (st != GGML_STATUS_SUCCESS) { + step_graph_destroy(prefn_sg); + if (ffn_hot_alloc) ggml_gallocr_free(ffn_hot_alloc); + return false; + } + + // Readback results + std::vector chunk_residuals((size_t)n_tokens * (size_t)hidden); + std::vector chunk_post((size_t)n_tokens * (size_t)hidden); + std::vector chunk_selected((size_t)n_tokens * (size_t)n_expert_used); + std::vector chunk_weights((size_t)n_tokens * (size_t)n_expert_used); + + ggml_backend_tensor_get(prefn_sg.ffn_residual, chunk_residuals.data(), 0, + sizeof(float) * chunk_residuals.size()); + ggml_backend_tensor_get(prefn_sg.ffn_post, chunk_post.data(), 0, + sizeof(float) * chunk_post.size()); + + ggml_tensor * layer_selected = (!prefn_sg.moe_selected.empty() && (size_t)il < prefn_sg.moe_selected.size()) + ? prefn_sg.moe_selected[(size_t)il] : nullptr; + if (!layer_selected || !prefn_sg.moe_weights) { + step_graph_destroy(prefn_sg); + if (ffn_hot_alloc) ggml_gallocr_free(ffn_hot_alloc); + return false; + } + ggml_backend_tensor_get(layer_selected, chunk_selected.data(), 0, + sizeof(int32_t) * chunk_selected.size()); + ggml_backend_tensor_get(prefn_sg.moe_weights, chunk_weights.data(), 0, + sizeof(float) * chunk_weights.size()); + + // MoE FFN — batched + MoeLayerDesc chunk_desc = make_moe_layer_desc(target_weights().layers[(size_t)il]); + std::vector ffn_batch_out; + bool ffn_ok = false; + + if (storage.cold_expert_ids.empty()) { + // All-hot: use batched hot-only path + ffn_ok = eval_moe_hot_only_batched( + target_backend(), chunk_cfg, chunk_desc, storage, + chunk_post.data(), chunk_selected.data(), chunk_weights.data(), + n_tokens, ffn_batch_out, nullptr, &ffn_hot_alloc); + } else { + // Mixed hot/cold: use hybrid path + ffn_ok = eval_moe_hybrid_ffn_batched( + target_backend(), target_weights().moe_hybrid->cpu_backend, + chunk_cfg, chunk_desc, storage, + chunk_post.data(), chunk_selected.data(), chunk_weights.data(), + n_tokens, ffn_batch_out, nullptr, &ffn_hot_alloc, nullptr); + } + + if (!ffn_ok) { + // Per-token fallback + ffn_batch_out.assign((size_t)hidden * (size_t)n_tokens, 0.0f); + std::vector single_out; + for (int ti = 0; ti < n_tokens; ++ti) { + if (!eval_moe_hybrid_ffn_single( + target_backend(), chunk_cfg, chunk_desc, storage, + target_weights().moe_hybrid->cpu_backend, + chunk_post.data() + (size_t)ti * (size_t)hidden, + chunk_selected.data() + (size_t)ti * (size_t)n_expert_used, + chunk_weights.data() + (size_t)ti * (size_t)n_expert_used, + n_expert_used, single_out, nullptr)) { + step_graph_destroy(prefn_sg); + if (ffn_hot_alloc) ggml_gallocr_free(ffn_hot_alloc); + return false; + } + std::memcpy(ffn_batch_out.data() + (size_t)ti * (size_t)hidden, + single_out.data(), sizeof(float) * (size_t)hidden); + } + } + + // Combine FFN + residual → embed_all for next layer + for (int i = 0; i < n_tokens; ++i) { + const float * ffn = ffn_batch_out.data() + (size_t)i * (size_t)hidden; + const float * res = chunk_residuals.data() + (size_t)i * (size_t)hidden; + float * emb = embed_all.data() + (size_t)i * (size_t)hidden; + for (int j = 0; j < hidden; ++j) { + emb[j] = ffn[j] + res[j]; + } + + // Feature capture at capture layers + if (capture_features && target_cache().target_feat && cfg_.draft_path) { + int capture_idx = -1; + for (int k = 0; k < target_weights().n_capture_layers; k++) { + if (target_weights().capture_layer_ids[k] == il) { + capture_idx = k; + break; + } + } + if (capture_idx >= 0) { + const int token_pos = base_pos + i; + const int cap = target_cache().target_feat_cap; + const int slot = token_pos % cap; + const size_t elt = ggml_element_size(target_cache().target_feat); + const size_t col_stride = target_cache().target_feat->nb[1]; + const size_t offset = (size_t)slot * col_stride + + (size_t)capture_idx * (size_t)hidden * elt; + std::vector bf16_tmp((size_t)hidden); + ggml_fp32_to_bf16_row(emb, bf16_tmp.data(), hidden); + ggml_backend_tensor_set(target_cache().target_feat, bf16_tmp.data(), + offset, (size_t)hidden * elt); + } + } + } + } + step_graph_destroy(prefn_sg); + if (ffn_hot_alloc) ggml_gallocr_free(ffn_hot_alloc); + + // Store last token hidden state in act_cur + act_cur.assign(embed_all.data() + (size_t)(n_tokens - 1) * (size_t)hidden, + embed_all.data() + (size_t)n_tokens * (size_t)hidden); + + // Project ALL tokens to logits and get argmax for each + const int vocab = target_weights().n_vocab; + argmax_out.resize(n_tokens); + + StepGraph proj_sg; + ggml_init_params ip{}; + ip.mem_size = 64 * 1024 * 1024; + ip.mem_buffer = nullptr; + ip.no_alloc = true; + proj_sg.ctx = ggml_init(ip); + if (!proj_sg.ctx) return false; + + proj_sg.hidden_input = ggml_new_tensor_2d(proj_sg.ctx, GGML_TYPE_F32, hidden, n_tokens); + ggml_set_input(proj_sg.hidden_input); + proj_sg.gf = ggml_new_graph_custom(proj_sg.ctx, 1024, false); + ggml_tensor * normed = ggml_rms_norm(proj_sg.ctx, proj_sg.hidden_input, target_weights().rms_eps); + normed = ggml_mul(proj_sg.ctx, normed, target_weights().out_norm); + proj_sg.logits = ggml_mul_mat(proj_sg.ctx, target_weights().output, normed); + ggml_set_output(proj_sg.logits); + ggml_build_forward_expand(proj_sg.gf, proj_sg.logits); + proj_sg.alloc = ggml_gallocr_new(ggml_backend_get_default_buffer_type(target_backend())); + if (!ggml_gallocr_alloc_graph(proj_sg.alloc, proj_sg.gf)) { + step_graph_destroy(proj_sg); + return false; + } + ggml_backend_tensor_set(proj_sg.hidden_input, embed_all.data(), 0, + sizeof(float) * (size_t)n_tokens * (size_t)hidden); + auto proj_st = ggml_backend_graph_compute(target_backend(), proj_sg.gf); + if (proj_st != GGML_STATUS_SUCCESS) { + step_graph_destroy(proj_sg); + return false; + } + + // Read logits and compute argmax per token + std::vector logits_buf((size_t)vocab * (size_t)n_tokens); + ggml_backend_tensor_get(proj_sg.logits, logits_buf.data(), 0, + sizeof(float) * logits_buf.size()); + step_graph_destroy(proj_sg); + + for (int t = 0; t < n_tokens; ++t) { + const float * tok_logits = logits_buf.data() + (size_t)t * (size_t)vocab; + int32_t best_id = 0; + float best_val = tok_logits[0]; + for (int j = 1; j < vocab; ++j) { + if (tok_logits[j] > best_val) { + best_val = tok_logits[j]; + best_id = j; + } + } + argmax_out[t] = best_id; + } + return true; +} + bool Qwen35MoeBackend::do_hybrid_spec_decode(int committed, int n_gen, std::vector & out_tokens, const DaemonIO & io) { @@ -1366,19 +1676,13 @@ bool Qwen35MoeBackend::do_hybrid_spec_decode(int committed, int n_gen, } draft_tok[0] = last_tok; - // 4. Verify: snapshot recurrent state, then run each draft token through hybrid forward + // 4. Verify: snapshot recurrent state, then run ALL draft tokens batched snapshot_ssm_state(target_cache()); target_tok.resize(q_len); - bool verify_ok = true; - for (int i = 0; i < q_len; i++) { - int32_t argmax = -1; - if (!hybrid_forward_one_token(draft_tok[i], committed + i, act_cur, argmax)) { - verify_ok = false; - break; - } - target_tok[i] = argmax; - } + bool verify_ok = hybrid_forward_batch( + draft_tok.data(), q_len, committed, + act_cur, target_tok, /*capture_features=*/false); if (!verify_ok) { std::fprintf(stderr, "[hybrid-spec] verify failed\n"); restore_ssm_state(target_cache()); @@ -1407,18 +1711,15 @@ bool Qwen35MoeBackend::do_hybrid_spec_decode(int committed, int n_gen, replay_tok[i] = (i < accept_n) ? draft_tok[i] : bonus_tok; } - // Replay tokens through hybrid forward (captures features for next draft step) - int32_t replay_last = -1; - for (int i = 0; i < commit_n; i++) { - int32_t argmax = -1; - if (!hybrid_forward_one_token(replay_tok[i], committed + i, act_cur, argmax)) { - std::fprintf(stderr, "[hybrid-spec] replay failed\n"); - step_graph_destroy(draft_sg); - return false; - } - replay_last = argmax; + // Replay tokens through batched hybrid forward (captures features for next draft step) + std::vector replay_argmax; + if (!hybrid_forward_batch(replay_tok.data(), commit_n, committed, + act_cur, replay_argmax, /*capture_features=*/true)) { + std::fprintf(stderr, "[hybrid-spec] replay failed\n"); + step_graph_destroy(draft_sg); + return false; } - last_tok = replay_last; + last_tok = replay_argmax[commit_n - 1]; // 7. Sync features to mirror for next draft step if (feature_mirror().target_feat && target_cache().target_feat) { diff --git a/server/src/qwen35moe/qwen35moe_backend.h b/server/src/qwen35moe/qwen35moe_backend.h index 7e036bca6..bca729e5f 100644 --- a/server/src/qwen35moe/qwen35moe_backend.h +++ b/server/src/qwen35moe/qwen35moe_backend.h @@ -7,6 +7,7 @@ #include "qwen35moe_pipelined_decode.h" #include "../common/moe_hybrid_ffn_eval.h" #include "../common/moe_hybrid_storage.h" +#include "../common/moe_hybrid_stream.h" #include "../common/moe_hybrid_routing_stats.h" #include "../common/moe_hybrid_swap_manager.h" @@ -47,6 +48,7 @@ class Qwen35MoeBackend : public Qwen35Backend { bool rebuild_hybrid_from_placement(const MoeHybridPlacement & placement, std::string & err); MoeHybridSwapPolicy swap_policy_; bool hybrid_telemetry_ = false; + MoeHybridStreamEngine stream_engine_; void maybe_post_request_swap(); bool load_dynamic_placement(const char * hotness_path, @@ -67,6 +69,13 @@ class Qwen35MoeBackend : public Qwen35Backend { std::vector & act_cur, int32_t & argmax_out); + // Batched hybrid forward: processes all tokens layer-by-layer (like prefill). + // Returns argmax for each token. Much faster than sequential hybrid_forward_one_token. + bool hybrid_forward_batch(const int32_t * tokens, int n_tokens, int base_pos, + std::vector & act_cur, + std::vector & argmax_out, + bool capture_features); + // Pipelined decode: uses cached DeltaNet graphs + optimized FFN loop bool run_pipelined_decode_path(int committed, int n_gen, std::vector & out_tokens, diff --git a/server/test/bench_moe_stream.cpp b/server/test/bench_moe_stream.cpp new file mode 100644 index 000000000..074ed8103 --- /dev/null +++ b/server/test/bench_moe_stream.cpp @@ -0,0 +1,177 @@ +// Synthetic microbenchmark: MoE prefill streaming vs CPU cold path. +// +// This benchmark creates fake expert data in a temporary mmap'd file, +// builds layer regions, and measures DMA + GPU compute latency at various +// batch sizes (T = number of tokens). +// +// Usage: bench_moe_stream [--n-expert 128] [--n-cold 4] [--hidden 5120] [--ffn 3584] +// +// Requires: CUDA GPU. + +#include "../src/common/moe_hybrid_stream.h" +#include "../src/common/moe_hybrid_storage.h" +#include "../src/common/gpu_runtime_compat.h" + +#include "ggml.h" +#include "ggml-backend.h" +#include "ggml-cuda.h" + +#include +#include +#include +#include +#include +#include + +#if !defined(_WIN32) +#include +#include +#include +#include +#endif + +using namespace dflash::common; +using Clock = std::chrono::high_resolution_clock; + +static double elapsed_ms(Clock::time_point t0, Clock::time_point t1) { + return std::chrono::duration(t1 - t0).count(); +} + +// Q4_K_M bytes per row: approximate as 0.5625 bytes/element (4.5 bits) +static size_t q4km_bytes(int rows, int cols) { + // ggml block_q4_K: 144 bytes per 256 elements → 0.5625 B/elem + return (size_t)rows * (size_t)cols * 9 / 16; +} + +int main(int argc, char ** argv) { + int n_expert = 128; + int n_cold = 4; + int hidden = 5120; + int ffn = 3584; + int n_expert_used = 8; + + for (int i = 1; i < argc; ++i) { + if (std::strcmp(argv[i], "--n-expert") == 0 && i + 1 < argc) n_expert = std::atoi(argv[++i]); + else if (std::strcmp(argv[i], "--n-cold") == 0 && i + 1 < argc) n_cold = std::atoi(argv[++i]); + else if (std::strcmp(argv[i], "--hidden") == 0 && i + 1 < argc) hidden = std::atoi(argv[++i]); + else if (std::strcmp(argv[i], "--ffn") == 0 && i + 1 < argc) ffn = std::atoi(argv[++i]); + else if (std::strcmp(argv[i], "--n-expert-used") == 0 && i + 1 < argc) n_expert_used = std::atoi(argv[++i]); + } + + std::printf("bench_moe_stream: n_expert=%d n_cold=%d hidden=%d ffn=%d n_expert_used=%d\n", + n_expert, n_cold, hidden, ffn, n_expert_used); + + // Compute per-expert sizes (gate_up fused + down) + const size_t gate_up_bytes = q4km_bytes(ffn * 2, hidden); // fused gate+up + const size_t down_bytes = q4km_bytes(hidden, ffn); + const size_t expert_total_bytes = gate_up_bytes + down_bytes; + const size_t file_size = expert_total_bytes * (size_t)n_expert; + + std::printf(" per_expert: gate_up=%.2f MiB down=%.2f MiB total=%.2f MiB\n", + gate_up_bytes / 1024.0 / 1024.0, + down_bytes / 1024.0 / 1024.0, + expert_total_bytes / 1024.0 / 1024.0); + std::printf(" file_size=%.2f MiB\n", file_size / 1024.0 / 1024.0); + +#if defined(_WIN32) + std::fprintf(stderr, "bench_moe_stream: Windows not yet supported in this benchmark\n"); + return 1; +#else + // Create a temporary file with random data + char tmppath[] = "/tmp/bench_moe_stream_XXXXXX"; + int fd = mkstemp(tmppath); + if (fd < 0) { perror("mkstemp"); return 1; } + unlink(tmppath); // auto-delete on close + + if (ftruncate(fd, (off_t)file_size) != 0) { perror("ftruncate"); close(fd); return 1; } + + void * mmap_addr = ::mmap(nullptr, file_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (mmap_addr == MAP_FAILED) { perror("mmap"); close(fd); return 1; } + + // Fill with random data to ensure pages are faulted in + std::printf(" filling temp file with random data...\n"); + std::mt19937 rng(42); + auto * ptr = static_cast(mmap_addr); + for (size_t off = 0; off < file_size; off += 4096) { + uint32_t val = rng(); + std::memcpy(ptr + off, &val, sizeof(val)); + } + + // Build a fake LayerExpertRegions + LayerExpertRegions regions{}; + regions.fused_gate_up = true; + regions.gate_up_exps.offset = 0; + regions.gate_up_exps.size = gate_up_bytes * (size_t)n_expert; + regions.expert_bytes_gate_up = gate_up_bytes; + regions.down_exps.offset = regions.gate_up_exps.size; + regions.down_exps.size = down_bytes * (size_t)n_expert; + regions.expert_bytes_down = down_bytes; + + // Init CUDA backend + ggml_backend_t gpu_backend = ggml_backend_cuda_init(0); + if (!gpu_backend) { + std::fprintf(stderr, "Failed to init CUDA backend\n"); + munmap(mmap_addr, file_size); + close(fd); + return 1; + } + + // Init stream engine + MoeHybridStreamEngine engine; + std::string err; + if (!engine.init(gpu_backend, expert_total_bytes, &err)) { + std::fprintf(stderr, "Stream engine init failed: %s\n", err.c_str()); + ggml_backend_free(gpu_backend); + munmap(mmap_addr, file_size); + close(fd); + return 1; + } + std::printf(" stream engine ready: pinned=%.1f MiB scratch=%.1f MiB\n", + engine.pinned_bytes() / 1024.0 / 1024.0, + engine.scratch_bytes() / 1024.0 / 1024.0); + + // Benchmark: measure DMA time for streaming cold experts + std::vector test_T = {1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048}; + std::vector cold_ids(n_cold); + for (int i = 0; i < n_cold; ++i) cold_ids[i] = i; // first n_cold experts are "cold" + + std::printf("\n%-8s %-12s %-12s %-12s\n", "T", "prefetch_ms", "stream_ms", "total_ms"); + std::printf("-------- ------------ ------------ ------------\n"); + + for (int T : test_T) { + // Warm up + engine.prefetch_cold_experts(mmap_addr, file_size, regions, cold_ids.data(), n_cold); + + const int n_iter = 5; + double prefetch_total = 0, stream_total = 0; + + for (int iter = 0; iter < n_iter; ++iter) { + auto t0 = Clock::now(); + engine.prefetch_cold_experts(mmap_addr, file_size, regions, cold_ids.data(), n_cold); + auto t1 = Clock::now(); + + // Simulate streaming all cold experts + for (int ci = 0; ci < n_cold; ++ci) { + engine.stream_expert_sync(mmap_addr, file_size, regions, cold_ids[ci], gpu_backend, nullptr); + } + auto t2 = Clock::now(); + + prefetch_total += elapsed_ms(t0, t1); + stream_total += elapsed_ms(t1, t2); + } + + double avg_prefetch = prefetch_total / n_iter; + double avg_stream = stream_total / n_iter; + std::printf("%-8d %-12.3f %-12.3f %-12.3f\n", T, avg_prefetch, avg_stream, avg_prefetch + avg_stream); + } + + // Cleanup + engine.destroy(); + ggml_backend_free(gpu_backend); + munmap(mmap_addr, file_size); + close(fd); + + std::printf("\nDone.\n"); + return 0; +#endif +}