diff --git a/faigate/adaptation.py b/faigate/adaptation.py index ad49080..1f7e605 100644 --- a/faigate/adaptation.py +++ b/faigate/adaptation.py @@ -206,16 +206,12 @@ def to_dict(self) -> dict[str, Any]: "cooldown_window_s": cooldown_window_s, "cooldown_remaining_s": cooldown_remaining_s, "cooldown_until": ( - round(self.last_issue_at + cooldown_window_s, 3) - if cooldown_window_s and self.last_issue_at - else 0.0 + round(self.last_issue_at + cooldown_window_s, 3) if cooldown_window_s and self.last_issue_at else 0.0 ), "degraded_window_s": degraded_window_s, "degraded_remaining_s": degraded_remaining_s, "degraded_until": ( - round(self.last_issue_at + degraded_window_s, 3) - if degraded_window_s and self.last_issue_at - else 0.0 + round(self.last_issue_at + degraded_window_s, 3) if degraded_window_s and self.last_issue_at else 0.0 ), "window_state": self.window_state(), "request_blocked": self.cooldown_active(), diff --git a/faigate/api/anthropic/models.py b/faigate/api/anthropic/models.py index da5dc1c..2fa346a 100644 --- a/faigate/api/anthropic/models.py +++ b/faigate/api/anthropic/models.py @@ -154,9 +154,7 @@ def _parse_system_prompt(raw: Any) -> str | list[str] | None: if isinstance(raw, str): return raw if not isinstance(raw, list): - raise AnthropicBridgeError( - "'system' must be a string, a list of strings, a list of text blocks, or null" - ) + raise AnthropicBridgeError("'system' must be a string, a list of strings, a list of text blocks, or null") normalized: list[str] = [] for item in raw: diff --git a/faigate/bridges/anthropic/adapter.py b/faigate/bridges/anthropic/adapter.py index 1129959..75928ac 100644 --- a/faigate/bridges/anthropic/adapter.py +++ b/faigate/bridges/anthropic/adapter.py @@ -53,11 +53,7 @@ def anthropic_request_to_canonical( """Map an Anthropic messages request to the internal gateway model.""" normalized_headers = {str(key): str(value) for key, value in (headers or {}).items()} - source = ( - normalized_headers.get("x-faigate-client") - or normalized_headers.get("anthropic-client") - or "claude-code" - ) + source = normalized_headers.get("x-faigate-client") or normalized_headers.get("anthropic-client") or "claude-code" client = source metadata = dict(request.metadata) metadata.setdefault("source", source) @@ -204,9 +200,7 @@ def approximate_anthropic_input_tokens( total += 12 total += _estimate_text_tokens(tool.name) total += _estimate_text_tokens(tool.description) - total += _estimate_text_tokens( - json.dumps(tool.input_schema, sort_keys=True, separators=(",", ":")) - ) + total += _estimate_text_tokens(json.dumps(tool.input_schema, sort_keys=True, separators=(",", ":"))) return max(total, 1), "estimated-char-v1" @@ -264,9 +258,7 @@ def _user_message_to_canonical(message: AnthropicMessage) -> list[CanonicalMessa pending_text.append(block) continue if block.type != "tool_result": - raise AnthropicBridgeError( - "Anthropic bridge v1 supports only text and tool_result blocks in user messages" - ) + raise AnthropicBridgeError("Anthropic bridge v1 supports only text and tool_result blocks in user messages") if not block.tool_use_id: # Claude-native clients can emit tool_result-like user blocks without a # stable tool_use_id. Falling back to user text keeps the session @@ -289,9 +281,7 @@ def _user_message_to_canonical(message: AnthropicMessage) -> list[CanonicalMessa # OpenAI-style tool continuity requires tool messages to follow the # assistant tool_calls immediately. Preserve any surrounding user text # as a trailing user turn once all tool_result blocks are emitted. - canonical_messages.append( - CanonicalMessage(role="user", content=_text_blocks_to_string(pending_text)) - ) + canonical_messages.append(CanonicalMessage(role="user", content=_text_blocks_to_string(pending_text))) return canonical_messages @@ -402,11 +392,7 @@ def _canonical_content_to_anthropic_blocks( content = message.content blocks: list[AnthropicContentBlock] if isinstance(content, str): - blocks = ( - [] - if (not content and message.tool_calls) - else [AnthropicContentBlock(type="text", text=content)] - ) + blocks = [] if (not content and message.tool_calls) else [AnthropicContentBlock(type="text", text=content)] elif isinstance(content, list): blocks = [] for item in content: @@ -457,9 +443,7 @@ def _canonical_content_to_anthropic_blocks( return blocks -def map_stop_reason_to_anthropic( - stop_reason: str | None, *, has_tool_calls: bool = False -) -> str | None: +def map_stop_reason_to_anthropic(stop_reason: str | None, *, has_tool_calls: bool = False) -> str | None: """Translate OpenAI-style finish reasons into Anthropic stop reasons.""" normalized = str(stop_reason or "").strip().lower() @@ -567,8 +551,7 @@ async def openai_sse_to_anthropic( "error", { "type": "error", - "error": payload.get("error") - or {"type": "api_error", "message": "Upstream error"}, + "error": payload.get("error") or {"type": "api_error", "message": "Upstream error"}, }, ) return @@ -639,9 +622,7 @@ async def openai_sse_to_anthropic( if not isinstance(tool_delta, dict): continue raw_index = int(tool_delta.get("index") or 0) - state = tool_states.setdefault( - raw_index, _AnthropicStreamToolState(index=raw_index) - ) + state = tool_states.setdefault(raw_index, _AnthropicStreamToolState(index=raw_index)) function = tool_delta.get("function") or {} if tool_delta.get("id"): state.tool_use_id = str(tool_delta["id"]) diff --git a/faigate/cli.py b/faigate/cli.py index 940992f..a5d451e 100644 --- a/faigate/cli.py +++ b/faigate/cli.py @@ -83,10 +83,7 @@ def _bar(ratio: float, width: int = 20, char: str = "█") -> str: def _table(headers: list[str], rows: list[list[str]], col_widths: list[int] | None = None): """Print a simple aligned table.""" if not col_widths: - col_widths = [ - max(len(h), max((len(str(r[i])) for r in rows), default=0)) + 2 - for i, h in enumerate(headers) - ] + col_widths = [max(len(h), max((len(str(r[i])) for r in rows), default=0)) + 2 for i, h in enumerate(headers)] # Header hdr = "" diff --git a/faigate/config.py b/faigate/config.py index 2cb129e..b68a468 100644 --- a/faigate/config.py +++ b/faigate/config.py @@ -260,21 +260,13 @@ def _validate_provider_base_url(name: str, base_url: str) -> str: parsed = urlparse(base_url) scheme = (parsed.scheme or "").strip().lower() if scheme not in {"http", "https"}: - raise ConfigError( - "Provider " - f"'{name}' base_url must use http or https " - f"(got '{parsed.scheme or 'missing'}')" - ) + raise ConfigError(f"Provider '{name}' base_url must use http or https (got '{parsed.scheme or 'missing'}')") if not parsed.netloc: raise ConfigError(f"Provider '{name}' base_url must include a host") if scheme == "http" and not _looks_local_base_url(base_url): - raise ConfigError( - "Provider " - f"'{name}' base_url must use https unless it points " - "to local/private network space" - ) + raise ConfigError(f"Provider '{name}' base_url must use https unless it points to local/private network space") return base_url @@ -350,9 +342,7 @@ def _normalize_positive_int(value: Any, *, field_name: str, provider_name: str) if value in (None, ""): return None if isinstance(value, bool) or not isinstance(value, int) or value <= 0: - raise ConfigError( - f"Provider '{provider_name}' field '{field_name}' must be a positive integer" - ) + raise ConfigError(f"Provider '{provider_name}' field '{field_name}' must be a positive integer") return value @@ -361,9 +351,7 @@ def _normalize_nonneg_int(value: Any, *, field_name: str, provider_name: str) -> if value in (None, ""): return None if isinstance(value, bool) or not isinstance(value, int) or value < 0: - raise ConfigError( - f"Provider '{provider_name}' field '{field_name}' must be a non-negative integer" - ) + raise ConfigError(f"Provider '{provider_name}' field '{field_name}' must be a non-negative integer") return value @@ -404,8 +392,7 @@ def _normalize_provider_cache(name: str, cfg: dict[str, Any]) -> dict[str, Any]: if mode not in _SUPPORTED_CACHE_MODES: supported = ", ".join(sorted(_SUPPORTED_CACHE_MODES)) raise ConfigError( - f"Provider '{name}' field 'cache.mode' uses unsupported value '{mode}'" - f" (supported: {supported})" + f"Provider '{name}' field 'cache.mode' uses unsupported value '{mode}' (supported: {supported})" ) read_discount = raw.get("read_discount") @@ -443,13 +430,9 @@ def _normalize_provider_cache(name: str, cfg: dict[str, Any]) -> dict[str, Any]: try: cache_read_discount = float(cache_read_discount_raw) except (TypeError, ValueError): - raise ConfigError( - f"Provider '{name}' field 'cache.cache_read_discount' must be a float" - ) + raise ConfigError(f"Provider '{name}' field 'cache.cache_read_discount' must be a float") if not (0.0 <= cache_read_discount <= 1.0): - raise ConfigError( - f"Provider '{name}' field 'cache.cache_read_discount' must be between 0.0 and 1.0" - ) + raise ConfigError(f"Provider '{name}' field 'cache.cache_read_discount' must be between 0.0 and 1.0") else: # Derive from pricing if available input_price = float(pricing.get("input", 0) or 0) @@ -464,13 +447,9 @@ def _normalize_provider_cache(name: str, cfg: dict[str, Any]) -> dict[str, Any]: try: cache_write_surcharge = float(cache_write_surcharge_raw) except (TypeError, ValueError): - raise ConfigError( - f"Provider '{name}' field 'cache.cache_write_surcharge' must be a float" - ) + raise ConfigError(f"Provider '{name}' field 'cache.cache_write_surcharge' must be a float") if cache_write_surcharge < 1.0: - raise ConfigError( - f"Provider '{name}' field 'cache.cache_write_surcharge' must be >= 1.0" - ) + raise ConfigError(f"Provider '{name}' field 'cache.cache_write_surcharge' must be >= 1.0") else: cache_write_surcharge = 1.0 @@ -519,9 +498,7 @@ def _normalize_provider_image(name: str, cfg: dict[str, Any]) -> dict[str, Any]: normalized_sizes = [] for value in supported_sizes: if not isinstance(value, str) or not value.strip(): - raise ConfigError( - f"Provider '{name}' field 'image.supported_sizes' must contain non-empty strings" - ) + raise ConfigError(f"Provider '{name}' field 'image.supported_sizes' must contain non-empty strings") normalized_sizes.append(value.strip()) if normalized_sizes: image["supported_sizes"] = normalized_sizes @@ -537,9 +514,7 @@ def _normalize_provider_image(name: str, cfg: dict[str, Any]) -> dict[str, Any]: normalized_tags = [] for value in policy_tags: if not isinstance(value, str) or not value.strip(): - raise ConfigError( - f"Provider '{name}' field 'image.policy_tags' must contain non-empty strings" - ) + raise ConfigError(f"Provider '{name}' field 'image.policy_tags' must contain non-empty strings") normalized_tags.append(value.strip().lower()) if normalized_tags: image["policy_tags"] = normalized_tags @@ -678,8 +653,7 @@ def _normalize_provider_transport(name: str, cfg: dict[str, Any]) -> dict[str, A if auth_mode not in _SUPPORTED_PROVIDER_TRANSPORT_AUTH_MODES: supported = ", ".join(sorted(_SUPPORTED_PROVIDER_TRANSPORT_AUTH_MODES)) raise ConfigError( - f"Provider '{name}' transport.auth_mode uses unsupported value " - f"'{auth_mode}' (supported: {supported})" + f"Provider '{name}' transport.auth_mode uses unsupported value '{auth_mode}' (supported: {supported})" ) normalized["auth_mode"] = auth_mode @@ -696,9 +670,7 @@ def _normalize_provider_transport(name: str, cfg: dict[str, Any]) -> dict[str, A ) normalized["probe_strategy"] = probe_strategy - probe_payload_kind = ( - str(transport.get("probe_payload_kind", "default") or "default").strip().lower() - ) + probe_payload_kind = str(transport.get("probe_payload_kind", "default") or "default").strip().lower() if not probe_payload_kind: raise ConfigError(f"Provider '{name}' transport.probe_payload_kind must be non-empty") normalized["probe_payload_kind"] = probe_payload_kind @@ -710,9 +682,7 @@ def _normalize_provider_transport(name: str, cfg: dict[str, Any]) -> dict[str, A probe_payload_max_tokens = transport.get("probe_payload_max_tokens", 1) if not isinstance(probe_payload_max_tokens, int) or probe_payload_max_tokens < 1: - raise ConfigError( - f"Provider '{name}' transport.probe_payload_max_tokens must be an integer >= 1" - ) + raise ConfigError(f"Provider '{name}' transport.probe_payload_max_tokens must be an integer >= 1") normalized["probe_payload_max_tokens"] = probe_payload_max_tokens for field_name in ( @@ -729,9 +699,7 @@ def _normalize_provider_transport(name: str, cfg: dict[str, Any]) -> dict[str, A raise ConfigError(f"Provider '{name}' transport.{field_name} must be a string") cleaned = value.strip() if not cleaned.startswith("/"): - raise ConfigError( - f"Provider '{name}' transport.{field_name} must start with '/' when set" - ) + raise ConfigError(f"Provider '{name}' transport.{field_name} must start with '/' when set") normalized[field_name] = cleaned for field_name in ("requires_api_key", "supports_models_probe"): @@ -762,9 +730,7 @@ def _normalize_provider_transport(name: str, cfg: dict[str, Any]) -> dict[str, A normalized["notes"].append(item.strip()) if normalized["probe_strategy"] == "models" and not normalized["models_path"]: - raise ConfigError( - f"Provider '{name}' transport.probe_strategy=models requires transport.models_path" - ) + raise ConfigError(f"Provider '{name}' transport.probe_strategy=models requires transport.models_path") return normalized @@ -778,9 +744,7 @@ def _normalize_provider(name: str, cfg: Any) -> dict[str, Any]: backend = normalized.get("backend", "openai-compat") if backend not in _SUPPORTED_BACKENDS: supported = ", ".join(sorted(_SUPPORTED_BACKENDS)) - raise ConfigError( - f"Provider '{name}' uses unsupported backend '{backend}' (supported: {supported})" - ) + raise ConfigError(f"Provider '{name}' uses unsupported backend '{backend}' (supported: {supported})") for field in ("base_url", "model"): value = normalized.get(field, "") @@ -802,20 +766,14 @@ def _normalize_provider(name: str, cfg: Any) -> dict[str, Any]: contract = contract.strip() if contract not in _SUPPORTED_PROVIDER_CONTRACTS: supported = ", ".join(sorted(_SUPPORTED_PROVIDER_CONTRACTS)) - raise ConfigError( - f"Provider '{name}' uses unsupported contract '{contract}' (supported: {supported})" - ) + raise ConfigError(f"Provider '{name}' uses unsupported contract '{contract}' (supported: {supported})") normalized["contract"] = contract if contract == "local-worker": if backend != "openai-compat": - raise ConfigError( - f"Provider '{name}' contract 'local-worker' requires backend 'openai-compat'" - ) + raise ConfigError(f"Provider '{name}' contract 'local-worker' requires backend 'openai-compat'") if not _looks_local_base_url(str(normalized.get("base_url", ""))): - raise ConfigError( - f"Provider '{name}' contract 'local-worker' requires a local/private base_url" - ) + raise ConfigError(f"Provider '{name}' contract 'local-worker' requires a local/private base_url") normalized.setdefault("tier", "local") raw_capabilities = normalized.get("capabilities") @@ -831,9 +789,7 @@ def _normalize_provider(name: str, cfg: Any) -> dict[str, Any]: } elif contract == "image-provider": if backend != "openai-compat": - raise ConfigError( - f"Provider '{name}' contract 'image-provider' requires backend 'openai-compat'" - ) + raise ConfigError(f"Provider '{name}' contract 'image-provider' requires backend 'openai-compat'") raw_capabilities = normalized.get("capabilities") if raw_capabilities is None: raw_capabilities = {} @@ -870,15 +826,11 @@ def _normalize_providers(data: dict[str, Any]) -> dict[str, Any]: raise ConfigError("'providers' must be a mapping") normalized = dict(data) - normalized["providers"] = { - name: _normalize_provider(name, cfg) for name, cfg in providers.items() - } + normalized["providers"] = {name: _normalize_provider(name, cfg) for name, cfg in providers.items()} return normalized -def _normalize_string_list( - value: Any, *, field_name: str, rule_name: str, allow_empty: bool = False -) -> list[str]: +def _normalize_string_list(value: Any, *, field_name: str, rule_name: str, allow_empty: bool = False) -> list[str]: """Normalize a config field to a list of non-empty strings.""" if value is None: return [] @@ -892,9 +844,7 @@ def _normalize_string_list( normalized = [] for item in items: if not isinstance(item, str) or not item.strip(): - raise ConfigError( - f"Policy '{rule_name}' field '{field_name}' must contain non-empty strings" - ) + raise ConfigError(f"Policy '{rule_name}' field '{field_name}' must contain non-empty strings") normalized.append(item.strip()) if not allow_empty and not normalized: @@ -940,10 +890,7 @@ def _normalize_provider_reference_list( unknown = sorted({item for item in normalized if item not in provider_names}) if unknown: unknown_list = ", ".join(unknown) - raise ConfigError( - "Policy " - f"'{rule_name}' field '{field_name}' references unknown providers: {unknown_list}" - ) + raise ConfigError(f"Policy '{rule_name}' field '{field_name}' references unknown providers: {unknown_list}") return normalized @@ -996,9 +943,7 @@ def _normalize_policy_select( unknown_caps = sorted(cap for cap in required_caps if cap not in _ALL_CAPABILITY_FIELDS) if unknown_caps: unknown_list = ", ".join(unknown_caps) - raise ConfigError( - f"Policy '{name}' require_capabilities has unknown capability names: {unknown_list}" - ) + raise ConfigError(f"Policy '{name}' require_capabilities has unknown capability names: {unknown_list}") normalized["require_capabilities"] = required_caps cap_values = normalized.get("capability_values", {}) @@ -1010,25 +955,17 @@ def _normalize_policy_select( normalized_cap_values: dict[str, list[Any]] = {} for cap_name, raw_values in cap_values.items(): if cap_name not in _ALL_CAPABILITY_FIELDS: - raise ConfigError( - f"Policy '{name}' capability_values references unknown capability '{cap_name}'" - ) + raise ConfigError(f"Policy '{name}' capability_values references unknown capability '{cap_name}'") values = raw_values if isinstance(raw_values, list) else [raw_values] if not values: - raise ConfigError( - f"Policy '{name}' capability_values '{cap_name}' must not be an empty list" - ) + raise ConfigError(f"Policy '{name}' capability_values '{cap_name}' must not be an empty list") normalized_values = [] for value in values: if cap_name in _BOOL_CAPABILITY_FIELDS and not isinstance(value, bool): - raise ConfigError( - f"Policy '{name}' capability_values '{cap_name}' must use boolean values" - ) + raise ConfigError(f"Policy '{name}' capability_values '{cap_name}' must use boolean values") if cap_name in _STRING_CAPABILITY_FIELDS: if not isinstance(value, str) or not value.strip(): - raise ConfigError( - f"Policy '{name}' capability_values '{cap_name}' must use non-empty strings" - ) + raise ConfigError(f"Policy '{name}' capability_values '{cap_name}' must use non-empty strings") value = value.strip() normalized_values.append(value) normalized_cap_values[cap_name] = normalized_values @@ -1038,9 +975,7 @@ def _normalize_policy_select( overlap = sorted(set(normalized["allow_providers"]) & set(normalized["deny_providers"])) if overlap: overlap_list = ", ".join(overlap) - raise ConfigError( - f"Policy '{name}' cannot allow and deny the same provider(s): {overlap_list}" - ) + raise ConfigError(f"Policy '{name}' cannot allow and deny the same provider(s): {overlap_list}") if extra_keys and "routing_mode" in extra_keys: routing_mode = normalized.get("routing_mode", "") @@ -1110,15 +1045,12 @@ def _normalize_client_profile_match(name: str, match: Any) -> dict[str, Any]: if "header_contains" in match: if not isinstance(match["header_contains"], dict): - raise ConfigError( - f"Client profile rule '{name}' field 'header_contains' must be a mapping" - ) + raise ConfigError(f"Client profile rule '{name}' field 'header_contains' must be a mapping") normalized_header_contains = {} for header_name, values in match["header_contains"].items(): if not isinstance(header_name, str) or not header_name.strip(): raise ConfigError( - f"Client profile rule '{name}' field 'header_contains' " - "must use non-empty header names" + f"Client profile rule '{name}' field 'header_contains' must use non-empty header names" ) normalized_header_contains[header_name.strip().lower()] = _normalize_string_list( values, @@ -1132,9 +1064,7 @@ def _normalize_client_profile_match(name: str, match: Any) -> dict[str, Any]: if compound in match: values = match[compound] if not isinstance(values, list) or not values: - raise ConfigError( - f"Client profile rule '{name}' field '{compound}' must be a non-empty list" - ) + raise ConfigError(f"Client profile rule '{name}' field '{compound}' must be a non-empty list") match[compound] = [_normalize_client_profile_match(name, item) for item in values] return match @@ -1203,9 +1133,7 @@ def _normalize_client_profiles(data: dict[str, Any]) -> dict[str, Any]: if default_profile not in normalized_profiles: normalized_profiles.setdefault( default_profile, - _normalize_policy_select( - f"client profile '{default_profile}'", {}, data.get("providers", {}) - ), + _normalize_policy_select(f"client profile '{default_profile}'", {}, data.get("providers", {})), ) rules = raw.get("rules", []) @@ -1234,9 +1162,7 @@ def _normalize_client_profiles(data: dict[str, Any]) -> dict[str, Any]: raise ConfigError(f"Client profile rule #{idx} must define a non-empty 'profile'") profile_name = profile_name.strip() if profile_name not in normalized_profiles: - raise ConfigError( - f"Client profile rule #{idx} references unknown profile '{profile_name}'" - ) + raise ConfigError(f"Client profile rule #{idx} references unknown profile '{profile_name}'") if profile_name in seen_rule_profiles: normalized_rules = [r for r in normalized_rules if r["profile"] != profile_name] normalized_rules.append( @@ -1306,9 +1232,7 @@ def _normalize_routing_modes(data: dict[str, Any]) -> dict[str, Any]: for alias in [normalized_name, *aliases]: owner = seen_aliases.get(alias) if owner and owner != normalized_name: - raise ConfigError( - f"Routing mode alias '{alias}' is already used by routing mode '{owner}'" - ) + raise ConfigError(f"Routing mode alias '{alias}' is already used by routing mode '{owner}'") seen_aliases[alias] = normalized_name normalized_modes[normalized_name] = { @@ -1369,23 +1293,16 @@ def _normalize_model_shortcuts(data: dict[str, Any]) -> dict[str, Any]: unknown = sorted(set(spec) - _SUPPORTED_MODEL_SHORTCUT_KEYS) if unknown: unknown_list = ", ".join(unknown) - raise ConfigError( - f"Model shortcut '{normalized_name}' has unknown keys: {unknown_list}" - ) + raise ConfigError(f"Model shortcut '{normalized_name}' has unknown keys: {unknown_list}") target = spec.get("target", "") if not isinstance(target, str) or not target.strip(): raise ConfigError(f"Model shortcut '{normalized_name}' must define a non-empty target") target = _resolve_provider_reference(target.strip(), provider_names) if target not in provider_names: - raise ConfigError( - f"Model shortcut '{normalized_name}' references unknown provider '{target}'" - ) + raise ConfigError(f"Model shortcut '{normalized_name}' references unknown provider '{target}'") if normalized_name in mode_names: - raise ConfigError( - "Model shortcut " - f"'{normalized_name}' conflicts with routing mode '{normalized_name}'" - ) + raise ConfigError(f"Model shortcut '{normalized_name}' conflicts with routing mode '{normalized_name}'") aliases = _normalize_string_list( spec.get("aliases", []), @@ -1396,13 +1313,9 @@ def _normalize_model_shortcuts(data: dict[str, Any]) -> dict[str, Any]: for alias in [normalized_name, *aliases]: owner = seen_aliases.get(alias) if owner and owner != normalized_name: - raise ConfigError( - f"Model shortcut alias '{alias}' is already used by model shortcut '{owner}'" - ) + raise ConfigError(f"Model shortcut alias '{alias}' is already used by model shortcut '{owner}'") if alias in mode_names: - raise ConfigError( - f"Model shortcut alias '{alias}' conflicts with routing mode '{alias}'" - ) + raise ConfigError(f"Model shortcut alias '{alias}' conflicts with routing mode '{alias}'") seen_aliases[alias] = normalized_name normalized_shortcuts[normalized_name] = { @@ -1425,9 +1338,7 @@ def _validate_routing_mode_references(data: dict[str, Any]) -> dict[str, Any]: for profile_name, hints in (data.get("client_profiles") or {}).get("profiles", {}).items(): routing_mode = str((hints or {}).get("routing_mode", "") or "").strip() if routing_mode and routing_mode not in mode_names: - raise ConfigError( - f"Client profile '{profile_name}' references unknown routing_mode '{routing_mode}'" - ) + raise ConfigError(f"Client profile '{profile_name}' references unknown routing_mode '{routing_mode}'") return data @@ -1607,8 +1518,7 @@ def _normalize_auto_update(data: dict[str, Any]) -> dict[str, Any]: overlap = sorted(set(allow_providers) & set(deny_providers)) if overlap: raise ConfigError( - "'auto_update.provider_scope' cannot allow and deny the same providers: " - + ", ".join(overlap) + "'auto_update.provider_scope' cannot allow and deny the same providers: " + ", ".join(overlap) ) verification = raw.get("verification", {}) @@ -1626,9 +1536,7 @@ def _normalize_auto_update(data: dict[str, Any]) -> dict[str, Any]: raise ConfigError("'auto_update.verification.command' must be a non-empty string") verification_timeout_seconds = verification.get("timeout_seconds", 30) - if isinstance(verification_timeout_seconds, bool) or not isinstance( - verification_timeout_seconds, int - ): + if isinstance(verification_timeout_seconds, bool) or not isinstance(verification_timeout_seconds, int): raise ConfigError("'auto_update.verification.timeout_seconds' must be an integer") if verification_timeout_seconds <= 0: raise ConfigError("'auto_update.verification.timeout_seconds' must be positive") @@ -1660,8 +1568,7 @@ def _normalize_auto_update(data: dict[str, Any]) -> dict[str, Any]: unknown_days = sorted(set(days) - _SUPPORTED_WINDOW_DAYS) if unknown_days: raise ConfigError( - "'auto_update.maintenance_window.days' has unknown weekday values: " - + ", ".join(unknown_days) + "'auto_update.maintenance_window.days' has unknown weekday values: " + ", ".join(unknown_days) ) start_hour = maintenance_window.get("start_hour", 0) @@ -1793,9 +1700,7 @@ def _normalize_provider_source_refresh(data: dict[str, Any]) -> dict[str, Any]: providers = raw.get("providers", ["blackbox", "kilo", "openai"]) if providers in (None, ""): providers = ["blackbox", "kilo", "openai"] - if not isinstance(providers, list) or any( - not isinstance(item, str) or not item.strip() for item in providers - ): + if not isinstance(providers, list) or any(not isinstance(item, str) or not item.strip() for item in providers): raise ConfigError("'provider_source_refresh.providers' must be a list of names") normalized = dict(data) @@ -1877,8 +1782,7 @@ def _normalize_anthropic_bridge(data: dict[str, Any]) -> dict[str, Any]: target = _resolve_provider_reference(target, provider_names) if target not in valid_targets: raise ConfigError( - "'anthropic_bridge.model_aliases' references unknown target " - f"'{target}' for alias '{alias}'" + f"'anthropic_bridge.model_aliases' references unknown target '{target}' for alias '{alias}'" ) normalized_aliases[alias] = target diff --git a/faigate/dashboard.py b/faigate/dashboard.py index ed75abc..e96a19f 100644 --- a/faigate/dashboard.py +++ b/faigate/dashboard.py @@ -112,9 +112,7 @@ def _client_highlights(client_totals: list[dict[str, Any]]) -> dict[str, dict[st rows, key=lambda row: (_safe_int(row.get("total_tokens")), _safe_int(row.get("requests"))), ), - "top_cost": max( - rows, key=lambda row: (_safe_float(row.get("cost_usd")), _safe_int(row.get("requests"))) - ), + "top_cost": max(rows, key=lambda row: (_safe_float(row.get("cost_usd")), _safe_int(row.get("requests")))), "highest_failure_rate": ( max( failure_rows, @@ -216,9 +214,7 @@ def _lane_family_summary( provider_map: dict[str, dict[str, Any]], ) -> list[dict[str, Any]]: metric_rows_by_provider = { - str(row.get("provider") or ""): row - for row in provider_rows - if str(row.get("provider") or "") + str(row.get("provider") or ""): row for row in provider_rows if str(row.get("provider") or "") } source_rows: list[dict[str, Any]] = [] if provider_map: @@ -231,9 +227,7 @@ def _lane_family_summary( "cost_usd": _safe_float(metrics_row.get("cost_usd")), "lane": dict((inventory_row or {}).get("lane") or {}), "request_readiness": dict((inventory_row or {}).get("request_readiness") or {}), - "route_runtime_state": dict( - (inventory_row or {}).get("route_runtime_state") or {} - ), + "route_runtime_state": dict((inventory_row or {}).get("route_runtime_state") or {}), } ) else: @@ -391,9 +385,7 @@ def _enrich_provider_rows_with_lane( "lane_cluster": str(lane.get("cluster") or ""), "benchmark_cluster": str(lane.get("benchmark_cluster") or ""), "cost_tier": str( - ((provider_inventory.get("capabilities") or {}).get("cost_tier")) - or lane.get("quality_tier") - or "" + ((provider_inventory.get("capabilities") or {}).get("cost_tier")) or lane.get("quality_tier") or "" ), "freshness_status": str(lane.get("freshness_status") or ""), "review_age_days": int(lane.get("review_age_days") or -1), @@ -403,9 +395,7 @@ def _enrich_provider_rows_with_lane( "route_runtime_state": dict(provider_inventory.get("route_runtime_state") or {}), "route_add_recommendations": add_recommendations, "recommended_add_provider": ( - str(add_recommendations[0].get("provider_name") or "") - if add_recommendations - else "" + str(add_recommendations[0].get("provider_name") or "") if add_recommendations else "" ), "recommended_add_strategy": ( str(add_recommendations[0].get("strategy") or "") if add_recommendations else "" @@ -479,10 +469,7 @@ def _render_refresh_guidance_block(report: dict[str, Any], *, limit: int = 3) -> freshness_status = str(item.get("freshness_status") or "unknown") review_age_days = int(item.get("review_age_days") or -1) age_suffix = f", {review_age_days}d" if review_age_days >= 0 else "" - line = ( - f"- {provider}: {item.get('action_label') or item.get('action')}" - f" ({freshness_status}{age_suffix})" - ) + line = f"- {provider}: {item.get('action_label') or item.get('action')} ({freshness_status}{age_suffix})" if item.get("refresh_url"): line += f" -> {item['refresh_url']}" lines.append(line) @@ -509,9 +496,7 @@ def _render_provider_catalog_block(report: dict[str, Any], *, limit: int = 3) -> f"pricing={_safe_int(item.get('pricing_count'))}" ) for alert in list(summary.get("alerts") or [])[:limit]: - lines.append( - f"- [{alert.get('severity')}] {alert.get('provider_id')}: {alert.get('headline')}" - ) + lines.append(f"- [{alert.get('severity')}] {alert.get('provider_id')}: {alert.get('headline')}") alert_summary = dict(summary.get("alert_summary") or {}) if alert_summary: lines.append( @@ -631,9 +616,7 @@ def build_dashboard_report( totals = stats.get("totals") or {} inventory_provider_map = _inventory_provider_map(inventory_payload) - providers = _enrich_provider_rows_with_lane( - stats.get("providers") or [], inventory_provider_map - ) + providers = _enrich_provider_rows_with_lane(stats.get("providers") or [], inventory_provider_map) route_additions = _route_add_summary(providers) lane_families = _lane_family_summary_from_stats(stats.get("lane_families") or []) if not lane_families: @@ -669,18 +652,14 @@ def build_dashboard_report( total_requests = _safe_int(totals.get("total_requests")) total_failures = _safe_int(totals.get("total_failures")) - success_pct = ( - ((total_requests - total_failures) * 100.0 / total_requests) if total_requests else 100.0 - ) + success_pct = ((total_requests - total_failures) * 100.0 / total_requests) if total_requests else 100.0 total_prompt_tokens = _safe_int(totals.get("total_prompt_tokens")) total_completion_tokens = _safe_int(totals.get("total_compl_tokens")) total_cost = _safe_float(totals.get("total_cost_usd")) avg_latency_ms = _safe_float(totals.get("avg_latency_ms")) fallback_requests = sum( - _safe_int(item.get("requests")) - for item in routing - if str(item.get("layer") or "") == "fallback" + _safe_int(item.get("requests")) for item in routing if str(item.get("layer") or "") == "fallback" ) fallback_pct = (fallback_requests * 100.0 / total_requests) if total_requests else 0.0 @@ -724,8 +703,7 @@ def build_dashboard_report( { "provider": provider_name, "category": _health_issue_category(str(payload.get("last_error") or "")), - "detail": str(payload.get("last_error") or "").strip() - or "No error detail provided", + "detail": str(payload.get("last_error") or "").strip() or "No error detail provided", } ) @@ -829,9 +807,7 @@ def build_dashboard_report( if top_provider_cost and total_cost > 0: dominant_cost_share = ( - (_safe_float(top_provider_cost.get("cost_usd")) * 100.0 / total_cost) - if total_cost - else 0.0 + (_safe_float(top_provider_cost.get("cost_usd")) * 100.0 / total_cost) if total_cost else 0.0 ) if dominant_cost_share >= 60.0: hints.append( @@ -840,8 +816,7 @@ def build_dashboard_report( cheaper_healthy = [ name for name, tier in healthy_provider_tiers.items() - if tier in {"cheap", "default", "fallback"} - and name != str(top_provider_cost.get("provider") or "") + if tier in {"cheap", "default", "fallback"} and name != str(top_provider_cost.get("provider") or "") ] if cheaper_healthy: decision_support.append( @@ -852,18 +827,10 @@ def build_dashboard_report( top_lane = str(top_provider_cost.get("canonical_model") or "") top_route = str(top_provider_cost.get("route_type") or "") if top_lane: - hints.append( - f"Top-cost lane right now: {top_lane}" - + (f" via {top_route}" if top_route else "") - + "." - ) + hints.append(f"Top-cost lane right now: {top_lane}" + (f" via {top_route}" if top_route else "") + ".") - rate_limited = [ - item["provider"] for item in unhealthy_providers if item["category"] == "rate-limited" - ] - quota_exhausted = [ - item["provider"] for item in unhealthy_providers if item["category"] == "quota-exhausted" - ] + rate_limited = [item["provider"] for item in unhealthy_providers if item["category"] == "rate-limited"] + quota_exhausted = [item["provider"] for item in unhealthy_providers if item["category"] == "quota-exhausted"] if rate_limited: decision_support.append( "Rate-limit pressure: " @@ -880,11 +847,7 @@ def build_dashboard_report( if healthy_provider_names: hints.append( f"Healthy providers right now: {', '.join(healthy_provider_names[:4])}" - + ( - "" - if len(healthy_provider_names) <= 4 - else f" +{len(healthy_provider_names) - 4} more" - ) + + ("" if len(healthy_provider_names) <= 4 else f" +{len(healthy_provider_names) - 4} more") ) elif health_payload: hints.append("No healthy providers are currently reported by /health.") @@ -932,9 +895,7 @@ def build_dashboard_report( f"{freshness['stale']} route assumption(s) are stale. Review benchmark or pricing guidance before leaning too hard on those lanes." ) elif freshness.get("aging"): - hints.append( - f"{freshness['aging']} route assumption(s) are aging and worth rechecking soon." - ) + hints.append(f"{freshness['aging']} route assumption(s) are aging and worth rechecking soon.") if refresh_guidance: top_refresh = refresh_guidance[0] decision_support.append( @@ -947,11 +908,8 @@ def build_dashboard_report( { "level": str(catalog_alert.get("severity") or "notice"), "headline": str(catalog_alert.get("headline") or "Provider catalog alert"), - "detail": str(catalog_alert.get("detail") or "").strip() - or "Provider source catalog requires review.", - "suggestion": str( - catalog_alert.get("suggestion") or "Review provider catalog state." - ), + "detail": str(catalog_alert.get("detail") or "").strip() or "Provider source catalog requires review.", + "suggestion": str(catalog_alert.get("suggestion") or "Review provider catalog state."), } ) if not provider_catalog_alerts and _safe_int(provider_catalog.get("due_sources")) > 0: @@ -965,8 +923,7 @@ def build_dashboard_report( if provider_catalog_alerts: top_catalog_alert = provider_catalog_alerts[0] decision_support.append( - f"Catalog alert: {top_catalog_alert.get('headline')} " - f"Follow up via {top_catalog_alert.get('suggestion')}" + f"Catalog alert: {top_catalog_alert.get('headline')} Follow up via {top_catalog_alert.get('suggestion')}" ) if provider_catalog_alert_summary.get("status") == "intervention-needed": hints.append( @@ -1030,9 +987,7 @@ def build_dashboard_report( "success_pct": round(success_pct, 1), "avg_latency_ms": round(avg_latency_ms, 1), "last_request_ago": _format_ago(float(last_request)) if last_request else "never", - "first_request_ago": _format_ago(float(first_request)) - if first_request - else "never", + "first_request_ago": _format_ago(float(first_request)) if first_request else "never", }, "spend": { "total_cost_usd": round(total_cost, 6), @@ -1041,21 +996,14 @@ def build_dashboard_report( "completion_tokens": total_completion_tokens, }, "health": { - "status": (health_payload or {}).get("status") - or ("live" if health_payload else "unknown"), - "providers_healthy": _safe_int( - ((health_payload or {}).get("summary") or {}).get("providers_healthy") - ), - "providers_total": _safe_int( - ((health_payload or {}).get("summary") or {}).get("providers_total") - ), + "status": (health_payload or {}).get("status") or ("live" if health_payload else "unknown"), + "providers_healthy": _safe_int(((health_payload or {}).get("summary") or {}).get("providers_healthy")), + "providers_total": _safe_int(((health_payload or {}).get("summary") or {}).get("providers_total")), "providers_request_ready": _safe_int( ((health_payload or {}).get("request_readiness") or {}).get("providers_ready") ), "providers_request_not_ready": _safe_int( - ((health_payload or {}).get("request_readiness") or {}).get( - "providers_not_ready" - ) + ((health_payload or {}).get("request_readiness") or {}).get("providers_not_ready") ), "providers_request_ready_compat": readiness_breakdown.get("compat", 0), "unhealthy": unhealthy_providers, @@ -1166,9 +1114,7 @@ def _render_overview(report: dict[str, Any]) -> str: route_additions = report.get("route_additions") or [] if route_additions: top_addition = route_additions[0] - lines.append( - f" Next add {top_addition.get('add_provider')} ({top_addition.get('strategy')})" - ) + lines.append(f" Next add {top_addition.get('add_provider')} ({top_addition.get('strategy')})") lines.extend( [ "", @@ -1438,10 +1384,7 @@ def _render_provider_detail(report: dict[str, Any], provider_name: str) -> str: None, ) if not row: - return ( - "fusionAIze Gate Dashboard\n\n" - f"Provider detail\n\nNo provider row found for '{provider_name}'.\n" - ) + return f"fusionAIze Gate Dashboard\n\nProvider detail\n\nNo provider row found for '{provider_name}'.\n" provider = str(row.get("provider") or provider_name) status = "live-healthy" @@ -1454,11 +1397,7 @@ def _render_provider_detail(report: dict[str, Any], provider_name: str) -> str: request_readiness = row.get("request_readiness") or {} transport = row.get("transport") or {} provider_routing_paths = _routing_path_summary( - [ - item - for item in report.get("routing") or [] - if str(item.get("provider") or "").strip().lower() == target - ] + [item for item in report.get("routing") or [] if str(item.get("provider") or "").strip().lower() == target] ) lines = [ "fusionAIze Gate Dashboard", @@ -1487,15 +1426,10 @@ def _render_provider_detail(report: dict[str, Any], provider_name: str) -> str: lines.append(f"Review age {_safe_int(row.get('review_age_days'))}d") if row.get("freshness_hint"): lines.append(f"Freshness hint {row.get('freshness_hint')}") - refresh_lookup = { - str(item.get("provider") or "").lower(): item - for item in report.get("refresh_guidance") or [] - } + refresh_lookup = {str(item.get("provider") or "").lower(): item for item in report.get("refresh_guidance") or []} refresh_item = refresh_lookup.get(target) if refresh_item: - lines.append( - f"Refresh action {refresh_item.get('action_label') or refresh_item.get('action')}" - ) + lines.append(f"Refresh action {refresh_item.get('action_label') or refresh_item.get('action')}") if refresh_item.get("refresh_url"): lines.append(f"Refresh source {refresh_item.get('refresh_url')}") if refresh_item.get("reason"): @@ -1509,9 +1443,7 @@ def _render_provider_detail(report: dict[str, Any], provider_name: str) -> str: if request_readiness.get("operator_hint"): lines.append(f"Operator hint {request_readiness.get('operator_hint')}") if row.get("recommended_add_provider"): - lines.append( - f"Add route {row.get('recommended_add_provider')} ({row.get('recommended_add_strategy')})" - ) + lines.append(f"Add route {row.get('recommended_add_provider')} ({row.get('recommended_add_strategy')})") if transport: lines.extend( [ @@ -1535,20 +1467,12 @@ def _render_provider_detail(report: dict[str, Any], provider_name: str) -> str: if runtime_state.get("window_state") and runtime_state.get("window_state") != "clear": lines.append(f"Runtime window {runtime_state.get('window_state')}") if runtime_state.get("cooldown_remaining_s"): - lines.append( - f"Cooldown left {_safe_int(runtime_state.get('cooldown_remaining_s'))}s" - ) + lines.append(f"Cooldown left {_safe_int(runtime_state.get('cooldown_remaining_s'))}s") if runtime_state.get("degraded_remaining_s"): - lines.append( - f"Degraded left {_safe_int(runtime_state.get('degraded_remaining_s'))}s" - ) + lines.append(f"Degraded left {_safe_int(runtime_state.get('degraded_remaining_s'))}s") if runtime_state.get("recovered_recently"): - lines.append( - f"Recovered from {runtime_state.get('last_recovered_issue_type') or 'n/a'}" - ) - lines.append( - f"Recovery watch {_safe_int(runtime_state.get('recovery_remaining_s'))}s" - ) + lines.append(f"Recovered from {runtime_state.get('last_recovered_issue_type') or 'n/a'}") + lines.append(f"Recovery watch {_safe_int(runtime_state.get('recovery_remaining_s'))}s") if provider in unhealthy: lines.append(f"Live issue {unhealthy[provider]['detail']}") if provider_routing_paths: @@ -1580,15 +1504,10 @@ def _render_client_detail(report: dict[str, Any], client_name: str) -> str: None, ) if not row: - return ( - "fusionAIze Gate Dashboard\n\n" - f"Client detail\n\nNo client row found for '{client_name}'.\n" - ) + return f"fusionAIze Gate Dashboard\n\nClient detail\n\nNo client row found for '{client_name}'.\n" name = str(row.get("client_tag") or row.get("client_profile") or client_name) - expensive = ( - _safe_float(row.get("cost_usd")) > 0.5 or _safe_float(row.get("avg_latency_ms")) > 4000 - ) + expensive = _safe_float(row.get("cost_usd")) > 0.5 or _safe_float(row.get("avg_latency_ms")) > 4000 suggested_scenario = _recommended_scenario_for_client( str(row.get("client_profile") or name), expensive=expensive, diff --git a/faigate/dashboard_web.py b/faigate/dashboard_web.py index 5c19798..a5eef1c 100644 --- a/faigate/dashboard_web.py +++ b/faigate/dashboard_web.py @@ -2121,6 +2121,24 @@ def _inline_svg(name: str) -> str:
+Catalog health areas needing attention.
+Actionable guidance based on catalog health.
+