diff --git a/README.md b/README.md index aa550a3..5199025 100644 --- a/README.md +++ b/README.md @@ -1,61 +1,54 @@ # Gonka OpenAI Proxy -OpenAI-compatible API proxy for Gonka that provides ChatGPT-like interface with API key authentication. - -๐ **Try it here:** -๐ https://gonka-gateway.mingles.ai/ +OpenAI-compatible API proxy for Gonka that provides a ChatGPT-like interface with API key authentication. Self-hosted, no database required โ configured entirely via environment variables. ## Features -- **OpenAI-compatible API**: Compatible with OpenAI Python SDK and other OpenAI-compatible clients -- **API Key Authentication**: Secure access using API keys (like ChatGPT API) +- **OpenAI-compatible API**: Drop-in replacement for OpenAI Python SDK and other OpenAI-compatible clients +- **API Key Authentication**: Secure access using configurable API keys - **Streaming Support**: Supports both streaming and non-streaming responses +- **Tool Emulation**: Automatic prompt-based tool call emulation for models that don't support native tool calling +- **Circuit Breaker**: Prevents cascading failures when the Gonka backend is degraded +- **Retry with Backoff**: Automatic retry with exponential backoff on transient errors - **Web Interface**: Built-in web chat interface for testing -- **Automatic Model Loading**: Loads available models from Gonka API on startup - **Docker Support**: Ready-to-use Docker container -## Configuration +## Quick Start -Copy `.env.example` to `.env` and configure the following variables: +### Running Locally +1. Clone the repository: ```bash -# Gonka API Configuration -GONKA_PRIVATE_KEY=your_hex_private_key_here -GONKA_ADDRESS=your_gonka_address_bech32 -GONKA_ENDPOINT=https://host:port/v1 -GONKA_PROVIDER_ADDRESS=provider_gonka_address_bech32 - -# API Key for external access (like ChatGPT API) -API_KEY=sk-your-secret-api-key-here - -# Server Configuration (optional) -HOST=0.0.0.0 -PORT=8000 +git clone https://github.com/MinglesAI/gonka-proxy.git +cd gonka-proxy ``` -### Configuration Details - -#### GONKA_PROVIDER_ADDRESS - -**What is it?** `GONKA_PROVIDER_ADDRESS` is the provider (host) address in the Gonka network in bech32 format. It is used to sign requests to the Gonka API. - -**Where to get it?** - -1. **From provider documentation**: If you are using a specific Gonka provider, their address should be specified in their documentation or provider page. - -2. **From endpoint metadata**: The provider address is usually associated with the endpoint (`GONKA_ENDPOINT`). The provider should specify their Gonka address in the documentation or during registration. +2. Install dependencies: +```bash +pip install -r requirements.txt +``` -3. **Via Gonka Dashboard**: If you have access to the Gonka Dashboard, the provider address can be found in your connection information or node settings. +3. Create a `.env` file (see [Environment Variables](#environment-variables)): +```bash +cp .env.example .env +# Edit .env with your values +``` -4. **Contact the provider**: If you are using a public Gonka endpoint, contact the endpoint owner or Gonka support to get the provider address. +4. Run the server: +```bash +python -m app.main +``` -**Example**: The address usually looks like `gonka1...` (bech32 format), e.g., `gonka1abc123def456...` +Or with uvicorn directly: +```bash +uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload +``` -**Important**: The provider address is used in the cryptographic signature of each request, so it must be correct for successful authentication. +5. Open `http://localhost:8000/` in your browser to use the web chat interface. -## Running with Docker +### Running with Docker -1. Build the Docker image: +1. Build the image: ```bash docker build -t gonka-proxy . ``` @@ -65,36 +58,80 @@ docker build -t gonka-proxy . docker run -d \ --name gonka-proxy \ -p 8000:8000 \ - --env-file .env \ + -e GONKA_PRIVATE_KEY=your_hex_private_key \ + -e GONKA_ADDRESS=your_gonka_address_bech32 \ + -e GONKA_ENDPOINT=https://host:port/v1 \ + -e GONKA_PROVIDER_ADDRESS=provider_gonka_address_bech32 \ + -e API_KEY=sk-your-secret-api-key \ gonka-proxy ``` -## Running Locally - -1. Install dependencies: +Or with a `.env` file: ```bash -pip install -r requirements.txt +docker run -d \ + --name gonka-proxy \ + -p 8000:8000 \ + --env-file .env \ + gonka-proxy ``` -2. Set environment variables or create `.env` file +## Environment Variables -3. Run the server: -```bash -python -m app.main +| Variable | Required | Default | Description | +|---|---|---|---| +| `GONKA_PRIVATE_KEY` | โ | โ | Your ECDSA private key in hex format (with or without `0x` prefix) | +| `GONKA_ADDRESS` | โ | โ | Your Gonka address in bech32 format (e.g. `gonka1abc...`) | +| `GONKA_ENDPOINT` | โ | โ | Gonka API base URL (e.g. `https://host:port/v1`) | +| `GONKA_PROVIDER_ADDRESS` | โ | โ | Provider's Gonka address in bech32 format โ used for request signing | +| `API_KEY` | โ | โ | Secret key clients must send in the `Authorization` header | +| `HOST` | โ | `0.0.0.0` | Server bind address | +| `PORT` | โ | `8000` | Server port | +| `GONKA_STREAM_READ_TIMEOUT` | โ | `300.0` | Max seconds to wait for streaming data from backend | + +### Configuration Details + +#### GONKA_PRIVATE_KEY +Your ECDSA private key in hex format. Used to sign every request to the Gonka backend. +Example: `a1b2c3d4e5f6...` or `0xa1b2c3d4e5f6...` + +#### GONKA_ADDRESS +Your address in the Gonka network (bech32 format). Sent as the `X-Requester-Address` header. +Example: `gonka1qyqszqgpqyqszqgpqyqszqgp...` + +#### GONKA_ENDPOINT +The Gonka inference API endpoint. Must include the `/v1` path segment. +Example: `https://my-gonka-node.example.com/v1` + +#### GONKA_PROVIDER_ADDRESS +The **provider's** Gonka address (bech32 format). This is included in the cryptographic signature of every request and must match what the provider expects. Obtain this from your Gonka provider's documentation or contact page. +Example: `gonka1provideraddress...` + +#### API_KEY +The bearer token clients must include in requests to the proxy. +Example: `sk-my-secret-key-123` + +Clients send it as: +``` +Authorization: Bearer sk-my-secret-key-123 ``` -Or with uvicorn directly: +### Example `.env` file + ```bash -uvicorn app.main:app --host 0.0.0.0 --port 8000 +GONKA_PRIVATE_KEY=0xaabbccddeeff... +GONKA_ADDRESS=gonka1youraddress... +GONKA_ENDPOINT=https://my-gonka-node.example.com/v1 +GONKA_PROVIDER_ADDRESS=gonka1provideraddress... +API_KEY=sk-my-secret-api-key ``` ## Usage ### Web Interface -Access the web interface at `http://localhost:8000/` to test the API interactively. +Open `http://localhost:8000/` to access the built-in chat interface. -### Using OpenAI Python SDK +### OpenAI Python SDK ```python from openai import OpenAI @@ -114,20 +151,6 @@ response = client.chat.completions.create( print(response.choices[0].message.content) ``` -### Using curl - -```bash -curl http://localhost:8000/v1/chat/completions \ - -H "Content-Type: application/json" \ - -H "Authorization: Bearer sk-your-secret-key" \ - -d '{ - "model": "gonka-model", - "messages": [ - {"role": "user", "content": "Hello!"} - ] - }' -``` - ### Streaming ```python @@ -140,9 +163,7 @@ client = OpenAI( stream = client.chat.completions.create( model="gonka-model", - messages=[ - {"role": "user", "content": "Tell me a story"} - ], + messages=[{"role": "user", "content": "Tell me a story"}], stream=True ) @@ -151,27 +172,41 @@ for chunk in stream: print(chunk.choices[0].delta.content, end="") ``` +### curl + +```bash +curl http://localhost:8000/v1/chat/completions \ + -H "Content-Type: application/json" \ + -H "Authorization: Bearer sk-your-secret-key" \ + -d '{ + "model": "gonka-model", + "messages": [{"role": "user", "content": "Hello!"}] + }' +``` + ## API Endpoints -- `POST /v1/chat/completions` - Chat completions (OpenAI-compatible) -- `GET /v1/models` - List available models -- `GET /api/models` - Get available models (no auth, for web interface) -- `GET /health` - Health check endpoint (no auth required) -- `GET /` - Web chat interface +| Endpoint | Auth | Description | +|---|---|---| +| `POST /v1/chat/completions` | โ | Chat completions (OpenAI-compatible) | +| `GET /v1/models` | โ | List available models (OpenAI-compatible) | +| `GET /api/models` | โ | Get available models (for web interface) | +| `GET /health` | โ | Health check (includes circuit breaker state) | +| `GET /` | โ | Web chat interface | -## Authentication +## Architecture -All endpoints except `/health`, `/api/models`, and `/` require authentication using the `Authorization` header: +### Tool Emulation -``` -Authorization: Bearer sk-your-secret-key -``` +If the Gonka model doesn't support native tool calling (`tools` + `tool_choice`), the proxy automatically converts tool definitions into a system prompt and parses tool call JSON from the model's text response. This is transparent to the client โ it still receives standard OpenAI-format `tool_calls` in the response. -Or simply: +### Circuit Breaker -``` -Authorization: sk-your-secret-key -``` +Wraps non-streaming Gonka backend calls. After 5 consecutive failures, the circuit opens and requests are rejected immediately with a `503` error for 60 seconds, then transitions to half-open to test recovery. + +### Retry + +Non-streaming requests are retried up to 2 times with exponential backoff on transient errors. ## License diff --git a/app/circuit_breaker.py b/app/circuit_breaker.py new file mode 100644 index 0000000..3bdb261 --- /dev/null +++ b/app/circuit_breaker.py @@ -0,0 +1,181 @@ +""" +Circuit Breaker implementation for resilient backend communication +""" +import time +import logging +from enum import Enum +from typing import Callable, Any, Optional +from functools import wraps + +logger = logging.getLogger(__name__) + + +class CircuitState(Enum): + """Circuit breaker states""" + CLOSED = "closed" # Normal operation - requests pass through + OPEN = "open" # Failing - reject requests immediately + HALF_OPEN = "half_open" # Testing - allow limited requests to test recovery + + +class CircuitBreakerOpenError(Exception): + """Raised when circuit breaker is OPEN""" + pass + + +class CircuitBreaker: + """ + Circuit breaker pattern implementation + + Prevents cascading failures by stopping requests to failing services + and allowing them to recover. + + States: + - CLOSED: Normal operation, requests pass through + - OPEN: Service is failing, reject requests immediately + - HALF_OPEN: Testing if service recovered, allow limited requests + """ + + def __init__( + self, + name: str, + failure_threshold: int = 5, + recovery_timeout: float = 60.0, + success_threshold: int = 2, + expected_exception: type = Exception + ): + """ + Initialize circuit breaker + + Args: + name: Circuit breaker name (for logging) + failure_threshold: Number of failures before opening circuit + recovery_timeout: Seconds to wait before trying half-open + success_threshold: Number of successes in half-open to close circuit + expected_exception: Exception type that triggers failure + """ + self.name = name + self.failure_threshold = failure_threshold + self.recovery_timeout = recovery_timeout + self.success_threshold = success_threshold + self.expected_exception = expected_exception + + self.failure_count = 0 + self.success_count = 0 + self.last_failure_time: Optional[float] = None + self.state = CircuitState.CLOSED + + async def call(self, func: Callable, *args, **kwargs) -> Any: + """ + Execute function with circuit breaker protection + + Args: + func: Async function to execute + *args, **kwargs: Arguments for function + + Returns: + Function result + + Raises: + CircuitBreakerOpenError: If circuit is OPEN + Exception: Original exception from function + """ + # Check if we should transition from OPEN to HALF_OPEN + if self.state == CircuitState.OPEN: + if self.last_failure_time and \ + time.time() - self.last_failure_time > self.recovery_timeout: + logger.info(f"Circuit breaker {self.name}: OPEN -> HALF_OPEN (testing recovery)") + self.state = CircuitState.HALF_OPEN + self.success_count = 0 + else: + raise CircuitBreakerOpenError( + f"Circuit breaker {self.name} is OPEN. " + f"Last failure: {self.last_failure_time}" + ) + + # Execute function + try: + result = await func(*args, **kwargs) + self._on_success() + return result + except self.expected_exception as e: + self._on_failure() + raise + + def _on_success(self): + """Handle successful request""" + if self.state == CircuitState.HALF_OPEN: + self.success_count += 1 + if self.success_count >= self.success_threshold: + logger.info(f"Circuit breaker {self.name}: HALF_OPEN -> CLOSED (recovered)") + self.state = CircuitState.CLOSED + self.failure_count = 0 + self.success_count = 0 + elif self.state == CircuitState.CLOSED: + # Reset failure count on success (gradual recovery) + if self.failure_count > 0: + self.failure_count = max(0, self.failure_count - 1) + + def _on_failure(self): + """Handle failed request""" + self.failure_count += 1 + self.last_failure_time = time.time() + + if self.state == CircuitState.HALF_OPEN: + # Failure in half-open -> back to OPEN + logger.warning(f"Circuit breaker {self.name}: HALF_OPEN -> OPEN (still failing)") + self.state = CircuitState.OPEN + self.success_count = 0 + elif self.state == CircuitState.CLOSED: + if self.failure_count >= self.failure_threshold: + logger.warning( + f"Circuit breaker {self.name}: CLOSED -> OPEN " + f"({self.failure_count} failures >= {self.failure_threshold})" + ) + self.state = CircuitState.OPEN + + def get_state(self) -> dict: + """Get current circuit breaker state""" + return { + "name": self.name, + "state": self.state.value, + "failure_count": self.failure_count, + "last_failure_time": self.last_failure_time, + "failure_threshold": self.failure_threshold, + "recovery_timeout": self.recovery_timeout + } + + def reset(self): + """Manually reset circuit breaker to CLOSED state""" + logger.info(f"Circuit breaker {self.name}: Manual reset") + self.state = CircuitState.CLOSED + self.failure_count = 0 + self.success_count = 0 + self.last_failure_time = None + + +def circuit_breaker_decorator( + name: str, + failure_threshold: int = 5, + recovery_timeout: float = 60.0 +): + """ + Decorator for circuit breaker pattern + + Usage: + @circuit_breaker_decorator("gonka_api", failure_threshold=5) + async def call_gonka_api(): + ... + """ + breaker = CircuitBreaker( + name=name, + failure_threshold=failure_threshold, + recovery_timeout=recovery_timeout + ) + + def decorator(func: Callable): + @wraps(func) + async def wrapper(*args, **kwargs): + return await breaker.call(func, *args, **kwargs) + wrapper.breaker = breaker # Attach breaker for access + return wrapper + return decorator diff --git a/app/gonka_client.py b/app/gonka_client.py index 212b3e7..49c525c 100644 --- a/app/gonka_client.py +++ b/app/gonka_client.py @@ -11,34 +11,49 @@ logger = logging.getLogger(__name__) +def encode_with_low_s(r: int, s: int, order: int) -> bytes: + """Encode ECDSA signature with low-S normalization""" + # Normalize s to low-S + if s > order // 2: + s = order - s + + # Convert to bytes (32 bytes each for r and s) + r_bytes = r.to_bytes(32, 'big') + s_bytes = s.to_bytes(32, 'big') + + return r_bytes + s_bytes + + class GonkaClient: """Client for making signed requests to Gonka API""" - + def __init__( self, private_key: str, address: str, endpoint: str, provider_address: str, - timeout: float = 60.0 + timeout: float = 60.0, + stream_read_timeout: float = 300.0, ): self.private_key = private_key self.address = address self.endpoint = endpoint.rstrip('/') self.provider_address = provider_address self.timeout = timeout - + self.stream_read_timeout = stream_read_timeout + # Initialize hybrid timestamp tracking self._wall_base = time.time_ns() self._perf_base = time.perf_counter_ns() - - # HTTP client + + # HTTP client: default timeout for non-streaming; streaming uses per-request timeout self.client = httpx.AsyncClient(timeout=timeout) - + def _hybrid_timestamp_ns(self) -> int: """Generate hybrid timestamp (monotonic + aligned to wall clock)""" return self._wall_base + (time.perf_counter_ns() - self._perf_base) - + def _sign_payload( self, payload_bytes: bytes, @@ -48,41 +63,45 @@ def _sign_payload( """Sign payload using ECDSA with SHA-256""" # Remove 0x prefix if present pk = self.private_key[2:] if self.private_key.startswith('0x') else self.private_key - sk = SigningKey.from_string(bytes.fromhex(pk), curve=SECP256k1) - - # Message bytes: payload || timestamp || provider_address - msg = payload_bytes + str(timestamp_ns).encode('utf-8') + provider_address.encode('utf-8') - - # Deterministic ECDSA over SHA-256 with low-S normalization - sig = sk.sign_deterministic(msg, hashfunc=hashlib.sha256) - r, s = sig[:32], sig[32:] - - order = SECP256k1.order - s_int = int.from_bytes(s, 'big') - if s_int > order // 2: - s_int = order - s_int - s = s_int.to_bytes(32, 'big') - - return base64.b64encode(r + s).decode('utf-8') - + signing_key = SigningKey.from_string(bytes.fromhex(pk), curve=SECP256k1) + + # Phase 3: Sign hash of payload instead of raw payload + payload_hash = hashlib.sha256(payload_bytes).hexdigest() + + # Build signature input: hash + timestamp + transfer_address + signature_input = payload_hash + signature_input += str(timestamp_ns) + signature_input += provider_address + + signature_bytes = signature_input.encode('utf-8') + + # Sign the message with deterministic ECDSA using low-S normalization + signature = signing_key.sign_deterministic( + signature_bytes, + hashfunc=hashlib.sha256, + sigencode=lambda r, s, order: encode_with_low_s(r, s, order) + ) + + return base64.b64encode(signature).decode('utf-8') + def _prepare_request(self, payload: Optional[dict]) -> Tuple[bytes, dict]: """Prepare request data (payload bytes, headers with signature)""" if payload is None: payload = {} - + payload_bytes = json.dumps(payload).encode('utf-8') timestamp_ns = self._hybrid_timestamp_ns() signature = self._sign_payload(payload_bytes, timestamp_ns, self.provider_address) - + headers = { "Content-Type": "application/json", "Authorization": signature, "X-Requester-Address": self.address, "X-Timestamp": str(timestamp_ns), } - + return payload_bytes, headers - + async def get_models(self) -> list: """Get available models from Gonka API""" try: @@ -94,7 +113,7 @@ async def get_models(self) -> list: except Exception as e: logger.warning(f"Failed to load models from Gonka API: {e}") return [] - + async def request( self, method: str, @@ -104,15 +123,8 @@ async def request( """Make a signed request to Gonka API (non-streaming)""" url = f"{self.endpoint}{path}" payload_bytes, headers = self._prepare_request(payload) - - # Log request body before sending - try: - request_body = json.loads(payload_bytes.decode('utf-8')) - logger.info(f"Gonka API Request: {method} {url}") - logger.info(f"Request body: {json.dumps(request_body, indent=2, ensure_ascii=False)}") - except Exception as e: - logger.warning(f"Failed to log request body: {e}") - + + logger.info(f"Gonka API Request: {method} {path}") try: response = await self.client.request( method, @@ -123,18 +135,12 @@ async def request( response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: - # Log error response - try: - error_body = e.response.text - logger.error(f"Gonka API Error Response: {e.response.status_code}") - logger.error(f"Error response body: {error_body}") - except Exception: - logger.error(f"Gonka API Error Response: {e.response.status_code} (failed to read body)") + logger.error(f"Gonka API Error Response: {e.response.status_code}") raise except Exception as e: logger.error(f"Gonka API Request failed: {type(e).__name__}: {str(e)}") raise - + async def request_stream( self, method: str, @@ -144,49 +150,71 @@ async def request_stream( """Make a signed streaming request to Gonka API""" url = f"{self.endpoint}{path}" payload_bytes, headers = self._prepare_request(payload) - - # Log request body before sending - try: - request_body = json.loads(payload_bytes.decode('utf-8')) - logger.info(f"Gonka API Stream Request: {method} {url}") - logger.info(f"Request body: {json.dumps(request_body, indent=2, ensure_ascii=False)}") - except Exception as e: - logger.warning(f"Failed to log request body: {e}") - + + logger.info(f"Gonka API Stream Request: {method} {path}") try: + # Use longer read timeout for streaming so long generations don't get cut off + stream_timeout = httpx.Timeout(self.timeout, read=self.stream_read_timeout) async with self.client.stream( method, url, headers=headers, - content=payload_bytes + content=payload_bytes, + timeout=stream_timeout, ) as response: if response.status_code >= 400: - # Read error response body try: error_body = await response.aread() error_text = error_body.decode('utf-8', errors='replace') logger.error(f"Gonka API Stream Error Response: {response.status_code}") logger.error(f"Error response body: {error_text}") except Exception as read_err: - logger.error(f"Gonka API Stream Error Response: {response.status_code} (failed to read body: {read_err})") + logger.error( + f"Gonka API Stream Error Response: {response.status_code} " + f"(failed to read body: {read_err})" + ) response.raise_for_status() - - async for chunk in response.aiter_bytes(): - yield chunk + + total_bytes = 0 + chunk_count = 0 + completed_normally = False + try: + async for chunk in response.aiter_bytes(): + total_bytes += len(chunk) + chunk_count += 1 + yield chunk + completed_normally = True + logger.info( + f"Gonka API Stream completed: {method} {path} " + f"(chunks={chunk_count}, bytes={total_bytes})" + ) + finally: + if not completed_normally: + logger.info( + f"Gonka API Stream ended without full completion: {method} {path} " + f"(chunks={chunk_count}, bytes={total_bytes}) โ client disconnect or stream closed" + ) except httpx.HTTPStatusError as e: - # Log error response (fallback for non-stream errors) - try: - error_body = e.response.text - logger.error(f"Gonka API Stream Error Response: {e.response.status_code}") - logger.error(f"Error response body: {error_body}") - except Exception: - logger.error(f"Gonka API Stream Error Response: {e.response.status_code} (failed to read body)") + logger.error(f"Gonka API Stream Error Response: {e.response.status_code}") + raise + except httpx.ReadTimeout: + logger.error( + "Gonka API Stream read timeout: backend did not send data within " + "stream_read_timeout; stream ended abruptly" + ) + raise + except httpx.ConnectError as e: + logger.error(f"Gonka API Stream connection error: {e}") + raise + except httpx.WriteTimeout: + logger.error("Gonka API Stream write timeout: request body send timed out") raise except Exception as e: - logger.error(f"Gonka API Stream Request failed: {type(e).__name__}: {str(e)}") + logger.error( + f"Gonka API Stream failed unexpectedly: {type(e).__name__}: {str(e)}" + ) raise - + async def close(self): """Close the HTTP client""" await self.client.aclose() - diff --git a/app/main.py b/app/main.py index 4a55df2..0070d0c 100644 --- a/app/main.py +++ b/app/main.py @@ -10,6 +10,13 @@ from app.gonka_client import GonkaClient from app.auth import verify_api_key +from app.tool_emulation import ( + emulate_tool_choice_auto, + process_response_with_tool_emulation, + process_stream_with_tool_emulation, +) +from app.circuit_breaker import CircuitBreaker, CircuitBreakerOpenError +from app.retry import retry_with_backoff, BACKEND_RETRY_CONFIG # Configure logging @@ -28,14 +35,17 @@ class Settings(BaseSettings): gonka_address: str = "" gonka_endpoint: str = "" gonka_provider_address: str = "" - + # API Key for external access api_key: str = "" - + # Server settings host: str = "0.0.0.0" port: int = 8000 - + + # Streaming read timeout (seconds); increase for slow/long generations + gonka_stream_read_timeout: float = 300.0 + class Config: env_file = ".env" case_sensitive = False @@ -48,16 +58,21 @@ class Config: gonka_client: Optional[GonkaClient] = None available_models: List[Dict] = [] +# Circuit breaker for Gonka backend +gonka_circuit_breaker = CircuitBreaker( + name="gonka_backend", + failure_threshold=5, + recovery_timeout=60.0, + success_threshold=2, +) + @asynccontextmanager async def lifespan(app: FastAPI): """Lifespan context manager for startup and shutdown events""" - # Startup global gonka_client, available_models - - # Initialize client and load models + try: - # Check if configuration is complete before loading models client = _create_gonka_client() if client: models = await client.get_models() @@ -69,10 +84,9 @@ async def lifespan(app: FastAPI): except Exception as e: logger.error(f"Failed to load models at startup: {e}") available_models = [] - + yield - - # Shutdown + if gonka_client: await gonka_client.close() @@ -92,7 +106,8 @@ def _create_gonka_client() -> Optional[GonkaClient]: private_key=settings.gonka_private_key, address=settings.gonka_address, endpoint=settings.gonka_endpoint, - provider_address=settings.gonka_provider_address + provider_address=settings.gonka_provider_address, + stream_read_timeout=settings.gonka_stream_read_timeout, ) return gonka_client @@ -109,8 +124,8 @@ def get_gonka_client() -> GonkaClient: if not settings.gonka_endpoint: missing.append("GONKA_ENDPOINT") if not settings.gonka_provider_address: - missing.append("GONKA_PROVIDER_ADDRESS (provider address in bech32 format, get it from the Gonka provider)") - + missing.append("GONKA_PROVIDER_ADDRESS") + raise HTTPException( status_code=500, detail=f"Gonka configuration incomplete. Missing: {', '.join(missing)}. " @@ -142,19 +157,17 @@ def get_gonka_client() -> GonkaClient: async def list_models(request: Request, api_key_valid: bool = Depends(verify_api_key)): """List available models (OpenAI-compatible endpoint)""" global available_models - - # Convert Gonka models format to OpenAI format + models_data = [] for model in available_models: model_id = model.get("id", "unknown") models_data.append({ "id": model_id, "object": "model", - "created": 1677610602, # Default timestamp + "created": 1677610602, "owned_by": "gonka" }) - - # If no models loaded, return default + if not models_data: models_data = [{ "id": "gonka-model", @@ -162,21 +175,19 @@ async def list_models(request: Request, api_key_valid: bool = Depends(verify_api "created": 1677610602, "owned_by": "gonka" }] - + return { "object": "list", "data": models_data } + # Models endpoint without auth (for web interface) @app.get("/api/models") async def get_models_no_auth(): """Get available models without authentication (for web interface)""" global available_models - - return { - "models": available_models - } + return {"models": available_models} # Chat completions endpoint @@ -187,34 +198,47 @@ async def chat_completions( ): """Chat completions endpoint (OpenAI-compatible)""" client = get_gonka_client() - + try: body = await request.json() - # Log incoming request body logger.info("Incoming chat completions request") - logger.info(f"Request body: {json.dumps(body, indent=2, ensure_ascii=False)}") except Exception as e: logger.error(f"Failed to parse request JSON: {e}") raise HTTPException(status_code=400, detail=f"Invalid JSON: {str(e)}") - + stream = body.get("stream", False) - + original_tools = body.get("tools") + + # Apply tool emulation if model doesn't support native tool calling + # (emulate_tool_choice_auto is a no-op when no tools are present) + emulated_body = emulate_tool_choice_auto(body) + tools_were_emulated = original_tools and "tools" not in emulated_body + try: if stream: - # Streaming response - proxy SSE from Gonka async def generate(): try: - async for chunk in client.request_stream( + raw_stream = client.request_stream( method="POST", path="/chat/completions", - payload=body - ): - # Yield chunk as-is (Gonka should return SSE format) - yield chunk + payload=emulated_body, + ) + if tools_were_emulated: + async for chunk in process_stream_with_tool_emulation( + raw_stream, original_tools + ): + yield chunk + else: + async for chunk in raw_stream: + yield chunk + except CircuitBreakerOpenError as e: + logger.error(f"Circuit breaker open: {e}") + error_payload = json.dumps({"error": {"message": str(e), "type": "service_unavailable"}}) + yield f"data: {error_payload}\n\ndata: [DONE]\n\n".encode() except Exception as e: logger.error(f"Streaming error: {type(e).__name__}: {str(e)}") raise - + return StreamingResponse( generate(), media_type="text/event-stream", @@ -225,13 +249,31 @@ async def generate(): } ) else: - # Non-streaming response - response = await client.request( - method="POST", - path="/chat/completions", - payload=body + # Non-streaming: wrap with circuit breaker + retry + async def do_request(): + return await gonka_circuit_breaker.call( + client.request, + method="POST", + path="/chat/completions", + payload=emulated_body, + ) + + response = await retry_with_backoff( + do_request, + max_retries=BACKEND_RETRY_CONFIG.max_retries, + initial_delay=BACKEND_RETRY_CONFIG.initial_delay, + max_delay=BACKEND_RETRY_CONFIG.max_delay, + exceptions=BACKEND_RETRY_CONFIG.exceptions, ) + + if tools_were_emulated: + response = process_response_with_tool_emulation(response, original_tools) + return response + + except CircuitBreakerOpenError as e: + logger.error(f"Circuit breaker open: {e}") + raise HTTPException(status_code=503, detail=f"Backend temporarily unavailable: {str(e)}") except Exception as e: logger.error(f"Chat completions error: {type(e).__name__}: {str(e)}") raise HTTPException( @@ -246,11 +288,16 @@ async def web_interface(): """Serve web chat interface""" return FileResponse("app/static/index.html") + # Health check endpoint (no auth required) @app.get("/health") async def health(): """Health check endpoint""" - return {"status": "ok"} + return { + "status": "ok", + "circuit_breaker": gonka_circuit_breaker.get_state(), + } + # Serve static files (must be last) app.mount("/static", StaticFiles(directory="app/static"), name="static") @@ -264,4 +311,3 @@ async def health(): port=settings.port, reload=False ) - diff --git a/app/retry.py b/app/retry.py new file mode 100644 index 0000000..ed5934a --- /dev/null +++ b/app/retry.py @@ -0,0 +1,173 @@ +""" +Retry utilities with exponential backoff +""" +import asyncio +import random +import logging +from typing import Callable, TypeVar, Optional, Tuple, Any + +logger = logging.getLogger(__name__) + +T = TypeVar('T') + + +async def retry_with_backoff( + func: Callable[[], T], + max_retries: int = 3, + initial_delay: float = 1.0, + max_delay: float = 60.0, + exponential_base: float = 2.0, + jitter: bool = True, + exceptions: Tuple[type, ...] = (Exception,), + on_retry: Optional[Callable[[int, Exception], None]] = None +) -> T: + """ + Retry function with exponential backoff + + Args: + func: Async function to retry (no arguments) + max_retries: Maximum number of retry attempts + initial_delay: Initial delay in seconds + max_delay: Maximum delay in seconds + exponential_base: Base for exponential backoff + jitter: Add random jitter to prevent thundering herd + exceptions: Tuple of exceptions to catch and retry + on_retry: Optional callback called on each retry (attempt, exception) + + Returns: + Function result + + Raises: + Last exception if all retries fail + """ + delay = initial_delay + last_exception = None + + for attempt in range(max_retries + 1): + try: + return await func() + except exceptions as e: + last_exception = e + + if attempt == max_retries: + logger.error( + f"Retry exhausted after {max_retries} attempts. " + f"Last error: {type(e).__name__}: {e}" + ) + raise + + # Calculate delay with exponential backoff + if jitter: + # Add random jitter: delay * base + random(0, 1) + delay = min( + delay * exponential_base + random.uniform(0, 1), + max_delay + ) + else: + delay = min(delay * exponential_base, max_delay) + + logger.warning( + f"Attempt {attempt + 1}/{max_retries} failed: {type(e).__name__}: {e}. " + f"Retrying in {delay:.2f}s" + ) + + if on_retry: + try: + on_retry(attempt + 1, e) + except Exception as callback_error: + logger.warning(f"Retry callback error: {callback_error}") + + await asyncio.sleep(delay) + + # Should never reach here, but just in case + if last_exception: + raise last_exception + raise RuntimeError("Retry failed without exception") + + +async def retry_with_backoff_args( + func: Callable[..., T], + *args, + max_retries: int = 3, + initial_delay: float = 1.0, + max_delay: float = 60.0, + exponential_base: float = 2.0, + jitter: bool = True, + exceptions: Tuple[type, ...] = (Exception,), + **kwargs +) -> T: + """ + Retry function with exponential backoff (supports arguments) + + Args: + func: Async function to retry + *args: Positional arguments for function + max_retries: Maximum number of retry attempts + initial_delay: Initial delay in seconds + max_delay: Maximum delay in seconds + exponential_base: Base for exponential backoff + jitter: Add random jitter + exceptions: Tuple of exceptions to catch and retry + **kwargs: Keyword arguments for function + + Returns: + Function result + + Raises: + Last exception if all retries fail + """ + async def wrapper(): + return await func(*args, **kwargs) + + return await retry_with_backoff( + wrapper, + max_retries=max_retries, + initial_delay=initial_delay, + max_delay=max_delay, + exponential_base=exponential_base, + jitter=jitter, + exceptions=exceptions + ) + + +class RetryConfig: + """Configuration for retry behavior""" + + def __init__( + self, + max_retries: int = 3, + initial_delay: float = 1.0, + max_delay: float = 60.0, + exponential_base: float = 2.0, + jitter: bool = True, + exceptions: Tuple[type, ...] = (Exception,) + ): + self.max_retries = max_retries + self.initial_delay = initial_delay + self.max_delay = max_delay + self.exponential_base = exponential_base + self.jitter = jitter + self.exceptions = exceptions + + +# Predefined retry configurations +HTTP_RETRY_CONFIG = RetryConfig( + max_retries=3, + initial_delay=1.0, + max_delay=30.0, + exceptions=(Exception,) # Catch all for HTTP errors +) + +DATABASE_RETRY_CONFIG = RetryConfig( + max_retries=3, + initial_delay=0.5, + max_delay=10.0, + exceptions=(Exception,) +) + +BACKEND_RETRY_CONFIG = RetryConfig( + max_retries=2, # Fewer retries for backend calls (circuit breaker handles failures) + initial_delay=1.0, + max_delay=20.0, + exceptions=(Exception,) +) diff --git a/app/static/index.html b/app/static/index.html index 78cc77f..f7c7c9d 100644 --- a/app/static/index.html +++ b/app/static/index.html @@ -3,7 +3,15 @@
-OpenAI-compatible API proxy testing
+OpenAI-compatible LLM proxy
+Start a conversation by sending a message
-Complete guide to using Gonka AI Gateway API
+ +Gonka AI Gateway provides an OpenAI-compatible API. Configure the API_KEY environment variable on the proxy server; clients authenticate using that key.
All API requests require the API key in the Authorization header:
+Authorization: Bearer sk-your-api-key-here
+ The gateway is served over HTTPS (proxied via Traefik). All API requests should be made to:
+http://localhost:8000/v1
+ Create a chat completion (OpenAI-compatible)
+POST /chat/completions
+Content-Type: application/json
+Authorization: Bearer sk-your-api-key
+{
+ "model": "Qwen/Qwen3-235B-A22B-Instruct-2507-FP8",
+ "messages": [
+ {"role": "user", "content": "Hello!"}
+ ],
+ "stream": false
+}
+ {
+ "id": "chatcmpl-123",
+ "object": "chat.completion",
+ "created": 1677652288,
+ "choices": [{
+ "index": 0,
+ "message": {
+ "role": "assistant",
+ "content": "Hello! How can I help you?"
+ },
+ "finish_reason": "stop"
+ }],
+ "usage": {
+ "prompt_tokens": 9,
+ "completion_tokens": 12,
+ "total_tokens": 21
+ }
+}
+ To enable streaming, set "stream": true in the request:
{
+ "model": "Qwen/Qwen3-235B-A22B-Instruct-2507-FP8",
+ "messages": [
+ {"role": "user", "content": "Tell me a story"}
+ ],
+ "stream": true
+}
+ List available models
+GET /models
+Authorization: Bearer sk-your-api-key
+ {
+ "object": "list",
+ "data": [
+ {
+ "id": "Qwen/Qwen3-235B-A22B-Instruct-2507-FP8",
+ "object": "model",
+ "created": 1677610602,
+ "owned_by": "gonka"
+ }
+ ]
+}
+ The API is fully compatible with the OpenAI Python SDK:
+from openai import OpenAI
+client = OpenAI(
+ api_key="sk-your-api-key-here",
+ base_url="http://localhost:8000/v1"
+)
+response = client.chat.completions.create(
+ model="Qwen/Qwen3-235B-A22B-Instruct-2507-FP8",
+ messages=[
+ {"role": "user", "content": "Hello!"}
+ ]
+)
+print(response.choices[0].message.content)
+ Here's a simple example using Python's requests library:
import requests
+url = "http://localhost:8000/v1/chat/completions"
+headers = {
+ "Authorization": "Bearer sk-your-api-key-here",
+ "Content-Type": "application/json"
+}
+data = {
+ "model": "Qwen/Qwen3-235B-A22B-Instruct-2507-FP8",
+ "messages": [
+ {"role": "user", "content": "Hello! How are you?"}
+ ]
+}
+response = requests.post(url, headers=headers, json=data)
+result = response.json()
+print(result["choices"][0]["message"]["content"])
+ curl http://localhost:8000/v1/chat/completions \
+ -H "Content-Type: application/json" \
+ -H "Authorization: Bearer sk-your-api-key" \
+ -d '{
+ "model": "Qwen/Qwen3-235B-A22B-Instruct-2507-FP8",
+ "messages": [
+ {"role": "user", "content": "Hello!"}
+ ]
+ }'
+ Currently, there are no rate limits. However, please use the API responsibly.
+Use this guide to connect OpenClaw to the Gonka gateway: configure the provider on your OpenClaw node, then use the OpenClaw Telegram bot commands to switch to the Gonka model and check status.
+ +Configure the Gonka provider on your OpenClaw node/gateway so the agent can use Gonka models. Edit openclaw.json or your models.json (wherever OpenClaw reads models.providers).
Replace:
+http://localhost:8000/v1 โ with your gateway base URL if different.sk-.......... โ with your API key (from registration or dashboard).{
+ "models": {
+ "providers": {
+ "gonka": {
+ "baseUrl": "http://localhost:8000/v1",
+ "apiKey": "sk-..........",
+ "auth": "api-key",
+ "api": "openai-completions",
+ "authHeader": true,
+ "models": [
+ {
+ "id": "Qwen/Qwen3-235B-A22B-Instruct-2507-FP8",
+ "name": "Qwen/Qwen3-235B-A22B-Instruct-2507-FP8",
+ "api": "openai-completions",
+ "reasoning": false,
+ "input": ["text"],
+ "cost": { "input": 0, "output": 0, "cacheRead": 0, "cacheWrite": 0 },
+ "contextWindow": 200000,
+ "maxTokens": 8192
+ }
+ ]
+ }
+ }
+ }
+}
+ After editing: save the file and restart the OpenClaw gateway/node if it does not reload config automatically.
+These commands are used in the OpenClaw Telegram bot chat (not in the gateway config or API). They let you switch to the Gonka model and check that it is active.
+In the OpenClaw Telegram bot, send /status to see the current runtime state, including which model is in use.
๐ฆ OpenClaw 2026.2.15 (3fe22ea)
+๐ง Model: gonka/Qwen/Qwen3-235B-A22B-Instruct-2507-FP8 ยท ๐ api-key sk-dvgโฆ6GIAPE (models.json)
+๐งฎ Tokens: 18k in / 137 out
+๐ Context: 18k/200k (9%) ยท ๐งน Compactions: 0
+๐งต Session: agent:main:main โข updated just now
+โ๏ธ Runtime: direct ยท Think: off ยท verbose
+๐ชข Queue: collect (depth 0)
+ From this you can confirm that the Model line shows gonka/Qwen/Qwen3-235B-A22B-Instruct-2507-FP8 when the Gonka model is active.
In the OpenClaw Telegram bot, send /model <provider/model-id> to switch the active model. To use the Gonka Qwen model, send:
/model gonka/Qwen/Qwen3-235B-A22B-Instruct-2507-FP8
+ Example: you send /model gonka/Qwen/Qwen3-235B-A22B-Instruct-2507-FP8, the agent replies Model set to gonka/Qwen/Qwen3-235B-A22B-Instruct-2507-FP8. Then send /status again to verify the Model line shows the Gonka model.
| What | +Where | +Action | +
|---|---|---|
| Add Gonka provider | +OpenClaw node โ openclaw.json or models config | +Put the models.providers.gonka block from section 1 into your config. Set baseUrl and apiKey. |
+
| Switch to Gonka model | +OpenClaw Telegram bot | +Send: /model gonka/Qwen/Qwen3-235B-A22B-Instruct-2507-FP8 |
+
| Check current model | +OpenClaw Telegram bot | +Send: /status โ look at the Model line in the reply. |
+
So: config = on the node; /status and /model = in the OpenClaw Telegram bot chat.
+