diff --git a/README.md b/README.md
index 216a601..79d6cfe 100644
--- a/README.md
+++ b/README.md
@@ -58,7 +58,9 @@ truffile deploy --dry-run [app_dir]
truffile list apps
truffile delete
truffile models
-truffile chat "hello"
+truffile chat
+truffile chat --no-repl "hello"
+truffile chat --no-default-tools
truffile proxy --host 127.0.0.1 --port 8080
```
diff --git a/truffile/_version.py b/truffile/_version.py
index a621a39..a4168bf 100644
--- a/truffile/_version.py
+++ b/truffile/_version.py
@@ -28,7 +28,7 @@
commit_id: COMMIT_ID
__commit_id__: COMMIT_ID
-__version__ = version = '0.1.15.dev3'
-__version_tuple__ = version_tuple = (0, 1, 15, 'dev3')
+__version__ = version = '0.1.15.dev11'
+__version_tuple__ = version_tuple = (0, 1, 15, 'dev11')
-__commit_id__ = commit_id = 'g0212865ef'
+__commit_id__ = commit_id = 'g708f695c8'
diff --git a/truffile/cli.py b/truffile/cli.py
index ca489aa..c55d288 100644
--- a/truffile/cli.py
+++ b/truffile/cli.py
@@ -1,7 +1,9 @@
import argparse
import asyncio
import json
+import os
import re
+import select
import signal
import socket
import sys
@@ -9,6 +11,7 @@
import time
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
+from typing import Any, Callable
import httpx
from truffile.storage import StorageService
@@ -16,6 +19,18 @@
from truffile.schema import validate_app_dir
from truffile.deploy import build_deploy_plan, deploy_with_builder
+try:
+ import readline
+except Exception:
+ readline = None # type: ignore[assignment]
+
+try:
+ import termios
+ import tty
+except Exception:
+ termios = None # type: ignore[assignment]
+ tty = None # type: ignore[assignment]
+
# ANSI colors
class C:
@@ -25,6 +40,7 @@ class C:
BLUE = "\033[94m"
MAGENTA = "\033[95m"
CYAN = "\033[96m"
+ GRAY = "\033[90m"
DIM = "\033[2m"
BOLD = "\033[1m"
RESET = "\033[0m"
@@ -37,8 +53,10 @@ class C:
ARROW = "→"
DOT = "•"
WARN = "⚠"
+HAMMER = "🔨"
TOOL_TAGS = ("", "")
TOOL_TAG_PATTERN = re.compile(r"\s*(.*?)\s*", re.DOTALL)
+REPL_COMMANDS = ["/help", "/", "/history", "/reset", "/models", "/exit", "/quit"]
class Spinner:
@@ -80,6 +98,42 @@ def fail(self, message: str | None = None):
sys.stdout.flush()
+class MushroomPulse:
+ FRAMES = ["(🍄 )", "(🍄. )", "(🍄.. )", "(🍄...)", "(🍄 ..)", "(🍄 .)"]
+
+ def __init__(self, message: str = "thinking", interval: float = 0.09):
+ self.message = message
+ self.interval = interval
+ self.running = False
+ self.thread: threading.Thread | None = None
+ self.frame_idx = 0
+ self.enabled = bool(sys.stdout.isatty())
+
+ def _spin(self) -> None:
+ while self.running:
+ frame = self.FRAMES[self.frame_idx % len(self.FRAMES)]
+ sys.stdout.write(f"\r{C.MAGENTA}{frame}{C.RESET} {C.DIM}{self.message}{C.RESET}")
+ sys.stdout.flush()
+ self.frame_idx += 1
+ time.sleep(self.interval)
+
+ def start(self) -> None:
+ if not self.enabled or self.running:
+ return
+ self.running = True
+ self.thread = threading.Thread(target=self._spin, daemon=True)
+ self.thread.start()
+
+ def stop(self) -> None:
+ if not self.running:
+ return
+ self.running = False
+ if self.thread:
+ self.thread.join(timeout=0.2)
+ sys.stdout.write("\r\033[K")
+ sys.stdout.flush()
+
+
class ScrollingLog:
#felt a little fancy lol
"""A scrolling log window that shows the last N lines in place."""
@@ -793,66 +847,461 @@ async def _default_model(ip: str) -> str | None:
return None
-async def cmd_chat(args, storage: StorageService) -> int:
- device, ip = await _resolve_connected_device(storage)
- if not device or not ip:
- return 1
+def _model_display_name(model: dict[str, Any]) -> str:
+ model_id = str(model.get("id") or "")
+ name = str(model.get("name") or model_id)
+ if name == model_id:
+ return name
+ return f"{name} ({model_id})"
- prompt = args.prompt
- if not prompt and args.prompt_words:
- prompt = " ".join(args.prompt_words).strip()
- if not prompt:
- error("Missing prompt")
- print(f" {C.DIM}Usage: truffile chat --prompt \"hello\"{C.RESET}")
- print(f" {C.DIM}Or: truffile chat \"hello\"{C.RESET}")
- return 1
- model = args.model
- if not model:
- spinner = Spinner("Resolving default model")
- spinner.start()
- model = await _default_model(ip)
- if not model:
- spinner.fail("Failed to resolve default model from IF2")
- return 1
- spinner.stop(success=True)
+def _model_value(model: dict[str, Any]) -> str:
+ return str(model.get("uuid") or model.get("id") or "")
- stream = not args.no_stream and not args.json
- messages: list[dict[str, str]] = []
- if args.system:
- messages.append({"role": "system", "content": args.system})
- messages.append({"role": "user", "content": prompt})
- payload: dict = {
+def _model_matches_current(model: dict[str, Any], current_model: str) -> bool:
+ if not current_model:
+ return False
+ mv = _model_value(model)
+ mid = str(model.get("id") or "")
+ return current_model in {mv, mid}
+
+
+def _pick_model_with_numbers(models: list[dict[str, Any]], current_model: str) -> str | None:
+ if not models:
+ return None
+ print(f"{C.BLUE}models:{C.RESET}")
+ default_idx = 0
+ for i, m in enumerate(models, start=1):
+ active = f" {C.DIM}[active]{C.RESET}" if _model_matches_current(m, current_model) else ""
+ if active:
+ default_idx = i - 1
+ print(f"{C.BLUE}{i}.{C.RESET} {_model_display_name(m)}{active}")
+ choice = input(f"{C.CYAN}?{C.RESET} select model [1-{len(models)}] (Enter to keep): ").strip()
+ if not choice:
+ return _model_value(models[default_idx])
+ try:
+ idx = int(choice) - 1
+ except ValueError:
+ warn("invalid model selection")
+ return None
+ if idx < 0 or idx >= len(models):
+ warn("invalid model selection")
+ return None
+ return _model_value(models[idx])
+
+
+def _pick_model_interactive(models: list[dict[str, Any]], current_model: str) -> str | None:
+ if not models:
+ return None
+ if not sys.stdin.isatty() or not sys.stdout.isatty() or termios is None or tty is None:
+ return _pick_model_with_numbers(models, current_model)
+
+ selected = 0
+ for i, m in enumerate(models):
+ if _model_matches_current(m, current_model):
+ selected = i
+ break
+
+ lines_rendered = 0
+
+ def _render() -> None:
+ nonlocal lines_rendered
+ lines: list[str] = []
+ lines.append(f"{C.BLUE}select model (↑/↓, Enter=select, q=cancel){C.RESET}")
+ for i, m in enumerate(models):
+ pointer = "›" if i == selected else " "
+ active = f" {C.DIM}[active]{C.RESET}" if _model_matches_current(m, current_model) else ""
+ line = f" {C.CYAN}{pointer}{C.RESET} {_model_display_name(m)}{active}"
+ lines.append(line)
+
+ if lines_rendered > 0:
+ sys.stdout.write(f"\033[{lines_rendered}A")
+ for line in lines:
+ sys.stdout.write(f"\r\033[K{line}\n")
+ sys.stdout.flush()
+ lines_rendered = len(lines)
+
+ fd = sys.stdin.fileno()
+ old_attrs = termios.tcgetattr(fd)
+ try:
+ tty.setraw(fd)
+ _render()
+ while True:
+ ch = sys.stdin.read(1)
+ if ch in ("\r", "\n"):
+ sys.stdout.write("\r\033[K")
+ return _model_value(models[selected])
+ if ch in ("q", "Q"):
+ sys.stdout.write("\r\033[K")
+ return None
+ if ch == "\x1b":
+ seq1 = sys.stdin.read(1)
+ if seq1 == "[":
+ seq2 = sys.stdin.read(1)
+ if seq2 == "A":
+ selected = (selected - 1) % len(models)
+ _render()
+ continue
+ if seq2 == "B":
+ selected = (selected + 1) % len(models)
+ _render()
+ continue
+ return None
+ finally:
+ termios.tcsetattr(fd, termios.TCSADRAIN, old_attrs)
+ if lines_rendered > 0:
+ sys.stdout.write(f"\033[{lines_rendered}A")
+ for _ in range(lines_rendered):
+ sys.stdout.write("\r\033[K\n")
+ sys.stdout.write(f"\033[{lines_rendered}A")
+ sys.stdout.flush()
+
+
+def _fetch_models_payload(client: httpx.Client, ip: str) -> list[dict[str, Any]]:
+ resp = client.get(f"http://{ip}/if2/v1/models", timeout=15.0)
+ resp.raise_for_status()
+ payload = resp.json()
+ raw = payload.get("data", [])
+ if not isinstance(raw, list):
+ raise RuntimeError("invalid models payload")
+ out: list[dict[str, Any]] = []
+ for m in raw:
+ if isinstance(m, dict):
+ out.append(m)
+ return out
+
+
+def _build_default_tools() -> list[dict[str, Any]]:
+ return [
+ {
+ "type": "function",
+ "function": {
+ "name": "web_search",
+ "description": "Search the web for a query and return top results.",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "query": {"type": "string", "description": "Search query."},
+ "max_results": {
+ "type": "integer",
+ "description": "Number of results to return (1-10).",
+ "default": 5,
+ },
+ },
+ "required": ["query"],
+ },
+ },
+ },
+ {
+ "type": "function",
+ "function": {
+ "name": "web_fetch",
+ "description": "Fetch and extract readable text from a URL.",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "url": {"type": "string", "description": "Absolute http/https URL."},
+ "max_chars": {
+ "type": "integer",
+ "description": "Max number of characters to return (500-20000).",
+ "default": 8000,
+ },
+ },
+ "required": ["url"],
+ },
+ },
+ },
+ ]
+
+
+def _tool_web_search(arguments: dict[str, Any]) -> dict[str, Any]:
+ query = str(arguments.get("query", "")).strip()
+ if not query:
+ return {"error": "query is required"}
+ max_results = arguments.get("max_results", 5)
+ try:
+ max_results = int(max_results)
+ except (TypeError, ValueError):
+ max_results = 5
+ max_results = max(1, min(max_results, 10))
+ try:
+ from ddgs import DDGS
+ except Exception as exc:
+ return {
+ "error": "ddgs is not installed or failed to import",
+ "detail": str(exc),
+ "hint": "pip install ddgs",
+ }
+ rows: list[dict[str, Any]] = []
+ try:
+ with DDGS() as ddgs:
+ for r in ddgs.text(query, max_results=max_results):
+ if len(rows) >= max_results:
+ break
+ rows.append(
+ {
+ "title": r.get("title"),
+ "url": r.get("href") or r.get("url"),
+ "snippet": r.get("body") or r.get("snippet"),
+ }
+ )
+ except Exception as exc:
+ return {"error": "web_search failed", "detail": str(exc)}
+ return {"query": query, "count": len(rows), "results": rows}
+
+
+def _tool_web_fetch(arguments: dict[str, Any]) -> dict[str, Any]:
+ url = str(arguments.get("url", "")).strip()
+ if not url:
+ return {"error": "url is required"}
+ max_chars = arguments.get("max_chars", 8000)
+ try:
+ max_chars = int(max_chars)
+ except (TypeError, ValueError):
+ max_chars = 8000
+ max_chars = max(500, min(max_chars, 20000))
+ try:
+ import trafilatura
+ except Exception as exc:
+ return {
+ "error": "trafilatura is not installed or failed to import",
+ "detail": str(exc),
+ "hint": "pip install trafilatura",
+ }
+ try:
+ downloaded = trafilatura.fetch_url(url)
+ if not downloaded:
+ return {"error": "failed to download url", "url": url}
+ text = trafilatura.extract(downloaded, include_links=False, include_images=False)
+ if not text:
+ return {"error": "failed to extract readable text", "url": url}
+ text = text.strip()
+ truncated = len(text) > max_chars
+ return {
+ "url": url,
+ "content": text[:max_chars],
+ "truncated": truncated,
+ "content_chars": min(len(text), max_chars),
+ }
+ except Exception as exc:
+ return {"error": "web_fetch failed", "url": url, "detail": str(exc)}
+
+
+def _execute_default_tool(name: str, arguments: dict[str, Any]) -> dict[str, Any]:
+ if name == "web_search":
+ return _tool_web_search(arguments)
+ if name == "web_fetch":
+ return _tool_web_fetch(arguments)
+ return {"error": f"unknown tool '{name}'"}
+
+
+def _print_history(messages: list[dict[str, Any]]) -> None:
+ for idx, msg in enumerate(messages):
+ role = str(msg.get("role", "unknown"))
+ if role == "assistant" and msg.get("tool_calls"):
+ text = f"[tool_calls={len(msg.get('tool_calls') or [])}]"
+ else:
+ content = msg.get("content", "")
+ if isinstance(content, list):
+ text = json.dumps(content, ensure_ascii=True)
+ else:
+ text = str(content)
+ text = text.replace("\n", " ")
+ if len(text) > 160:
+ text = text[:157] + "..."
+ print(f"{idx:03d} {role:9s} {text}")
+
+
+def _build_chat_payload(
+ *,
+ model: str,
+ messages: list[dict[str, Any]],
+ args: argparse.Namespace,
+ stream: bool,
+ tools: list[dict[str, Any]] | None,
+) -> dict[str, Any]:
+ body: dict[str, Any] = {
"model": model,
"messages": messages,
"stream": stream,
"reasoning": {"enabled": bool(args.reasoning)},
+ "max_tokens": int(args.max_tokens) if args.max_tokens is not None else 512,
}
- if args.max_tokens is not None:
- payload["max_tokens"] = args.max_tokens
- else:
- payload["max_tokens"] = 512
if args.temperature is not None:
- payload["temperature"] = args.temperature
+ body["temperature"] = args.temperature
if args.top_p is not None:
- payload["top_p"] = args.top_p
+ body["top_p"] = args.top_p
if stream:
- payload["stream_options"] = {"include_usage": True}
+ body["stream_options"] = {"include_usage": True}
+ if tools:
+ body["tools"] = tools
+ body["tool_choice"] = "auto"
+ return body
- url = f"http://{ip}/if2/v1/chat/completions"
- headers = {"Content-Type": "application/json"}
- spinner = Spinner(f"Connecting to {device}")
- spinner.start()
+def _print_reasoning_and_response(reasoning_text: str, response_text: str, show_reasoning: bool) -> None:
+ if show_reasoning and reasoning_text:
+ print(f"{C.GRAY}thinking:{C.RESET}")
+ print(f"{C.GRAY}{reasoning_text}{C.RESET}")
+ if response_text:
+ print()
+ if response_text:
+ print(response_text)
+
+
+def _print_repl_commands(prefix: str | None = None) -> None:
+ if prefix is None:
+ matches = REPL_COMMANDS
+ else:
+ matches = [cmd for cmd in REPL_COMMANDS if cmd.startswith(prefix)]
+ if not matches:
+ print(f"{C.YELLOW}no command matches: {prefix}{C.RESET}")
+ return
+ print(f"{C.BLUE}commands: {', '.join(matches)}{C.RESET}")
+
+
+def _install_repl_completer(commands: list[str]) -> Callable[[], None] | None:
+ if readline is None:
+ return None
try:
- with httpx.Client(timeout=None) as client:
- if stream:
+ prev_completer = readline.get_completer()
+ prev_delims = readline.get_completer_delims()
+ prev_display_hook = getattr(readline, "get_completion_display_matches_hook", lambda: None)()
+ readline.parse_and_bind("tab: complete")
+ readline.parse_and_bind("set show-all-if-ambiguous on")
+ readline.parse_and_bind("set completion-ignore-case on")
+ readline.set_completer_delims(" \t\n")
+ matches: list[str] = []
+
+ def _complete(text: str, state: int) -> str | None:
+ nonlocal matches
+ if state == 0:
+ buffer = readline.get_line_buffer().lstrip()
+ if buffer.startswith("/"):
+ prefix = buffer.split()[0]
+ matches = [cmd for cmd in commands if cmd.startswith(prefix)]
+ else:
+ matches = []
+ if state < len(matches):
+ return matches[state]
+ return None
+
+ readline.set_completer(_complete)
+ if hasattr(readline, "set_completion_display_matches_hook"):
+ def _display_matches(substitution: str, display_matches: list[str], longest_match_length: int) -> None:
+ del substitution, longest_match_length
+ if not display_matches:
+ return
+ print()
+ print(f"{C.BLUE}commands: {', '.join(display_matches)}{C.RESET}")
+ try:
+ readline.redisplay()
+ except Exception:
+ pass
+ readline.set_completion_display_matches_hook(_display_matches)
+
+ def _cleanup() -> None:
+ try:
+ readline.set_completer(prev_completer)
+ readline.set_completer_delims(prev_delims)
+ if hasattr(readline, "set_completion_display_matches_hook"):
+ readline.set_completion_display_matches_hook(prev_display_hook)
+ except Exception:
+ pass
+
+ return _cleanup
+ except Exception:
+ return None
+
+
+class StreamAbortWatcher:
+ def __init__(self) -> None:
+ self.enabled = bool(sys.stdin.isatty() and termios is not None and tty is not None)
+ self._fd: int | None = None
+ self._old_attrs: Any = None
+ self._thread: threading.Thread | None = None
+ self._stop = threading.Event()
+ self._abort_reason: str | None = None
+
+ def __enter__(self) -> "StreamAbortWatcher":
+ if not self.enabled:
+ return self
+ try:
+ self._fd = sys.stdin.fileno()
+ self._old_attrs = termios.tcgetattr(self._fd)
+ tty.setcbreak(self._fd)
+ except Exception:
+ self.enabled = False
+ return self
+ self._thread = threading.Thread(target=self._watch, daemon=True)
+ self._thread.start()
+ return self
+
+ def _watch(self) -> None:
+ if self._fd is None:
+ return
+ while not self._stop.is_set():
+ try:
+ ready, _, _ = select.select([self._fd], [], [], 0.1)
+ except Exception:
+ return
+ if not ready:
+ continue
+ try:
+ ch = os.read(self._fd, 1)
+ except Exception:
+ continue
+ if not ch:
+ continue
+ if ch == b"\x1b":
+ self._abort_reason = "esc"
+ self._stop.set()
+ return
+
+ def aborted(self) -> bool:
+ return self._abort_reason is not None
+
+ def __exit__(self, exc_type, exc, tb) -> bool:
+ self._stop.set()
+ if self._thread:
+ self._thread.join(timeout=0.2)
+ if self.enabled and self._fd is not None and self._old_attrs is not None:
+ try:
+ termios.tcsetattr(self._fd, termios.TCSADRAIN, self._old_attrs)
+ except Exception:
+ pass
+ return False
+
+
+def _run_single_chat_request(
+ *,
+ client: httpx.Client,
+ url: str,
+ headers: dict[str, str],
+ payload: dict[str, Any],
+ args: argparse.Namespace,
+ stream: bool,
+) -> tuple[dict[str, Any], dict[str, Any] | None, bool]:
+ wait_anim = MushroomPulse("thinking")
+ wait_anim.start()
+ if stream:
+ content_parts: list[str] = []
+ reasoning_parts: list[str] = []
+ usage: dict[str, Any] | None = None
+ tool_calls_by_index: dict[int, dict[str, Any]] = {}
+ reasoning_stream_started = False
+ interrupted = False
+ first_event_seen = False
+
+ try:
+ with StreamAbortWatcher() as abort_watcher:
with client.stream("POST", url, headers=headers, json=payload) as resp:
resp.raise_for_status()
- spinner.stop(success=True)
- usage_printed = False
for raw in resp.iter_lines():
+ if abort_watcher.aborted():
+ interrupted = True
+ break
if not raw:
continue
line = raw.strip()
@@ -865,46 +1314,315 @@ async def cmd_chat(args, storage: StorageService) -> int:
evt = json.loads(data)
except Exception:
continue
+ if not first_event_seen:
+ wait_anim.stop()
+ first_event_seen = True
+
+ if isinstance(evt.get("usage"), dict):
+ usage = evt.get("usage")
choices = evt.get("choices")
- if isinstance(choices, list) and choices:
- c0 = choices[0]
- if isinstance(c0, dict):
- delta = c0.get("delta", {})
- if isinstance(delta, dict):
- txt = delta.get("content")
- if isinstance(txt, str) and txt:
- print(txt, end="", flush=True)
- reasoning = delta.get("reasoning")
- if args.reasoning and isinstance(reasoning, str) and reasoning:
- print(reasoning, end="", flush=True)
-
- usage = evt.get("usage")
- if isinstance(usage, dict) and not usage_printed:
- usage_printed = True
- print(f"\n{C.DIM}[usage] {usage}{C.RESET}", flush=True)
- print()
- else:
- resp = client.post(url, headers=headers, json=payload, timeout=120.0)
- resp.raise_for_status()
- spinner.stop(success=True)
- body = resp.json()
- if args.json:
- print(json.dumps(body, indent=2))
- else:
- content = ""
+ if not isinstance(choices, list) or not choices:
+ continue
+ c0 = choices[0]
+ if not isinstance(c0, dict):
+ continue
+ delta = c0.get("delta", {})
+ if not isinstance(delta, dict):
+ continue
+
+ reasoning_chunk = delta.get("reasoning")
+ if isinstance(reasoning_chunk, str) and reasoning_chunk:
+ reasoning_parts.append(reasoning_chunk)
+ if args.reasoning:
+ if not reasoning_stream_started:
+ print(f"{C.GRAY}thinking:{C.RESET}")
+ reasoning_stream_started = True
+ print(f"{C.GRAY}{reasoning_chunk}{C.RESET}", end="", flush=True)
+
+ content_chunk = delta.get("content")
+ if isinstance(content_chunk, str) and content_chunk:
+ content_parts.append(content_chunk)
+ if not args.reasoning:
+ print(content_chunk, end="", flush=True)
+
+ for tc in delta.get("tool_calls") or []:
+ if not isinstance(tc, dict):
+ continue
+ idx = tc.get("index")
+ if not isinstance(idx, int):
+ idx = len(tool_calls_by_index)
+ entry = tool_calls_by_index.setdefault(
+ idx,
+ {
+ "id": tc.get("id", ""),
+ "type": tc.get("type", "function"),
+ "function": {"name": "", "arguments": ""},
+ },
+ )
+ if tc.get("id"):
+ entry["id"] = tc["id"]
+ if tc.get("type"):
+ entry["type"] = tc["type"]
+ fn = tc.get("function") or {}
+ if isinstance(fn, dict):
+ if fn.get("name"):
+ entry["function"]["name"] += str(fn["name"])
+ if fn.get("arguments"):
+ entry["function"]["arguments"] += str(fn["arguments"])
+ except KeyboardInterrupt:
+ interrupted = True
+ finally:
+ wait_anim.stop()
+
+ msg: dict[str, Any] = {"role": "assistant", "content": "".join(content_parts).strip()}
+ reasoning_text = "".join(reasoning_parts).strip()
+ if reasoning_text:
+ msg["reasoning_content"] = reasoning_text
+ if tool_calls_by_index:
+ msg["tool_calls"] = [tool_calls_by_index[i] for i in sorted(tool_calls_by_index)]
+ if args.reasoning:
+ if reasoning_stream_started:
+ print()
+ response_text = str(msg.get("content") or "")
+ if response_text:
+ print()
+ print(response_text)
+ elif content_parts:
+ print()
+ if interrupted:
+ print(f"{C.YELLOW}response interrupted{C.RESET}")
+ return msg, usage, interrupted
+
+ try:
+ resp = client.post(url, headers=headers, json=payload, timeout=120.0)
+ resp.raise_for_status()
+ body = resp.json()
+ finally:
+ wait_anim.stop()
+ if args.json:
+ print(json.dumps(body, indent=2))
+
+ choices = body.get("choices", [])
+ c0 = choices[0] if isinstance(choices, list) and choices else {}
+ msg = c0.get("message", {}) if isinstance(c0, dict) else {}
+ if not isinstance(msg, dict):
+ msg = {}
+ out: dict[str, Any] = {"role": "assistant", "content": str(msg.get("content", "") or "")}
+ if isinstance(msg.get("reasoning"), str) and msg.get("reasoning"):
+ out["reasoning_content"] = msg["reasoning"]
+ if isinstance(msg.get("tool_calls"), list):
+ out["tool_calls"] = msg.get("tool_calls")
+
+ _print_reasoning_and_response(
+ str(out.get("reasoning_content") or ""),
+ str(out.get("content") or ""),
+ bool(args.reasoning),
+ )
+ return out, body.get("usage") if isinstance(body.get("usage"), dict) else None, False
+
+
+def _run_chat_turn(
+ *,
+ client: httpx.Client,
+ url: str,
+ headers: dict[str, str],
+ model: str,
+ args: argparse.Namespace,
+ stream: bool,
+ tools: list[dict[str, Any]] | None,
+ messages: list[dict[str, Any]],
+ user_text: str,
+) -> int:
+ messages.append({"role": "user", "content": user_text})
+
+ max_rounds = max(1, int(getattr(args, "max_tool_rounds", 8)))
+ for _ in range(max_rounds):
+ payload = _build_chat_payload(model=model, messages=messages, args=args, stream=stream, tools=tools)
+ assistant_msg, usage, interrupted = _run_single_chat_request(
+ client=client, url=url, headers=headers, payload=payload, args=args, stream=stream
+ )
+ messages.append(assistant_msg)
+ if isinstance(usage, dict):
+ print(f"{C.DIM}[usage] {usage}{C.RESET}")
+ if interrupted:
+ return 130
+
+ tool_calls = assistant_msg.get("tool_calls") if isinstance(assistant_msg, dict) else None
+ if not tools or not isinstance(tool_calls, list) or not tool_calls:
+ return 0
+
+ for tool_call in tool_calls:
+ if not isinstance(tool_call, dict):
+ continue
+ fn = tool_call.get("function") or {}
+ if not isinstance(fn, dict):
+ continue
+ name = str(fn.get("name") or "")
+ raw_args = str(fn.get("arguments") or "{}")
+ try:
+ parsed_args = json.loads(raw_args)
+ except json.JSONDecodeError:
+ parsed_args = {"_raw": raw_args}
+ print(f"{C.CYAN}{HAMMER} tool{C.RESET} {name}")
+ tool_result = _execute_default_tool(name, parsed_args)
+ messages.append(
+ {
+ "role": "tool",
+ "tool_call_id": tool_call.get("id", ""),
+ "content": json.dumps(tool_result, ensure_ascii=False),
+ }
+ )
+
+ warn("Reached max tool rounds without a final assistant response")
+ return 1
+
+
+async def cmd_chat(args, storage: StorageService) -> int:
+ prompt = args.prompt
+ if not prompt and args.prompt_words:
+ prompt = " ".join(args.prompt_words).strip()
+ if args.no_repl and not prompt:
+ error("Missing prompt")
+ print(f" {C.DIM}Usage: truffile chat --no-repl \"hello\"{C.RESET}")
+ return 1
+
+ device, ip = await _resolve_connected_device(storage)
+ if not device or not ip:
+ return 1
+
+ model = args.model
+ if not model:
+ spinner = Spinner("Resolving default model")
+ spinner.start()
+ model = await _default_model(ip)
+ if not model:
+ spinner.fail("Failed to resolve default model from IF2")
+ return 1
+ spinner.stop(success=True)
+
+ stream = not args.no_stream and not args.json
+ tools = _build_default_tools() if getattr(args, "default_tools", True) else None
+ messages: list[dict[str, Any]] = []
+ if args.system:
+ messages.append({"role": "system", "content": args.system})
+
+ url = f"http://{ip}/if2/v1/chat/completions"
+ headers = {"Content-Type": "application/json"}
+
+ try:
+ spinner = Spinner(f"Connecting to {device}")
+ spinner.start()
+ with httpx.Client(timeout=None) as client:
+ spinner.stop(success=True)
+
+ # Single-turn mode.
+ if args.no_repl:
+ return _run_chat_turn(
+ client=client,
+ url=url,
+ headers=headers,
+ model=model,
+ args=args,
+ stream=stream,
+ tools=tools,
+ messages=messages,
+ user_text=prompt or "",
+ )
+
+ # REPL mode (default).
+ print(f"{C.DIM}model: {model}{C.RESET}")
+ print(
+ f"{C.DIM}commands: /help, /history, /reset, /models, /exit{C.RESET}"
+ )
+
+ cleanup_repl = _install_repl_completer(REPL_COMMANDS)
+ try:
+ if prompt:
+ print(f"{C.CYAN}> {prompt}{C.RESET}")
+ rc = _run_chat_turn(
+ client=client,
+ url=url,
+ headers=headers,
+ model=model,
+ args=args,
+ stream=stream,
+ tools=tools,
+ messages=messages,
+ user_text=prompt,
+ )
+ if rc != 0:
+ return rc
+
+ while True:
try:
- choices = body.get("choices", [])
- if isinstance(choices, list) and choices:
- msg = choices[0].get("message", {})
- if isinstance(msg, dict):
- content = str(msg.get("content", ""))
- except Exception:
- content = ""
- print(content)
+ line = input(f"{C.CYAN}> {C.RESET}").strip()
+ except EOFError:
+ print()
+ return 0
+ except KeyboardInterrupt:
+ print()
+ continue
+
+ if not line:
+ continue
+ if line in {"/", "/help"}:
+ _print_repl_commands()
+ continue
+ if line in {"/exit", "/quit"}:
+ return 0
+ if line == "/history":
+ _print_history(messages)
+ continue
+ if line == "/reset":
+ messages = []
+ if args.system:
+ messages.append({"role": "system", "content": args.system})
+ print(f"{C.YELLOW}history reset{C.RESET}")
+ continue
+ if line == "/models":
+ try:
+ models = _fetch_models_payload(client, ip)
+ selected_model = _pick_model_interactive(models, model)
+ if selected_model and selected_model != model:
+ model = selected_model
+ print(f"{C.GREEN}{CHECK}{C.RESET} model switched: {model}")
+ except Exception as exc:
+ error(f"failed to list models: {exc}")
+ continue
+ if line.startswith("/"):
+ matches = [cmd for cmd in REPL_COMMANDS if cmd.startswith(line)]
+ if matches:
+ _print_repl_commands(line)
+ else:
+ warn(f"unknown command: {line}")
+ _print_repl_commands()
+ continue
+
+ rc = _run_chat_turn(
+ client=client,
+ url=url,
+ headers=headers,
+ model=model,
+ args=args,
+ stream=stream,
+ tools=tools,
+ messages=messages,
+ user_text=line,
+ )
+ if rc != 0:
+ if rc == 130:
+ continue
+ return rc
+ finally:
+ if cleanup_repl:
+ cleanup_repl()
return 0
except Exception as e:
- spinner.fail(f"Chat request failed: {e}")
+ try:
+ spinner.fail(f"Chat request failed: {e}") # type: ignore[name-defined]
+ except Exception:
+ error(f"Chat request failed: {e}")
return 1
@@ -1655,6 +2373,11 @@ def cmd_validate(args) -> int:
def print_help():
+ if sys.stdout.isatty():
+ intro = MushroomPulse("truffile", interval=0.08)
+ intro.start()
+ time.sleep(0.65)
+ intro.stop()
print(f"{MUSHROOM} {C.BOLD}truffile{C.RESET} - TruffleOS SDK")
print()
print(f"{C.BOLD}Usage:{C.RESET} truffile [options]")
@@ -1668,7 +2391,7 @@ def print_help():
print(f" {C.BLUE}delete{C.RESET} Delete installed apps from device")
print(f" {C.BLUE}list{C.RESET} List installed apps or devices")
print(f" {C.BLUE}models{C.RESET} List models on your Truffle")
- print(f" {C.BLUE}chat{C.RESET} [prompt] Chat with any model on your Truffle")
+ print(f" {C.BLUE}chat{C.RESET} [prompt] Chat on your Truffle (REPL by default)")
print(f" {C.BLUE}proxy{C.RESET} Run OpenAI-compatible proxy")
print()
print(f"{C.BOLD}Examples:{C.RESET}")
@@ -1680,7 +2403,9 @@ def print_help():
print(f" {C.DIM}truffile validate ./my-app{C.RESET}")
print(f" {C.DIM}truffile list apps{C.RESET}")
print(f" {C.DIM}truffile models{C.RESET} {C.DIM}# show models on your Truffle{C.RESET}")
- print(f" {C.DIM}truffile chat \"hello\"{C.RESET} {C.DIM}# run chat completion on your Truffle{C.RESET}")
+ print(f" {C.DIM}truffile chat{C.RESET} {C.DIM}# open interactive REPL chat{C.RESET}")
+ print(f" {C.DIM}truffile chat --no-repl \"hello\"{C.RESET} {C.DIM}# one-shot chat{C.RESET}")
+ print(f" {C.DIM}truffile chat --no-reasoning{C.RESET} {C.DIM}# disable reasoning output{C.RESET}")
print(f" {C.DIM}truffile proxy{C.RESET} {C.DIM}# run local /v1 proxy{C.RESET}")
print()
@@ -1726,12 +2451,34 @@ def main() -> int:
p_chat.add_argument("-p", "--prompt", help="Prompt text")
p_chat.add_argument("-m", "--model", help="Model id/uuid (default: first model from IF2 list)")
p_chat.add_argument("--system", help="System prompt")
- p_chat.add_argument("--reasoning", action="store_true", help="Enable reasoning mode")
+ p_chat.add_argument(
+ "--reasoning",
+ action=argparse.BooleanOptionalAction,
+ default=True,
+ help="Enable/disable reasoning output (default: enabled)",
+ )
p_chat.add_argument("--max-tokens", type=int, help="Max response tokens")
p_chat.add_argument("--temperature", type=float, help="Sampling temperature")
p_chat.add_argument("--top-p", type=float, help="Nucleus sampling top-p")
p_chat.add_argument("--no-stream", action="store_true", help="Disable streaming output")
p_chat.add_argument("--json", action="store_true", help="Print full JSON response (non-stream)")
+ p_chat.add_argument(
+ "--default-tools",
+ action=argparse.BooleanOptionalAction,
+ default=True,
+ help="Enable built-in web_search/web_fetch tools",
+ )
+ p_chat.add_argument(
+ "--max-tool-rounds",
+ type=int,
+ default=8,
+ help="Max assistant/tool loop rounds per user turn",
+ )
+ p_chat.add_argument(
+ "--no-repl",
+ action="store_true",
+ help="Run single-turn chat and exit",
+ )
p_proxy = subparsers.add_parser("proxy", add_help=False)
p_proxy.add_argument("--device", "-d", help="Device name (default: last connected)")