From aee294282f304b57b86eae74fecc4379a0f9c19c Mon Sep 17 00:00:00 2001 From: Erik LaBianca Date: Mon, 27 Apr 2026 14:30:46 -0400 Subject: [PATCH 1/2] feat(server): derive model name from GGUF filename; default port 1236, ctx 128K /v1/models now returns the GGUF stem (e.g. Qwen3.5-27B-Q4_K_M) instead of the hardcoded "luce-dflash". Default port changed to 1236, max_ctx to 131072 (validated on RTX 5090 with TQ3_0 KV cache), and startup updated to use uv. Co-Authored-By: Claude Sonnet 4.6 --- dflash/scripts/server.py | 32 +++++++++++++------------------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/dflash/scripts/server.py b/dflash/scripts/server.py index 8d1875450..623994c6d 100644 --- a/dflash/scripts/server.py +++ b/dflash/scripts/server.py @@ -1,8 +1,7 @@ """ OpenAI-compatible HTTP server on top of test_dflash. - pip install fastapi uvicorn transformers - python3 scripts/server.py # serves on :8000 + uv run scripts/server.py # serves on :1236 curl http://localhost:8000/v1/chat/completions \\ -H 'Content-Type: application/json' \\ @@ -12,8 +11,7 @@ OPENAI_API_BASE=http://localhost:8000/v1 OPENAI_API_KEY=sk-any Streams tokens as Server-Sent Events using the OpenAI delta format. -Model reloads per request (~10 s first-token latency). A daemon-mode -binary that keeps the model resident is a planned follow-up. +Model stays resident in daemon mode (default); context default is 128K. """ import argparse import json @@ -39,7 +37,6 @@ DEFAULT_DRAFT_ROOT = ROOT / "models" / "draft" DEFAULT_BIN = ROOT / "build" / ("test_dflash" + (".exe" if sys.platform == "win32" else "")) DEFAULT_BUDGET = 22 -MODEL_NAME = "luce-dflash" def resolve_draft(root: Path) -> Path: @@ -95,7 +92,7 @@ class ChatMessage(BaseModel): class ChatRequest(BaseModel): - model: str = MODEL_NAME + model: str = "" messages: list[ChatMessage] stream: bool = False max_tokens: int = 512 @@ -110,7 +107,7 @@ class AnthropicMessage(BaseModel): class AnthropicMessagesRequest(BaseModel): - model: str = MODEL_NAME + model: str = "" max_tokens: int messages: list[AnthropicMessage] system: str | list[dict] | None = None @@ -121,9 +118,11 @@ class AnthropicMessagesRequest(BaseModel): def build_app(target: Path, draft: Path, bin_path: Path, budget: int, max_ctx: int, - tokenizer: AutoTokenizer, stop_ids: set[int]) -> FastAPI: + tokenizer: AutoTokenizer, stop_ids: set[int], + model_name: str = "") -> FastAPI: import asyncio app = FastAPI(title="Luce DFlash OpenAI server") + MODEL_NAME = model_name daemon_lock = asyncio.Lock() r_pipe, w_pipe = os.pipe() @@ -429,21 +428,14 @@ async def sse() -> AsyncIterator[str]: def main(): ap = argparse.ArgumentParser() ap.add_argument("--host", default="0.0.0.0") - ap.add_argument("--port", type=int, default=8080) + ap.add_argument("--port", type=int, default=1236) ap.add_argument("--target", type=Path, default=DEFAULT_TARGET) ap.add_argument("--draft", type=Path, default=DEFAULT_DRAFT_ROOT) ap.add_argument("--bin", type=Path, default=DEFAULT_BIN) ap.add_argument("--budget", type=int, default=DEFAULT_BUDGET) - # Attention compute currently scales with --max-ctx, not the actual - # prompt+gen length (see https://github.com/Luce-Org/lucebox-hub/issues/10). - # Default 16384 fits most API workloads without the 20×+ slowdown users - # hit with --max-ctx=131072 on short requests. Bump via --max-ctx if you - # actually need long-context serving. - default_ctx = 16384 + default_ctx = 131072 ap.add_argument("--max-ctx", type=int, default=default_ctx, - help=f"Maximum context length (default: {default_ctx}; " - "oversizing this — e.g. 131072 on short prompts — " - "can slow attention 20×+ until issue #10 is fixed)") + help=f"Maximum context length (default: {default_ctx})") ap.add_argument("--kv-f16", action="store_true", help="Force F16 KV cache. When --max-ctx > 6144 the server " "auto-enables TQ3_0 KV to fit; pass --kv-f16 to opt out.") @@ -483,8 +475,9 @@ def main(): ids = tokenizer.encode(s, add_special_tokens=False) if ids: stop_ids.add(ids[0]) + model_name = args.target.stem app = build_app(args.target, draft, args.bin, args.budget, args.max_ctx, - tokenizer, stop_ids) + tokenizer, stop_ids, model_name=model_name) import uvicorn print(f"Luce DFlash OpenAI server on http://{args.host}:{args.port}") @@ -494,6 +487,7 @@ def main(): print(f" budget = {args.budget}") print(f" max_ctx = {args.max_ctx}") print(f" tokenizer = {tokenizer_id}") + print(f" model = {model_name}") uvicorn.run(app, host=args.host, port=args.port, log_level="info") From a8f9d3801fbce615e775bab7dcce53174cffdffe Mon Sep 17 00:00:00 2001 From: Erik LaBianca Date: Mon, 27 Apr 2026 15:09:16 -0400 Subject: [PATCH 2/2] refactor(server): proper Python package with tool-calling, tests - src/dflash/server/ replaces scripts/server.py + scripts/server_tools.py - server/parsing.py: Qwen3.x tool-call + reasoning parsers (pure fns, no deps) - server/schemas.py: pydantic models for OpenAI + Anthropic APIs - Entry point: dflash-server = "dflash.server:main" (uv run dflash-server) - tests/: 33 tests covering parsing and all HTTP endpoints - Consolidates tool calling, streaming state machine, GGUF model name, port 1236, 128K ctx defaults from server_tools.py into single package Co-Authored-By: Claude Sonnet 4.6 --- dflash/pyproject.toml | 25 + dflash/scripts/server.py | 495 ---------------- dflash/scripts/server_tools.py | 856 --------------------------- dflash/scripts/test_server.py | 105 ---- dflash/src/dflash/__init__.py | 0 dflash/src/dflash/server/__init__.py | 540 +++++++++++++++++ dflash/src/dflash/server/parsing.py | 158 +++++ dflash/src/dflash/server/schemas.py | 56 ++ dflash/tests/test_parsing.py | 152 +++++ dflash/tests/test_server.py | 196 ++++++ 10 files changed, 1127 insertions(+), 1456 deletions(-) create mode 100644 dflash/pyproject.toml delete mode 100644 dflash/scripts/server.py delete mode 100644 dflash/scripts/server_tools.py delete mode 100644 dflash/scripts/test_server.py create mode 100644 dflash/src/dflash/__init__.py create mode 100644 dflash/src/dflash/server/__init__.py create mode 100644 dflash/src/dflash/server/parsing.py create mode 100644 dflash/src/dflash/server/schemas.py create mode 100644 dflash/tests/test_parsing.py create mode 100644 dflash/tests/test_server.py diff --git a/dflash/pyproject.toml b/dflash/pyproject.toml new file mode 100644 index 000000000..4f3a98898 --- /dev/null +++ b/dflash/pyproject.toml @@ -0,0 +1,25 @@ +[project] +name = "dflash" +version = "0.1.0" +description = "Python scripts for dflash inference, benchmarking, and serving" +requires-python = ">=3.10" +dependencies = [ + "transformers", + "numpy", + "gguf", + "fastapi", + "uvicorn[standard]", + "jinja2", + "pytest", + "httpx", + "datasets", +] + +[project.optional-dependencies] +oracle = ["torch"] + +[project.scripts] +dflash-server = "dflash.server:main" + +[tool.setuptools.packages.find] +where = ["src"] diff --git a/dflash/scripts/server.py b/dflash/scripts/server.py deleted file mode 100644 index 623994c6d..000000000 --- a/dflash/scripts/server.py +++ /dev/null @@ -1,495 +0,0 @@ -""" -OpenAI-compatible HTTP server on top of test_dflash. - - uv run scripts/server.py # serves on :1236 - - curl http://localhost:8000/v1/chat/completions \\ - -H 'Content-Type: application/json' \\ - -d '{"model":"luce-dflash","messages":[{"role":"user","content":"hi"}],"stream":true}' - -Drop-in for Open WebUI / LM Studio / Cline by setting - OPENAI_API_BASE=http://localhost:8000/v1 OPENAI_API_KEY=sk-any - -Streams tokens as Server-Sent Events using the OpenAI delta format. -Model stays resident in daemon mode (default); context default is 128K. -""" -import argparse -import json -import os -import struct -import subprocess -import sys -import tempfile -import time -import uuid -from pathlib import Path -from typing import AsyncIterator - -from fastapi import FastAPI, Request -from fastapi.responses import JSONResponse, StreamingResponse -from pydantic import BaseModel -from starlette.concurrency import iterate_in_threadpool -from transformers import AutoTokenizer - - -ROOT = Path(__file__).resolve().parent.parent -DEFAULT_TARGET = ROOT / "models" / "Qwen3.5-27B-Q4_K_M.gguf" -DEFAULT_DRAFT_ROOT = ROOT / "models" / "draft" -DEFAULT_BIN = ROOT / "build" / ("test_dflash" + (".exe" if sys.platform == "win32" else "")) -DEFAULT_BUDGET = 22 - - -def resolve_draft(root: Path) -> Path: - for st in root.rglob("model.safetensors"): - return st - raise FileNotFoundError(f"no model.safetensors under {root}") - - -# Models known to share the qwen35 GGUF arch + vocab. Verified via -# tokenizer.ggml.pre == "qwen35" and identical eos/pad/bos token IDs. -_QWEN35_FAMILY_TOKENIZERS = { - "Qwen3.5-27B": "Qwen/Qwen3.5-27B", - "Qwen3.6-27B": "Qwen/Qwen3.6-27B", -} - - -def _tokenizer_id_from_gguf(gguf_path: Path) -> str: - """Infer the HuggingFace tokenizer repo from a GGUF target file. - - The GGUF file encodes its own tokenizer so in principle we could use that - directly, but `test_dflash` drives generation through the HF tokenizer for - chat-template application. We match on `general.basename` / `general.name` - metadata; if anything goes wrong we fall back to the historical default - (Qwen/Qwen3.5-27B) so existing setups don't break. - """ - default = "Qwen/Qwen3.5-27B" - try: - from gguf import GGUFReader # type: ignore - r = GGUFReader(str(gguf_path)) - for key in ("general.basename", "general.name"): - f = r.fields.get(key) - if f is None or not f.data: - continue - import numpy as np - p = f.parts[f.data[0]] - if not isinstance(p, np.ndarray): - continue - try: - val = bytes(p).decode("utf-8", errors="replace") - except Exception: - continue - for known, repo in _QWEN35_FAMILY_TOKENIZERS.items(): - if known.lower() in val.lower(): - return repo - except Exception: - pass - return default - - -class ChatMessage(BaseModel): - role: str - content: str | list[dict] - - -class ChatRequest(BaseModel): - model: str = "" - messages: list[ChatMessage] - stream: bool = False - max_tokens: int = 512 - temperature: float | None = None # noted + ignored (greedy-only) - top_p: float | None = None - - -class AnthropicMessage(BaseModel): - role: str - # Anthropic allows either a plain string or a list of content blocks. - content: str | list[dict] - - -class AnthropicMessagesRequest(BaseModel): - model: str = "" - max_tokens: int - messages: list[AnthropicMessage] - system: str | list[dict] | None = None - stream: bool = False - temperature: float | None = None - top_p: float | None = None - stop_sequences: list[str] | None = None - - -def build_app(target: Path, draft: Path, bin_path: Path, budget: int, max_ctx: int, - tokenizer: AutoTokenizer, stop_ids: set[int], - model_name: str = "") -> FastAPI: - import asyncio - app = FastAPI(title="Luce DFlash OpenAI server") - MODEL_NAME = model_name - daemon_lock = asyncio.Lock() - - r_pipe, w_pipe = os.pipe() - if sys.platform == "win32": - import msvcrt - os.set_inheritable(w_pipe, True) - stream_fd_val = int(msvcrt.get_osfhandle(w_pipe)) - else: - stream_fd_val = w_pipe - - bin_abs = str(Path(bin_path).resolve()) - dll_dir = str(Path(bin_abs).parent / "bin") - env = {**os.environ} - if sys.platform == "win32": - env["PATH"] = dll_dir + os.pathsep + str(Path(bin_abs).parent) + os.pathsep + env.get("PATH", "") - - cmd = [bin_abs, str(target), str(draft), "--daemon", - "--fast-rollback", "--ddtree", f"--ddtree-budget={budget}", - f"--max-ctx={max_ctx}", - f"--stream-fd={stream_fd_val}"] - if sys.platform == "win32": - daemon_proc = subprocess.Popen(cmd, close_fds=False, env=env, - stdin=subprocess.PIPE) - else: - daemon_proc = subprocess.Popen(cmd, pass_fds=(w_pipe,), env=env, - stdin=subprocess.PIPE) - os.close(w_pipe) - - @app.get("/v1/models") - def list_models(): - return { - "object": "list", - "data": [{"id": MODEL_NAME, "object": "model", "owned_by": "luce"}], - } - - def _tokenize_prompt(req: ChatRequest) -> Path: - msgs = [{"role": m.role, "content": _anthropic_text_from_content(m.content) - if isinstance(m.content, list) else m.content} - for m in req.messages] - prompt = tokenizer.apply_chat_template( - msgs, tokenize=False, add_generation_prompt=True) - ids = tokenizer.encode(prompt, add_special_tokens=False) - # mkstemp returns (fd, path). The previous code kept only the - # path and discarded fd, leaking 1 file descriptor per request. - # os.fdopen() takes ownership of the fd and closes it on __exit__. - fd, path = tempfile.mkstemp(suffix=".bin") - tmp = Path(path) - with os.fdopen(fd, "wb") as f: - for t in ids: - f.write(struct.pack("= n_gen: - hit_stop = True - - @app.post("/v1/chat/completions") - async def chat_completions(req: ChatRequest): - prompt_bin = _tokenize_prompt(req) - - # Clamp max_tokens to available headroom - prompt_len = prompt_bin.stat().st_size // 4 - # Safety buffer for the dflash block_size (16) - available_gen = max_ctx - prompt_len - 20 - gen_len = min(req.max_tokens, available_gen) - if gen_len <= 0: - return JSONResponse({"detail": f"Prompt length ({prompt_len}) exceeds max_ctx ({max_ctx})"}, status_code=400) - - completion_id = "chatcmpl-" + uuid.uuid4().hex[:24] - created = int(time.time()) - - if req.stream: - async def sse() -> AsyncIterator[str]: - async with daemon_lock: - cmd_line = f"{prompt_bin} {gen_len}\n" - daemon_proc.stdin.write(cmd_line.encode("utf-8")) - daemon_proc.stdin.flush() - head = { - "id": completion_id, "object": "chat.completion.chunk", - "created": created, "model": MODEL_NAME, - "choices": [{"index": 0, - "delta": {"role": "assistant"}, - "finish_reason": None}], - } - yield f"data: {json.dumps(head)}\n\n" - try: - # Offload blocking os.read in _token_stream to a thread so - # SSE chunks flush progressively instead of after generation ends. - async for tok_id in iterate_in_threadpool(_token_stream(r_pipe, gen_len)): - chunk = { - "id": completion_id, - "object": "chat.completion.chunk", - "created": created, "model": MODEL_NAME, - "choices": [{"index": 0, - "delta": {"content": tokenizer.decode([tok_id])}, - "finish_reason": None}], - } - yield f"data: {json.dumps(chunk)}\n\n" - finally: - try: prompt_bin.unlink() - except Exception: pass - tail = { - "id": completion_id, "object": "chat.completion.chunk", - "created": created, "model": MODEL_NAME, - "choices": [{"index": 0, "delta": {}, - "finish_reason": "stop"}], - } - yield f"data: {json.dumps(tail)}\n\n" - yield "data: [DONE]\n\n" - - return StreamingResponse(sse(), media_type="text/event-stream") - - # Non-streaming: collect all tokens, return one response - async with daemon_lock: - cmd_line = f"{prompt_bin} {gen_len}\n" - daemon_proc.stdin.write(cmd_line.encode("utf-8")) - daemon_proc.stdin.flush() - tokens = list(_token_stream(r_pipe, gen_len)) - - try: prompt_bin.unlink() - except Exception: pass - text = tokenizer.decode(tokens, skip_special_tokens=True) - return JSONResponse({ - "id": completion_id, - "object": "chat.completion", - "created": created, - "model": MODEL_NAME, - "choices": [{ - "index": 0, - "message": {"role": "assistant", "content": text}, - "finish_reason": "stop", - }], - "usage": {"prompt_tokens": 0, # not tracked yet - "completion_tokens": len(tokens), - "total_tokens": len(tokens)}, - }) - - # ── Anthropic Messages API ────────────────────────────────────── - # Mirrors the OpenAI endpoint but formatted for the Anthropic SDK. - # `?beta=true` (or any other query params) are accepted and ignored. - - def _anthropic_text_from_content(content) -> str: - if isinstance(content, str): - return content - # list of blocks — concatenate the text blocks, ignore images/tools - parts = [] - for b in content: - if isinstance(b, dict) and b.get("type") == "text": - parts.append(b.get("text", "")) - return "".join(parts) - - def _tokenize_anthropic(req: AnthropicMessagesRequest) -> tuple[Path, int]: - msgs = [] - system_text = _anthropic_text_from_content(req.system) if req.system else None - if system_text: - msgs.append({"role": "system", "content": system_text}) - for m in req.messages: - msgs.append({"role": m.role, - "content": _anthropic_text_from_content(m.content)}) - prompt = tokenizer.apply_chat_template( - msgs, tokenize=False, add_generation_prompt=True) - ids = tokenizer.encode(prompt, add_special_tokens=False) - # mkstemp returns (fd, path); discarding fd leaks 1 per request (#15). - fd, path = tempfile.mkstemp(suffix=".bin") - tmp = Path(path) - with os.fdopen(fd, "wb") as f: - for t in ids: - f.write(struct.pack("= n_gen: - hit_stop = True - - @app.post("/v1/messages") - async def anthropic_messages(req: AnthropicMessagesRequest): - prompt_bin, prompt_len = _tokenize_anthropic(req) - - available_gen = max_ctx - prompt_len - 20 - gen_len = min(req.max_tokens, available_gen) - if gen_len <= 0: - try: prompt_bin.unlink() - except Exception: pass - return JSONResponse( - {"type": "error", - "error": {"type": "invalid_request_error", - "message": f"Prompt length ({prompt_len}) exceeds max_ctx ({max_ctx})"}}, - status_code=400) - - msg_id = "msg_" + uuid.uuid4().hex[:24] - - if req.stream: - async def sse() -> AsyncIterator[str]: - # Hold the lock across the ENTIRE read cycle so concurrent - # requests don't interleave tokens through the shared pipe. - async with daemon_lock: - message_start = { - "type": "message_start", - "message": { - "id": msg_id, "type": "message", "role": "assistant", - "model": req.model or MODEL_NAME, - "content": [], "stop_reason": None, "stop_sequence": None, - "usage": {"input_tokens": prompt_len, "output_tokens": 0}, - }, - } - yield f"event: message_start\ndata: {json.dumps(message_start)}\n\n" - - cb_start = { - "type": "content_block_start", "index": 0, - "content_block": {"type": "text", "text": ""}, - } - yield f"event: content_block_start\ndata: {json.dumps(cb_start)}\n\n" - - cmd_line = f"{prompt_bin} {gen_len}\n" - daemon_proc.stdin.write(cmd_line.encode("utf-8")) - daemon_proc.stdin.flush() - - out_tokens = 0 - try: - async for tok_id in _astream_tokens(r_pipe, gen_len): - out_tokens += 1 - delta = { - "type": "content_block_delta", "index": 0, - "delta": {"type": "text_delta", - "text": tokenizer.decode([tok_id])}, - } - yield f"event: content_block_delta\ndata: {json.dumps(delta)}\n\n" - finally: - try: prompt_bin.unlink() - except Exception: pass - - yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': 0})}\n\n" - - msg_delta = { - "type": "message_delta", - "delta": {"stop_reason": "end_turn", "stop_sequence": None}, - "usage": {"output_tokens": out_tokens}, - } - yield f"event: message_delta\ndata: {json.dumps(msg_delta)}\n\n" - yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n" - - return StreamingResponse(sse(), media_type="text/event-stream") - - # Non-streaming - async with daemon_lock: - cmd_line = f"{prompt_bin} {gen_len}\n" - daemon_proc.stdin.write(cmd_line.encode("utf-8")) - daemon_proc.stdin.flush() - tokens = [t async for t in _astream_tokens(r_pipe, gen_len)] - - try: prompt_bin.unlink() - except Exception: pass - text = tokenizer.decode(tokens, skip_special_tokens=True) - return JSONResponse({ - "id": msg_id, - "type": "message", - "role": "assistant", - "model": req.model or MODEL_NAME, - "content": [{"type": "text", "text": text}], - "stop_reason": "end_turn", - "stop_sequence": None, - "usage": {"input_tokens": prompt_len, - "output_tokens": len(tokens)}, - }) - - return app - - -def main(): - ap = argparse.ArgumentParser() - ap.add_argument("--host", default="0.0.0.0") - ap.add_argument("--port", type=int, default=1236) - ap.add_argument("--target", type=Path, default=DEFAULT_TARGET) - ap.add_argument("--draft", type=Path, default=DEFAULT_DRAFT_ROOT) - ap.add_argument("--bin", type=Path, default=DEFAULT_BIN) - ap.add_argument("--budget", type=int, default=DEFAULT_BUDGET) - default_ctx = 131072 - ap.add_argument("--max-ctx", type=int, default=default_ctx, - help=f"Maximum context length (default: {default_ctx})") - ap.add_argument("--kv-f16", action="store_true", - help="Force F16 KV cache. When --max-ctx > 6144 the server " - "auto-enables TQ3_0 KV to fit; pass --kv-f16 to opt out.") - ap.add_argument("--fa-window", type=int, default=None, - help="Sliding window for FA layers (KV positions). 0 = full " - "attention. Default 2048 (set in C++); only kicks in " - "once kv_cache > window. Trades attention range for " - "long-context decode speed.") - ap.add_argument("--tokenizer", type=str, default=None, - help="HuggingFace tokenizer repo ID (default: auto-detect " - "from target GGUF basename; falls back to Qwen/Qwen3.5-27B)") - ap.add_argument("--daemon", action="store_true", help="Run with persistent model daemon (now default)") - args = ap.parse_args() - - # Auto-enable TQ3_0 KV cache when the requested context exceeds what F16 fits. - # Clients like Claude Code routinely send 10k+ token system prompts, so - # 6144 is too tight for real-world use. setdefault so an explicit user - # DFLASH27B_KV_TQ3=0 still wins. - if args.max_ctx > 6144 and not args.kv_f16: - os.environ.setdefault("DFLASH27B_KV_TQ3", "1") - - if args.fa_window is not None: - os.environ["DFLASH27B_FA_WINDOW"] = str(args.fa_window) - - if not args.bin.is_file(): - raise SystemExit(f"binary not found at {args.bin}") - if not args.target.is_file(): - raise SystemExit(f"target GGUF not found at {args.target}") - draft = resolve_draft(args.draft) if args.draft.is_dir() else args.draft - if not draft.is_file(): - raise SystemExit(f"draft safetensors not found at {args.draft}") - - tokenizer_id = args.tokenizer or _tokenizer_id_from_gguf(args.target) - tokenizer = AutoTokenizer.from_pretrained(tokenizer_id, trust_remote_code=True) - stop_ids = set() - for s in ("<|im_end|>", "<|endoftext|>"): - ids = tokenizer.encode(s, add_special_tokens=False) - if ids: stop_ids.add(ids[0]) - - model_name = args.target.stem - app = build_app(args.target, draft, args.bin, args.budget, args.max_ctx, - tokenizer, stop_ids, model_name=model_name) - - import uvicorn - print(f"Luce DFlash OpenAI server on http://{args.host}:{args.port}") - print(f" target = {args.target}") - print(f" draft = {draft}") - print(f" bin = {args.bin}") - print(f" budget = {args.budget}") - print(f" max_ctx = {args.max_ctx}") - print(f" tokenizer = {tokenizer_id}") - print(f" model = {model_name}") - uvicorn.run(app, host=args.host, port=args.port, log_level="info") - - -if __name__ == "__main__": - main() diff --git a/dflash/scripts/server_tools.py b/dflash/scripts/server_tools.py deleted file mode 100644 index 4c4aa4b11..000000000 --- a/dflash/scripts/server_tools.py +++ /dev/null @@ -1,856 +0,0 @@ -""" -OpenAI-compatible HTTP server on top of test_dflash, **with tool-calling support**. - -Patched fork of scripts/server.py that: - 1. Accepts the OpenAI `tools` array in ChatRequest. - 2. Renders tools into the prompt via Qwen's chat template (`tools=...`). - 3. Parses `` blocks out - of the model output and returns them as proper OpenAI `tool_calls`. - 4. Supports `role: "tool"` and assistant `tool_calls` in input messages so - multi-turn agent loops round-trip correctly. - -Streaming behavior: - - Content tokens are streamed as `delta.content` until a `` opener - is detected; the rest of the response is then buffered, parsed at the end - of generation, and emitted as a single final `delta.tool_calls` chunk with - `finish_reason: "tool_calls"`. - - If no tool call appears in the output, behavior is identical to the - upstream server. - -Greedy decoding still applies (verify path is greedy-only). `temperature` and -`top_p` are accepted but ignored, matching upstream. - -Run: - pip install fastapi uvicorn transformers - python3 scripts/server_tools.py --port 8000 -""" -import argparse -import json -import os -import re -import struct -import subprocess -import tempfile -import time -import uuid -from pathlib import Path -from typing import Any, AsyncIterator - -from fastapi import FastAPI -from fastapi.responses import JSONResponse, StreamingResponse -from pydantic import BaseModel, Field -from starlette.concurrency import iterate_in_threadpool -from transformers import AutoTokenizer - - -ROOT = Path(__file__).resolve().parent.parent -DEFAULT_TARGET = ROOT / "models" / "Qwen3.5-27B-Q4_K_M.gguf" -DEFAULT_DRAFT_ROOT = ROOT / "models" / "draft" -DEFAULT_BIN = ROOT / "build" / "test_dflash" -DEFAULT_BUDGET = 22 -MODEL_NAME = "luce-dflash" - - -def resolve_draft(root: Path) -> Path: - for st in root.rglob("model.safetensors"): - return st - raise FileNotFoundError(f"no model.safetensors under {root}") - - -# ─── pydantic schemas ────────────────────────────────────────────── - -class ToolCallFunction(BaseModel): - name: str - arguments: str # JSON string per OpenAI spec - - -class ToolCall(BaseModel): - id: str | None = None - type: str = "function" - function: ToolCallFunction - - -class ChatMessage(BaseModel): - role: str - content: Any | None = None # str, list, or null when tool_calls present - name: str | None = None - tool_call_id: str | None = None - tool_calls: list[ToolCall] | None = None - - -class ToolDef(BaseModel): - type: str = "function" - function: dict # {name, description, parameters: {...JSON schema...}} - - -class ChatRequest(BaseModel): - model: str = MODEL_NAME - messages: list[ChatMessage] - stream: bool = False - max_tokens: int = 512 - temperature: float | None = None - top_p: float | None = None - tools: list[ToolDef] | None = None - tool_choice: Any | None = None # "auto" | "none" | {"function": {...}} - chat_template_kwargs: dict | None = None # e.g. {"enable_thinking": false} - stop: Any | None = None # str or list[str] - stream_options: dict | None = None # e.g. {"include_usage": true} - - -class AnthropicMessage(BaseModel): - role: str - # Anthropic allows either a plain string or a list of content blocks. - content: str | list[dict] - - -class AnthropicMessagesRequest(BaseModel): - model: str = MODEL_NAME - max_tokens: int - messages: list[AnthropicMessage] - system: str | list[dict] | None = None - stream: bool = False - temperature: float | None = None - top_p: float | None = None - stop_sequences: list[str] | None = None - - -# ─── tool-call parser ────────────────────────────────────────────── - -# Qwen3.6 chat template emits: -# -# -# -# VALUE -# -# ... -# -# -# Parsers ported from vLLM (Apache-2.0) for behavioral parity with -# `--reasoning-parser qwen3` and `--tool-call-parser qwen3_coder`: -# vllm/reasoning/qwen3_reasoning_parser.py -# vllm/tool_parsers/qwen3coder_tool_parser.py -# Core algorithms reproduced without vLLM runtime dependencies. - -TOOL_CALL_COMPLETE_RE = re.compile(r"(.*?)", re.DOTALL) -TOOL_CALL_FUNCTION_RE = re.compile( - r"| by using -# next or end-of-string as a terminator. -TOOL_CALL_PARAMETER_RE = re.compile( - r"|(?=)|$)", - re.DOTALL, -) -TOOL_OPEN_TAG = "" - -# Qwen3.6 chat template wraps the model's CoT inside .... -# The template typically prefills `\n` into the prompt (headless mode) -# so only `` appears in generated output; older templates emit both. -THINK_OPEN_TAG = "" -THINK_CLOSE_TAG = "" - - -def normalize_stop(stop) -> list[str]: - """Coerce OpenAI's stop field (str | list[str] | None) to list[str].""" - if not stop: - return [] - if isinstance(stop, str): - return [stop] - return [s for s in stop if isinstance(s, str) and s] - - -def first_stop_match(text: str, stops: list[str]) -> int: - """Return the earliest index where any stop sequence appears, or -1.""" - best = -1 - for s in stops: - i = text.find(s) - if i != -1 and (best == -1 or i < best): - best = i - return best - - -def parse_reasoning(text: str, thinking_enabled: bool = True) -> tuple[str, str | None]: - """Port of vLLM's Qwen3ReasoningParser.extract_reasoning. - - Handles the three Qwen3.x thinking flavors: - 1. Paired: `...` both in generated output. - 2. Headless: template prefilled `\\n` into the prompt, model - only emits `......`. - 3. Disabled: user passed `chat_template_kwargs: {enable_thinking: false}`. - Template still emits `\\n\\n\\n\\n` but into the prompt; - the model output is pure content and contains no tags. - - If the output was truncated mid-thinking (no `` seen and - `thinking_enabled=True`), returns `("", full_output_as_reasoning)` — - matching vLLM's convention. - - Returns (cleaned_content, reasoning_content). - """ - # Strip if the model emitted it itself (older templates). - parts = text.partition(THINK_OPEN_TAG) - rest = parts[2] if parts[1] else parts[0] - if THINK_CLOSE_TAG not in rest: - if thinking_enabled: - # No close tag — assume truncated; everything is reasoning. - return "", (rest.strip() or None) - else: - # Thinking disabled — output is pure content. - return rest.strip(), None - reasoning, _, content = rest.partition(THINK_CLOSE_TAG) - return content.strip(), (reasoning.strip() or None) - - -def _find_tool_properties(tools, function_name): - """Helper matching vLLM's `find_tool_properties`: returns the parameters - dict for a given function name, or {} if not found. - Accepts pydantic ToolDef instances or plain dicts. - """ - for t in tools or []: - fn = t.function if hasattr(t, "function") else t.get("function", {}) - if hasattr(fn, "model_dump"): - fn = fn.model_dump() - if fn.get("name") == function_name: - params = fn.get("parameters", {}) - if isinstance(params, dict): - return params.get("properties", {}) - return {} - - -def _convert_param_value(param_value: str, param_name: str, param_config: dict, - func_name: str): - """Port of vLLM's _convert_param_value. Coerces stringified XML values - to their JSON-schema type (int/float/bool/object/array/string).""" - import ast - if param_value.lower() == "null": - return None - if param_name not in param_config: - return param_value - cfg = param_config[param_name] - if isinstance(cfg, dict) and "type" in cfg: - ptype = str(cfg["type"]).strip().lower() - elif isinstance(cfg, dict) and "anyOf" in cfg: - ptype = "object" - else: - ptype = "string" - if ptype in ("string", "str", "text", "varchar", "char", "enum"): - return param_value - if any(ptype.startswith(p) for p in ("int", "uint", "long", "short", "unsigned")): - try: return int(param_value) - except (ValueError, TypeError): return param_value - if ptype.startswith("num") or ptype.startswith("float"): - try: - f = float(param_value) - return f if f - int(f) != 0 else int(f) - except (ValueError, TypeError): - return param_value - if ptype in ("boolean", "bool", "binary"): - return param_value.lower() == "true" - # object / array / dict / list - if (ptype in ("object", "array", "arr") - or ptype.startswith("dict") or ptype.startswith("list")): - try: return json.loads(param_value) - except (json.JSONDecodeError, TypeError, ValueError): pass - try: return ast.literal_eval(param_value) - except (ValueError, SyntaxError, TypeError): return param_value - - -def parse_tool_calls(text: str, tools=None) -> tuple[str, list[dict]]: - """Port of Qwen3CoderToolParser._parse_xml_function_call (non-streaming). - - Handles Qwen3.x's `...VAL - ...` XML. Uses vLLM's improved - parameter regex that tolerates unclosed tags. When `tools` - is provided, each parameter value is coerced to its JSON-schema type. - - Returns (cleaned_content, tool_calls_list). - """ - tool_calls: list[dict] = [] - cleaned_parts: list[str] = [] - cursor = 0 - for m in TOOL_CALL_COMPLETE_RE.finditer(text): - cleaned_parts.append(text[cursor:m.start()]) - cursor = m.end() - body = m.group(1) - fn_match = TOOL_CALL_FUNCTION_RE.search(body) - if not fn_match: - continue - fn_text = fn_match.group(1) or fn_match.group(2) or "" - end_idx = fn_text.find(">") - if end_idx == -1: - continue - function_name = fn_text[:end_idx].strip() - params_region = fn_text[end_idx + 1:] - param_config = _find_tool_properties(tools, function_name) - args: dict = {} - for match_text in TOOL_CALL_PARAMETER_RE.findall(params_region): - eq_idx = match_text.find(">") - if eq_idx == -1: - continue - k = match_text[:eq_idx].strip() - v = match_text[eq_idx + 1:] - if v.startswith("\n"): v = v[1:] - if v.endswith("\n"): v = v[:-1] - args[k] = _convert_param_value(v, k, param_config, function_name) - tool_calls.append({ - "id": "call_" + uuid.uuid4().hex[:24], - "type": "function", - "function": { - "name": function_name, - "arguments": json.dumps(args, ensure_ascii=False), - }, - }) - cleaned_parts.append(text[cursor:]) - return "".join(cleaned_parts).strip(), tool_calls - - -# ─── app ─────────────────────────────────────────────────────────── - -def build_app(target: Path, draft: Path, bin_path: Path, budget: int, - max_ctx: int, tokenizer: AutoTokenizer, stop_ids: set[int]) -> FastAPI: - import asyncio - app = FastAPI(title="Luce DFlash OpenAI server (tool-aware)") - daemon_lock = asyncio.Lock() - - r_pipe, w_pipe = os.pipe() - cmd = [str(bin_path), str(target), str(draft), "--daemon", - "--fast-rollback", "--ddtree", f"--ddtree-budget={budget}", - f"--max-ctx={max_ctx}", - f"--stream-fd={w_pipe}"] - daemon_proc = subprocess.Popen(cmd, pass_fds=(w_pipe,), stdin=subprocess.PIPE) - os.close(w_pipe) - - @app.get("/v1/models") - def list_models(): - return {"object": "list", - "data": [{"id": MODEL_NAME, "object": "model", "owned_by": "luce"}]} - - def _tokenize_prompt(req: ChatRequest) -> tuple[Path, bool]: - """Returns (prompt_bin_path, started_in_thinking). started_in_thinking - is True when the chat template prefilled \\n at the end of the - prompt — the model's first emitted tokens are reasoning content.""" - # Convert pydantic messages to dicts the chat template expects. - msgs: list[dict] = [] - for m in req.messages: - d: dict = {"role": m.role} - if m.content is not None: - d["content"] = m.content - if m.name is not None: - d["name"] = m.name - if m.tool_call_id is not None: - d["tool_call_id"] = m.tool_call_id - if m.tool_calls is not None: - # The Qwen template walks tool_calls[i].function.{name, arguments} - d["tool_calls"] = [] - for tc in m.tool_calls: - args = tc.function.arguments - # Template expects arguments as a dict, not a JSON string. - if isinstance(args, str): - try: - args_obj = json.loads(args) - except (json.JSONDecodeError, ValueError): - args_obj = {"_raw": args} - else: - args_obj = args - d["tool_calls"].append({ - "id": tc.id, - "type": tc.type, - "function": {"name": tc.function.name, "arguments": args_obj}, - }) - msgs.append(d) - - tools_arg = None - if req.tools: - tools_arg = [t.model_dump()["function"] | {"type": t.type} for t in req.tools] - # The Qwen template accepts the raw OpenAI tools array structure. - tools_arg = [t.model_dump() for t in req.tools] - - kwargs = dict(tokenize=False, add_generation_prompt=True) - if tools_arg: - kwargs["tools"] = tools_arg - # Per-request chat template knobs (e.g. enable_thinking, preserve_thinking). - if req.chat_template_kwargs: - kwargs.update(req.chat_template_kwargs) - prompt = tokenizer.apply_chat_template(msgs, **kwargs) - # Did the template prefill `\n` at the end? Then streaming should - # start in reasoning mode. - started_in_thinking = bool(re.search(r"\s*$", prompt)) - ids = tokenizer.encode(prompt, add_special_tokens=False) - fd, path = tempfile.mkstemp(suffix=".bin") - tmp = Path(path) - with os.fdopen(fd, "wb") as f: - for t in ids: - f.write(struct.pack("= n_gen: - hit_stop = True - - @app.post("/v1/chat/completions") - async def chat_completions(req: ChatRequest): - prompt_bin, started_in_thinking = _tokenize_prompt(req) - prompt_len = prompt_bin.stat().st_size // 4 - available_gen = max_ctx - prompt_len - 20 - gen_len = min(req.max_tokens, available_gen) - if gen_len <= 0: - return JSONResponse( - {"detail": f"Prompt length ({prompt_len}) exceeds max_ctx ({max_ctx})"}, - status_code=400) - - completion_id = "chatcmpl-" + uuid.uuid4().hex[:24] - created = int(time.time()) - - if req.stream: - return await _stream_response(req, prompt_bin, gen_len, - completion_id, created, - started_in_thinking, daemon_lock) - - # Non-streaming: collect, parse, return. - async with daemon_lock: - cmd_line = f"{prompt_bin} {gen_len}\n" - daemon_proc.stdin.write(cmd_line.encode("utf-8")) - daemon_proc.stdin.flush() - tokens = list(_token_stream(r_pipe, gen_len)) - try: prompt_bin.unlink() - except Exception: pass - - text = tokenizer.decode(tokens, skip_special_tokens=True) - # User-supplied stop sequences: trim at first match. - stops = normalize_stop(req.stop) - if stops: - i = first_stop_match(text, stops) - if i != -1: - text = text[:i] - # Respect enable_thinking from chat_template_kwargs when deciding how - # to treat a ``-less response (see parse_reasoning docstring). - thinking_enabled = True - if req.chat_template_kwargs: - thinking_enabled = req.chat_template_kwargs.get("enable_thinking", True) - cleaned, tool_calls = parse_tool_calls(text, tools=req.tools) - cleaned, reasoning = parse_reasoning(cleaned, thinking_enabled=thinking_enabled) - - msg: dict = {"role": "assistant"} - finish_reason = "stop" - if reasoning: - msg["reasoning_content"] = reasoning - if tool_calls: - msg["content"] = cleaned if cleaned else None - msg["tool_calls"] = tool_calls - finish_reason = "tool_calls" - else: - msg["content"] = cleaned - - return JSONResponse({ - "id": completion_id, - "object": "chat.completion", - "created": created, - "model": MODEL_NAME, - "choices": [{ - "index": 0, - "message": msg, - "finish_reason": finish_reason, - }], - "usage": {"prompt_tokens": prompt_len, - "completion_tokens": len(tokens), - "total_tokens": prompt_len + len(tokens)}, - }) - - async def _stream_response(req, prompt_bin, gen_len, completion_id, created, - started_in_thinking, lock): - prompt_len = prompt_bin.stat().st_size // 4 - include_usage = bool(req.stream_options and req.stream_options.get("include_usage")) - def chunk(delta_obj, finish=None): - return {"id": completion_id, "object": "chat.completion.chunk", - "created": created, "model": MODEL_NAME, - "choices": [{"index": 0, "delta": delta_obj, - "finish_reason": finish}]} - - async def sse() -> AsyncIterator[str]: - async with lock: - cmd_line = f"{prompt_bin} {gen_len}\n" - daemon_proc.stdin.write(cmd_line.encode("utf-8")) - daemon_proc.stdin.flush() - - yield f"data: {json.dumps(chunk({'role': 'assistant'}))}\n\n" - - # State machine: mode ∈ {'reasoning', 'content', 'tool_buffer'} - mode = "reasoning" if started_in_thinking else "content" - window = "" # holdback buffer for tag detection - tool_buffer = "" - stops = normalize_stop(req.stop) - # Holdback must cover longest tag AND longest stop sequence. - tag_holdback = max(len(THINK_OPEN_TAG), len(THINK_CLOSE_TAG), len(TOOL_OPEN_TAG)) - stop_holdback = max((len(s) for s in stops), default=0) - HOLDBACK = max(tag_holdback, stop_holdback) - completion_tokens = 0 - stop_hit = False - - def emit_delta(text, kind): - """kind: 'content' or 'reasoning_content'""" - if not text: - return None - return f"data: {json.dumps(chunk({kind: text}))}\n\n" - - try: - async for tok_id in iterate_in_threadpool(_token_stream(r_pipe, gen_len)): - completion_tokens += 1 - piece = tokenizer.decode([tok_id]) - window += piece - - # Stop-sequence check on the visible (content/reasoning) stream. - if stops and mode != "tool_buffer": - si = first_stop_match(window, stops) - if si != -1: - window = window[:si] - stop_hit = True - # Flush truncated remainder per current mode. - kind = "reasoning_content" if mode == "reasoning" else "content" - out = emit_delta(window, kind) - if out: yield out - window = "" - break - - # Process state transitions until no more tags found in window. - while True: - if mode == "tool_buffer": - tool_buffer += window - window = "" - break - - # Look for the next tag of interest based on mode. - if mode == "reasoning": - idx = window.find(THINK_CLOSE_TAG) - if idx != -1: - pre = window[:idx] - out = emit_delta(pre, "reasoning_content") - if out: yield out - window = window[idx + len(THINK_CLOSE_TAG):] - mode = "content" - continue - # No close tag yet. Stream all but holdback. - if len(window) > HOLDBACK: - safe = window[:-HOLDBACK] - out = emit_delta(safe, "reasoning_content") - if out: yield out - window = window[-HOLDBACK:] - break # need more tokens - - else: # mode == "content" - think_idx = window.find(THINK_OPEN_TAG) - tool_idx = window.find(TOOL_OPEN_TAG) - # Pick the earliest tag that actually appears. - hits = [(i, t) for i, t in - ((think_idx, "think"), (tool_idx, "tool")) if i != -1] - if hits: - hits.sort() - idx, which = hits[0] - pre = window[:idx] - out = emit_delta(pre, "content") - if out: yield out - if which == "think": - window = window[idx + len(THINK_OPEN_TAG):] - mode = "reasoning" - else: # tool - tool_buffer = window[idx:] - window = "" - mode = "tool_buffer" - continue - if len(window) > HOLDBACK: - safe = window[:-HOLDBACK] - out = emit_delta(safe, "content") - if out: yield out - window = window[-HOLDBACK:] - break # need more tokens - - if stop_hit: - finish_reason = "stop" - yield f"data: {json.dumps(chunk({}, finish=finish_reason))}\n\n" - if include_usage: - usage_chunk = {"id": completion_id, "object": "chat.completion.chunk", - "created": created, "model": MODEL_NAME, "choices": [], - "usage": {"prompt_tokens": prompt_len, - "completion_tokens": completion_tokens, - "total_tokens": prompt_len + completion_tokens}} - yield f"data: {json.dumps(usage_chunk)}\n\n" - yield "data: [DONE]\n\n" - try: prompt_bin.unlink() - except Exception: pass - return - - # Generation done. Flush remaining window per current mode. - if mode == "reasoning" and window: - out = emit_delta(window, "reasoning_content") - if out: yield out - elif mode == "content" and window: - out = emit_delta(window, "content") - if out: yield out - elif mode == "tool_buffer": - tool_buffer += window - window = "" - - finish_reason = "stop" - if mode == "tool_buffer": - cleaned_after, tool_calls = parse_tool_calls(tool_buffer, tools=req.tools) - if tool_calls: - if cleaned_after: - out = emit_delta(cleaned_after, "content") - if out: yield out - tc_delta_list = [{ - "index": i, "id": tc["id"], "type": "function", - "function": {"name": tc["function"]["name"], - "arguments": tc["function"]["arguments"]}, - } for i, tc in enumerate(tool_calls)] - yield f"data: {json.dumps(chunk({'tool_calls': tc_delta_list}))}\n\n" - finish_reason = "tool_calls" - else: - # Unclosed — emit raw as content fallback. - out = emit_delta(tool_buffer, "content") - if out: yield out - finally: - try: prompt_bin.unlink() - except Exception: pass - - yield f"data: {json.dumps(chunk({}, finish=finish_reason))}\n\n" - if include_usage: - usage_chunk = { - "id": completion_id, "object": "chat.completion.chunk", - "created": created, "model": MODEL_NAME, - "choices": [], - "usage": {"prompt_tokens": prompt_len, - "completion_tokens": completion_tokens, - "total_tokens": prompt_len + completion_tokens}, - } - yield f"data: {json.dumps(usage_chunk)}\n\n" - yield "data: [DONE]\n\n" - - return StreamingResponse(sse(), media_type="text/event-stream") - - # ── Anthropic Messages API ────────────────────────────────────── - # Mirrors the OpenAI endpoint but formatted for the Anthropic SDK - # (Claude Code, Anthropic clients). Tool calling NOT forwarded here - # yet — agent CLIs that want tools should use /v1/chat/completions. - - def _anthropic_text_from_content(content) -> str: - if isinstance(content, str): - return content - parts = [] - for b in content: - if isinstance(b, dict) and b.get("type") == "text": - parts.append(b.get("text", "")) - return "".join(parts) - - def _tokenize_anthropic(req: AnthropicMessagesRequest) -> tuple[Path, int]: - msgs = [] - system_text = _anthropic_text_from_content(req.system) if req.system else None - if system_text: - msgs.append({"role": "system", "content": system_text}) - for m in req.messages: - msgs.append({"role": m.role, - "content": _anthropic_text_from_content(m.content)}) - prompt = tokenizer.apply_chat_template( - msgs, tokenize=False, add_generation_prompt=True) - ids = tokenizer.encode(prompt, add_special_tokens=False) - # mkstemp returns (fd, path); discarding fd leaks 1 per request (#15). - fd, path = tempfile.mkstemp(suffix=".bin") - tmp = Path(path) - with os.fdopen(fd, "wb") as f: - for t in ids: - f.write(struct.pack("= n_gen: - hit_stop = True - - @app.post("/v1/messages") - async def anthropic_messages(req: AnthropicMessagesRequest): - prompt_bin, prompt_len = _tokenize_anthropic(req) - - available_gen = max_ctx - prompt_len - 20 - gen_len = min(req.max_tokens, available_gen) - if gen_len <= 0: - try: prompt_bin.unlink() - except Exception: pass - return JSONResponse( - {"type": "error", - "error": {"type": "invalid_request_error", - "message": f"Prompt length ({prompt_len}) exceeds max_ctx ({max_ctx})"}}, - status_code=400) - - msg_id = "msg_" + uuid.uuid4().hex[:24] - - if req.stream: - async def sse() -> AsyncIterator[str]: - async with daemon_lock: - message_start = { - "type": "message_start", - "message": { - "id": msg_id, "type": "message", "role": "assistant", - "model": req.model or MODEL_NAME, - "content": [], "stop_reason": None, "stop_sequence": None, - "usage": {"input_tokens": prompt_len, "output_tokens": 0}, - }, - } - yield f"event: message_start\ndata: {json.dumps(message_start)}\n\n" - - cb_start = { - "type": "content_block_start", "index": 0, - "content_block": {"type": "text", "text": ""}, - } - yield f"event: content_block_start\ndata: {json.dumps(cb_start)}\n\n" - - cmd_line = f"{prompt_bin} {gen_len}\n" - daemon_proc.stdin.write(cmd_line.encode("utf-8")) - daemon_proc.stdin.flush() - - out_tokens = 0 - try: - async for tok_id in _astream_tokens(r_pipe, gen_len): - out_tokens += 1 - delta = { - "type": "content_block_delta", "index": 0, - "delta": {"type": "text_delta", - "text": tokenizer.decode([tok_id])}, - } - yield f"event: content_block_delta\ndata: {json.dumps(delta)}\n\n" - finally: - try: prompt_bin.unlink() - except Exception: pass - - yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': 0})}\n\n" - - msg_delta = { - "type": "message_delta", - "delta": {"stop_reason": "end_turn", "stop_sequence": None}, - "usage": {"output_tokens": out_tokens}, - } - yield f"event: message_delta\ndata: {json.dumps(msg_delta)}\n\n" - yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n" - - return StreamingResponse(sse(), media_type="text/event-stream") - - # Non-streaming - async with daemon_lock: - cmd_line = f"{prompt_bin} {gen_len}\n" - daemon_proc.stdin.write(cmd_line.encode("utf-8")) - daemon_proc.stdin.flush() - tokens = [t async for t in _astream_tokens(r_pipe, gen_len)] - - try: prompt_bin.unlink() - except Exception: pass - text = tokenizer.decode(tokens, skip_special_tokens=True) - return JSONResponse({ - "id": msg_id, - "type": "message", - "role": "assistant", - "model": req.model or MODEL_NAME, - "content": [{"type": "text", "text": text}], - "stop_reason": "end_turn", - "stop_sequence": None, - "usage": {"input_tokens": prompt_len, - "output_tokens": len(tokens)}, - }) - - return app - - -def main(): - ap = argparse.ArgumentParser() - ap.add_argument("--host", default="0.0.0.0") - ap.add_argument("--port", type=int, default=8000) - ap.add_argument("--target", type=Path, default=DEFAULT_TARGET) - ap.add_argument("--draft", type=Path, default=DEFAULT_DRAFT_ROOT) - ap.add_argument("--bin", type=Path, default=DEFAULT_BIN) - ap.add_argument("--budget", type=int, default=DEFAULT_BUDGET) - # Attention compute currently scales with --max-ctx, not the actual - # prompt+gen length (see issue #10). Default 16384 fits most API - # workloads without the 20×+ slowdown users hit with --max-ctx=131072 - # on short requests. Bump via --max-ctx for long-context serving. - ap.add_argument("--max-ctx", type=int, default=16384, - help="Maximum context length (default: 16384; oversizing " - "this, e.g. 131072 on short prompts, can slow " - "attention 20×+ until issue #10 is fixed)") - ap.add_argument("--kv-f16", action="store_true", - help="Force F16 KV cache. When --max-ctx > 6144 the server " - "auto-enables TQ3_0 KV to fit; pass --kv-f16 to opt out.") - ap.add_argument("--fa-window", type=int, default=None, - help="Sliding window for FA layers (KV positions). 0 = full " - "attention. Default 2048 (set in C++); only kicks in " - "once kv_cache > window. Trades attention range for " - "long-context decode speed.") - ap.add_argument("--tokenizer", default="Qwen/Qwen3.5-27B", - help="HF tokenizer id; Qwen3.6 shares this tokenizer.") - args = ap.parse_args() - - # Auto-enable TQ3_0 KV cache when the requested context exceeds what F16 fits. - # setdefault so an explicit user DFLASH27B_KV_TQ3=0 still wins. - if args.max_ctx > 6144 and not args.kv_f16: - os.environ.setdefault("DFLASH27B_KV_TQ3", "1") - - if args.fa_window is not None: - os.environ["DFLASH27B_FA_WINDOW"] = str(args.fa_window) - - if not args.bin.is_file(): - raise SystemExit(f"binary not found at {args.bin}") - if not args.target.is_file(): - raise SystemExit(f"target GGUF not found at {args.target}") - draft = resolve_draft(args.draft) if args.draft.is_dir() else args.draft - if not draft.is_file(): - raise SystemExit(f"draft safetensors not found at {args.draft}") - - tokenizer = AutoTokenizer.from_pretrained(args.tokenizer, trust_remote_code=True) - stop_ids = set() - for s in ("<|im_end|>", "<|endoftext|>"): - ids = tokenizer.encode(s, add_special_tokens=False) - if ids: stop_ids.add(ids[0]) - - app = build_app(args.target, draft, args.bin, args.budget, args.max_ctx, - tokenizer, stop_ids) - - import uvicorn - print(f"Luce DFlash OpenAI server (tool-aware) on http://{args.host}:{args.port}") - print(f" target = {args.target}") - print(f" draft = {draft}") - print(f" bin = {args.bin}") - print(f" budget = {args.budget}") - print(f" max_ctx= {args.max_ctx}") - print(f" tokenizer = {args.tokenizer}") - uvicorn.run(app, host=args.host, port=args.port, log_level="info") - - -if __name__ == "__main__": - main() diff --git a/dflash/scripts/test_server.py b/dflash/scripts/test_server.py deleted file mode 100644 index a9926806d..000000000 --- a/dflash/scripts/test_server.py +++ /dev/null @@ -1,105 +0,0 @@ -import os -import struct -import json -import asyncio -from pathlib import Path -from unittest.mock import patch, MagicMock - -import pytest -from fastapi.testclient import TestClient - -from server import build_app, MODEL_NAME - - -@pytest.fixture -def mock_tokenizer(): - tokenizer = MagicMock() - tokenizer.encode.return_value = [1] - tokenizer.decode.return_value = "hello" - return tokenizer - -@patch("server.subprocess.Popen") -def test_models_endpoint(mock_popen, mock_tokenizer): - app = build_app( - target=Path("target.gguf"), - draft=Path("draft.safetensors"), - bin_path=Path("test_dflash"), - budget=22, - max_ctx=131072, - tokenizer=mock_tokenizer, - stop_ids={2} - ) - client = TestClient(app) - response = client.get("/v1/models") - assert response.status_code == 200 - data = response.json() - assert data["object"] == "list" - assert len(data["data"]) == 1 - assert data["data"][0]["id"] == MODEL_NAME - -@patch("server.os.pipe") -@patch("server.subprocess.Popen") -@patch("server.os.read") -def test_chat_completions_non_streaming(mock_os_read, mock_popen, mock_pipe, mock_tokenizer): - mock_pipe.return_value = (1, 2) - - app = build_app( - target=Path("target.gguf"), - draft=Path("draft.safetensors"), - bin_path=Path("test_dflash"), - budget=22, - max_ctx=131072, - tokenizer=mock_tokenizer, - stop_ids={2} - ) - - # Mock os.read to return a single token (e.g. 10) and then -1 - mock_os_read.side_effect = [ - struct.pack("= 3 - assert lines[-1] == "data: [DONE]" \ No newline at end of file diff --git a/dflash/src/dflash/__init__.py b/dflash/src/dflash/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dflash/src/dflash/server/__init__.py b/dflash/src/dflash/server/__init__.py new file mode 100644 index 000000000..a96476bf3 --- /dev/null +++ b/dflash/src/dflash/server/__init__.py @@ -0,0 +1,540 @@ +""" +OpenAI-compatible HTTP server on top of test_dflash, with tool-calling support. + + uv run dflash-server # serves on :1236 + + curl http://localhost:1236/v1/chat/completions \ + -H 'Content-Type: application/json' \ + -d '{"model":"Qwen3.5-27B-Q4_K_M","messages":[{"role":"user","content":"hi"}],"stream":true}' + +Drop-in for Open WebUI / LM Studio / Cline by setting + OPENAI_API_BASE=http://localhost:1236/v1 OPENAI_API_KEY=sk-any + +Supports OpenAI /v1/chat/completions with tools and Anthropic /v1/messages. +Model stays resident in daemon mode (default); context default is 128K. +""" +import argparse +import json +import os +import struct +import subprocess +import sys +import tempfile +import time +import uuid +from pathlib import Path +from typing import AsyncIterator + +from fastapi import FastAPI +from fastapi.responses import JSONResponse, StreamingResponse +from starlette.concurrency import iterate_in_threadpool +from transformers import AutoTokenizer + +from .parsing import ( + TOOL_OPEN_TAG, THINK_OPEN_TAG, THINK_CLOSE_TAG, + normalize_stop, first_stop_match, + parse_tool_calls, parse_reasoning, +) +from .schemas import ChatRequest, AnthropicMessagesRequest + + +ROOT = Path(__file__).resolve().parent.parent.parent.parent +DEFAULT_TARGET = ROOT / "models" / "Qwen3.5-27B-Q4_K_M.gguf" +DEFAULT_DRAFT_ROOT = ROOT / "models" / "draft" +DEFAULT_BIN = ROOT / "build" / ("test_dflash" + (".exe" if sys.platform == "win32" else "")) +DEFAULT_BUDGET = 22 + +_QWEN35_FAMILY_TOKENIZERS = { + "Qwen3.5-27B": "Qwen/Qwen3.5-27B", + "Qwen3.6-27B": "Qwen/Qwen3.6-27B", +} + + +def resolve_draft(root: Path) -> Path: + for st in root.rglob("model.safetensors"): + return st + raise FileNotFoundError(f"no model.safetensors under {root}") + + +def _tokenizer_id_from_gguf(gguf_path: Path) -> str: + """Infer the HuggingFace tokenizer repo from a GGUF target file.""" + default = "Qwen/Qwen3.5-27B" + try: + from gguf import GGUFReader # type: ignore + r = GGUFReader(str(gguf_path)) + for key in ("general.basename", "general.name"): + f = r.fields.get(key) + if f is None or not f.data: + continue + import numpy as np + p = f.parts[f.data[0]] + if not isinstance(p, np.ndarray): + continue + try: + val = bytes(p).decode("utf-8", errors="replace") + except Exception: + continue + for known, repo in _QWEN35_FAMILY_TOKENIZERS.items(): + if known.lower() in val.lower(): + return repo + except Exception: + pass + return default + + +def build_app(target: Path, draft: Path, bin_path: Path, budget: int, + max_ctx: int, tokenizer: AutoTokenizer, stop_ids: set[int], + model_name: str = "") -> FastAPI: + import asyncio + app = FastAPI(title="Luce DFlash OpenAI server") + daemon_lock = asyncio.Lock() + + r_pipe, w_pipe = os.pipe() + if sys.platform == "win32": + import msvcrt + os.set_inheritable(w_pipe, True) + stream_fd_val = int(msvcrt.get_osfhandle(w_pipe)) + else: + stream_fd_val = w_pipe + + bin_abs = str(Path(bin_path).resolve()) + env = {**os.environ} + if sys.platform == "win32": + dll_dir = str(Path(bin_abs).parent / "bin") + env["PATH"] = dll_dir + os.pathsep + str(Path(bin_abs).parent) + os.pathsep + env.get("PATH", "") + + cmd = [bin_abs, str(target), str(draft), "--daemon", + "--fast-rollback", "--ddtree", f"--ddtree-budget={budget}", + f"--max-ctx={max_ctx}", f"--stream-fd={stream_fd_val}"] + if sys.platform == "win32": + daemon_proc = subprocess.Popen(cmd, close_fds=False, env=env, stdin=subprocess.PIPE) + else: + daemon_proc = subprocess.Popen(cmd, pass_fds=(w_pipe,), env=env, stdin=subprocess.PIPE) + os.close(w_pipe) + + @app.get("/v1/models") + def list_models(): + return {"object": "list", + "data": [{"id": model_name, "object": "model", "owned_by": "luce"}]} + + def _tokenize_prompt(req: ChatRequest) -> tuple[Path, bool]: + """Tokenize a ChatRequest to a prompt .bin file. + + Returns (path, started_in_thinking) where started_in_thinking is True + when the chat template prefilled — streaming begins in reasoning mode. + """ + import re + msgs: list[dict] = [] + for m in req.messages: + d: dict = {"role": m.role} + if m.content is not None: + d["content"] = m.content + if m.name is not None: + d["name"] = m.name + if m.tool_call_id is not None: + d["tool_call_id"] = m.tool_call_id + if m.tool_calls is not None: + d["tool_calls"] = [] + for tc in m.tool_calls: + args = tc.function.arguments + if isinstance(args, str): + try: + args_obj = json.loads(args) + except (json.JSONDecodeError, ValueError): + args_obj = {"_raw": args} + else: + args_obj = args + d["tool_calls"].append({ + "id": tc.id, "type": tc.type, + "function": {"name": tc.function.name, "arguments": args_obj}, + }) + msgs.append(d) + + kwargs: dict = dict(tokenize=False, add_generation_prompt=True) + if req.tools: + kwargs["tools"] = [t.model_dump() for t in req.tools] + if req.chat_template_kwargs: + kwargs.update(req.chat_template_kwargs) + prompt = tokenizer.apply_chat_template(msgs, **kwargs) + started_in_thinking = bool(re.search(r"\s*$", prompt)) + ids = tokenizer.encode(prompt, add_special_tokens=False) + fd, path = tempfile.mkstemp(suffix=".bin") + tmp = Path(path) + with os.fdopen(fd, "wb") as f: + for t in ids: + f.write(struct.pack("= n_gen: + hit_stop = True + + async def _astream_tokens(r, n_gen): + generated = 0 + hit_stop = False + while True: + b = await asyncio.to_thread(os.read, r, 4) + if not b or len(b) < 4: + break + tok_id = struct.unpack("= n_gen: + hit_stop = True + + # ── OpenAI /v1/chat/completions ──────────────────────────────── + + @app.post("/v1/chat/completions") + async def chat_completions(req: ChatRequest): + prompt_bin, started_in_thinking = _tokenize_prompt(req) + prompt_len = prompt_bin.stat().st_size // 4 + available_gen = max_ctx - prompt_len - 20 + gen_len = min(req.max_tokens, available_gen) + if gen_len <= 0: + try: prompt_bin.unlink() + except Exception: pass + return JSONResponse( + {"detail": f"Prompt length ({prompt_len}) exceeds max_ctx ({max_ctx})"}, + status_code=400) + + completion_id = "chatcmpl-" + uuid.uuid4().hex[:24] + created = int(time.time()) + + if req.stream: + return await _stream_response(req, prompt_bin, gen_len, + completion_id, created, + started_in_thinking, daemon_lock) + + async with daemon_lock: + daemon_proc.stdin.write(f"{prompt_bin} {gen_len}\n".encode()) + daemon_proc.stdin.flush() + tokens = list(_token_stream(r_pipe, gen_len)) + try: prompt_bin.unlink() + except Exception: pass + + text = tokenizer.decode(tokens, skip_special_tokens=True) + stops = normalize_stop(req.stop) + if stops: + i = first_stop_match(text, stops) + if i != -1: + text = text[:i] + thinking_enabled = True + if req.chat_template_kwargs: + thinking_enabled = req.chat_template_kwargs.get("enable_thinking", True) + cleaned, tool_calls = parse_tool_calls(text, tools=req.tools) + cleaned, reasoning = parse_reasoning(cleaned, thinking_enabled=thinking_enabled) + + msg: dict = {"role": "assistant"} + finish_reason = "stop" + if reasoning: + msg["reasoning_content"] = reasoning + if tool_calls: + msg["content"] = cleaned if cleaned else None + msg["tool_calls"] = tool_calls + finish_reason = "tool_calls" + else: + msg["content"] = cleaned + + return JSONResponse({ + "id": completion_id, "object": "chat.completion", + "created": created, "model": model_name, + "choices": [{"index": 0, "message": msg, "finish_reason": finish_reason}], + "usage": {"prompt_tokens": prompt_len, + "completion_tokens": len(tokens), + "total_tokens": prompt_len + len(tokens)}, + }) + + async def _stream_response(req, prompt_bin, gen_len, completion_id, created, + started_in_thinking, lock): + prompt_len = prompt_bin.stat().st_size // 4 + include_usage = bool(req.stream_options and req.stream_options.get("include_usage")) + + def chunk(delta_obj, finish=None): + return {"id": completion_id, "object": "chat.completion.chunk", + "created": created, "model": model_name, + "choices": [{"index": 0, "delta": delta_obj, "finish_reason": finish}]} + + def usage_chunk(completion_tokens): + return {"id": completion_id, "object": "chat.completion.chunk", + "created": created, "model": model_name, "choices": [], + "usage": {"prompt_tokens": prompt_len, + "completion_tokens": completion_tokens, + "total_tokens": prompt_len + completion_tokens}} + + async def sse() -> AsyncIterator[str]: + async with lock: + daemon_proc.stdin.write(f"{prompt_bin} {gen_len}\n".encode()) + daemon_proc.stdin.flush() + yield f"data: {json.dumps(chunk({'role': 'assistant'}))}\n\n" + + # mode ∈ {'reasoning', 'content', 'tool_buffer'} + mode = "reasoning" if started_in_thinking else "content" + window = "" + tool_buffer = "" + stops = normalize_stop(req.stop) + HOLDBACK = max( + len(THINK_OPEN_TAG), len(THINK_CLOSE_TAG), len(TOOL_OPEN_TAG), + *(len(s) for s in stops), + 0, + ) + completion_tokens = 0 + stop_hit = False + + def emit(text, kind): + if not text: + return None + return f"data: {json.dumps(chunk({kind: text}))}\n\n" + + try: + async for tok_id in iterate_in_threadpool(_token_stream(r_pipe, gen_len)): + completion_tokens += 1 + window += tokenizer.decode([tok_id]) + + if stops and mode != "tool_buffer": + si = first_stop_match(window, stops) + if si != -1: + out = emit(window[:si], "reasoning_content" if mode == "reasoning" else "content") + if out: yield out + window = "" + stop_hit = True + break + + while True: + if mode == "tool_buffer": + tool_buffer += window; window = ""; break + + if mode == "reasoning": + idx = window.find(THINK_CLOSE_TAG) + if idx != -1: + out = emit(window[:idx], "reasoning_content") + if out: yield out + window = window[idx + len(THINK_CLOSE_TAG):] + mode = "content"; continue + if len(window) > HOLDBACK: + out = emit(window[:-HOLDBACK], "reasoning_content") + if out: yield out + window = window[-HOLDBACK:] + break + + else: # content + think_idx = window.find(THINK_OPEN_TAG) + tool_idx = window.find(TOOL_OPEN_TAG) + hits = sorted((i, t) for i, t in + ((think_idx, "think"), (tool_idx, "tool")) if i != -1) + if hits: + idx, which = hits[0] + out = emit(window[:idx], "content") + if out: yield out + if which == "think": + window = window[idx + len(THINK_OPEN_TAG):] + mode = "reasoning" + else: + tool_buffer = window[idx:]; window = ""; mode = "tool_buffer" + continue + if len(window) > HOLDBACK: + out = emit(window[:-HOLDBACK], "content") + if out: yield out + window = window[-HOLDBACK:] + break + + finish_reason = "stop" + if stop_hit: + yield f"data: {json.dumps(chunk({}, finish='stop'))}\n\n" + if include_usage: yield f"data: {json.dumps(usage_chunk(completion_tokens))}\n\n" + yield "data: [DONE]\n\n" + try: prompt_bin.unlink() + except Exception: pass + return + + if mode == "reasoning" and window: + out = emit(window, "reasoning_content") + if out: yield out + elif mode == "content" and window: + out = emit(window, "content") + if out: yield out + elif mode == "tool_buffer": + tool_buffer += window + + if mode == "tool_buffer": + cleaned_after, tool_calls = parse_tool_calls(tool_buffer, tools=req.tools) + if tool_calls: + if cleaned_after: + out = emit(cleaned_after, "content") + if out: yield out + tc_delta = [ + {"index": i, "id": tc["id"], "type": "function", + "function": {"name": tc["function"]["name"], + "arguments": tc["function"]["arguments"]}} + for i, tc in enumerate(tool_calls) + ] + yield f"data: {json.dumps(chunk({'tool_calls': tc_delta}))}\n\n" + finish_reason = "tool_calls" + else: + out = emit(tool_buffer, "content") + if out: yield out + + finally: + try: prompt_bin.unlink() + except Exception: pass + + yield f"data: {json.dumps(chunk({}, finish=finish_reason))}\n\n" + if include_usage: yield f"data: {json.dumps(usage_chunk(completion_tokens))}\n\n" + yield "data: [DONE]\n\n" + + return StreamingResponse(sse(), media_type="text/event-stream") + + # ── Anthropic /v1/messages ───────────────────────────────────── + + def _anthropic_text(content) -> str: + if isinstance(content, str): + return content + return "".join(b.get("text", "") for b in content + if isinstance(b, dict) and b.get("type") == "text") + + def _tokenize_anthropic(req: AnthropicMessagesRequest) -> tuple[Path, int]: + msgs = [] + if req.system: + system_text = _anthropic_text(req.system) + if system_text: + msgs.append({"role": "system", "content": system_text}) + for m in req.messages: + msgs.append({"role": m.role, "content": _anthropic_text(m.content)}) + prompt = tokenizer.apply_chat_template(msgs, tokenize=False, add_generation_prompt=True) + ids = tokenizer.encode(prompt, add_special_tokens=False) + fd, path = tempfile.mkstemp(suffix=".bin") + tmp = Path(path) + with os.fdopen(fd, "wb") as f: + for t in ids: + f.write(struct.pack(" AsyncIterator[str]: + async with daemon_lock: + yield f"event: message_start\ndata: {json.dumps({'type': 'message_start', 'message': {'id': msg_id, 'type': 'message', 'role': 'assistant', 'model': req.model or model_name, 'content': [], 'stop_reason': None, 'stop_sequence': None, 'usage': {'input_tokens': prompt_len, 'output_tokens': 0}}})}\n\n" + yield f"event: content_block_start\ndata: {json.dumps({'type': 'content_block_start', 'index': 0, 'content_block': {'type': 'text', 'text': ''}})}\n\n" + daemon_proc.stdin.write(f"{prompt_bin} {gen_len}\n".encode()) + daemon_proc.stdin.flush() + out_tokens = 0 + try: + async for tok_id in _astream_tokens(r_pipe, gen_len): + out_tokens += 1 + yield f"event: content_block_delta\ndata: {json.dumps({'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': tokenizer.decode([tok_id])}})}\n\n" + finally: + try: prompt_bin.unlink() + except Exception: pass + yield f"event: content_block_stop\ndata: {json.dumps({'type': 'content_block_stop', 'index': 0})}\n\n" + yield f"event: message_delta\ndata: {json.dumps({'type': 'message_delta', 'delta': {'stop_reason': 'end_turn', 'stop_sequence': None}, 'usage': {'output_tokens': out_tokens}})}\n\n" + yield f"event: message_stop\ndata: {json.dumps({'type': 'message_stop'})}\n\n" + return StreamingResponse(sse(), media_type="text/event-stream") + + async with daemon_lock: + daemon_proc.stdin.write(f"{prompt_bin} {gen_len}\n".encode()) + daemon_proc.stdin.flush() + tokens = [t async for t in _astream_tokens(r_pipe, gen_len)] + try: prompt_bin.unlink() + except Exception: pass + text = tokenizer.decode(tokens, skip_special_tokens=True) + return JSONResponse({ + "id": msg_id, "type": "message", "role": "assistant", + "model": req.model or model_name, + "content": [{"type": "text", "text": text}], + "stop_reason": "end_turn", "stop_sequence": None, + "usage": {"input_tokens": prompt_len, "output_tokens": len(tokens)}, + }) + + return app + + +def main(): + ap = argparse.ArgumentParser(description="Luce DFlash OpenAI-compatible server") + ap.add_argument("--host", default="0.0.0.0") + ap.add_argument("--port", type=int, default=1236) + ap.add_argument("--target", type=Path, default=DEFAULT_TARGET) + ap.add_argument("--draft", type=Path, default=DEFAULT_DRAFT_ROOT) + ap.add_argument("--bin", type=Path, default=DEFAULT_BIN) + ap.add_argument("--budget", type=int, default=DEFAULT_BUDGET) + ap.add_argument("--max-ctx", type=int, default=131072, + help="Maximum context length (default: 131072)") + ap.add_argument("--kv-f16", action="store_true", + help="Force F16 KV cache (default: TQ3_0 when --max-ctx > 6144)") + ap.add_argument("--fa-window", type=int, default=None, + help="Sliding window for FA layers; 0 = full attention") + ap.add_argument("--tokenizer", type=str, default=None, + help="HuggingFace tokenizer repo (default: auto-detect from GGUF)") + ap.add_argument("--daemon", action="store_true", + help="(accepted, ignored — daemon is always on)") + args = ap.parse_args() + + if args.max_ctx > 6144 and not args.kv_f16: + os.environ.setdefault("DFLASH27B_KV_TQ3", "1") + if args.fa_window is not None: + os.environ["DFLASH27B_FA_WINDOW"] = str(args.fa_window) + + if not args.bin.is_file(): + raise SystemExit(f"binary not found at {args.bin}") + if not args.target.is_file(): + raise SystemExit(f"target GGUF not found at {args.target}") + draft = resolve_draft(args.draft) if args.draft.is_dir() else args.draft + if not draft.is_file(): + raise SystemExit(f"draft safetensors not found at {args.draft}") + + tokenizer_id = args.tokenizer or _tokenizer_id_from_gguf(args.target) + tokenizer = AutoTokenizer.from_pretrained(tokenizer_id, trust_remote_code=True) + stop_ids = set() + for s in ("<|im_end|>", "<|endoftext|>"): + ids = tokenizer.encode(s, add_special_tokens=False) + if ids: stop_ids.add(ids[0]) + + model_name = args.target.stem + app = build_app(args.target, draft, args.bin, args.budget, args.max_ctx, + tokenizer, stop_ids, model_name=model_name) + + import uvicorn + print(f"Luce DFlash OpenAI server on http://{args.host}:{args.port}") + print(f" target = {args.target}") + print(f" draft = {draft}") + print(f" bin = {args.bin}") + print(f" budget = {args.budget}") + print(f" max_ctx = {args.max_ctx}") + print(f" tokenizer = {tokenizer_id}") + print(f" model = {model_name}") + uvicorn.run(app, host=args.host, port=args.port, log_level="info") diff --git a/dflash/src/dflash/server/parsing.py b/dflash/src/dflash/server/parsing.py new file mode 100644 index 000000000..7c7d92673 --- /dev/null +++ b/dflash/src/dflash/server/parsing.py @@ -0,0 +1,158 @@ +""" +Qwen3.x tool-call and reasoning parsers. + +Ported from vLLM (Apache-2.0) for behavioral parity with +--tool-call-parser qwen3_coder and --reasoning-parser qwen3. +Pure functions — no FastAPI or subprocess dependencies. +""" +import json +import re +import uuid + + +TOOL_OPEN_TAG = "" +THINK_OPEN_TAG = "" +THINK_CLOSE_TAG = "" + +TOOL_CALL_COMPLETE_RE = re.compile(r"(.*?)", re.DOTALL) +TOOL_CALL_FUNCTION_RE = re.compile( + r"| by using the next tag or end-of-string as terminator. +TOOL_CALL_PARAMETER_RE = re.compile( + r"|(?=)|$)", + re.DOTALL, +) + + +def normalize_stop(stop) -> list[str]: + """Coerce OpenAI stop field (str | list[str] | None) to list[str].""" + if not stop: + return [] + if isinstance(stop, str): + return [stop] + return [s for s in stop if isinstance(s, str) and s] + + +def first_stop_match(text: str, stops: list[str]) -> int: + """Return the earliest index where any stop sequence appears, or -1.""" + best = -1 + for s in stops: + i = text.find(s) + if i != -1 and (best == -1 or i < best): + best = i + return best + + +def parse_reasoning(text: str, thinking_enabled: bool = True) -> tuple[str, str | None]: + """Extract ... reasoning from model output. + + Handles three Qwen3.x flavors: + - Paired: both and in generated output. + - Headless: template prefilled , model only emits . + - Disabled: thinking disabled; output is pure content. + + Returns (content, reasoning_content). + """ + parts = text.partition(THINK_OPEN_TAG) + rest = parts[2] if parts[1] else parts[0] + if THINK_CLOSE_TAG not in rest: + if thinking_enabled: + # Truncated mid-think — treat everything as reasoning. + return "", (rest.strip() or None) + else: + return rest.strip(), None + reasoning, _, content = rest.partition(THINK_CLOSE_TAG) + return content.strip(), (reasoning.strip() or None) + + +def _find_tool_properties(tools, function_name: str) -> dict: + for t in tools or []: + fn = t.function if hasattr(t, "function") else t.get("function", {}) + if hasattr(fn, "model_dump"): + fn = fn.model_dump() + if fn.get("name") == function_name: + params = fn.get("parameters", {}) + if isinstance(params, dict): + return params.get("properties", {}) + return {} + + +def _convert_param_value(param_value: str, param_name: str, param_config: dict, + func_name: str): + """Coerce a stringified XML parameter value to its JSON-schema type.""" + import ast + if param_value.lower() == "null": + return None + if param_name not in param_config: + return param_value + cfg = param_config[param_name] + if isinstance(cfg, dict) and "type" in cfg: + ptype = str(cfg["type"]).strip().lower() + elif isinstance(cfg, dict) and "anyOf" in cfg: + ptype = "object" + else: + ptype = "string" + if ptype in ("string", "str", "text", "varchar", "char", "enum"): + return param_value + if any(ptype.startswith(p) for p in ("int", "uint", "long", "short", "unsigned")): + try: return int(param_value) + except (ValueError, TypeError): return param_value + if ptype.startswith("num") or ptype.startswith("float"): + try: + f = float(param_value) + return f if f - int(f) != 0 else int(f) + except (ValueError, TypeError): + return param_value + if ptype in ("boolean", "bool", "binary"): + return param_value.lower() == "true" + if (ptype in ("object", "array", "arr") + or ptype.startswith("dict") or ptype.startswith("list")): + try: return json.loads(param_value) + except (json.JSONDecodeError, TypeError, ValueError): pass + try: return ast.literal_eval(param_value) + except (ValueError, SyntaxError, TypeError): return param_value + + +def parse_tool_calls(text: str, tools=None) -> tuple[str, list[dict]]: + """Parse Qwen3.x XML tool calls out of model output. + + Returns (cleaned_content, tool_calls_list). + """ + tool_calls: list[dict] = [] + cleaned_parts: list[str] = [] + cursor = 0 + for m in TOOL_CALL_COMPLETE_RE.finditer(text): + cleaned_parts.append(text[cursor:m.start()]) + cursor = m.end() + body = m.group(1) + fn_match = TOOL_CALL_FUNCTION_RE.search(body) + if not fn_match: + continue + fn_text = fn_match.group(1) or fn_match.group(2) or "" + end_idx = fn_text.find(">") + if end_idx == -1: + continue + function_name = fn_text[:end_idx].strip() + params_region = fn_text[end_idx + 1:] + param_config = _find_tool_properties(tools, function_name) + args: dict = {} + for match_text in TOOL_CALL_PARAMETER_RE.findall(params_region): + eq_idx = match_text.find(">") + if eq_idx == -1: + continue + k = match_text[:eq_idx].strip() + v = match_text[eq_idx + 1:] + if v.startswith("\n"): v = v[1:] + if v.endswith("\n"): v = v[:-1] + args[k] = _convert_param_value(v, k, param_config, function_name) + tool_calls.append({ + "id": "call_" + uuid.uuid4().hex[:24], + "type": "function", + "function": { + "name": function_name, + "arguments": json.dumps(args, ensure_ascii=False), + }, + }) + cleaned_parts.append(text[cursor:]) + return "".join(cleaned_parts).strip(), tool_calls diff --git a/dflash/src/dflash/server/schemas.py b/dflash/src/dflash/server/schemas.py new file mode 100644 index 000000000..1165053de --- /dev/null +++ b/dflash/src/dflash/server/schemas.py @@ -0,0 +1,56 @@ +from typing import Any +from pydantic import BaseModel + + +class ToolCallFunction(BaseModel): + name: str + arguments: str # JSON string per OpenAI spec + + +class ToolCall(BaseModel): + id: str | None = None + type: str = "function" + function: ToolCallFunction + + +class ChatMessage(BaseModel): + role: str + content: Any | None = None # str, list, or null when tool_calls present + name: str | None = None + tool_call_id: str | None = None + tool_calls: list[ToolCall] | None = None + + +class ToolDef(BaseModel): + type: str = "function" + function: dict # {name, description, parameters: {...JSON schema...}} + + +class ChatRequest(BaseModel): + model: str = "" + messages: list[ChatMessage] + stream: bool = False + max_tokens: int = 512 + temperature: float | None = None # accepted, ignored (greedy-only) + top_p: float | None = None + tools: list[ToolDef] | None = None + tool_choice: Any | None = None + chat_template_kwargs: dict | None = None # e.g. {"enable_thinking": false} + stop: Any | None = None # str or list[str] + stream_options: dict | None = None # e.g. {"include_usage": true} + + +class AnthropicMessage(BaseModel): + role: str + content: str | list[dict] + + +class AnthropicMessagesRequest(BaseModel): + model: str = "" + max_tokens: int + messages: list[AnthropicMessage] + system: str | list[dict] | None = None + stream: bool = False + temperature: float | None = None + top_p: float | None = None + stop_sequences: list[str] | None = None diff --git a/dflash/tests/test_parsing.py b/dflash/tests/test_parsing.py new file mode 100644 index 000000000..f6dbf72f1 --- /dev/null +++ b/dflash/tests/test_parsing.py @@ -0,0 +1,152 @@ +import json +import pytest +from dflash.server.parsing import ( + normalize_stop, + first_stop_match, + parse_reasoning, + parse_tool_calls, +) + + +# ── normalize_stop ───────────────────────────────────────────────── + +def test_normalize_stop_none(): + assert normalize_stop(None) == [] + +def test_normalize_stop_empty_list(): + assert normalize_stop([]) == [] + +def test_normalize_stop_string(): + assert normalize_stop("STOP") == ["STOP"] + +def test_normalize_stop_list(): + assert normalize_stop(["a", "b"]) == ["a", "b"] + +def test_normalize_stop_filters_non_strings(): + assert normalize_stop(["a", None, 42, "b"]) == ["a", "b"] + + +# ── first_stop_match ─────────────────────────────────────────────── + +def test_first_stop_match_not_found(): + assert first_stop_match("hello world", ["STOP"]) == -1 + +def test_first_stop_match_found(): + assert first_stop_match("hello STOP world", ["STOP"]) == 6 + +def test_first_stop_match_earliest(): + assert first_stop_match("aXbYc", ["Y", "X"]) == 1 + +def test_first_stop_match_empty_stops(): + assert first_stop_match("hello", []) == -1 + + +# ── parse_reasoning ──────────────────────────────────────────────── + +def test_parse_reasoning_paired_tags(): + content, reasoning = parse_reasoning("deep thoughtthe answer") + assert content == "the answer" + assert reasoning == "deep thought" + +def test_parse_reasoning_headless(): + # Template prefilled ; model only emits the close tag. + content, reasoning = parse_reasoning("deep thoughtthe answer") + assert content == "the answer" + assert reasoning == "deep thought" + +def test_parse_reasoning_disabled(): + content, reasoning = parse_reasoning("plain content", thinking_enabled=False) + assert content == "plain content" + assert reasoning is None + +def test_parse_reasoning_truncated(): + # No and thinking enabled — everything is reasoning. + content, reasoning = parse_reasoning("incomplete thought", thinking_enabled=True) + assert content == "" + assert reasoning == "incomplete thought" + +def test_parse_reasoning_no_tags(): + content, reasoning = parse_reasoning("plain content", thinking_enabled=True) + # No → treated as truncated reasoning. + assert content == "" + assert reasoning == "plain content" + +def test_parse_reasoning_empty_think(): + content, reasoning = parse_reasoning("answer") + assert content == "answer" + assert reasoning is None # empty string stripped to None + + +# ── parse_tool_calls ─────────────────────────────────────────────── + +def _make_tool_xml(name, params: dict) -> str: + param_str = "".join( + f"\n{v}\n\n" for k, v in params.items() + ) + return f"\n\n{param_str}\n" + + +def test_parse_tool_calls_no_calls(): + content, calls = parse_tool_calls("just some text") + assert content == "just some text" + assert calls == [] + +def test_parse_tool_calls_basic(): + xml = _make_tool_xml("get_weather", {"location": "London"}) + content, calls = parse_tool_calls(xml) + assert len(calls) == 1 + assert calls[0]["function"]["name"] == "get_weather" + args = json.loads(calls[0]["function"]["arguments"]) + assert args["location"] == "London" + +def test_parse_tool_calls_multiple(): + xml = (_make_tool_xml("fn_a", {"x": "1"}) + "\n" + + _make_tool_xml("fn_b", {"y": "2"})) + _, calls = parse_tool_calls(xml) + assert len(calls) == 2 + assert calls[0]["function"]["name"] == "fn_a" + assert calls[1]["function"]["name"] == "fn_b" + +def test_parse_tool_calls_cleans_surrounding_text(): + xml = "Before. " + _make_tool_xml("fn", {"k": "v"}) + " After." + content, calls = parse_tool_calls(xml) + assert "Before." in content + assert "After." in content + assert len(calls) == 1 + +def test_parse_tool_calls_type_coercion_int(): + tools = [{"function": {"name": "fn", "parameters": { + "properties": {"count": {"type": "integer"}}}}}] + xml = _make_tool_xml("fn", {"count": "42"}) + _, calls = parse_tool_calls(xml, tools=tools) + args = json.loads(calls[0]["function"]["arguments"]) + assert args["count"] == 42 + assert isinstance(args["count"], int) + +def test_parse_tool_calls_type_coercion_bool(): + tools = [{"function": {"name": "fn", "parameters": { + "properties": {"flag": {"type": "boolean"}}}}}] + xml = _make_tool_xml("fn", {"flag": "true"}) + _, calls = parse_tool_calls(xml, tools=tools) + args = json.loads(calls[0]["function"]["arguments"]) + assert args["flag"] is True + +def test_parse_tool_calls_type_coercion_object(): + tools = [{"function": {"name": "fn", "parameters": { + "properties": {"data": {"type": "object"}}}}}] + xml = _make_tool_xml("fn", {"data": '{"key": "val"}'}) + _, calls = parse_tool_calls(xml, tools=tools) + args = json.loads(calls[0]["function"]["arguments"]) + assert args["data"] == {"key": "val"} + +def test_parse_tool_calls_unclosed_tag(): + # Malformed — no ; should return as plain content. + text = "\n\n\nhello\n\n\n" + content, calls = parse_tool_calls(text) + assert calls == [] + +def test_parse_tool_calls_ids_are_unique(): + xml = (_make_tool_xml("fn_a", {"x": "1"}) + _make_tool_xml("fn_b", {"y": "2"})) + _, calls = parse_tool_calls(xml) + ids = [c["id"] for c in calls] + assert len(set(ids)) == len(ids) diff --git a/dflash/tests/test_server.py b/dflash/tests/test_server.py new file mode 100644 index 000000000..1bb68ac98 --- /dev/null +++ b/dflash/tests/test_server.py @@ -0,0 +1,196 @@ +import json +import struct +from pathlib import Path +from unittest.mock import patch, MagicMock + +import pytest +from fastapi.testclient import TestClient + +from dflash.server import build_app + + +MODEL = "test-model" + + +@pytest.fixture +def mock_tokenizer(): + tok = MagicMock() + tok.encode.return_value = [1, 2, 3] + tok.decode.return_value = "hello" + tok.apply_chat_template.return_value = "" + return tok + + +@pytest.fixture +def app(mock_tokenizer): + with patch("dflash.server.subprocess.Popen"), \ + patch("dflash.server.os.pipe", return_value=(10, 11)), \ + patch("dflash.server.os.close"): + return build_app( + target=Path("target.gguf"), + draft=Path("draft.safetensors"), + bin_path=Path("test_dflash"), + budget=22, + max_ctx=131072, + tokenizer=mock_tokenizer, + stop_ids={2}, + model_name=MODEL, + ) + + +@pytest.fixture +def client(app): + return TestClient(app) + + +# ── /v1/models ───────────────────────────────────────────────────── + +def test_models_endpoint(client): + r = client.get("/v1/models") + assert r.status_code == 200 + data = r.json() + assert data["object"] == "list" + assert data["data"][0]["id"] == MODEL + + +# ── /v1/chat/completions — non-streaming ────────────────────────── + +def _token_bytes(*ids): + return [struct.pack("\n\n" + "\nLondon\n\n" + "\n" + ) + mock_tokenizer.decode.return_value = tool_xml + mock_read.side_effect = _token_bytes(10, -1) + r = client.post("/v1/chat/completions", json={ + "messages": [{"role": "user", "content": "weather?"}], + "tools": [{"type": "function", "function": { + "name": "get_weather", + "description": "Get weather", + "parameters": {"type": "object", "properties": {"location": {"type": "string"}}}, + }}], + "stream": False, + }) + assert r.status_code == 200 + data = r.json() + assert data["choices"][0]["finish_reason"] == "tool_calls" + tc = data["choices"][0]["message"]["tool_calls"] + assert len(tc) == 1 + assert tc[0]["function"]["name"] == "get_weather" + args = json.loads(tc[0]["function"]["arguments"]) + assert args["location"] == "London" + + +@patch("dflash.server.os.read") +def test_chat_non_streaming_stop_sequence(mock_read, client, mock_tokenizer): + mock_tokenizer.decode.return_value = "hello STOP world" + mock_read.side_effect = _token_bytes(10, -1) + r = client.post("/v1/chat/completions", json={ + "messages": [{"role": "user", "content": "hi"}], + "stop": "STOP", + "stream": False, + # Disable thinking so parse_reasoning doesn't treat plain text as + # truncated reasoning (which would clear content). + "chat_template_kwargs": {"enable_thinking": False}, + }) + assert r.status_code == 200 + assert r.json()["choices"][0]["message"]["content"] == "hello" + + +def test_chat_context_exceeded(client, mock_tokenizer): + # encode returns a list long enough to exceed max_ctx + mock_tokenizer.encode.return_value = list(range(131072)) + r = client.post("/v1/chat/completions", json={ + "messages": [{"role": "user", "content": "hi"}], + "stream": False, + }) + assert r.status_code == 400 + + +# ── /v1/chat/completions — streaming ───────────────────────────── + +@patch("dflash.server.os.read") +def test_chat_streaming(mock_read, client): + mock_read.side_effect = _token_bytes(10, -1) + r = client.post("/v1/chat/completions", json={ + "messages": [{"role": "user", "content": "hi"}], + "stream": True, + }) + assert r.status_code == 200 + events = [l for l in r.text.split("\n\n") if l.startswith("data:")] + assert events[-1] == "data: [DONE]" + # First event is role header, middle events are content deltas, last is finish. + payloads = [json.loads(e[len("data: "):]) for e in events[:-1]] + finish_events = [p for p in payloads if p["choices"][0].get("finish_reason")] + assert finish_events[-1]["choices"][0]["finish_reason"] == "stop" + + +# ── /v1/messages (Anthropic) ─────────────────────────────────────── + +@patch("dflash.server.os.read") +def test_anthropic_non_streaming(mock_read, client): + mock_read.side_effect = _token_bytes(10, -1) + r = client.post("/v1/messages", json={ + "model": MODEL, + "max_tokens": 512, + "messages": [{"role": "user", "content": "hi"}], + "stream": False, + }) + assert r.status_code == 200 + data = r.json() + assert data["type"] == "message" + assert data["role"] == "assistant" + assert data["stop_reason"] == "end_turn" + assert data["content"][0]["type"] == "text" + + +@patch("dflash.server.os.read") +def test_anthropic_streaming(mock_read, client): + mock_read.side_effect = _token_bytes(10, -1) + r = client.post("/v1/messages", json={ + "model": MODEL, + "max_tokens": 512, + "messages": [{"role": "user", "content": "hi"}], + "stream": True, + }) + assert r.status_code == 200 + event_types = [ + json.loads(l.split("data: ", 1)[1])["type"] + for l in r.text.split("\n") + if l.startswith("data:") + ] + assert event_types[0] == "message_start" + assert "content_block_start" in event_types + assert "message_stop" in event_types + + +def test_anthropic_context_exceeded(client, mock_tokenizer): + mock_tokenizer.encode.return_value = list(range(131072)) + r = client.post("/v1/messages", json={ + "model": MODEL, + "max_tokens": 512, + "messages": [{"role": "user", "content": "hi"}], + }) + assert r.status_code == 400 + assert r.json()["type"] == "error"