From c625783ab580a4e68b6268cba678bc12f97dce31 Mon Sep 17 00:00:00 2001 From: "mirrobot-agent[bot]" <2140342+mirrobot-agent@users.noreply.github.com> Date: Sun, 19 Apr 2026 18:34:32 +0000 Subject: [PATCH 1/2] feat(providers): add Freebuff as a provider Implements Freebuff (freebuff.com) as a custom provider with session/run lifecycle management, model-to-agent mapping, and multi-token rotation. Closes #157 Adds Freebuff provider following the architecture of the freebuff2api reference implementation (quorinex/freebuff2api), adapted to the proxy's provider interface pattern. Key components: - freebuff_auth_base.py: Session/run lifecycle, model-agent mapping, token pools - freebuff_provider.py: Custom completion handler with metadata injection - Registered in provider_factory.py and provider_config.py Supported models (from Codebuff free-agents): - z-ai/glm-5.1, minimax/minimax-m2.7 - google/gemini-2.5-flash-lite, google/gemini-3.1-flash-lite-preview --- .env.example | 8 + src/rotator_library/provider_config.py | 7 + src/rotator_library/provider_factory.py | 2 + .../providers/freebuff_auth_base.py | 562 ++++++++++++++++++ .../providers/freebuff_provider.py | 499 ++++++++++++++++ 5 files changed, 1078 insertions(+) create mode 100644 src/rotator_library/providers/freebuff_auth_base.py create mode 100644 src/rotator_library/providers/freebuff_provider.py diff --git a/.env.example b/.env.example index 72351421..6877fa9d 100644 --- a/.env.example +++ b/.env.example @@ -85,6 +85,14 @@ # Path to your iFlow credential file (e.g., ~/.iflow/oauth_creds.json). #IFLOW_OAUTH_1="" +# --- Freebuff --- +# Freebuff auth tokens (comma-separated for multi-token rotation). +# Obtain from ~/.config/manicode/credentials.json (authToken field) or https://freebuff.llm.pm +#FREEBUFF_API_KEY_1="" +#FREEBUFF_API_KEY_2="" +#FREEBUFF_API_BASE="https://codebuff.com" +#FREEBUFF_MODELS='["z-ai/glm-5.1","minimax/minimax-m2.7","google/gemini-2.5-flash-lite","google/gemini-3.1-flash-lite-preview"]' + # ------------------------------------------------------------------------------ # | [ADVANCED] Provider-Specific Settings | diff --git a/src/rotator_library/provider_config.py b/src/rotator_library/provider_config.py index 51d40043..a2fbbad6 100644 --- a/src/rotator_library/provider_config.py +++ b/src/rotator_library/provider_config.py @@ -106,6 +106,13 @@ "synthetic": { "category": "popular", }, + "freebuff": { + "category": "popular", + "note": "Free AI model hosting. Auth token from Freebuff CLI (~/.config/manicode/credentials.json).", + "extra_vars": [ + ("FREEBUFF_API_BASE", "API Base URL (optional)", "https://codebuff.com"), + ], + }, # ========================================================================= # CLOUD PLATFORMS - Aggregators & cloud inference platforms # ========================================================================= diff --git a/src/rotator_library/provider_factory.py b/src/rotator_library/provider_factory.py index dcc40bc9..622fcffd 100644 --- a/src/rotator_library/provider_factory.py +++ b/src/rotator_library/provider_factory.py @@ -7,12 +7,14 @@ from .providers.qwen_auth_base import QwenAuthBase from .providers.iflow_auth_base import IFlowAuthBase from .providers.antigravity_auth_base import AntigravityAuthBase +from .providers.freebuff_auth_base import FreebuffAuthBase PROVIDER_MAP = { "gemini_cli": GeminiAuthBase, "qwen_code": QwenAuthBase, "iflow": IFlowAuthBase, "antigravity": AntigravityAuthBase, + "freebuff": FreebuffAuthBase, } def get_provider_auth_class(provider_name: str): diff --git a/src/rotator_library/providers/freebuff_auth_base.py b/src/rotator_library/providers/freebuff_auth_base.py new file mode 100644 index 00000000..a1aab230 --- /dev/null +++ b/src/rotator_library/providers/freebuff_auth_base.py @@ -0,0 +1,562 @@ +# SPDX-License-Identifier: LGPL-3.0-only +# Copyright (c) 2026 Mirrowel + +""" +Freebuff Authentication Base + +Manages Freebuff session and run lifecycle including: +- Free session management (create, poll, refresh, end) +- Agent run lifecycle (start, finish, rotate) +- Model-to-agent mapping from Codebuff free-agents source +- Per-token state tracking (sessions, runs) +""" + +import asyncio +import json +import logging +import os +import random +import re +import string +import time +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional, Tuple + +import httpx + +lib_logger = logging.getLogger("rotator_library") +lib_logger.propagate = False +if not lib_logger.handlers: + lib_logger.addHandler(logging.NullHandler()) + +FREEBUFF_DEFAULT_BASE_URL = "https://codebuff.com" +FREEBUFF_USER_AGENT = "ai-sdk/openai-compatible/1.0.25/codebuff" +FREE_AGENTS_SOURCE_URL = ( + "https://raw.githubusercontent.com/CodebuffAI/codebuff/main/common/src/constants/free-agents.ts" +) +MODEL_REFRESH_INTERVAL = 6 * 3600 +SESSION_POLL_INTERVAL = 5.0 +SESSION_RETRY_DELAY = 10.0 +RUN_ROTATION_INTERVAL = 6 * 3600 +REQUEST_TIMEOUT = 900.0 + +HARDCODED_AGENT_MODELS: Dict[str, List[str]] = { + "base2-free": ["minimax/minimax-m2.7", "z-ai/glm-5.1"], + "file-picker": ["google/gemini-2.5-flash-lite"], + "file-picker-max": ["google/gemini-3.1-flash-lite-preview"], + "file-lister": ["google/gemini-3.1-flash-lite-preview"], + "researcher-web": ["google/gemini-3.1-flash-lite-preview"], + "researcher-docs": ["google/gemini-3.1-flash-lite-preview"], + "basher": ["google/gemini-3.1-flash-lite-preview"], + "editor-lite": ["minimax/minimax-m2.7", "z-ai/glm-5.1"], + "code-reviewer-lite": ["minimax/minimax-m2.7", "z-ai/glm-5.1"], +} + + +def _generate_client_session_id() -> str: + return "".join(random.choices(string.ascii_lowercase + string.digits, k=13)) + + +def _parse_optional_time(value: str) -> Optional[datetime]: + value = value.strip() + if not value: + return None + try: + return datetime.fromisoformat(value.replace("Z", "+00:00")) + except (ValueError, TypeError): + return None + + +class CachedSession: + __slots__ = ("status", "instance_id", "expires_at") + + def __init__( + self, + status: str, + instance_id: str = "", + expires_at: Optional[datetime] = None, + ): + self.status = status + self.instance_id = instance_id + self.expires_at = expires_at + + +class ManagedRun: + __slots__ = ("run_id", "agent_id", "started_at", "inflight", "request_count", "finishing") + + def __init__(self, run_id: str, agent_id: str): + self.run_id = run_id + self.agent_id = agent_id + self.started_at = time.monotonic() + self.inflight = 0 + self.request_count = 0 + self.finishing = False + + +class TokenPoolState: + def __init__(self, token: str, name: str): + self.token = token + self.name = name + self.session: Optional[CachedSession] = None + self.session_refresh_lock = asyncio.Lock() + self.runs: Dict[str, ManagedRun] = {} + self.draining: List[ManagedRun] = [] + self.cooldown_until: float = 0.0 + self.last_error: str = "" + + def is_cooling_down(self) -> bool: + return time.monotonic() < self.cooldown_until + + def ready_session_instance(self) -> Optional[str]: + if self.session is None: + return None + if self.session.status == "disabled": + return "" + if self.session.status == "active" and self.session.instance_id: + if self.session.expires_at is None or datetime.now(timezone.utc) < self.session.expires_at.replace( + tzinfo=timezone.utc + ) - __import__("datetime").timedelta(seconds=5): + return self.session.instance_id + return None + + +class FreebuffAuthBase: + """ + Authentication and session management base for Freebuff provider. + + Handles: + - Model-agent mapping (fetched from Codebuff repo with hardcoded fallback) + - Free session lifecycle per auth token + - Agent run lifecycle per auth token + - Token pool round-robin selection + """ + + def __init__(self): + self.base_url = os.getenv("FREEBUFF_API_BASE", FREEBUFF_DEFAULT_BASE_URL).rstrip("/") + self._agent_models: Dict[str, List[str]] = {} + self._model_to_agent: Dict[str, str] = {} + self._all_models: List[str] = [] + self._token_pools: Dict[str, TokenPoolState] = {} + self._next_pool_index = 0 + self._model_refresh_lock = asyncio.Lock() + self._last_model_refresh: float = 0.0 + self._initialized = False + self._load_model_mapping_fallback() + + async def initialize_credentials(self, credential_paths: List[str]) -> None: + """ + Initialize token pool states from credential paths. + + Credential paths are either file paths or env:// references. + The actual token value is stored in the pool state. + """ + if self._initialized: + return + self._initialized = True + + self._load_model_mapping_fallback() + + for i, cred_path in enumerate(credential_paths): + name = f"token-{i + 1}" + token = self._resolve_token(cred_path) + if token: + self._token_pools[cred_path] = TokenPoolState(token, name) + lib_logger.info(f"Freebuff: initialized token pool {name}") + + lib_logger.info( + f"Freebuff: initialized {len(self._token_pools)} token pools, " + f"{len(self._all_models)} models available" + ) + + def _resolve_token(self, credential_path: str) -> Optional[str]: + if credential_path.startswith("env://"): + return os.getenv(credential_path[6:]) + if os.path.isfile(credential_path): + try: + with open(credential_path) as f: + data = json.load(f) + if isinstance(data, dict): + default = data.get("default", data) + if isinstance(default, dict): + return default.get("authToken", default.get("token")) + return str(default) + return str(data) + except (json.JSONDecodeError, OSError) as e: + lib_logger.warning(f"Freebuff: failed to read credential file {credential_path}: {e}") + return None + return credential_path + + def get_available_models(self) -> List[str]: + return list(self._all_models) + + def get_agent_for_model(self, model: str) -> Optional[str]: + return self._model_to_agent.get(model) + + def _load_model_mapping_fallback(self) -> None: + model_to_agent: Dict[str, str] = {} + all_models: List[str] = [] + for agent_id, models in HARDCODED_AGENT_MODELS.items(): + for model in models: + if model not in model_to_agent: + model_to_agent[model] = agent_id + all_models.append(model) + all_models.sort() + self._agent_models = dict(HARDCODED_AGENT_MODELS) + self._model_to_agent = model_to_agent + self._all_models = all_models + + async def refresh_model_mapping(self, client: httpx.AsyncClient) -> None: + async with self._model_refresh_lock: + now = time.monotonic() + if now - self._last_model_refresh < MODEL_REFRESH_INTERVAL: + return + try: + resp = await client.get( + FREE_AGENTS_SOURCE_URL, + headers={"Accept": "text/plain"}, + timeout=30.0, + ) + resp.raise_for_status() + source = resp.text + parsed = self._parse_free_agents_source(source) + if parsed: + model_to_agent, all_models = self._build_model_mapping(parsed) + self._agent_models = parsed + self._model_to_agent = model_to_agent + self._all_models = all_models + self._last_model_refresh = now + lib_logger.info( + f"Freebuff: refreshed model mapping: {len(parsed)} agents, {len(all_models)} models" + ) + except Exception as e: + lib_logger.debug(f"Freebuff: model mapping refresh failed: {e}") + + def _parse_free_agents_source(self, source: str) -> Dict[str, List[str]]: + block_re = re.compile(r"'([^']+)':\s*new\s+Set\(\[([^\]]*)\]\)") + model_re = re.compile(r"'([^']+)'") + result: Dict[str, List[str]] = {} + for match in block_re.finditer(source): + agent_id = match.group(1) + models_str = match.group(2) + models = [m.strip() for m in model_re.findall(models_str) if m.strip()] + if models: + result[agent_id] = models + return result + + def _build_model_mapping( + self, agent_models: Dict[str, List[str]] + ) -> Tuple[Dict[str, str], List[str]]: + model_agents: Dict[str, List[str]] = {} + for agent_id, models in agent_models.items(): + for model in models: + model_agents.setdefault(model, []).append(agent_id) + model_to_agent = {m: random.choice(a) for m, a in model_agents.items()} + all_models = sorted(model_to_agent.keys()) + return model_to_agent, all_models + + def _get_pool(self, credential_path: str) -> Optional[TokenPoolState]: + return self._token_pools.get(credential_path) + + def _select_pool(self) -> Optional[Tuple[str, TokenPoolState]]: + pools = list(self._token_pools.items()) + if not pools: + return None + start = self._next_pool_index % len(pools) + ready = [] + not_ready = [] + for offset in range(len(pools)): + path, pool = pools[(start + offset) % len(pools)] + if pool.ready_session_instance() is not None: + ready.append((path, pool)) + else: + not_ready.append((path, pool)) + candidates = ready + not_ready + for path, pool in candidates: + if not pool.is_cooling_down(): + self._next_pool_index += 1 + return path, pool + return None + + async def ensure_session( + self, client: httpx.AsyncClient, pool: TokenPoolState + ) -> Optional[str]: + instance = pool.ready_session_instance() + if instance is not None: + return instance + + async with pool.session_refresh_lock: + instance = pool.ready_session_instance() + if instance is not None: + return instance + try: + session, instance_id = await self._refresh_session(client, pool) + pool.session = session + pool.last_error = "" + if session and session.status == "active" and session.expires_at: + asyncio.ensure_future( + self._watch_session_expiry(client, pool, session) + ) + return instance_id + except Exception as e: + pool.session = None + pool.last_error = str(e) + lib_logger.warning(f"Freebuff [{pool.name}]: session refresh failed: {e}") + return None + + async def _refresh_session( + self, client: httpx.AsyncClient, pool: TokenPoolState + ) -> Tuple[Optional[CachedSession], Optional[str]]: + state = await self._create_or_refresh_session(client, pool.token) + while True: + status = state.get("status", "").strip() + if status == "disabled": + return CachedSession("disabled"), "" + elif status == "active": + instance_id = state.get("instanceId", "").strip() + if not instance_id: + raise ValueError("active session missing instanceId") + expires_at = _parse_optional_time(state.get("expiresAt", "")) + return ( + CachedSession("active", instance_id, expires_at), + instance_id, + ) + elif status == "queued": + instance_id = state.get("instanceId", "").strip() + if not instance_id: + raise ValueError("queued session missing instanceId") + wait_ms = state.get("estimatedWaitMs", 0) + delay = max(1.0, min(wait_ms / 1000.0, SESSION_POLL_INTERVAL)) + await asyncio.sleep(delay) + state = await self._get_session(client, pool.token, instance_id) + else: + state = await self._create_or_refresh_session(client, pool.token) + + async def _create_or_refresh_session( + self, client: httpx.AsyncClient, token: str + ) -> Dict[str, Any]: + url = f"{self.base_url}/api/v1/freebuff/session" + resp = await client.post( + url, + json={}, + headers={ + "Authorization": f"Bearer {token}", + "Accept": "application/json", + "Content-Type": "application/json", + "User-Agent": FREEBUFF_USER_AGENT, + }, + timeout=30.0, + ) + if resp.status_code == 404: + return {"status": "disabled"} + resp.raise_for_status() + data = resp.json() + if not data.get("status", "").strip(): + raise ValueError("session response missing status") + return data + + async def _get_session( + self, client: httpx.AsyncClient, token: str, instance_id: str + ) -> Dict[str, Any]: + url = f"{self.base_url}/api/v1/freebuff/session" + resp = await client.get( + url, + headers={ + "Authorization": f"Bearer {token}", + "Accept": "application/json", + "User-Agent": FREEBUFF_USER_AGENT, + "x-freebuff-instance-id": instance_id, + }, + timeout=30.0, + ) + if resp.status_code == 404: + return {"status": "disabled"} + resp.raise_for_status() + return resp.json() + + async def _end_session( + self, client: httpx.AsyncClient, token: str + ) -> None: + url = f"{self.base_url}/api/v1/freebuff/session" + try: + await client.delete( + url, + headers={ + "Authorization": f"Bearer {token}", + "Accept": "application/json", + "User-Agent": FREEBUFF_USER_AGENT, + }, + timeout=15.0, + ) + except Exception as e: + lib_logger.debug(f"Freebuff: end session failed: {e}") + + async def _watch_session_expiry( + self, client: httpx.AsyncClient, pool: TokenPoolState, session: CachedSession + ) -> None: + if not session.expires_at: + return + now = datetime.now(timezone.utc) + expires = session.expires_at + if expires.tzinfo is None: + expires = expires.replace(tzinfo=timezone.utc) + delay = max(0, (expires - now).total_seconds() + 1) + await asyncio.sleep(delay) + if pool.session is session and session.status == "active": + pool.session = None + lib_logger.info(f"Freebuff [{pool.name}]: session expired, will refresh on next request") + + def invalidate_session(self, pool: TokenPoolState, reason: str = "") -> None: + pool.session = None + if reason: + pool.last_error = reason + + async def start_run( + self, client: httpx.AsyncClient, pool: TokenPoolState, agent_id: str + ) -> str: + url = f"{self.base_url}/api/v1/agent-runs" + payload = {"action": "START", "agentId": agent_id} + resp = await client.post( + url, + json=payload, + headers={ + "Authorization": f"Bearer {pool.token}", + "Content-Type": "application/json", + "Accept": "application/json", + "User-Agent": FREEBUFF_USER_AGENT, + }, + timeout=30.0, + ) + resp.raise_for_status() + data = resp.json() + run_id = data.get("runId", "").strip() + if not run_id: + raise ValueError(f"start run response missing runId: {data}") + old_run = pool.runs.get(agent_id) + run = ManagedRun(run_id, agent_id) + pool.runs[agent_id] = run + if old_run: + pool.draining.append(old_run) + asyncio.ensure_future(self._finish_draining_run(client, pool, old_run)) + lib_logger.debug(f"Freebuff [{pool.name}]: started run {run_id} for agent {agent_id}") + return run_id + + async def ensure_run( + self, client: httpx.AsyncClient, pool: TokenPoolState, agent_id: str + ) -> ManagedRun: + run = pool.runs.get(agent_id) + needs_rotate = run is None or (time.monotonic() - run.started_at) >= RUN_ROTATION_INTERVAL + if needs_rotate: + await self.start_run(client, pool, agent_id) + run = pool.runs.get(agent_id) + if run is None: + raise RuntimeError(f"run missing for agent {agent_id} after rotation") + return run + + async def finish_run( + self, client: httpx.AsyncClient, pool: TokenPoolState, run: ManagedRun + ) -> None: + if run.finishing: + return + run.finishing = True + url = f"{self.base_url}/api/v1/agent-runs" + payload = { + "action": "FINISH", + "runId": run.run_id, + "status": "completed", + "totalSteps": run.request_count, + "directCredits": 0, + "totalCredits": 0, + } + try: + await client.post( + url, + json=payload, + headers={ + "Authorization": f"Bearer {pool.token}", + "Content-Type": "application/json", + "Accept": "application/json", + "User-Agent": FREEBUFF_USER_AGENT, + }, + timeout=15.0, + ) + except Exception as e: + run.finishing = False + pool.last_error = str(e) + lib_logger.debug(f"Freebuff [{pool.name}]: finish run {run.run_id} failed: {e}") + return + pool.draining = [r for r in pool.draining if r is not run] + + def invalidate_run(self, pool: TokenPoolState, run: ManagedRun, reason: str = "") -> None: + if pool.runs.get(run.agent_id) is run: + del pool.runs[run.agent_id] + pool.draining = [r for r in pool.draining if r is not run] + if reason: + pool.last_error = reason + + async def _finish_draining_run( + self, client: httpx.AsyncClient, pool: TokenPoolState, run: ManagedRun + ) -> None: + if run.inflight > 0: + return + await self.finish_run(client, pool, run) + + def acquire_run(self, run: ManagedRun) -> None: + run.inflight += 1 + run.request_count += 1 + + def release_run(self, pool: TokenPoolState, run: ManagedRun) -> None: + if run.inflight > 0: + run.inflight -= 1 + if run.inflight == 0 and pool.runs.get(run.agent_id) is not run: + asyncio.ensure_future(self._finish_draining_run_run(pool, run)) + + async def _finish_draining_run_run(self, pool: TokenPoolState, run: ManagedRun) -> None: + async with httpx.AsyncClient(timeout=15.0) as client: + await self._finish_draining_run(client, pool, run) + + def is_session_invalid_error(self, status_code: int, error_body: str) -> bool: + if status_code < 400: + return False + session_errors = { + "freebuff_update_required", + "waiting_room_required", + "waiting_room_queued", + "session_superseded", + "session_expired", + } + try: + data = json.loads(error_body) + error = data.get("error", "") + return error in session_errors + except (json.JSONDecodeError, TypeError): + return False + + def is_run_invalid_error(self, status_code: int, error_body: str) -> bool: + if status_code != 400: + return False + msg = error_body.lower() + return "runid not found" in msg or "runid not running" in msg + + def get_auth_header(self, credential_identifier: str) -> Dict[str, str]: + pool = self._get_pool(credential_identifier) + token = pool.token if pool else credential_identifier + return {"Authorization": f"Bearer {token}"} + + async def prewarm( + self, client: httpx.AsyncClient, credential_paths: List[str] + ) -> None: + agent_ids = list(self._agent_models.keys()) + for path in credential_paths: + pool = self._get_pool(path) + if not pool: + continue + try: + await self.ensure_session(client, pool) + except Exception as e: + lib_logger.debug(f"Freebuff [{pool.name}]: session prewarm failed: {e}") + for agent_id in agent_ids: + try: + await self.start_run(client, pool, agent_id) + except Exception as e: + lib_logger.debug(f"Freebuff [{pool.name}]: run prewarm for {agent_id} failed: {e}") + lib_logger.info("Freebuff: prewarm complete") diff --git a/src/rotator_library/providers/freebuff_provider.py b/src/rotator_library/providers/freebuff_provider.py new file mode 100644 index 00000000..1bb64703 --- /dev/null +++ b/src/rotator_library/providers/freebuff_provider.py @@ -0,0 +1,499 @@ +# SPDX-License-Identifier: LGPL-3.0-only +# Copyright (c) 2026 Mirrowel + +""" +Freebuff Provider + +Provider for Freebuff (https://freebuff.com) - a free AI model hosting platform. +Implements custom request handling due to Freebuff's unique session/run lifecycle. + +Features: +- Free session management (create, poll for active, refresh) +- Agent run lifecycle (start, finish, rotate) +- Model-to-agent mapping (dynamic refresh from Codebuff source) +- Multi-token rotation with round-robin selection +- OpenAI-compatible streaming and non-streaming responses +- Automatic retry on session/run invalidation + +Environment Variables: +- FREEBUFF_API_BASE: Override base URL (default: https://codebuff.com) +- FREEBUFF_MODELS: Custom model list (JSON array or dict) +""" + +import asyncio +import copy +import json +import logging +import os +import re +import time +from typing import Any, AsyncGenerator, Dict, List, Optional, Union + +import httpx +import litellm +from litellm.exceptions import RateLimitError + +from ..model_definitions import ModelDefinitions +from ..timeout_config import TimeoutConfig +from ..transaction_logger import ProviderLogger +from .freebuff_auth_base import FreebuffAuthBase, _generate_client_session_id +from .provider_interface import ProviderInterface + +lib_logger = logging.getLogger("rotator_library") +lib_logger.propagate = False +if not lib_logger.handlers: + lib_logger.addHandler(logging.NullHandler()) + +SUPPORTED_PARAMS = { + "model", + "messages", + "temperature", + "top_p", + "max_tokens", + "stream", + "tools", + "tool_choice", + "presence_penalty", + "frequency_penalty", + "n", + "stop", + "seed", + "response_format", +} + + +class FreebuffProvider(FreebuffAuthBase, ProviderInterface): + """ + Freebuff provider with custom session/run management. + + Uses Freebuff's free-tier API which requires: + 1. An active free session (may involve queuing) + 2. An active agent run for the target model + 3. codebuff_metadata injection into every request + + Supports multi-token rotation for higher throughput. + """ + + skip_cost_calculation = True + provider_env_name = "freebuff" + + tier_priorities = { + "active": 1, + "queued": 2, + "no-session": 3, + } + default_tier_priority = 3 + + def __init__(self): + super().__init__() + self.model_definitions = ModelDefinitions() + + def has_custom_logic(self) -> bool: + return True + + async def get_models(self, credential: str, client: httpx.AsyncClient) -> List[str]: + models = [] + seen_ids: set = set() + + static_models = self.model_definitions.get_all_provider_models("freebuff") + if static_models: + for model in static_models: + model_id = model.split("/")[-1] if "/" in model else model + models.append(model) + seen_ids.add(model_id) + lib_logger.debug(f"Freebuff: loaded {len(static_models)} static models") + + await self.refresh_model_mapping(client) + + for model_id in self.get_available_models(): + if model_id not in seen_ids: + models.append(f"freebuff/{model_id}") + seen_ids.add(model_id) + + if not models: + for model_id in self.get_available_models(): + if model_id not in seen_ids: + models.append(f"freebuff/{model_id}") + seen_ids.add(model_id) + + return models + + def _clean_tool_schemas(self, tools: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + cleaned = [] + for tool in tools: + cleaned_tool = copy.deepcopy(tool) + if "function" in cleaned_tool: + func = cleaned_tool["function"] + func.pop("strict", None) + if "parameters" in func and isinstance(func["parameters"], dict): + params = func["parameters"] + params.pop("additionalProperties", None) + if "properties" in params: + self._clean_schema_properties(params["properties"]) + self._resolve_refs(params) + cleaned_tools.append(cleaned_tool) + return cleaned + + def _clean_schema_properties(self, properties: Dict[str, Any]) -> None: + for prop_name, prop_schema in properties.items(): + if isinstance(prop_schema, dict): + prop_schema.pop("strict", None) + prop_schema.pop("additionalProperties", None) + if "properties" in prop_schema: + self._clean_schema_properties(prop_schema["properties"]) + if "items" in prop_schema and isinstance(prop_schema["items"], dict): + self._clean_schema_properties({"item": prop_schema["items"]}) + + def _resolve_refs(self, schema: Dict[str, Any]) -> None: + defs = {} + for key in ("definitions", "$defs"): + if key in schema and isinstance(schema[key], dict): + defs.update(schema[key]) + if not defs: + return + + def _resolve(node: Any, depth: int = 0) -> Any: + if depth > 12 or not isinstance(node, dict): + return node + ref = node.get("$ref", "") + if ref and len(node) == 1: + if ref.startswith("#/definitions/"): + name = ref[len("#/definitions/"):] + elif ref.startswith("#/$defs/"): + name = ref[len("#/$defs/"):] + else: + return node + if name in defs: + return _resolve(copy.deepcopy(defs[name]), depth + 1) + return node + + for prop_schema in schema.get("properties", {}).values(): + if isinstance(prop_schema, dict): + resolved = _resolve(prop_schema) + if isinstance(resolved, dict): + prop_schema.update(resolved) + prop_schema.pop("$ref", None) + + schema.pop("definitions", None) + schema.pop("$defs", None) + + def _build_request_payload( + self, + model: str, + run_id: str, + session_instance_id: Optional[str], + **kwargs, + ) -> Dict[str, Any]: + payload = {k: v for k, v in kwargs.items() if k in SUPPORTED_PARAMS} + payload["model"] = model + payload["stream"] = True + + if payload.get("tools") and isinstance(payload["tools"], list) and payload["tools"]: + payload["tools"] = self._clean_tool_schemas(payload["tools"]) + + metadata = kwargs.get("codebuff_metadata", {}) + if not isinstance(metadata, dict): + metadata = {} + metadata["run_id"] = run_id + metadata["cost_mode"] = "free" + metadata["client_id"] = _generate_client_session_id() + if session_instance_id: + metadata["freebuff_instance_id"] = session_instance_id + payload["codebuff_metadata"] = metadata + + return payload + + def _convert_chunk_to_openai(self, chunk: Dict[str, Any], model_id: str): + if not isinstance(chunk, dict): + return + choices = chunk.get("choices", []) + usage_data = chunk.get("usage") + + if choices and usage_data: + yield { + "choices": choices, + "model": model_id, + "object": "chat.completion.chunk", + "id": chunk.get("id", f"chatcmpl-freebuff-{time.time()}"), + "created": chunk.get("created", int(time.time())), + } + yield { + "choices": [], + "model": model_id, + "object": "chat.completion.chunk", + "id": chunk.get("id", f"chatcmpl-freebuff-{time.time()}"), + "created": chunk.get("created", int(time.time())), + "usage": { + "prompt_tokens": usage_data.get("prompt_tokens", 0), + "completion_tokens": usage_data.get("completion_tokens", 0), + "total_tokens": usage_data.get("total_tokens", 0), + }, + } + return + + if usage_data: + yield { + "choices": [], + "model": model_id, + "object": "chat.completion.chunk", + "id": chunk.get("id", f"chatcmpl-freebuff-{time.time()}"), + "created": chunk.get("created", int(time.time())), + "usage": { + "prompt_tokens": usage_data.get("prompt_tokens", 0), + "completion_tokens": usage_data.get("completion_tokens", 0), + "total_tokens": usage_data.get("total_tokens", 0), + }, + } + return + + if choices: + yield { + "choices": choices, + "model": model_id, + "object": "chat.completion.chunk", + "id": chunk.get("id", f"chatcmpl-freebuff-{time.time()}"), + "created": chunk.get("created", int(time.time())), + } + + def _stream_to_completion_response( + self, chunks: List[litellm.ModelResponse] + ) -> litellm.ModelResponse: + if not chunks: + raise ValueError("No chunks provided for reassembly") + + final_message: Dict[str, Any] = {"role": "assistant"} + aggregated_tool_calls: Dict[int, Dict[str, Any]] = {} + usage_data = None + chunk_finish_reason = None + first_chunk = chunks[0] + + for chunk in chunks: + if not hasattr(chunk, "choices") or not chunk.choices: + continue + choice = chunk.choices[0] + delta = choice.get("delta", {}) if hasattr(choice, "get") else {} + + if "content" in delta and delta["content"] is not None: + final_message.setdefault("content", "") + final_message["content"] += delta["content"] + + if "reasoning_content" in delta and delta["reasoning_content"] is not None: + final_message.setdefault("reasoning_content", "") + final_message["reasoning_content"] += delta["reasoning_content"] + + if "tool_calls" in delta and delta["tool_calls"]: + for tc_chunk in delta["tool_calls"]: + index = tc_chunk.get("index", 0) + if index not in aggregated_tool_calls: + aggregated_tool_calls[index] = { + "type": "function", + "function": {"name": "", "arguments": ""}, + } + if "id" in tc_chunk: + aggregated_tool_calls[index]["id"] = tc_chunk["id"] + if "type" in tc_chunk: + aggregated_tool_calls[index]["type"] = tc_chunk["type"] + if "function" in tc_chunk: + fn = tc_chunk["function"] + if fn.get("name"): + aggregated_tool_calls[index]["function"]["name"] += fn["name"] + if fn.get("arguments"): + aggregated_tool_calls[index]["function"]["arguments"] += fn["arguments"] + + if "function_call" in delta and delta["function_call"] is not None: + if "function_call" not in final_message: + final_message["function_call"] = {"name": "", "arguments": ""} + if delta["function_call"].get("name"): + final_message["function_call"]["name"] += delta["function_call"]["name"] + if delta["function_call"].get("arguments"): + final_message["function_call"]["arguments"] += delta["function_call"]["arguments"] + + if hasattr(choice, "get") and choice.get("finish_reason"): + chunk_finish_reason = choice["finish_reason"] + + for chunk in reversed(chunks): + if hasattr(chunk, "usage") and chunk.usage: + usage_data = chunk.usage + break + + if aggregated_tool_calls: + final_message["tool_calls"] = list(aggregated_tool_calls.values()) + + for field in ("content", "tool_calls", "function_call"): + final_message.setdefault(field, None) + + if aggregated_tool_calls: + finish_reason = "tool_calls" + elif chunk_finish_reason: + finish_reason = chunk_finish_reason + else: + finish_reason = "stop" + + final_response_data = { + "id": first_chunk.id, + "object": "chat.completion", + "created": first_chunk.created, + "model": first_chunk.model, + "choices": [ + { + "index": 0, + "message": final_message, + "finish_reason": finish_reason, + } + ], + "usage": usage_data, + } + return litellm.ModelResponse(**final_response_data) + + async def acompletion( + self, client: httpx.AsyncClient, **kwargs + ) -> Union[litellm.ModelResponse, AsyncGenerator[litellm.ModelResponse, None]]: + credential_path = kwargs.pop("credential_identifier") + transaction_context = kwargs.pop("transaction_context", None) + model = kwargs["model"] + + file_logger = ProviderLogger(transaction_context) + + model_name = model.split("/")[-1] if "/" in model else model + agent_id = self.get_agent_for_model(model_name) + if not agent_id: + raise ValueError(f"Freebuff: unsupported model '{model_name}'") + + await self.refresh_model_mapping(client) + + async def make_request(): + pool = self._get_pool(credential_path) + if not pool: + raise ValueError(f"Freebuff: no token pool for credential {credential_path}") + + session_instance = await self.ensure_session(client, pool) + run = await self.ensure_run(client, pool, agent_id) + self.acquire_run(run) + + payload = self._build_request_payload( + model_name, + run.run_id, + session_instance, + **kwargs, + ) + + headers = { + "Authorization": f"Bearer {pool.token}", + "Content-Type": "application/json", + "Accept": "text/event-stream", + "User-Agent": "ai-sdk/openai-compatible/1.0.25/codebuff", + } + + url = f"{self.base_url}/api/v1/chat/completions" + file_logger.log_request(payload) + lib_logger.debug(f"Freebuff request: model={model_name}, agent={agent_id}, run={run.run_id}") + + return client.stream( + "POST", + url, + headers=headers, + json=payload, + timeout=TimeoutConfig.streaming(), + ), pool, run + + async def stream_handler(response_stream, pool, run, attempt=1): + try: + async with response_stream as response: + if response.status_code >= 400: + error_bytes = await response.aread() + error_text = error_bytes.decode("utf-8") if isinstance(error_bytes, bytes) else error_bytes + + if self.is_session_invalid_error(response.status_code, error_text): + lib_logger.info(f"Freebuff [{pool.name}]: session invalid, refreshing and retrying") + self.invalidate_session(pool, error_text) + if attempt < 2: + retry_stream, retry_pool, retry_run = await make_request() + async for chunk in stream_handler( + retry_stream, retry_pool, retry_run, attempt + 1 + ): + yield chunk + return + + if self.is_run_invalid_error(response.status_code, error_text): + lib_logger.info(f"Freebuff [{pool.name}]: run {run.run_id} invalid, rotating") + self.invalidate_run(pool, run, error_text) + if attempt < 2: + retry_stream, retry_pool, retry_run = await make_request() + async for chunk in stream_handler( + retry_stream, retry_pool, retry_run, attempt + 1 + ): + yield chunk + return + + if response.status_code == 401: + from datetime import timedelta + pool.cooldown_until = time.monotonic() + 1800 + self.invalidate_session(pool, "auth rejected") + self.release_run(pool, run) + raise RateLimitError( + f"Freebuff auth rejected: {error_text}", + llm_provider="freebuff", + model=model, + response=response, + ) + + if response.status_code == 429: + self.release_run(pool, run) + raise RateLimitError( + f"Freebuff rate limit: {error_text}", + llm_provider="freebuff", + model=model, + response=response, + ) + + self.release_run(pool, run) + raise httpx.HTTPStatusError( + f"HTTP {response.status_code}: {error_text}", + request=response.request, + response=response, + ) + + async for line in response.aiter_lines(): + file_logger.log_response_chunk(line) + if line.startswith("data:"): + data_str = line[6:] if line.startswith("data: ") else line[5:] + if data_str.strip() == "[DONE]": + break + try: + chunk = json.loads(data_str) + for openai_chunk in self._convert_chunk_to_openai(chunk, model): + yield litellm.ModelResponse(**openai_chunk) + except json.JSONDecodeError: + lib_logger.warning(f"Freebuff: could not decode JSON: {line}") + + self.release_run(pool, run) + + except httpx.HTTPStatusError: + raise + except RateLimitError: + raise + except Exception as e: + file_logger.log_error(f"Freebuff stream error: {e}") + lib_logger.error(f"Freebuff stream error: {e}", exc_info=True) + raise + + async def logging_stream_wrapper(): + openai_chunks = [] + try: + stream, pool, run = await make_request() + async for chunk in stream_handler(stream, pool, run): + openai_chunks.append(chunk) + yield chunk + finally: + if openai_chunks: + final_response = self._stream_to_completion_response(openai_chunks) + file_logger.log_final_response(final_response.dict()) + + if kwargs.get("stream"): + return logging_stream_wrapper() + else: + async def non_stream_wrapper(): + chunks = [chunk async for chunk in logging_stream_wrapper()] + return self._stream_to_completion_response(chunks) + + return await non_stream_wrapper() From 2b710a2fe2bd6e2c27f5eab30858f3e7dbd5e582 Mon Sep 17 00:00:00 2001 From: "mirrobot-agent[bot]" <2140342+mirrobot-agent@users.noreply.github.com> Date: Sun, 19 Apr 2026 19:07:14 +0000 Subject: [PATCH 2/2] fix(providers): address review findings in Freebuff provider Fixes all issues identified in PR #158 code review: P0: Fixed NameError in _clean_tool_schemas (cleaned_tools -> cleaned) P1: Added retry cap to unbounded while True loop in _refresh_session P2: Replaced __import__('datetime').timedelta with proper import P2: Removed unused imports (asyncio, os, re, timedelta) P2: Removed unused constants (SESSION_RETRY_DELAY, REQUEST_TIMEOUT) P2: Removed dead code in get_models fallback block P2: Renamed _finish_draining_run_run -> _finish_draining_run_background --- .../providers/freebuff_auth_base.py | 19 +++++++++++++------ .../providers/freebuff_provider.py | 12 +----------- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/src/rotator_library/providers/freebuff_auth_base.py b/src/rotator_library/providers/freebuff_auth_base.py index a1aab230..4e73165a 100644 --- a/src/rotator_library/providers/freebuff_auth_base.py +++ b/src/rotator_library/providers/freebuff_auth_base.py @@ -19,7 +19,7 @@ import re import string import time -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from typing import Any, Dict, List, Optional, Tuple import httpx @@ -36,9 +36,8 @@ ) MODEL_REFRESH_INTERVAL = 6 * 3600 SESSION_POLL_INTERVAL = 5.0 -SESSION_RETRY_DELAY = 10.0 RUN_ROTATION_INTERVAL = 6 * 3600 -REQUEST_TIMEOUT = 900.0 +SESSION_MAX_RETRIES = 20 HARDCODED_AGENT_MODELS: Dict[str, List[str]] = { "base2-free": ["minimax/minimax-m2.7", "z-ai/glm-5.1"], @@ -115,7 +114,7 @@ def ready_session_instance(self) -> Optional[str]: if self.session.status == "active" and self.session.instance_id: if self.session.expires_at is None or datetime.now(timezone.utc) < self.session.expires_at.replace( tzinfo=timezone.utc - ) - __import__("datetime").timedelta(seconds=5): + ) - timedelta(seconds=5): return self.session.instance_id return None @@ -307,6 +306,7 @@ async def _refresh_session( self, client: httpx.AsyncClient, pool: TokenPoolState ) -> Tuple[Optional[CachedSession], Optional[str]]: state = await self._create_or_refresh_session(client, pool.token) + retries = 0 while True: status = state.get("status", "").strip() if status == "disabled": @@ -329,6 +329,13 @@ async def _refresh_session( await asyncio.sleep(delay) state = await self._get_session(client, pool.token, instance_id) else: + retries += 1 + if retries >= SESSION_MAX_RETRIES: + raise RuntimeError( + f"Freebuff: session refresh exceeded max retries " + f"({SESSION_MAX_RETRIES}) with unexpected status: {status!r}" + ) + await asyncio.sleep(SESSION_POLL_INTERVAL) state = await self._create_or_refresh_session(client, pool.token) async def _create_or_refresh_session( @@ -508,9 +515,9 @@ def release_run(self, pool: TokenPoolState, run: ManagedRun) -> None: if run.inflight > 0: run.inflight -= 1 if run.inflight == 0 and pool.runs.get(run.agent_id) is not run: - asyncio.ensure_future(self._finish_draining_run_run(pool, run)) + asyncio.ensure_future(self._finish_draining_run_background(pool, run)) - async def _finish_draining_run_run(self, pool: TokenPoolState, run: ManagedRun) -> None: + async def _finish_draining_run_background(self, pool: TokenPoolState, run: ManagedRun) -> None: async with httpx.AsyncClient(timeout=15.0) as client: await self._finish_draining_run(client, pool, run) diff --git a/src/rotator_library/providers/freebuff_provider.py b/src/rotator_library/providers/freebuff_provider.py index 1bb64703..94ad0830 100644 --- a/src/rotator_library/providers/freebuff_provider.py +++ b/src/rotator_library/providers/freebuff_provider.py @@ -20,12 +20,9 @@ - FREEBUFF_MODELS: Custom model list (JSON array or dict) """ -import asyncio import copy import json import logging -import os -import re import time from typing import Any, AsyncGenerator, Dict, List, Optional, Union @@ -110,12 +107,6 @@ async def get_models(self, credential: str, client: httpx.AsyncClient) -> List[s models.append(f"freebuff/{model_id}") seen_ids.add(model_id) - if not models: - for model_id in self.get_available_models(): - if model_id not in seen_ids: - models.append(f"freebuff/{model_id}") - seen_ids.add(model_id) - return models def _clean_tool_schemas(self, tools: List[Dict[str, Any]]) -> List[Dict[str, Any]]: @@ -131,7 +122,7 @@ def _clean_tool_schemas(self, tools: List[Dict[str, Any]]) -> List[Dict[str, Any if "properties" in params: self._clean_schema_properties(params["properties"]) self._resolve_refs(params) - cleaned_tools.append(cleaned_tool) + cleaned.append(cleaned_tool) return cleaned def _clean_schema_properties(self, properties: Dict[str, Any]) -> None: @@ -426,7 +417,6 @@ async def stream_handler(response_stream, pool, run, attempt=1): return if response.status_code == 401: - from datetime import timedelta pool.cooldown_until = time.monotonic() + 1800 self.invalidate_session(pool, "auth rejected") self.release_run(pool, run)