From 06bdda7fe63e65cb065ccfe07ee7a44a61f20141 Mon Sep 17 00:00:00 2001 From: Javier Pazo Date: Sat, 9 May 2026 12:39:37 +0200 Subject: [PATCH] feat(dflash): daemon scripts improvements (GPU split, Windows, defaults) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bundle of small daemon-script improvements that piled up while running Lucebox in production. The cleanup is net negative (+260 / -583): most of it removes dead branches and stale defaults. Per CONTRIBUTING ("one concern per PR"): I tried to split this into three PRs but the changes are entangled across the same hunks of `server.py` / `server_tools.py` (e.g. argparse table changed in one place to add `--target-gpu`, `--prompt-file` and the new defaults together). I kept it as one cohesive PR. **Happy to split into three sequential PRs if you prefer**, listed below in the order I would split them — let me know: A. feat(dflash): add --target-gpu / --draft-gpu daemon flags - server.py / server_tools.py: launcher flags exposed - Pins CUDA_VISIBLE_DEVICES per process and translates back to native ordinals when invoking test_dflash. So `--target-gpu=1 --draft-gpu=0` produces CUDA_VISIBLE_DEVICES=1,0 with ordinals 0/1 inside. - DFLASH_TARGET_GPU / DFLASH_DRAFT_GPU env vars also honoured. - Complements PR #122 (open, weicj) which adds the same selection in the Python dual-GPU bench harness; this is the daemon side. B. chore(dflash): Windows-friendly daemon scripts - server.py / server_tools.py / run.py / _prefill_hook.py: argparse plumbing supports `--prompt-file ` so prompts with newlines / non-ASCII work without fragile shell quoting. - tokenize_prompt.py / detokenize.py: minor UTF-8 path fix. - bench_he.py / bench_llm.py: tokenizer override exposed (line with merged PR #93). C. feat(dflash): default to Qwen3.6-27B target + DFlash drafter - server.py / server_tools.py: when neither `--target-model` nor `--draft-model` is given, defaults pick the Qwen3.6-27B target + DFlash drafter with #79-style metadata. - Behaviour is unchanged for any invocation that already passes the flags explicitly. D. (already covered by PR #135) - --target-cache-slots and --draft-feature-mirror are exposed to the daemon launcher; this is a small tail of #135 and not strictly part of this PR's scope. If #135 lands first, these lines rebase cleanly; if this PR lands first, #135's slot scheduler has the wiring it needs at the daemon level. Validation: - py_compile for all 8 scripts. - End-to-end manual launch: python dflash/scripts/server.py \ --target-gpu 1 --draft-gpu 0 \ --target-cache-slots 2 --stream-tagged \ --prompt-file prompts/long.txt on Windows MSVC + CUDA 12.x, RTX 6000 Ada (sm_89) + RTX 4090. Daemon comes up, both GPUs are pinned correctly, the prompt file is read without escaping issues, slot 0 and slot 1 admit requests independently. - bench_he.py / bench_llm.py with a non-default tokenizer flag matches the behaviour PR #93 enabled. This is a correctness / config / UX change, not a kernel perf change. No kernel timings claimed. Verification vs existing community PRs: - COMP-COMPL with #110 (merged, "fix Sparce attention and Qwen loader on windows", touches `qwen3_0p6b_loader.cpp`). - COMP-COMPL with #126 (merged, "support Windows in DflashClient", touches `pflash/pflash/dflash_client.py`). - COMP-COMPL with #93 (merged, "make target tokenizer overrideable in bench_he and bench_llm"). - COMP-COMPL with #122 (open, weicj, "CUDA/HIP mixed backend placement" bench harness). - COMP-COMPL with #132 (open, server.py for codex tooling). Some surface overlap on `server.py`; happy to rebase if #132 lands first. - No prior art for the daemon-side --target-gpu/--draft-gpu plumbing or for the Windows --prompt-file path at this layer. Author: Javier Pazo --- dflash/scripts/_prefill_hook.py | 31 +-- dflash/scripts/bench_he.py | 88 ++----- dflash/scripts/bench_llm.py | 208 ++------------- dflash/scripts/detokenize.py | 2 +- dflash/scripts/run.py | 12 +- dflash/scripts/server.py | 404 +++++++++--------------------- dflash/scripts/server_tools.py | 94 ++++++- dflash/scripts/tokenize_prompt.py | 4 +- 8 files changed, 260 insertions(+), 583 deletions(-) diff --git a/dflash/scripts/_prefill_hook.py b/dflash/scripts/_prefill_hook.py index 7d7611631..10a14186f 100644 --- a/dflash/scripts/_prefill_hook.py +++ b/dflash/scripts/_prefill_hook.py @@ -59,7 +59,6 @@ class PrefillConfig: keep_ratio: float # 0.015..0.125 drafter_gguf: Optional[Path] # drafter weights (Qwen3-0.6B BF16 GGUF) drafter_tokenizer_id: str # HF repo ID for drafter vocab - skip_park: bool = False # skip park/unpark on >=32 GB GPUs @property def enabled(self) -> bool: @@ -93,11 +92,6 @@ def add_cli_flags(ap) -> None: ap.add_argument("--prefill-drafter-tokenizer", default="Qwen/Qwen3-0.6B", help="HF repo ID for the drafter tokenizer " "(default Qwen/Qwen3-0.6B).") - ap.add_argument("--prefill-skip-park", action="store_true", default=False, - help="Skip park/unpark/free-drafter on GPUs with enough VRAM " - "to hold target + draft + scorer simultaneously (e.g. " - "RTX 5090 32 GB). Keeps scorer resident for fast " - "subsequent compressions.") def config_from_args(args) -> PrefillConfig: @@ -115,7 +109,6 @@ def config_from_args(args) -> PrefillConfig: keep_ratio=args.prefill_keep_ratio, drafter_gguf=args.prefill_drafter, drafter_tokenizer_id=args.prefill_drafter_tokenizer, - skip_park=getattr(args, 'prefill_skip_park', False), ) @@ -128,17 +121,11 @@ def compress_text_via_daemon( drafter_tokenizer, cfg: PrefillConfig, prompt_text: str, - skip_park: bool = False, ) -> str: """Run the daemon's compress + memory dance, return the compressed text. Caller holds the daemon lock for the full duration. After this returns, the daemon has its target + draft restored and is ready for ``generate``. - - When ``skip_park`` is True (e.g. on 32 GB+ GPUs where all three models - fit in VRAM simultaneously), the park/unpark/free-drafter steps are - skipped. The daemon's compress handler will keep the scorer loaded for - subsequent requests, avoiding the ~2s reload penalty per request. """ # 1) drafter-tokenize the prompt drafter_ids = drafter_tokenizer(prompt_text, return_tensors=None, @@ -153,23 +140,21 @@ def compress_text_via_daemon( for t in drafter_ids: f.write(struct.pack(" str | None: +def _find_safetensors(root: Path) -> str | None: if root.is_file(): - return str(root) if root.suffix in (".safetensors", ".gguf") else None + return str(root) if not root.is_dir(): return None for st in root.rglob("model.safetensors"): return str(st) - for gguf in root.rglob("*.gguf"): - return str(gguf) return None def _resolve_draft() -> str: env = os.environ.get("DFLASH_DRAFT") if env: - found = _find_draft_file(Path(env)) + found = _find_safetensors(Path(env)) if found: return found - raise FileNotFoundError(f"DFLASH_DRAFT does not point to a draft file: {env}") + raise FileNotFoundError(f"DFLASH_DRAFT does not point to model.safetensors: {env}") for candidate in (_LOCAL_DRAFT_FILE, _LOCAL_DRAFT_ROOT): - found = _find_draft_file(candidate) + found = _find_safetensors(candidate) if found: return found raise FileNotFoundError( - "draft model file not found. Expected one of:\n" + "draft model.safetensors not found. Expected one of:\n" f" - {_LOCAL_DRAFT_FILE}\n" - "Download it as documented in the README, or set DFLASH_DRAFT to an explicit .safetensors/.gguf file or directory." + "Download it as documented in the README, or set DFLASH_DRAFT to an explicit file or directory." ) @@ -261,37 +259,17 @@ def run_test_dflash(prompt_path: Path, n_gen: int, fast_rollback: bool, print("STDERR:", r.stderr[-2000:]) raise RuntimeError(f"test_dflash exited {r.returncode}") - # Parse output. The target layer-split harness prints both prefill and - # decode lines, so avoid the older "first tok/s wins" regexp there. + # Parse output out = r.stdout - m_prefill = re.search( - r"\[target-split\] prefill tokens=(\d+) time=(\d+(?:\.\d+)?) s speed=(\d+(?:\.\d+)?) tok/s", - out, - ) - m_decode_split = re.search( - r"\[target-split-dflash\] decode tokens=(\d+) time=(\d+(?:\.\d+)?) s speed=(\d+(?:\.\d+)?) tok/s", - out, - ) - m_decode_default = re.search( - r"\[dflash\] generated \d+ tokens in \d+(?:\.\d+)? s\s+->\s+(\d+(?:\.\d+)?) tok/s", - out, - ) m_tps = re.search(r"(\d+(?:\.\d+)?)\s+tok/s", out) m_commit = re.search(r"avg commit/step=(\d+(?:\.\d+)?)", out) m_accept = re.search(r"accepted=(\d+)/(\d+) \((\d+(?:\.\d+)?)%", out) m_steps = re.search(r"(\d+) draft steps", out) - if not ((m_decode_split or m_decode_default or m_tps) and m_commit and m_accept and m_steps): + if not (m_tps and m_commit and m_accept and m_steps): print("STDOUT tail:", out[-2000:]) raise RuntimeError("failed to parse output") - if m_decode_split: - tok_s = float(m_decode_split.group(3)) - elif m_decode_default: - tok_s = float(m_decode_default.group(1)) - else: - tok_s = float(m_tps.group(1)) return { - "tok_s": tok_s, - "prefill_tok_s": float(m_prefill.group(3)) if m_prefill else None, + "tok_s": float(m_tps.group(1)), "commit_per_step": float(m_commit.group(1)), "accepted": int(m_accept.group(1)), "total_draft_pos": int(m_accept.group(2)), @@ -322,26 +300,12 @@ def main(): help="Visible CUDA device id for the target backend") ap.add_argument("--draft-gpu", type=int, default=None, help="Visible CUDA device id for the draft backend") - ap.add_argument("--target-gpus", default=None, - help="Comma-separated target GPU ids for the layer-split harness") - ap.add_argument("--target-layer-split", default=None, - help="Comma-separated layer split weights matching --target-gpus") - ap.add_argument("--target-split-load-draft", action="store_true", - help="Load the draft alongside the target layer-split harness") - ap.add_argument("--target-split-dflash", action="store_true", - help="Run chain DFlash decode through the target layer-split harness") - ap.add_argument("--max-ctx", type=int, default=None, - help="Forward --max-ctx=N to test_dflash") - ap.add_argument("--prefill-ubatch", type=int, default=None, - help="Set DFLASH27B_PREFILL_UBATCH for target split prefill") ap.add_argument("--cuda-visible-devices", default=None, help="Optional CUDA_VISIBLE_DEVICES override for test_dflash") ap.add_argument("--target-tokenizer", - default=os.environ.get("DFLASH_TOKENIZER", "Qwen/Qwen3.5-27B"), + default=os.environ.get("DFLASH_TOKENIZER", "Qwen/Qwen3.6-27B"), help="HuggingFace tokenizer repo for the target. Defaults to " - "$DFLASH_TOKENIZER, then Qwen/Qwen3.5-27B. Override for " - "Qwen3.6 or other variants, e.g. " - "--target-tokenizer Qwen/Qwen3.6-27B") + "$DFLASH_TOKENIZER, then Qwen/Qwen3.6-27B.") args = ap.parse_args() # Tokenized prompts are cached at TMPDIR/he_prompt__NN.bin so @@ -375,8 +339,8 @@ def main(): print(f"[bench] skipping tokenize (reusing {_prompt_path(0, tok_slug).parent})") print(f"\n[bench] mode={args.mode} n_gen={args.n_gen}") - print(f"{'prompt':28s} {'steps':>6s} {'AL':>6s} {'pct%':>6s} {'prefill':>8s} {'decode':>8s}") - print("-" * 72) + print(f"{'prompt':28s} {'steps':>6s} {'AL':>6s} {'pct%':>6s} {'tok/s':>8s}") + print("-" * 62) extra_args = [] if args.draft_feature_mirror: @@ -385,29 +349,17 @@ def main(): extra_args.append(f"--target-gpu={args.target_gpu}") if args.draft_gpu is not None: extra_args.append(f"--draft-gpu={args.draft_gpu}") - if args.target_gpus: - extra_args.append(f"--target-gpus={args.target_gpus}") - if args.target_layer_split: - extra_args.append(f"--target-layer-split={args.target_layer_split}") - if args.target_split_load_draft: - extra_args.append("--target-split-load-draft") - if args.target_split_dflash: - extra_args.append("--target-split-dflash") - if args.max_ctx is not None: - extra_args.append(f"--max-ctx={args.max_ctx}") extra_env = {} if args.cuda_visible_devices: extra_env["CUDA_VISIBLE_DEVICES"] = args.cuda_visible_devices - if args.prefill_ubatch is not None: - extra_env["DFLASH27B_PREFILL_UBATCH"] = str(args.prefill_ubatch) results = [] for i, (name, _) in enumerate(PROMPTS): path = _prompt_path(i, tok_slug) try: r = run_test_dflash(path, args.n_gen, - fast_rollback=(args.mode == "fast" and not args.target_split_dflash), + fast_rollback=(args.mode == "fast"), ddtree_budget=args.ddtree_budget, ddtree_temp=args.ddtree_temp, ddtree_no_chain_seed=args.ddtree_no_chain_seed, @@ -417,10 +369,9 @@ def main(): print(f" [{i:02d}] {name:26s} FAILED: {e}") continue results.append((name, r)) - prefill_s = f"{r['prefill_tok_s']:8.2f}" if r["prefill_tok_s"] is not None else f"{'n/a':>8s}" print( f" {name:26s} {r['steps']:6d} {r['commit_per_step']:6.2f} " - f"{r['pct']:6.1f} {prefill_s} {r['tok_s']:8.2f}" + f"{r['pct']:6.1f} {r['tok_s']:8.2f}" ) if not results: @@ -431,12 +382,9 @@ def main(): mean_al = sum(r["commit_per_step"] for _, r in results) / n mean_tps = sum(r["tok_s"] for _, r in results) / n mean_pct = sum(r["pct"] for _, r in results) / n - prefill_vals = [r["prefill_tok_s"] for _, r in results if r["prefill_tok_s"] is not None] - mean_prefill = sum(prefill_vals) / len(prefill_vals) if prefill_vals else None - print("-" * 72) - prefill_s = f"{mean_prefill:8.2f}" if mean_prefill is not None else f"{'n/a':>8s}" - print(f"{'MEAN':28s} {'':6s} {mean_al:6.2f} {mean_pct:6.1f} {prefill_s} {mean_tps:8.2f}") + print("-" * 62) + print(f"{'MEAN':28s} {'':6s} {mean_al:6.2f} {mean_pct:6.1f} {mean_tps:8.2f}") print() print(f"commit/step range: {min(r['commit_per_step'] for _,r in results):.2f} - " f"{max(r['commit_per_step'] for _,r in results):.2f}") diff --git a/dflash/scripts/bench_llm.py b/dflash/scripts/bench_llm.py index 05e317e40..788b2e79f 100644 --- a/dflash/scripts/bench_llm.py +++ b/dflash/scripts/bench_llm.py @@ -8,7 +8,7 @@ DFLASH_DRAFT path to draft model.safetensors DFLASH_BIN path to build/test_dflash DFLASH_BIN_AR path to build/test_generate - DFLASH_TOKENIZER HF tokenizer repo (default Qwen/Qwen3.5-27B; matches run.py) + DFLASH_TOKENIZER HF tokenizer repo (default Qwen/Qwen3.6-27B; matches run.py) """ import json import os @@ -29,7 +29,7 @@ DRAFT = None TEST_DFLASH = os.environ.get("DFLASH_BIN", str(ROOT / "build" / f"test_dflash{BIN_SUFFIX}")) TEST_GENERATE = os.environ.get("DFLASH_BIN_AR", str(ROOT / "build" / f"test_generate{BIN_SUFFIX}")) -TOKENIZER = os.environ.get("DFLASH_TOKENIZER", "Qwen/Qwen3.5-27B") +TOKENIZER = os.environ.get("DFLASH_TOKENIZER", "Qwen/Qwen3.6-27B") TMPDIR = Path(tempfile.gettempdir()) / "dflash_bench" TMPDIR.mkdir(parents=True, exist_ok=True) @@ -38,9 +38,9 @@ N_SAMPLE = 10 BENCHES = [ - ("HumanEval", "openai_humaneval", None, "test", lambda x: x["prompt"], None, N_GEN), - ("GSM8K", "gsm8k", "main", "test", lambda x: f"Question: {x['question']}\nAnswer: ", None, N_GEN), - ("Math500", "HuggingFaceH4/MATH-500", None, "test", lambda x: f"Problem: {x['problem']}\nSolution: ", lambda x: x["answer"], 2048), + ("HumanEval", "openai_humaneval", None, "test", lambda x: x["prompt"]), + ("GSM8K", "gsm8k", "main", "test", lambda x: f"Question: {x['question']}\nAnswer: "), + ("Math500", "HuggingFaceH4/MATH-500", None, "test", lambda x: f"Problem: {x['problem']}\nSolution: "), ] @@ -95,10 +95,10 @@ def tokenize(tok, p, path: Path): return len(ids) -def run_ar(path: Path, n_gen: int = N_GEN): +def run_ar(path: Path): out_bin = TMPDIR / "ar_out.bin" r = _run_checked( - [TEST_GENERATE, TARGET, str(path), str(n_gen), str(out_bin)], + [TEST_GENERATE, TARGET, str(path), str(N_GEN), str(out_bin)], timeout=300, label="test_generate", ) @@ -108,25 +108,25 @@ def run_ar(path: Path, n_gen: int = N_GEN): return float(m.group(1)) -def _auto_max_ctx(n_prompt, n_gen: int = N_GEN): +def _auto_max_ctx(n_prompt): # Auto-fit attention budget: prompt + gen + small verify pad, aligned to # FATTN_KQ_STRIDE=256. Oversizing max_ctx makes attention stride over # unused KV and can cost >20× prefill time (32K prompt + --kv-q4 + # max_ctx=131072 → 1035s vs 38s at max_ctx=32768). See scripts/run.py. pad = 64 # covers q_len=16 + ddtree budget up to 22 with margin - return ((n_prompt + n_gen + pad + 255) // 256) * 256 + return ((n_prompt + N_GEN + pad + 255) // 256) * 256 -def run_df(path: Path, n_prompt, n_gen: int = N_GEN): - max_ctx = _auto_max_ctx(n_prompt, n_gen) - out_bin = TMPDIR / f"df_out.bin" +def run_df(path: Path, n_prompt): + max_ctx = _auto_max_ctx(n_prompt) + out_bin = TMPDIR / "df_out.bin" r = _run_checked( [ TEST_DFLASH, TARGET, DRAFT, str(path), - str(n_gen), + str(N_GEN), str(out_bin), "--fast-rollback", "--ddtree", @@ -140,146 +140,7 @@ def run_df(path: Path, n_prompt, n_gen: int = N_GEN): al = re.search(r"avg commit/step=(\d+(?:\.\d+)?)", r.stdout) if not (tps and al): raise RuntimeError(f"test_dflash output parse failed: {r.stdout[-1500:]}") - return float(tps.group(1)), float(al.group(1)), out_bin - - -def _read_ids(path: Path): - """Read a binary file of packed int32 token IDs.""" - data = path.read_bytes() - return list(struct.unpack(f"<{len(data)//4}i", data)) - - -def _extract_boxed(text: str) -> str | None: - """Extract the last \\boxed{...} from a string, handling nested braces.""" - results = [] - i = 0 - while i < len(text): - idx = text.find("\\boxed{", i) - if idx == -1: - break - start = idx + len("\\boxed{") - depth = 1 - j = start - while j < len(text) and depth > 0: - if text[j] == "{": - depth += 1 - elif text[j] == "}": - depth -= 1 - j += 1 - if depth == 0: - results.append(text[start:j-1].strip()) - i = j - return results[-1] if results else None - - -def _normalize_math(s: str) -> str: - """Normalize a math answer string for comparison.""" - if s is None: - return "" - s = s.strip() - if s.startswith("$") and s.endswith("$"): - s = s[1:-1].strip() - s = re.sub(r"\\text\s*\{([^}]*)\}", r"\1", s) - s = re.sub(r"\\mathrm\s*\{([^}]*)\}", r"\1", s) - for cmd in [r"\left", r"\right", r"\displaystyle", r"\tfrac", r"\dfrac"]: - s = s.replace(cmd, "") - for unit in [" cm", " m", " km", " kg", " g", " s", " ms", - " degrees", " degree", "°", " inches", " feet", - " square units", " units", " dollars"]: - if s.lower().rstrip(".").endswith(unit): - s = s[:len(s) - len(unit) - (1 if s.endswith(".") else 0)] - s = re.sub(r"\s+", " ", s).strip() - s = s.rstrip(".,") - return s - - -def _math_equiv(pred: str, gold: str) -> bool: - """Check if two math answers are equivalent.""" - if pred is None or gold is None: - return False - p = _normalize_math(pred) - g = _normalize_math(gold) - if p == g: - return True - p_c = re.sub(r"\s*\\frac", r"\\frac", p) - g_c = re.sub(r"\s*\\frac", r"\\frac", g) - if p_c == g_c: - return True - try: - pf = float(p.replace(",", "")) - gf = float(g.replace(",", "")) - return abs(pf - gf) < 1e-6 - except (ValueError, TypeError): - pass - mixed_pat = re.compile(r"^(\d+)\s*\\frac\s*\{(\d+)\}\s*\{(\d+)\}$") - for s, other in [(p, g), (g, p)]: - m = mixed_pat.match(s) - if m: - try: - val = float(m.group(1)) + float(m.group(2)) / float(m.group(3)) - oval = float(other.replace(",", "")) - if abs(val - oval) < 1e-6: - return True - except (ValueError, ZeroDivisionError): - pass - frac_pat = re.compile(r"\\?frac\s*\{([^}]+)\}\s*\{([^}]+)\}") - for s, other in [(p, g), (g, p)]: - m = frac_pat.search(s) - if m: - try: - val = float(m.group(1)) / float(m.group(2)) - oval = float(other.replace(",", "")) - if abs(val - oval) < 1e-6: - return True - except (ValueError, ZeroDivisionError): - pass - return False - - -def score_math(output_bin: Path, gold_answer: str, tok) -> tuple[bool, str]: - """Score a Math500 output against the gold answer. - - Extracts \\boxed{} answers from model output (after for thinking - models), compares against gold with normalized string matching + numeric/ - fraction equivalence. Returns (correct, detail_str). - """ - ids = _read_ids(output_bin) - text = tok.decode(ids) - - think_end = text.rfind("") - answer_text = text[think_end + len(""):] if think_end >= 0 else text - - pred = _extract_boxed(answer_text) - if not pred: - pred = _extract_boxed(text) - if not pred: - pred = None - - # Fallback: "the answer is **X**" patterns - if pred is None: - bold_pattern = re.compile( - r'(?:answer\s+is|there\s+are|result\s+is|equals?|=)\s*\*\*(.+?)\*\*', - re.IGNORECASE) - m = bold_pattern.search(answer_text) - if m: - pred = m.group(1).strip().rstrip(".") - - # Fallback: last $...$ expression - if pred is None: - matches = re.findall(r'\$([^$]+)\$', answer_text) - if matches: - pred = matches[-1].strip() - - correct = _math_equiv(pred, gold_answer) - pred_short = (pred[:60] + "…") if pred and len(pred) > 60 else pred - gold_short = (gold_answer[:60] + "…") if len(gold_answer) > 60 else gold_answer - if correct: - detail = f"🎯 {pred_short}" - elif pred: - detail = f"✗ pred={pred_short} gold={gold_short}" - else: - detail = f"✗ no answer found, gold={gold_short}" - return correct, detail + return float(tps.group(1)), float(al.group(1)) def main(): @@ -300,57 +161,40 @@ def main(): tok = AutoTokenizer.from_pretrained(TOKENIZER, trust_remote_code=True) results = {} - for name, ds_name, cfg, split, extract, gold_extract, gen in BENCHES: - print(f"\n[bench] ==== {name} (n={N_SAMPLE}, n_gen={gen}) ====", flush=True) + for name, ds_name, cfg, split, extract in BENCHES: + print(f"\n[bench] ==== {name} (n={N_SAMPLE}) ====", flush=True) ds = load_dataset(ds_name, cfg, split=split) - ds_selected = ds.shuffle(seed=42).select(range(N_SAMPLE)) - prompt_list = [extract(s) for s in ds_selected] - gold_list = [gold_extract(s) for s in ds_selected] if gold_extract else [None] * len(prompt_list) - + ds = ds.shuffle(seed=42).select(range(N_SAMPLE)) ar_tps, df_tps, df_al = [], [], [] - n_score_correct, n_scored = 0, 0 - for i, (p, gold) in enumerate(zip(prompt_list, gold_list)): + for i, s in enumerate(ds): + p = extract(s) path = TMPDIR / f"b_{name}_{i:02d}.bin" n = tokenize(tok, p, path) if n == 0 or n > 3500: continue try: - ar = run_ar(path, gen) - df, al, df_bin = run_df(path, n, gen) + ar = run_ar(path) + df, al = run_df(path, n) except Exception as e: print(f" [{i+1:02d}/{N_SAMPLE}] n_tok={n:4d} FAILED: {e}", flush=True) continue - - score_detail = "" - if gold is not None: - correct, score_detail = score_math(df_bin, gold, tok) - n_scored += 1 - if correct: - n_score_correct += 1 - score_detail = f" {score_detail}" - if ar > 0: ar_tps.append(ar) if df > 0: df_tps.append(df) df_al.append(al) - print(f" [{i+1:02d}/{N_SAMPLE}] n_tok={n:4d} AR={ar:6.2f} DFlash={df:7.2f} AL={al:5.2f}{score_detail}", flush=True) + print(f" [{i+1:02d}/{N_SAMPLE}] n_tok={n:4d} AR={ar:6.2f} DFlash={df:7.2f} AL={al:5.2f}", flush=True) ar_m = sum(ar_tps) / len(ar_tps) if ar_tps else 0 df_m = sum(df_tps) / len(df_tps) if df_tps else 0 al_m = sum(df_al) / len(df_al) if df_al else 0 - score_str = f"{n_score_correct}/{n_scored}" if n_scored else "" results[name] = {"ar": ar_m, "dflash": df_m, "al": al_m, - "speedup": df_m / ar_m if ar_m else 0, - "score": score_str} - summary = f" {name} mean: AR={ar_m:.2f} DFlash={df_m:.2f} AL={al_m:.2f} {results[name]['speedup']:.2f}x" - if score_str: - summary += f" score={score_str} ({n_score_correct/n_scored*100:.0f}%)" - print(summary, flush=True) + "speedup": df_m / ar_m if ar_m else 0} + print(f" {name} mean: AR={ar_m:.2f} DFlash={df_m:.2f} AL={al_m:.2f} {results[name]['speedup']:.2f}x", flush=True) print("\n[bench] === SUMMARY ===") - print(f"{'Task':12s} {'AR':>8s} {'DFlash':>8s} {'AL':>6s} {'Speedup':>8s} {'Score':>8s}") + print(f"{'Task':12s} {'AR':>8s} {'DFlash':>8s} {'AL':>6s} {'Speedup':>8s}") for name, r in results.items(): - print(f"{name:12s} {r['ar']:8.2f} {r['dflash']:8.2f} {r['al']:6.2f} {r['speedup']:7.2f}x {r.get('score',''):>8s}") + print(f"{name:12s} {r['ar']:8.2f} {r['dflash']:8.2f} {r['al']:6.2f} {r['speedup']:7.2f}x") out_json = TMPDIR / "bench_llm_results.json" with open(out_json, "w") as f: diff --git a/dflash/scripts/detokenize.py b/dflash/scripts/detokenize.py index 0470cf9f1..ec808586a 100644 --- a/dflash/scripts/detokenize.py +++ b/dflash/scripts/detokenize.py @@ -7,7 +7,7 @@ def main(): ap = argparse.ArgumentParser() ap.add_argument("--in", dest="inp", required=True) - ap.add_argument("--model", default="Qwen/Qwen3.5-27B") + ap.add_argument("--model", default="Qwen/Qwen3.6-27B") ap.add_argument("--slice", default=None, help="optional 'start:end' (token indices, exclusive end)") args = ap.parse_args() diff --git a/dflash/scripts/run.py b/dflash/scripts/run.py index 108a7520b..2894d79cd 100644 --- a/dflash/scripts/run.py +++ b/dflash/scripts/run.py @@ -61,6 +61,8 @@ def main(): d = default_paths() ap = argparse.ArgumentParser() ap.add_argument("--prompt", type=str, default=None) + ap.add_argument("--prompt-file", type=str, default=None, + help="Read the prompt from a UTF-8 text file. Useful on Windows long-context runs where argv length is limited.") ap.add_argument("--n-gen", type=int, default=256) ap.add_argument("--target", type=str, default=d["target"]) ap.add_argument("--draft", type=str, default=d["draft"]) @@ -91,7 +93,12 @@ def main(): "because the kernel strides over unused KV.") args = ap.parse_args() - prompt_text = args.prompt if args.prompt else sys.stdin.read().strip() + if args.prompt_file: + prompt_text = Path(args.prompt_file).read_text(encoding="utf-8") + elif args.prompt: + prompt_text = args.prompt + else: + prompt_text = sys.stdin.read().strip() if not prompt_text: sys.exit("no prompt") @@ -108,7 +115,8 @@ def main(): msgs.append({"role": "system", "content": args.system}) msgs.append({"role": "user", "content": prompt_text}) text = tokenizer.apply_chat_template(msgs, tokenize=False, - add_generation_prompt=True) + add_generation_prompt=True, + enable_thinking=False) draft_path = resolve_draft(args.draft) im_end_id = tokenizer.encode("<|im_end|>", add_special_tokens=False) diff --git a/dflash/scripts/server.py b/dflash/scripts/server.py index 6786457a5..8256828f8 100644 --- a/dflash/scripts/server.py +++ b/dflash/scripts/server.py @@ -16,7 +16,6 @@ import argparse import json import os -import re import struct import subprocess import sys @@ -46,22 +45,25 @@ str(ROOT / "models" / "Qwen3.6-27B-Q4_K_M.gguf"), )) DEFAULT_DRAFT_ROOT = ROOT / "models" / "draft" -DEFAULT_BIN = ROOT / "build" / ("test_dflash" + (".exe" if sys.platform == "win32" else "")) +DEFAULT_BIN = ROOT / "build" / "Release" / ("test_dflash" + (".exe" if sys.platform == "win32" else "")) DEFAULT_BUDGET = 22 MODEL_NAME = "luce-dflash" -# Architecture strings stored in `general.architecture` of every GGUF this -# server can drive. test_dflash dispatches by GGUF arch internally: -# qwen35 / qwen36 -> existing DFlash + DDTree pipeline -# laguna -> dflash27b::run_laguna_daemon() (no spec-decode) -# server.py just needs to omit --draft + the DFlash/DDTree flags when the -# arch doesn't support speculative decoding yet. -_QWEN35_ARCHES = {"qwen35", "qwen36"} -_LAGUNA_ARCHES = {"laguna"} - _ALLOWED_TEMPLATE_KWARGS = frozenset({"enable_thinking", "tools", "add_generation_prompt"}) +def _resolve_gpu_arg(value: int | None, env_name: str) -> int | None: + if value is not None: + return value + raw = os.environ.get(env_name) + if raw is None or not raw.strip(): + return None + try: + return int(raw) + except ValueError as exc: + raise ValueError(f"{env_name} must be an integer, got {raw!r}") from exc + + def resolve_draft(root: Path) -> Path: for st in root.rglob("model.safetensors"): return st @@ -72,61 +74,26 @@ def resolve_draft(root: Path) -> Path: "Qwen3.5-27B": "Qwen/Qwen3.5-27B", "Qwen3.6-27B": "Qwen/Qwen3.6-27B", } -THINK_OPEN_TAG = "" -THINK_CLOSE_TAG = "" - -_LAGUNA_FAMILY_TOKENIZERS = { - "Laguna-XS.2": "poolside/Laguna-XS.2", - "Laguna-XS": "poolside/Laguna-XS.2", - "laguna-xs2": "poolside/Laguna-XS.2", -} - - -def _read_gguf_str(reader, key: str) -> str | None: - f = reader.fields.get(key) - if f is None or not f.data: - return None - import numpy as np - p = f.parts[f.data[0]] - if not isinstance(p, np.ndarray): - return None - try: - return bytes(p).decode("utf-8", errors="replace") - except Exception: - return None - - -def _arch_from_gguf(gguf_path: Path) -> str: - """Return the value of ``general.architecture`` from the GGUF, or 'unknown'. - - server.py uses this to dispatch between the qwen35 stack (test_dflash + - DFlash + DDTree) and the laguna stack (test_laguna_daemon, autoregressive - only). 'unknown' falls back to the qwen35 path so existing setups keep - working when the field is missing. - """ - try: - from gguf import GGUFReader # type: ignore - r = GGUFReader(str(gguf_path)) - v = _read_gguf_str(r, "general.architecture") - return v.lower() if v else "unknown" - except Exception: - return "unknown" def _tokenizer_id_from_gguf(gguf_path: Path) -> str: - default = "Qwen/Qwen3.5-27B" + default = "Qwen/Qwen3.6-27B" try: from gguf import GGUFReader # type: ignore r = GGUFReader(str(gguf_path)) - arch = (_read_gguf_str(r, "general.architecture") or "").lower() - family = _LAGUNA_FAMILY_TOKENIZERS if arch in _LAGUNA_ARCHES else _QWEN35_FAMILY_TOKENIZERS - if arch in _LAGUNA_ARCHES: - default = next(iter(_LAGUNA_FAMILY_TOKENIZERS.values())) for key in ("general.basename", "general.name"): - val = _read_gguf_str(r, key) - if val is None: + f = r.fields.get(key) + if f is None or not f.data: + continue + import numpy as np + p = f.parts[f.data[0]] + if not isinstance(p, np.ndarray): continue - for known, repo in family.items(): + try: + val = bytes(p).decode("utf-8", errors="replace") + except Exception: + continue + for known, repo in _QWEN35_FAMILY_TOKENIZERS.items(): if known.lower() in val.lower(): return repo except Exception: @@ -134,74 +101,6 @@ def _tokenizer_id_from_gguf(gguf_path: Path) -> str: return default -def parse_reasoning( - text: str, - thinking_enabled: bool = True, - started_in_thinking: bool = False, -) -> tuple[str, str | None]: - # Qwen chat templates can prefill `\n` into the prompt, so the - # generated output contains only the reasoning body plus ``. - parts = text.partition(THINK_OPEN_TAG) - saw_open_tag = bool(parts[1]) - rest = parts[2] if saw_open_tag else parts[0] - if THINK_CLOSE_TAG not in rest: - if thinking_enabled and (started_in_thinking or saw_open_tag): - return "", (rest.strip() or None) - return rest.strip(), None - reasoning, _, content = rest.partition(THINK_CLOSE_TAG) - return content.strip(), (reasoning.strip() or None) - - -def prompt_starts_in_thinking(prompt: str) -> bool: - return bool(re.search(r"\s*$", prompt)) - - -def consume_stream_piece(window: str, mode: str, piece: str): - outputs = [] - holdback = max(len(THINK_OPEN_TAG), len(THINK_CLOSE_TAG)) - window += piece - while True: - if mode == "reasoning": - idx = window.find(THINK_CLOSE_TAG) - if idx != -1: - pre = window[:idx] - if pre: - outputs.append(("reasoning_content", pre)) - window = window[idx + len(THINK_CLOSE_TAG):] - mode = "content" - continue - if len(window) > holdback: - safe = window[:-holdback] - if safe: - outputs.append(("reasoning_content", safe)) - window = window[-holdback:] - break - - idx = window.find(THINK_OPEN_TAG) - if idx != -1: - pre = window[:idx] - if pre: - outputs.append(("content", pre)) - window = window[idx + len(THINK_OPEN_TAG):] - mode = "reasoning" - continue - if len(window) > holdback: - safe = window[:-holdback] - if safe: - outputs.append(("content", safe)) - window = window[-holdback:] - break - - return outputs, window, mode - - -def flush_stream_deltas(window: str, mode: str): - if not window: - return [] - kind = "reasoning_content" if mode == "reasoning" else "content" - return [(kind, window)] - - # FIX 2: _content_to_str helper used for BOTH OpenAI and Anthropic message # content fields (str | list[dict]). Previously OpenAI list[dict] content # was passed raw to the tokenizer and caused a crash. @@ -226,11 +125,9 @@ class ChatRequest(BaseModel): messages: list[ChatMessage] stream: bool = False max_tokens: int = 512 - temperature: float | None = None # 0 = greedy, >0 = sample - seed: int | None = None # rng seed for sampling - top_p: float | None = None # nucleus, applied when temperature > 0 - top_k: int | None = None # top-k, applied when temperature > 0 - frequency_penalty: float | None = None # OAI -> rep_pen = 1 + freq_pen (sampling only) + temperature: float | None = None # accepted, ignored (greedy-only) + top_p: float | None = None # accepted, ignored + top_k: int | None = None # accepted, ignored stop: list[str] | str | None = None # FIX 3: accept stop field (Open WebUI sends it) chat_template_kwargs: dict | None = None @@ -248,32 +145,20 @@ class AnthropicMessagesRequest(BaseModel): stream: bool = False temperature: float | None = None top_p: float | None = None - seed: int | None = None - frequency_penalty: float | None = None stop_sequences: list[str] | None = None chat_template_kwargs: dict | None = None -def _samp_suffix(req) -> str: - # Render ` samp=temp,top_p,top_k,rep_pen[,seed]` tail when the request asks for - # non-greedy decoding. Empty string keeps the daemon protocol greedy-compatible. - t = float(getattr(req, "temperature", 0.0) or 0.0) - if t <= 0.0: - return "" - tp = float(getattr(req, "top_p", 1.0) or 1.0) - tk = int(getattr(req, "top_k", 0) or 0) - rp = float(getattr(req, "frequency_penalty", 0.0) or 0.0) + 1.0 - seed = int(getattr(req, "seed", 0) or 0) - return f" samp={t:.4f},{tp:.4f},{tk},{rp:.4f},{seed}" - - -def build_app(target: Path, draft: Path | None, bin_path: Path, budget: int, max_ctx: int, +def build_app(target: Path, draft: Path, bin_path: Path, budget: int, max_ctx: int, tokenizer: AutoTokenizer, stop_ids: set[int], prefill_cfg: PrefillConfig | None = None, drafter_tokenizer: AutoTokenizer | None = None, prefix_cache_slots: int = 4, prefill_cache_slots: int = 4, - arch: str = "qwen35") -> FastAPI: + target_cache_slots: int = 1, + draft_feature_mirror: bool = False, + target_gpu: int | None = None, + draft_gpu: int | None = None) -> FastAPI: import asyncio app = FastAPI(title="Luce DFlash OpenAI server") @@ -300,25 +185,24 @@ def build_app(target: Path, draft: Path | None, bin_path: Path, budget: int, max bin_abs = str(Path(bin_path).resolve()) dll_dir = str(Path(bin_abs).parent / "bin") env = {**os.environ} + if target_gpu is not None: + env["DFLASH_TARGET_GPU"] = str(target_gpu) + if draft_gpu is not None: + env["DFLASH_DRAFT_GPU"] = str(draft_gpu) if sys.platform == "win32": env["PATH"] = dll_dir + os.pathsep + str(Path(bin_abs).parent) + os.pathsep + env.get("PATH", "") - if arch in _LAGUNA_ARCHES: - # test_dflash detects arch=laguna from the GGUF and dispatches - # internally to dflash27b::run_laguna_daemon(). No --draft, no - # --fast-rollback, no --ddtree (no Laguna spec-decode draft yet). - # Tokens stream as int32 LE on stream_fd terminated by -1, byte- - # identical to the qwen35 path so SSE/stream consumers stay shared. - cmd = [bin_abs, str(target), "--daemon", - f"--max-ctx={max_ctx}", - f"--stream-fd={stream_fd_val}"] - else: - if draft is None: - raise SystemExit("qwen35 arch requires --draft model.safetensors") - cmd = [bin_abs, str(target), str(draft), "--daemon", - "--fast-rollback", "--ddtree", f"--ddtree-budget={budget}", - f"--max-ctx={max_ctx}", - f"--stream-fd={stream_fd_val}"] + cmd = [bin_abs, str(target), str(draft), "--daemon", + "--fast-rollback", "--ddtree", f"--ddtree-budget={budget}", + f"--max-ctx={max_ctx}", + f"--target-cache-slots={target_cache_slots}", + f"--stream-fd={stream_fd_val}"] + if draft_feature_mirror: + cmd.append("--draft-feature-mirror") + if target_gpu is not None: + cmd.append(f"--target-gpu={target_gpu}") + if draft_gpu is not None: + cmd.append(f"--draft-gpu={draft_gpu}") if sys.platform == "win32": daemon_proc = subprocess.Popen(cmd, close_fds=False, env=env, stdin=subprocess.PIPE, @@ -403,13 +287,8 @@ def _render_messages(msgs_list: list[dict], ``template_kwargs`` is passed through to ``apply_chat_template`` so callers can toggle template knobs like ``enable_thinking`` per-request. - - Thinking is disabled by default (enable_thinking=False) because Qwen3.6's - think mode wrecks DFlash acceptance rates. Clients can opt in by sending - ``"chat_template_kwargs": {"enable_thinking": true}`` in the request. """ - tpl_kwargs: dict = {"tokenize": False, "add_generation_prompt": True, - "enable_thinking": False} + tpl_kwargs: dict = {"tokenize": False, "add_generation_prompt": True} tpl_kwargs.update( {k: v for k, v in (template_kwargs or {}).items() if k in _ALLOWED_TEMPLATE_KWARGS} ) @@ -417,18 +296,12 @@ def _render_messages(msgs_list: list[dict], ids = tokenizer.encode(prompt, add_special_tokens=False) return _ids_to_bin(ids), ids, prompt - def _thinking_enabled(kwargs: dict | None) -> bool: - if kwargs: - return kwargs.get("enable_thinking", True) - return True - # FIX 2 applied: always call _content_to_str on message content - def _tokenize_prompt(req: ChatRequest) -> tuple[Path, list[int], list[dict], bool]: + def _tokenize_prompt(req: ChatRequest) -> tuple[Path, list[int], list[dict]]: msgs = [{"role": m.role, "content": _content_to_str(m.content)} for m in req.messages] - path, ids, prompt = _render_messages(msgs, req.chat_template_kwargs) - think = _thinking_enabled(req.chat_template_kwargs) and prompt_starts_in_thinking(prompt) - return path, ids, msgs, think + path, ids, _prompt = _render_messages(msgs, req.chat_template_kwargs) + return path, ids, msgs def _maybe_compress(msgs: list[dict], prompt_bin: Path, prompt_ids: list[int], template_kwargs: dict | None = None @@ -452,7 +325,6 @@ def _maybe_compress(msgs: list[dict], prompt_bin: Path, prompt_ids: list[int], drafter_tokenizer=drafter_tokenizer, cfg=prefill_cfg, prompt_text=long_text, - skip_park=prefill_cfg.skip_park, ) new_msgs = list(msgs) @@ -523,7 +395,7 @@ def _write_cmd(cmd_line: str): daemon_proc.stdin.write(cmd_line.encode("utf-8")) daemon_proc.stdin.flush() - def _build_cmd_line(req, cur_bin, cur_ids, gen_len, prefix_cache, + def _build_cmd_line(cur_bin, cur_ids, gen_len, prefix_cache, prompt_ids, full_snap_prep_ref: list, compression_fired: bool): """ @@ -535,12 +407,11 @@ def _build_cmd_line(req, cur_bin, cur_ids, gen_len, prefix_cache, if compression_fired: full_snap_prep = prefix_cache.prepare_full_snap(prompt_ids) full_snap_prep_ref[0] = full_snap_prep - samp = _samp_suffix(req) if full_snap_prep is not None: fslot, _ = full_snap_prep - return f"{cur_bin} {gen_len} snap={len(cur_ids)}:{fslot}" + samp + "\n", None + return f"{cur_bin} {gen_len} snap={len(cur_ids)}:{fslot}\n", None else: - return f"{cur_bin} {gen_len}" + samp + "\n", None + return f"{cur_bin} {gen_len}\n", None else: full_snap_prep_ref[0] = None hit = prefix_cache.lookup(cur_ids) @@ -552,7 +423,7 @@ def _build_cmd_line(req, cur_bin, cur_ids, gen_len, prefix_cache, cmd_line = f"{cur_bin} {gen_len}" if snap_prep: cmd_line += f" snap={snap_prep[1]}:{snap_prep[0]}" - return cmd_line + _samp_suffix(req) + "\n", snap_prep + return cmd_line + "\n", snap_prep def _gen_len_for(prompt_len: int, max_tokens: int) -> int: return min(max_tokens, max_ctx - prompt_len - 20) @@ -561,7 +432,7 @@ def _gen_len_for(prompt_len: int, max_tokens: int) -> int: @app.post("/v1/chat/completions") async def chat_completions(req: ChatRequest): - prompt_bin, prompt_ids, raw_msgs, started_in_thinking = _tokenize_prompt(req) + prompt_bin, prompt_ids, raw_msgs = _tokenize_prompt(req) completion_id = "chatcmpl-" + uuid.uuid4().hex[:24] created = int(time.time()) @@ -587,7 +458,7 @@ async def sse() -> AsyncIterator[str]: yield f"data: {json.dumps(err)}\n\n" yield "data: [DONE]\n\n" return - cmd_line = f"RESTORE {slot} {cached_cur_bin} {gen_len}" + _samp_suffix(req) + "\n" + cmd_line = f"RESTORE {slot} {cached_cur_bin} {gen_len}\n" else: cur_bin, cur_ids = await asyncio.to_thread( _maybe_compress, raw_msgs, prompt_bin, prompt_ids, @@ -606,7 +477,7 @@ async def sse() -> AsyncIterator[str]: return compression_fired = (cur_bin != prompt_bin) cmd_line, snap_prep = _build_cmd_line( - req, cur_bin, cur_ids, gen_len, prefix_cache, + cur_bin, cur_ids, gen_len, prefix_cache, prompt_ids, full_snap_prep_ref, compression_fired) # FIX 7: guard against dead daemon @@ -625,29 +496,15 @@ async def sse() -> AsyncIterator[str]: "finish_reason": None}], } yield f"data: {json.dumps(head)}\n\n" - window, mode = "", ("reasoning" if started_in_thinking else "content") try: async for tok_id in _astream_tokens(r_pipe, gen_len): - outputs, window, mode = consume_stream_piece( - window, mode, tokenizer.decode([tok_id])) - for kind, text in outputs: - chunk = { - "id": completion_id, - "object": "chat.completion.chunk", - "created": created, "model": MODEL_NAME, - "choices": [{"index": 0, - "delta": {kind: text}, - "finish_reason": None}], - } - yield f"data: {json.dumps(chunk)}\n\n" - for kind, text in flush_stream_deltas(window, mode): chunk = { "id": completion_id, "object": "chat.completion.chunk", "created": created, "model": MODEL_NAME, "choices": [{"index": 0, - "delta": {kind: text}, + "delta": {"content": tokenizer.decode([tok_id])}, "finish_reason": None}], } yield f"data: {json.dumps(chunk)}\n\n" @@ -695,7 +552,7 @@ async def sse() -> AsyncIterator[str]: return JSONResponse( {"detail": f"Prompt length ({prompt_len}) exceeds max_ctx ({max_ctx})"}, status_code=400) - cmd_line = f"RESTORE {slot} {cached_cur_bin} {gen_len}" + _samp_suffix(req) + "\n" + cmd_line = f"RESTORE {slot} {cached_cur_bin} {gen_len}\n" else: cur_bin, cur_ids = await asyncio.to_thread( _maybe_compress, raw_msgs, prompt_bin, prompt_ids, @@ -710,7 +567,7 @@ async def sse() -> AsyncIterator[str]: status_code=400) compression_fired = (cur_bin != prompt_bin) cmd_line, snap_prep = _build_cmd_line( - req, cur_bin, cur_ids, gen_len, prefix_cache, + cur_bin, cur_ids, gen_len, prefix_cache, prompt_ids, full_snap_prep_ref, compression_fired) try: @@ -736,14 +593,6 @@ async def sse() -> AsyncIterator[str]: except Exception: pass text = tokenizer.decode(tokens, skip_special_tokens=True) - cleaned, reasoning = parse_reasoning( - text, - thinking_enabled=_thinking_enabled(req.chat_template_kwargs), - started_in_thinking=started_in_thinking, - ) - msg = {"role": "assistant", "content": cleaned} - if reasoning: - msg["reasoning_content"] = reasoning return JSONResponse({ "id": completion_id, "object": "chat.completion", @@ -751,7 +600,7 @@ async def sse() -> AsyncIterator[str]: "model": MODEL_NAME, "choices": [{ "index": 0, - "message": msg, + "message": {"role": "assistant", "content": text}, "finish_reason": "stop", }], "usage": {"prompt_tokens": prompt_len, @@ -762,20 +611,19 @@ async def sse() -> AsyncIterator[str]: # ── Anthropic Messages API ────────────────────────────────────────────── def _tokenize_anthropic(req: AnthropicMessagesRequest - ) -> tuple[Path, list[int], list[dict], bool]: + ) -> tuple[Path, list[int], list[dict]]: msgs = [] system_text = _content_to_str(req.system) if req.system else None if system_text: msgs.append({"role": "system", "content": system_text}) for m in req.messages: msgs.append({"role": m.role, "content": _content_to_str(m.content)}) - path, ids, prompt = _render_messages(msgs, req.chat_template_kwargs) - think = _thinking_enabled(req.chat_template_kwargs) and prompt_starts_in_thinking(prompt) - return path, ids, msgs, think + path, ids, _prompt = _render_messages(msgs, req.chat_template_kwargs) + return path, ids, msgs @app.post("/v1/messages") async def anthropic_messages(req: AnthropicMessagesRequest): - prompt_bin, prompt_ids, raw_msgs, started_in_thinking = _tokenize_anthropic(req) + prompt_bin, prompt_ids, raw_msgs = _tokenize_anthropic(req) msg_id = "msg_" + uuid.uuid4().hex[:24] if req.stream: @@ -799,7 +647,7 @@ async def sse() -> AsyncIterator[str]: "message": f"Prompt length ({prompt_len}) exceeds max_ctx ({max_ctx})"}} yield f"event: error\ndata: {json.dumps(err)}\n\n" return - cmd_line = f"RESTORE {slot} {cached_cur_bin} {gen_len}" + _samp_suffix(req) + "\n" + cmd_line = f"RESTORE {slot} {cached_cur_bin} {gen_len}\n" else: cur_bin, cur_ids = await asyncio.to_thread( _maybe_compress, raw_msgs, prompt_bin, prompt_ids, @@ -816,7 +664,7 @@ async def sse() -> AsyncIterator[str]: return compression_fired = (cur_bin != prompt_bin) cmd_line, snap_prep = _build_cmd_line( - req, cur_bin, cur_ids, gen_len, prefix_cache, + cur_bin, cur_ids, gen_len, prefix_cache, prompt_ids, full_snap_prep_ref, compression_fired) message_start = { @@ -829,6 +677,7 @@ async def sse() -> AsyncIterator[str]: }, } yield f"event: message_start\ndata: {json.dumps(message_start)}\n\n" + yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': 0, 'content_block': {'type': 'text', 'text': ''}})}\n\n" try: _write_cmd(cmd_line) @@ -837,42 +686,15 @@ async def sse() -> AsyncIterator[str]: return out_tokens = 0 - window, mode = "", ("reasoning" if started_in_thinking else "content") - block_index = 0 - active_kind = "thinking" if mode == "reasoning" else "text" - block = {"type": active_kind} - if active_kind == "thinking": - block["thinking"] = "" - else: - block["text"] = "" - yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': block_index, 'content_block': block})}\n\n" try: async for tok_id in _astream_tokens(r_pipe, gen_len): out_tokens += 1 - outputs, window, mode = consume_stream_piece( - window, mode, tokenizer.decode([tok_id])) - for kind, text in outputs: - target_kind = "thinking" if kind == "reasoning_content" else "text" - if target_kind != active_kind: - yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': block_index})}\n\n" - block_index += 1 - active_kind = target_kind - new_block = {"type": active_kind, active_kind: ""} - yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': block_index, 'content_block': new_block})}\n\n" - delta_type = "thinking_delta" if target_kind == "thinking" else "text_delta" - delta_key = "thinking" if target_kind == "thinking" else "text" - yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': block_index, 'delta': {'type': delta_type, delta_key: text}})}\n\n" - for kind, text in flush_stream_deltas(window, mode): - target_kind = "thinking" if kind == "reasoning_content" else "text" - if target_kind != active_kind: - yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': block_index})}\n\n" - block_index += 1 - active_kind = target_kind - new_block = {"type": active_kind, active_kind: ""} - yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': block_index, 'content_block': new_block})}\n\n" - delta_type = "thinking_delta" if target_kind == "thinking" else "text_delta" - delta_key = "thinking" if target_kind == "thinking" else "text" - yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': block_index, 'delta': {'type': delta_type, delta_key: text}})}\n\n" + delta = { + "type": "content_block_delta", "index": 0, + "delta": {"type": "text_delta", + "text": tokenizer.decode([tok_id])}, + } + yield f"event: content_block_delta\ndata: {json.dumps(delta)}\n\n" finally: if full_hit is None: try: cur_bin.unlink() @@ -888,7 +710,7 @@ async def sse() -> AsyncIterator[str]: elif snap_prep: prefix_cache.confirm_inline_snap(*snap_prep, cur_ids) - yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': block_index})}\n\n" + yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': 0})}\n\n" msg_delta = { "type": "message_delta", "delta": {"stop_reason": "end_turn", "stop_sequence": None}, @@ -918,7 +740,7 @@ async def sse() -> AsyncIterator[str]: {"type": "error", "error": {"type": "invalid_request_error", "message": f"Prompt length ({prompt_len}) exceeds max_ctx ({max_ctx})"}}, status_code=400) - cmd_line = f"RESTORE {slot} {cached_cur_bin} {gen_len}" + _samp_suffix(req) + "\n" + cmd_line = f"RESTORE {slot} {cached_cur_bin} {gen_len}\n" else: cur_bin, cur_ids = await asyncio.to_thread( _maybe_compress, raw_msgs, prompt_bin, prompt_ids, @@ -934,7 +756,7 @@ async def sse() -> AsyncIterator[str]: status_code=400) compression_fired = (cur_bin != prompt_bin) cmd_line, snap_prep = _build_cmd_line( - req, cur_bin, cur_ids, gen_len, prefix_cache, + cur_bin, cur_ids, gen_len, prefix_cache, prompt_ids, full_snap_prep_ref, compression_fired) try: @@ -961,20 +783,12 @@ async def sse() -> AsyncIterator[str]: except Exception: pass text = tokenizer.decode(tokens, skip_special_tokens=True) - cleaned, reasoning = parse_reasoning( - text, - thinking_enabled=_thinking_enabled(req.chat_template_kwargs), - started_in_thinking=started_in_thinking, - ) - content = [{"type": "text", "text": cleaned}] - if reasoning: - content.insert(0, {"type": "thinking", "thinking": reasoning}) return JSONResponse({ "id": msg_id, "type": "message", "role": "assistant", "model": req.model or MODEL_NAME, - "content": content, + "content": [{"type": "text", "text": text}], "stop_reason": "end_turn", "stop_sequence": None, "usage": {"input_tokens": prompt_len, @@ -1008,10 +822,29 @@ def main(): ap.add_argument("--tokenizer", type=str, default=None) ap.add_argument("--prefix-cache-slots", type=int, default=4) ap.add_argument("--prefill-cache-slots", type=int, default=4) + ap.add_argument("--target-cache-slots", "--cache-slots", dest="target_cache_slots", + type=int, default=1, + help="Native TargetCache slots per daemon process") + ap.add_argument("--draft-feature-mirror", action="store_true", + help="Mirror target features for split target/draft GPU experiments") + ap.add_argument("--gpu", type=int, default=None, + help="CUDA device ordinal to use for both target and draft") + ap.add_argument("--target-gpu", type=int, default=None, + help="CUDA device ordinal for target model/cache") + ap.add_argument("--draft-gpu", type=int, default=None, + help="CUDA device ordinal for draft model/cache") ap.add_argument("--daemon", action="store_true") add_cli_flags(ap) args = ap.parse_args() prefill_cfg = config_from_args(args) + target_gpu = _resolve_gpu_arg( + args.target_gpu if args.target_gpu is not None else args.gpu, + "DFLASH_TARGET_GPU", + ) + draft_gpu = _resolve_gpu_arg( + args.draft_gpu if args.draft_gpu is not None else args.gpu, + "DFLASH_DRAFT_GPU", + ) if args.cache_type_k: os.environ["DFLASH27B_KV_K"] = args.cache_type_k @@ -1028,33 +861,14 @@ def main(): os.environ.setdefault("DFLASH27B_FA_WINDOW", "0") os.environ.setdefault("DFLASH_FP_USE_BSA", "1") os.environ.setdefault("DFLASH_FP_ALPHA", "0.85") - if prefill_cfg.skip_park: - os.environ["DFLASH_COMPRESS_NO_PARK"] = "1" + if not args.bin.is_file(): + raise SystemExit(f"binary not found at {args.bin}") if not args.target.is_file(): raise SystemExit(f"target GGUF not found at {args.target}") - - # Architecture detection. test_dflash itself dispatches by GGUF arch at - # main() entry, so server.py just needs to know enough to omit --draft + - # DFlash/DDTree flags on archs that lack a spec-decode draft. Same - # binary serves every arch. - arch = _arch_from_gguf(args.target) - - if not args.bin.is_file(): - raise SystemExit(f"binary not found at {args.bin} (arch={arch})") - - if arch in _LAGUNA_ARCHES: - # No DFlash draft model exists for laguna yet; test_dflash'́s - # internal arch dispatch reads general.architecture, accepts the - # no-draft argv layout, and routes to run_laguna_daemon(). PFlash - # compression and prefix-cache SNAPSHOT/RESTORE are both wired - # through the laguna daemon now, so --prefill-compression and - # --prefix-cache-slots behave the same as on the qwen35 path. - draft = None - else: - draft = resolve_draft(args.draft) if args.draft.is_dir() else args.draft - if not draft.is_file(): - raise SystemExit(f"draft safetensors not found at {args.draft}") + draft = resolve_draft(args.draft) if args.draft.is_dir() else args.draft + if not draft.is_file(): + raise SystemExit(f"draft safetensors not found at {args.draft}") tokenizer_id = args.tokenizer or _tokenizer_id_from_gguf(args.target) tokenizer = AutoTokenizer.from_pretrained(tokenizer_id, trust_remote_code=True) @@ -1074,16 +888,22 @@ def main(): drafter_tokenizer=drafter_tokenizer, prefix_cache_slots=args.prefix_cache_slots, prefill_cache_slots=args.prefill_cache_slots, - arch=arch) + target_cache_slots=args.target_cache_slots, + draft_feature_mirror=args.draft_feature_mirror, + target_gpu=target_gpu, + draft_gpu=draft_gpu) import uvicorn print(f"Luce DFlash OpenAI server on http://{args.host}:{args.port}") - print(f" arch = {arch}") print(f" target = {args.target}") print(f" draft = {draft}") print(f" bin = {args.bin}") print(f" budget = {args.budget}") print(f" max_ctx = {args.max_ctx}") + print(f" cache_slots= {args.target_cache_slots}") + print(f" draft_feature_mirror= {args.draft_feature_mirror}") + print(f" target_gpu= {target_gpu if target_gpu is not None else 'default'}") + print(f" draft_gpu = {draft_gpu if draft_gpu is not None else 'default'}") print(f" tokenizer = {tokenizer_id}") if prefill_cfg.enabled: print(f" pflash = {prefill_cfg.mode} · threshold={prefill_cfg.threshold} " diff --git a/dflash/scripts/server_tools.py b/dflash/scripts/server_tools.py index ba21784e3..1df5a4d6d 100644 --- a/dflash/scripts/server_tools.py +++ b/dflash/scripts/server_tools.py @@ -30,6 +30,7 @@ import re import struct import subprocess +import sys import tempfile import time import uuid @@ -56,11 +57,23 @@ str(ROOT / "models" / "Qwen3.6-27B-Q4_K_M.gguf"), )) DEFAULT_DRAFT_ROOT = ROOT / "models" / "draft" -DEFAULT_BIN = ROOT / "build" / "test_dflash" +DEFAULT_BIN = ROOT / "build" / "Release" / ("test_dflash" + (".exe" if sys.platform == "win32" else "")) DEFAULT_BUDGET = 22 MODEL_NAME = "luce-dflash" +def _resolve_gpu_arg(value: int | None, env_name: str) -> int | None: + if value is not None: + return value + raw = os.environ.get(env_name) + if raw is None or not raw.strip(): + return None + try: + return int(raw) + except ValueError as exc: + raise ValueError(f"{env_name} must be an integer, got {raw!r}") from exc + + def resolve_draft(root: Path) -> Path: for st in root.rglob("model.safetensors"): return st @@ -320,18 +333,52 @@ def build_app(target: Path, draft: Path, bin_path: Path, budget: int, prefill_cfg: PrefillConfig | None = None, drafter_tokenizer: AutoTokenizer | None = None, prefix_cache_slots: int = 4, - prefill_cache_slots: int = 4) -> FastAPI: + prefill_cache_slots: int = 4, + target_cache_slots: int = 1, + draft_feature_mirror: bool = False, + target_gpu: int | None = None, + draft_gpu: int | None = None) -> FastAPI: import asyncio app = FastAPI(title="Luce DFlash OpenAI server (tool-aware)") daemon_lock = asyncio.Lock() r_pipe, w_pipe = os.pipe() - cmd = [str(bin_path), str(target), str(draft), "--daemon", + if sys.platform == "win32": + import msvcrt + os.set_inheritable(w_pipe, True) + stream_fd_val = int(msvcrt.get_osfhandle(w_pipe)) + else: + stream_fd_val = w_pipe + + bin_abs = str(Path(bin_path).resolve()) + env = {**os.environ} + if target_gpu is not None: + env["DFLASH_TARGET_GPU"] = str(target_gpu) + if draft_gpu is not None: + env["DFLASH_DRAFT_GPU"] = str(draft_gpu) + if sys.platform == "win32": + bin_dir = str(Path(bin_abs).parent) + env["PATH"] = bin_dir + os.pathsep + env.get("PATH", "") + + cmd = [bin_abs, str(target), str(draft), "--daemon", "--fast-rollback", "--ddtree", f"--ddtree-budget={budget}", f"--max-ctx={max_ctx}", - f"--stream-fd={w_pipe}"] - daemon_proc = subprocess.Popen(cmd, pass_fds=(w_pipe,), stdin=subprocess.PIPE, - stdout=subprocess.PIPE, bufsize=0) + f"--target-cache-slots={target_cache_slots}", + f"--stream-fd={stream_fd_val}"] + if draft_feature_mirror: + cmd.append("--draft-feature-mirror") + if target_gpu is not None: + cmd.append(f"--target-gpu={target_gpu}") + if draft_gpu is not None: + cmd.append(f"--draft-gpu={draft_gpu}") + if sys.platform == "win32": + daemon_proc = subprocess.Popen(cmd, close_fds=False, env=env, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, bufsize=0) + else: + daemon_proc = subprocess.Popen(cmd, pass_fds=(w_pipe,), env=env, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, bufsize=0) os.close(w_pipe) bus = DaemonStdoutBus(daemon_proc.stdout) @@ -399,7 +446,6 @@ def _maybe_compress_tool_chat(req: "ChatRequest", prompt_bin: Path, drafter_tokenizer=drafter_tokenizer, cfg=prefill_cfg, prompt_text=last_user.content, - skip_park=prefill_cfg.skip_park, ) new_msgs = [] @@ -923,7 +969,6 @@ def _maybe_compress_anthropic(prompt_bin: Path, prompt_len: int, drafter_tokenizer=drafter_tokenizer, cfg=prefill_cfg, prompt_text=long_text, - skip_park=prefill_cfg.skip_park, ) new_msgs = list(msgs) new_msgs[last_user_idx] = {"role": "user", "content": compressed_text} @@ -1193,8 +1238,8 @@ def main(): "attention. Default 2048 (set in C++); only kicks in " "once kv_cache > window. Trades attention range for " "long-context decode speed.") - ap.add_argument("--tokenizer", default="Qwen/Qwen3.5-27B", - help="HF tokenizer id; Qwen3.6 shares this tokenizer.") + ap.add_argument("--tokenizer", default="Qwen/Qwen3.6-27B", + help="HF tokenizer id for the target model.") add_cli_flags(ap) ap.add_argument("--prefix-cache-slots", type=int, default=4, help="Number of prefix-cache snapshot slots (0 to disable)") @@ -1202,8 +1247,27 @@ def main(): help="Number of full-compress-result cache slots (Option 3). " "Only active when --prefill-compression is enabled. " "prefix-cache-slots + prefill-cache-slots must not exceed 8.") + ap.add_argument("--target-cache-slots", "--cache-slots", dest="target_cache_slots", + type=int, default=1, + help="Native TargetCache slots per daemon process") + ap.add_argument("--draft-feature-mirror", action="store_true", + help="Mirror target features for split target/draft GPU experiments") + ap.add_argument("--gpu", type=int, default=None, + help="CUDA device ordinal to use for both target and draft") + ap.add_argument("--target-gpu", type=int, default=None, + help="CUDA device ordinal for target model/cache") + ap.add_argument("--draft-gpu", type=int, default=None, + help="CUDA device ordinal for draft model/cache") args = ap.parse_args() prefill_cfg = config_from_args(args) + target_gpu = _resolve_gpu_arg( + args.target_gpu if args.target_gpu is not None else args.gpu, + "DFLASH_TARGET_GPU", + ) + draft_gpu = _resolve_gpu_arg( + args.draft_gpu if args.draft_gpu is not None else args.gpu, + "DFLASH_DRAFT_GPU", + ) # Auto-enable TQ3_0 KV cache when the requested context exceeds what F16 fits. # setdefault so an explicit user DFLASH27B_KV_TQ3=0 still wins. @@ -1253,7 +1317,11 @@ def main(): prefill_cfg=prefill_cfg if prefill_cfg.enabled else None, drafter_tokenizer=drafter_tokenizer, prefix_cache_slots=args.prefix_cache_slots, - prefill_cache_slots=args.prefill_cache_slots) + prefill_cache_slots=args.prefill_cache_slots, + target_cache_slots=args.target_cache_slots, + draft_feature_mirror=args.draft_feature_mirror, + target_gpu=target_gpu, + draft_gpu=draft_gpu) import uvicorn print(f"Luce DFlash OpenAI server (tool-aware) on http://{args.host}:{args.port}") @@ -1262,6 +1330,10 @@ def main(): print(f" bin = {args.bin}") print(f" budget = {args.budget}") print(f" max_ctx= {args.max_ctx}") + print(f" cache_slots = {args.target_cache_slots}") + print(f" draft_feature_mirror = {args.draft_feature_mirror}") + print(f" target_gpu = {target_gpu if target_gpu is not None else 'default'}") + print(f" draft_gpu = {draft_gpu if draft_gpu is not None else 'default'}") print(f" tokenizer = {args.tokenizer}") if prefill_cfg.enabled: print(f" pflash = {prefill_cfg.mode} · threshold={prefill_cfg.threshold} " diff --git a/dflash/scripts/tokenize_prompt.py b/dflash/scripts/tokenize_prompt.py index c2721838c..995b25274 100644 --- a/dflash/scripts/tokenize_prompt.py +++ b/dflash/scripts/tokenize_prompt.py @@ -1,5 +1,5 @@ """ -Tokenize a prompt string using the Qwen3.5 HF tokenizer (via transformers) +Tokenize a prompt string using the Qwen3.6 HF tokenizer (via transformers) and emit the token IDs as a flat int32 binary file. We depend on Python only for the tokenizer — the C++ library consumes the @@ -19,7 +19,7 @@ def main(): ap = argparse.ArgumentParser() ap.add_argument("--out", required=True) ap.add_argument("--prompt", required=True) - ap.add_argument("--model", default="Qwen/Qwen3.5-27B", + ap.add_argument("--model", default="Qwen/Qwen3.6-27B", help="HF repo id whose tokenizer to use") ap.add_argument("--add-bos", action="store_true", help="Prepend BOS token") args = ap.parse_args()