Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ add_library(dflash_common STATIC
src/common/moe_hybrid_storage.cpp
src/common/spark_corpus.cpp
src/common/moe_hybrid_ffn_eval.cpp
src/common/moe_hybrid_stream.cpp
src/common/cold_ffn_cpu.cpp
src/common/moe_hybrid_swap_manager.cpp
src/qwen35/layer_split_forward.cpp
Expand Down Expand Up @@ -692,6 +693,11 @@ if(DFLASH27B_TESTS)
target_include_directories(bench_laguna_spark PRIVATE ${DFLASH27B_SRC_INCLUDE_DIRS})
target_link_libraries(bench_laguna_spark PRIVATE dflash_common ggml ${DFLASH27B_GGML_BACKEND_TARGET})
endif()
if(EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/test/bench_moe_stream.cpp" AND DFLASH27B_GPU_BACKEND STREQUAL "cuda")
add_executable(bench_moe_stream test/bench_moe_stream.cpp)
target_include_directories(bench_moe_stream PRIVATE ${DFLASH27B_SRC_INCLUDE_DIRS} ${CUDAToolkit_INCLUDE_DIRS})
target_link_libraries(bench_moe_stream PRIVATE dflash_common ggml ${DFLASH27B_GGML_BACKEND_TARGET} CUDA::cudart)
endif()
if(EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/test/test_laguna_daemon.cpp")
add_executable(test_laguna_daemon test/test_laguna_daemon.cpp)
target_include_directories(test_laguna_daemon PRIVATE ${DFLASH27B_SRC_INCLUDE_DIRS})
Expand Down
396 changes: 396 additions & 0 deletions server/docs/moe_hybrid.md

Large diffs are not rendered by default.

156 changes: 156 additions & 0 deletions server/scripts/bench_moe_prefill_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
#!/usr/bin/env python3
"""
bench_moe_prefill_streaming.py — End-to-end benchmark comparing prefill
throughput with and without MoE streaming.

Usage:
# Against a running server (streaming enabled):
python bench_moe_prefill_streaming.py --url http://localhost:8080

# Compare two servers (streaming vs baseline):
python bench_moe_prefill_streaming.py \
--url-streaming http://localhost:8080 \
--url-baseline http://localhost:8081

Measures prompt eval tok/s at varying prompt lengths.
"""

import argparse
import json
import time
import sys
from urllib.request import urlopen, Request
from urllib.error import URLError


def generate_prompt(n_tokens: int) -> str:
"""Generate a prompt that's approximately n_tokens long."""
# Average ~1.3 tokens per word in English
words_needed = max(1, int(n_tokens / 1.3))
base = "The quick brown fox jumps over the lazy dog. "
words = base.split()
prompt_words = []
for i in range(words_needed):
prompt_words.append(words[i % len(words)])
return " ".join(prompt_words)


def bench_prefill(url: str, prompt: str, max_tokens: int = 1) -> dict:
"""Send a completion request and measure prefill time."""
payload = {
"model": "default",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": max_tokens,
"temperature": 0.0,
"stream": False,
}

req = Request(
f"{url}/v1/chat/completions",
data=json.dumps(payload).encode(),
headers={"Content-Type": "application/json"},
)

t0 = time.perf_counter()
try:
with urlopen(req, timeout=120) as resp:
body = json.loads(resp.read())
except URLError as e:
return {"error": str(e)}
t1 = time.perf_counter()

usage = body.get("usage", {})
prompt_tokens = usage.get("prompt_tokens", 0)
# The server reports timings in the response if available
timings = body.get("timings", {})
prefill_ms = timings.get("prompt_ms", (t1 - t0) * 1000)

return {
"prompt_tokens": prompt_tokens,
"wall_ms": (t1 - t0) * 1000,
"prefill_ms": prefill_ms,
"tok_per_s": prompt_tokens / (prefill_ms / 1000) if prefill_ms > 0 else 0,
}


def run_sweep(url: str, prompt_lengths: list, n_warmup: int = 1, n_iter: int = 3) -> list:
"""Run prefill benchmark at various prompt lengths."""
results = []

for n_tok in prompt_lengths:
prompt = generate_prompt(n_tok)

# Warmup
for _ in range(n_warmup):
bench_prefill(url, prompt)

# Measure
measurements = []
for _ in range(n_iter):
r = bench_prefill(url, prompt)
if "error" in r:
print(f" ERROR at T={n_tok}: {r['error']}", file=sys.stderr)
break
measurements.append(r)

if measurements:
avg_prefill_ms = sum(m["prefill_ms"] for m in measurements) / len(measurements)
avg_tok_s = sum(m["tok_per_s"] for m in measurements) / len(measurements)
actual_tokens = measurements[0]["prompt_tokens"]
results.append({
"target_tokens": n_tok,
"actual_tokens": actual_tokens,
"avg_prefill_ms": avg_prefill_ms,
"avg_tok_per_s": avg_tok_s,
})
print(f" T={n_tok:5d} actual={actual_tokens:5d} "
f"prefill={avg_prefill_ms:8.1f} ms "
f"tok/s={avg_tok_s:8.1f}")

return results


def main():
parser = argparse.ArgumentParser(description="MoE prefill streaming benchmark")
parser.add_argument("--url", type=str, help="Server URL to benchmark")
parser.add_argument("--url-streaming", type=str, help="Server URL with streaming enabled")
parser.add_argument("--url-baseline", type=str, help="Server URL with baseline (CPU cold)")
parser.add_argument("--prompt-lengths", type=str, default="128,256,512,1024,2048,4096,8192",
help="Comma-separated prompt lengths to test")
parser.add_argument("--n-iter", type=int, default=3, help="Iterations per measurement")
parser.add_argument("--n-warmup", type=int, default=1, help="Warmup iterations")
args = parser.parse_args()

prompt_lengths = [int(x) for x in args.prompt_lengths.split(",")]

if args.url:
print(f"\n=== Benchmarking: {args.url} ===")
results = run_sweep(args.url, prompt_lengths, args.n_warmup, args.n_iter)
print(f"\n{'T':>8} {'Prefill ms':>12} {'tok/s':>10}")
print("-" * 35)
for r in results:
print(f"{r['actual_tokens']:>8} {r['avg_prefill_ms']:>12.1f} {r['avg_tok_per_s']:>10.1f}")

elif args.url_streaming and args.url_baseline:
print(f"\n=== Baseline: {args.url_baseline} ===")
baseline = run_sweep(args.url_baseline, prompt_lengths, args.n_warmup, args.n_iter)

print(f"\n=== Streaming: {args.url_streaming} ===")
streaming = run_sweep(args.url_streaming, prompt_lengths, args.n_warmup, args.n_iter)

print(f"\n{'T':>8} {'Baseline ms':>12} {'Stream ms':>12} {'Speedup':>8}")
print("-" * 50)
stream_map = {s["target_tokens"]: s for s in streaming}
for b in baseline:
s = stream_map.get(b["target_tokens"])
if not s:
continue
speedup = b["avg_prefill_ms"] / s["avg_prefill_ms"] if s["avg_prefill_ms"] > 0 else 0
print(f"{b['actual_tokens']:>8} {b['avg_prefill_ms']:>12.1f} "
f"{s['avg_prefill_ms']:>12.1f} {speedup:>7.2f}x")
else:
parser.error("Provide --url or both --url-streaming and --url-baseline")


if __name__ == "__main__":
main()
25 changes: 25 additions & 0 deletions server/src/common/gguf_mmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#pragma once

#include <cstddef>
#include <cstdint>
#include <string>

namespace dflash::common {
Expand All @@ -36,6 +37,10 @@ class GgufMmap {
size_t size() const; // 0 when not open
bool is_open() const;

// Advise the kernel to read ahead the given byte range into page cache.
// No-op if not open or range is invalid. Safe to call from any thread.
void advise_willneed(size_t offset, size_t length) const;

// Transfer ownership of the mmap'd region to the caller.
// After release() this object is empty (is_open() == false).
// The caller is responsible for unmapping on POSIX or UnmapViewOfFile on
Expand Down Expand Up @@ -196,6 +201,26 @@ inline const void * GgufMmap::data() const { return data_; }
inline size_t GgufMmap::size() const { return size_; }
inline bool GgufMmap::is_open() const { return data_ != nullptr; }

inline void GgufMmap::advise_willneed(size_t offset, size_t length) const {
if (!data_ || offset >= size_) return;
if (offset + length > size_) length = size_ - offset;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Unsigned integer overflow in range validation: offset + length on size_t can wrap around, causing the clamp to be skipped and an invalid oversized range to be passed to madvise/PrefetchVirtualMemory. Use if (length > size_ - offset) instead.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At server/src/common/gguf_mmap.h, line 205:

<comment>Unsigned integer overflow in range validation: `offset + length` on `size_t` can wrap around, causing the clamp to be skipped and an invalid oversized range to be passed to `madvise`/`PrefetchVirtualMemory`. Use `if (length > size_ - offset)` instead.</comment>

<file context>
@@ -196,6 +200,26 @@ inline const void * GgufMmap::data() const { return data_; }
 
+inline void GgufMmap::advise_willneed(size_t offset, size_t length) const {
+    if (!data_ || offset >= size_) return;
+    if (offset + length > size_) length = size_ - offset;
+    if (length == 0) return;
+#if defined(_WIN32)
</file context>
Suggested change
if (offset + length > size_) length = size_ - offset;
if (length > size_ - offset) length = size_ - offset;

if (length == 0) return;
#if defined(_WIN32)
// PrefetchVirtualMemory (Windows 8+)
WIN32_MEMORY_RANGE_ENTRY entry{};
entry.VirtualAddress = const_cast<uint8_t *>(static_cast<const uint8_t *>(data_)) + offset;
entry.NumberOfBytes = length;
PrefetchVirtualMemory(GetCurrentProcess(), 1, &entry, 0);
#else
// Align to page boundary for madvise
const size_t page_size = (size_t)sysconf(_SC_PAGESIZE);
const size_t aligned_offset = (offset / page_size) * page_size;
const size_t aligned_length = length + (offset - aligned_offset);
::madvise(const_cast<uint8_t *>(static_cast<const uint8_t *>(data_)) + aligned_offset,
aligned_length, MADV_WILLNEED);
#endif
}

inline GgufMmap::OwnedRegion GgufMmap::release() {
OwnedRegion r{};
r.data = data_;
Expand Down
Loading
Loading