Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion condor/acp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@
Heartbeat,
ACPEvent,
)
from .pydantic_ai_client import PydanticAIClient, is_pydantic_ai_model
from .openai_compatible import OpenAICompatibleClient, is_openai_compatible_agent
233 changes: 233 additions & 0 deletions condor/acp/openai_compatible.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
"""OpenAI-compatible chat client (LM Studio, local OpenAI API, etc.).

Implements the same surface as :class:`ACPClient` (``start``, ``stop``, ``alive``,
``prompt``, ``prompt_stream``) so handlers can swap backends. Tool calls and MCP
are not wired through this client — use Claude or Gemini for full MCP support.
"""

from __future__ import annotations

import asyncio
import json
import logging
import os
from typing import Any, AsyncIterator

import aiohttp

from .client import (
ACPEvent,
Heartbeat,
PromptDone,
PermissionCallback,
TextChunk,
ThoughtChunk,
)

log = logging.getLogger(__name__)

OPENAI_COMPATIBLE_AGENT_KEYS = frozenset({"lm-studio"})


def _env(name: str, default: str) -> str:
v = os.environ.get(name)
return v.strip() if v else default


def lm_studio_base_url() -> str:
"""Base URL including ``/v1`` (e.g. ``http://127.0.0.1:1234/v1``)."""
return _env("LM_STUDIO_BASE_URL", "http://127.0.0.1:1234/v1").rstrip("/")


def lm_studio_api_key() -> str:
return _env("LM_STUDIO_API_KEY", "lm-studio")


def lm_studio_model() -> str | None:
m = os.environ.get("LM_STUDIO_MODEL")
return m.strip() if m and m.strip() else None


def lm_studio_temperature() -> float:
try:
return float(os.environ.get("LM_STUDIO_TEMPERATURE", "0.7"))
except ValueError:
return 0.7


async def _fetch_default_model(base: str, api_key: str) -> str | None:
"""GET /v1/models and return the first model id."""
url = f"{base}/models"
headers = {"Authorization": f"Bearer {api_key}"}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status != 200:
log.warning("LM Studio models list HTTP %s", resp.status)
return None
data = await resp.json()
except Exception:
log.exception("Failed to list LM Studio models")
return None
models = data.get("data") or []
if not models:
return None
first = models[0]
mid = first.get("id") if isinstance(first, dict) else None
return mid


class OpenAICompatibleClient:
"""Chat Completions API with streaming; conversation state held in memory."""

def __init__(
self,
*,
base_url: str | None = None,
model: str | None = None,
api_key: str | None = None,
temperature: float | None = None,
working_dir: str | None = None,
permission_callback: PermissionCallback | None = None,
mcp_servers: list[dict[str, Any]] | None = None,
extra_env: dict[str, str] | None = None,
):
self._base = (base_url or lm_studio_base_url()).rstrip("/")
self._model = model if model is not None else lm_studio_model()
self._api_key = api_key if api_key is not None else lm_studio_api_key()
self._temperature = temperature if temperature is not None else lm_studio_temperature()
self.working_dir = working_dir # unused; parity with ACPClient
self.permission_callback = permission_callback
self.mcp_servers = mcp_servers or []
self.extra_env = extra_env or {}
self._messages: list[dict[str, Any]] = []
self._started = False
self._http: aiohttp.ClientSession | None = None

@property
def command(self) -> str:
return "openai-compatible"

async def start(self) -> None:
if self._started:
return
timeout = aiohttp.ClientTimeout(total=None, sock_connect=10, sock_read=300)
self._http = aiohttp.ClientSession(timeout=timeout)
if not self._model:
self._model = await _fetch_default_model(self._base, self._api_key)
if not self._model:
await self.stop()
raise RuntimeError(
"No LM Studio model configured. Set LM_STUDIO_MODEL or load a model in LM Studio, "
f"then ensure the server is running at {self._base}."
)
self._started = True
log.info("OpenAI-compatible session started (model=%s, base=%s)", self._model, self._base)

async def stop(self) -> None:
self._started = False
if self._http:
await self._http.close()
self._http = None

@property
def alive(self) -> bool:
return self._started and self._http is not None and not self._http.closed

def _headers(self) -> dict[str, str]:
return {
"Authorization": f"Bearer {self._api_key}",
"Content-Type": "application/json",
}

async def prompt(self, text: str) -> str:
chunks: list[str] = []
async for event in self.prompt_stream(text):
if isinstance(event, TextChunk):
chunks.append(event.text)
return "".join(chunks)

async def prompt_stream(self, text: str) -> AsyncIterator[ACPEvent]:
assert self._http is not None and self._model

self._messages.append({"role": "user", "content": text})
url = f"{self._base}/chat/completions"
body: dict[str, Any] = {
"model": self._model,
"messages": self._messages,
"stream": True,
"temperature": self._temperature,
}

loop = asyncio.get_event_loop()
start_time = loop.time()
max_duration = 1860.0

try:
async with self._http.post(url, headers=self._headers(), json=body) as resp:
if resp.status != 200:
err_text = await resp.text()
log.error("OpenAI-compatible chat/completions HTTP %s: %s", resp.status, err_text[:500])
yield PromptDone(stop_reason="error")
return

assistant_parts: list[str] = []
raw_buf = b""
done_seen = False

while not done_seen:
try:
chunk = await asyncio.wait_for(resp.content.read(65536), timeout=30.0)
except asyncio.TimeoutError:
elapsed = loop.time() - start_time
if not self.alive:
yield PromptDone(stop_reason="disconnected")
return
if elapsed > max_duration:
yield PromptDone(stop_reason="timeout")
return
yield Heartbeat(elapsed_seconds=elapsed)
continue

if not chunk:
break
raw_buf += chunk
while b"\n" in raw_buf:
line, raw_buf = raw_buf.split(b"\n", 1)
line = line.decode(errors="replace").strip()
if not line or line.startswith(":"):
continue
if line == "data: [DONE]":
done_seen = True
break
if not line.startswith("data:"):
continue
payload = line[5:].strip()
try:
data = json.loads(payload)
except json.JSONDecodeError:
continue
for choice in data.get("choices") or []:
delta = choice.get("delta") or {}
piece = delta.get("content")
if piece:
assistant_parts.append(piece)
yield TextChunk(text=piece)
for key in ("reasoning_content", "reasoning"):
r = delta.get(key)
if isinstance(r, str) and r:
yield ThoughtChunk(text=r)
if done_seen:
break

text_full = "".join(assistant_parts)
if text_full:
self._messages.append({"role": "assistant", "content": text_full})
yield PromptDone(stop_reason="end_turn")
except aiohttp.ClientError as e:
log.exception("OpenAI-compatible request failed: %s", e)
yield PromptDone(stop_reason="error")


def is_openai_compatible_agent(agent_key: str) -> bool:
return agent_key in OPENAI_COMPATIBLE_AGENT_KEYS
26 changes: 11 additions & 15 deletions condor/trading_agent/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
ToolCallEvent,
ToolCallUpdate,
)
from condor.acp.pydantic_ai_client import PydanticAIClient, is_pydantic_ai_model
from condor.acp.openai_compatible import OpenAICompatibleClient, is_openai_compatible_agent

from .journal import JournalManager, next_experiment_number, next_session_number
from .prompts import build_tick_prompt
Expand Down Expand Up @@ -282,7 +282,7 @@ async def _tick(self) -> None:
)
self._pending_directives.clear()

# 6. Create agent client (ACP for Claude/Gemini, PydanticAI for open-source models)
# 6. Create ACP session
from handlers.agents._shared import (
build_mcp_servers_for_agent,
build_mcp_servers_for_session,
Expand All @@ -305,20 +305,14 @@ async def _tick(self) -> None:
)
permission_cb = auto_approve_with_risk_check(self.risk, risk_state, execution_mode=mode)

# Session config overrides strategy default for agent_key
agent_key = self.config.get("agent_key") or self.strategy.agent_key
use_pydantic_ai = is_pydantic_ai_model(agent_key)

if use_pydantic_ai:
base_url = self.config.get("model_base_url") or None
acp_client = PydanticAIClient(
model=agent_key,
if is_openai_compatible_agent(self.strategy.agent_key):
acp_client = OpenAICompatibleClient(
working_dir=get_project_dir(),
mcp_servers=mcp_servers,
permission_callback=permission_cb,
base_url=base_url,
)
else:
agent_cmd = ACP_COMMANDS.get(agent_key, ACP_COMMANDS["claude-code"])
agent_cmd = ACP_COMMANDS.get(self.strategy.agent_key, ACP_COMMANDS["claude-code"])
acp_client = ACPClient(
command=agent_cmd,
working_dir=get_project_dir(),
Expand Down Expand Up @@ -389,7 +383,6 @@ async def _tick(self) -> None:
executors_data=executors_summary,
risk_state=risk_state.to_dict(),
duration=tick_duration,
agent_key=agent_key,
)
log.info(
"TickEngine %s experiment #%d complete (tools=%d, response=%d chars)",
Expand Down Expand Up @@ -437,7 +430,11 @@ async def _tick(self) -> None:
self.agent_id, tick_num, len(tool_calls), len(response_text),
)

async def _collect_stream(self, acp_client: ACPClient, prompt: str):
async def _collect_stream(
self,
acp_client: ACPClient | OpenAICompatibleClient,
prompt: str,
):
"""Wrapper to make prompt_stream compatible with wait_for."""
async for event in acp_client.prompt_stream(prompt):
yield event
Expand Down Expand Up @@ -517,7 +514,6 @@ def get_info(self) -> dict[str, Any]:
"total_amount_quote": self.config.get("total_amount_quote", 100),
"trading_context": self.config.get("trading_context", ""),
"risk_limits": risk_limits if isinstance(risk_limits, dict) else risk_limits.model_dump() if hasattr(risk_limits, "model_dump") else {},
"agent_key": self.config.get("agent_key") or self.strategy.agent_key,
"execution_mode": self.config.get("execution_mode", "loop"),
"max_ticks": self.config.get("max_ticks", 0),
"last_tick_at": self._last_tick_at,
Expand Down
2 changes: 1 addition & 1 deletion condor/trading_agent/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class Strategy:
id: str
name: str
description: str
agent_key: str # "claude-code", "gemini", or pydantic-ai model like "ollama:llama3.1"
agent_key: str # "claude-code", "gemini", or "lm-studio"
instructions: str # The strategy logic text for the LLM
skills: list[str] = field(default_factory=list) # Optional skill names
default_config: dict[str, Any] = field(default_factory=dict)
Expand Down
Loading