From d9da963099629e64fd086c4bfbad1ec205b6a776 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tr=E1=BA=A7n=20Quang=20=C4=90=C3=A3ng?= Date: Mon, 11 May 2026 13:55:29 +0000 Subject: [PATCH 1/2] =?UTF-8?q?feat(cli):=20M3=20resource=20management=20?= =?UTF-8?q?=E2=80=94=20combo,=20provider-node,=20provider-models,=20provid?= =?UTF-8?q?er/key/pool=20extensions,=20models=20registry?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 26 ++ Cargo.toml | 1 + src/cli/apply.rs | 178 +++++++++++ src/cli/combo.rs | 524 ++++++++++++++++++++++++++++++++ src/cli/key_ext.rs | 291 ++++++++++++++++++ src/cli/mod.rs | 262 ++++++++++++++++ src/cli/models.rs | 238 +++++++++++++++ src/cli/pool_ext.rs | 425 ++++++++++++++++++++++++++ src/cli/provider_ext.rs | 580 ++++++++++++++++++++++++++++++++++++ src/cli/provider_models.rs | 520 ++++++++++++++++++++++++++++++++ src/cli/provider_node.rs | 414 +++++++++++++++++++++++++ src/cli/schema.rs | 44 +++ src/main.rs | 12 + src/server/api/providers.rs | 7 +- 14 files changed, 3521 insertions(+), 1 deletion(-) create mode 100644 src/cli/apply.rs create mode 100644 src/cli/combo.rs create mode 100644 src/cli/key_ext.rs create mode 100644 src/cli/models.rs create mode 100644 src/cli/pool_ext.rs create mode 100644 src/cli/provider_ext.rs create mode 100644 src/cli/provider_models.rs create mode 100644 src/cli/provider_node.rs diff --git a/Cargo.lock b/Cargo.lock index f289112f..d6292eb9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1744,6 +1744,16 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "libyml" +version = "0.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3302702afa434ffa30847a83305f0a69d6abd74293b6554c18ec85c7ef30c980" +dependencies = [ + "anyhow", + "version_check", +] + [[package]] name = "linux-raw-sys" version = "0.12.1" @@ -2015,6 +2025,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", + "serde_yml", "sha1", "sha2 0.10.9", "sha256", @@ -2668,6 +2679,21 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_yml" +version = "0.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59e2dd588bf1597a252c3b920e0143eb99b0f76e4e082f4c92ce34fbc9e71ddd" +dependencies = [ + "indexmap", + "itoa", + "libyml", + "memchr", + "ryu", + "serde", + "version_check", +] + [[package]] name = "sha1" version = "0.10.6" diff --git a/Cargo.toml b/Cargo.toml index ceee308a..a0cbc493 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ clap_complete = "4" simd-json = "0.14" serde = { version = "1", features = ["derive"] } serde_json = "1" +serde_yml = "0.0.12" toml = "0.8" # JWT auth diff --git a/src/cli/apply.rs b/src/cli/apply.rs new file mode 100644 index 00000000..10513932 --- /dev/null +++ b/src/cli/apply.rs @@ -0,0 +1,178 @@ +//! Shared idempotent `apply --from-file` plumbing for resource commands. +//! +//! All `*_apply` subcommands accept a YAML or JSON document on stdin (`-`) +//! or a path, parse it into a list of payloads, then diff against the +//! current DB snapshot and produce a single mutation. The output envelope +//! is the same across resources: +//! +//! ```text +//! { "created": [...], "updated": [...], "unchanged": [...], "deleted": [...] } +//! ``` +//! +//! `--prune` opts in to deleting resources that are present in DB but not +//! in the input document (kubectl-style). Default behaviour is upsert only. + +use std::fs; +use std::io::{self, Read}; +use std::path::Path; + +use serde::Deserialize; +use serde_json::Value; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum InputFormat { + Json, + Yaml, +} + +/// Load a YAML or JSON document from a path or from stdin (`-`). +/// +/// Returns the parsed `serde_json::Value` plus the detected format. +pub fn load_document(source: &str) -> anyhow::Result<(Value, InputFormat)> { + let raw = read_source(source)?; + let trimmed = raw.trim_start(); + let (value, fmt) = if trimmed.starts_with('{') || trimmed.starts_with('[') { + ( + serde_json::from_str::(&raw) + .map_err(|e| anyhow::anyhow!("failed to parse JSON: {e}"))?, + InputFormat::Json, + ) + } else { + let yaml = serde_yml::from_str::(&raw) + .map_err(|e| anyhow::anyhow!("failed to parse YAML: {e}"))?; + let bytes = serde_json::to_vec(&yaml) + .map_err(|e| anyhow::anyhow!("failed to convert YAML to JSON: {e}"))?; + ( + serde_json::from_slice::(&bytes) + .map_err(|e| anyhow::anyhow!("failed to normalise YAML payload: {e}"))?, + InputFormat::Yaml, + ) + }; + Ok((value, fmt)) +} + +fn read_source(source: &str) -> anyhow::Result { + if source == "-" { + let mut buf = String::new(); + io::stdin().read_to_string(&mut buf)?; + Ok(buf) + } else { + let path = Path::new(source); + Ok( + fs::read_to_string(path) + .map_err(|e| anyhow::anyhow!("failed to read {source}: {e}"))?, + ) + } +} + +/// Parse a `Value` as either a single object or an array of objects. +/// +/// Returns the items, plus an error if the shape is wrong. Used by every +/// `apply` command so a user can pipe in either form. +pub fn into_items Deserialize<'de>>(value: Value) -> anyhow::Result> { + match value { + Value::Array(items) => items + .into_iter() + .enumerate() + .map(|(idx, item)| { + serde_json::from_value::(item) + .map_err(|e| anyhow::anyhow!("invalid item at index {idx}: {e}")) + }) + .collect(), + Value::Object(_) => { + Ok(vec![serde_json::from_value::(value) + .map_err(|e| anyhow::anyhow!("invalid item: {e}"))?]) + } + other => Err(anyhow::anyhow!( + "expected an object or array, got {}", + type_name(&other) + )), + } +} + +fn type_name(value: &Value) -> &'static str { + match value { + Value::Null => "null", + Value::Bool(_) => "bool", + Value::Number(_) => "number", + Value::String(_) => "string", + Value::Array(_) => "array", + Value::Object(_) => "object", + } +} + +/// Aggregated diff returned by every `*_apply` command. +#[derive(Debug, Default, Clone, serde::Serialize)] +pub struct ApplyDiff { + pub created: Vec, + pub updated: Vec, + pub unchanged: Vec, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub deleted: Vec, +} + +impl ApplyDiff { + pub fn summary(&self) -> String { + let mut parts = Vec::new(); + if !self.created.is_empty() { + parts.push(format!("{} created", self.created.len())); + } + if !self.updated.is_empty() { + parts.push(format!("{} updated", self.updated.len())); + } + if !self.unchanged.is_empty() { + parts.push(format!("{} unchanged", self.unchanged.len())); + } + if !self.deleted.is_empty() { + parts.push(format!("{} deleted", self.deleted.len())); + } + if parts.is_empty() { + "nothing to do".into() + } else { + parts.join(", ") + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(Deserialize, Debug, PartialEq)] + struct Item { + name: String, + } + + #[test] + fn into_items_accepts_single_object() { + let value: Value = serde_json::from_str(r#"{"name":"foo"}"#).unwrap(); + let items: Vec = into_items(value).unwrap(); + assert_eq!(items, vec![Item { name: "foo".into() }]); + } + + #[test] + fn into_items_accepts_array() { + let value: Value = serde_json::from_str(r#"[{"name":"a"},{"name":"b"}]"#).unwrap(); + let items: Vec = into_items(value).unwrap(); + assert_eq!(items.len(), 2); + } + + #[test] + fn into_items_rejects_string() { + let value: Value = Value::String("nope".into()); + assert!(into_items::(value).is_err()); + } + + #[test] + fn apply_diff_summary() { + let mut diff = ApplyDiff::default(); + diff.created = vec!["a".into()]; + diff.unchanged = vec!["b".into(), "c".into()]; + assert_eq!(diff.summary(), "1 created, 2 unchanged"); + } + + #[test] + fn apply_diff_empty_summary() { + assert_eq!(ApplyDiff::default().summary(), "nothing to do"); + } +} diff --git a/src/cli/combo.rs b/src/cli/combo.rs new file mode 100644 index 00000000..1759d2a4 --- /dev/null +++ b/src/cli/combo.rs @@ -0,0 +1,524 @@ +//! `openproxy combo *` — fallback chains and round-robin combos. +//! +//! Combos are one of OpenProxy's two core concepts: a named list of models +//! the router walks through on failure (or rotates across, depending on +//! strategy). They are stored in `db.json` as the `combos` Vec. + +use std::collections::BTreeMap; +use std::collections::HashSet; + +use clap::Subcommand; +use serde::Deserialize; +use serde_json::{json, Value}; + +use crate::cli::apply::{into_items, load_document, ApplyDiff}; +use crate::cli::output::{emit_error, emit_robot, humanln, OutputCtx}; +use crate::db::Db; +use crate::types::Combo; + +#[derive(Debug, Clone, Subcommand)] +pub enum ComboCmd { + /// List all combos. + List, + /// Show one combo by name. + Get { name: String }, + /// Create a new combo. + Create { + /// Combo name (letters, digits, `_`, `-`, `.`). + #[arg(long)] + name: String, + /// Models in priority order, comma-separated (e.g. `openai/gpt-4o,anthropic/claude-3-5-sonnet`). + #[arg(long, value_delimiter = ',')] + models: Vec, + /// Strategy: `fallback` (default), `round-robin`, or `sticky-round-robin`. + #[arg(long, default_value = "fallback")] + strategy: String, + }, + /// Edit an existing combo. Any flag omitted is left unchanged. + Edit { + name: String, + #[arg(long, value_delimiter = ',')] + models: Option>, + #[arg(long)] + strategy: Option, + }, + /// Delete a combo. Exit 0 if it does not exist (use `--strict` to fail). + Delete { + name: String, + #[arg(long)] + strict: bool, + }, + /// Mark combo active (sets `isActive=true` in extras). + Enable { name: String }, + /// Mark combo inactive. + Disable { name: String }, + /// Dry-run a combo's expansion to verify membership and reachability. + Test { + name: String, + /// Optional prompt; ignored unless `--live`. + #[arg(long)] + prompt: Option, + }, + /// Idempotent upsert from a YAML/JSON file or stdin (`-`). + Apply { + /// Path or `-` for stdin. + #[arg(long = "from-file", default_value = "-")] + from_file: String, + /// Delete combos that exist in DB but are missing from the input. + #[arg(long)] + prune: bool, + }, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct ComboInput { + name: String, + #[serde(default)] + models: Vec, + #[serde(default)] + strategy: Option, + #[serde(default)] + kind: Option, + #[serde(default)] + is_active: Option, +} + +pub async fn run(cmd: ComboCmd, db: &Db, ctx: OutputCtx) -> anyhow::Result<()> { + match cmd { + ComboCmd::List => run_list(db, ctx).await, + ComboCmd::Get { name } => run_get(db, ctx, &name).await, + ComboCmd::Create { + name, + models, + strategy, + } => run_create(db, ctx, name, models, strategy).await, + ComboCmd::Edit { + name, + models, + strategy, + } => run_edit(db, ctx, &name, models, strategy).await, + ComboCmd::Delete { name, strict } => run_delete(db, ctx, &name, strict).await, + ComboCmd::Enable { name } => run_set_active(db, ctx, &name, true).await, + ComboCmd::Disable { name } => run_set_active(db, ctx, &name, false).await, + ComboCmd::Test { name, prompt } => run_test(db, ctx, &name, prompt).await, + ComboCmd::Apply { from_file, prune } => run_apply(db, ctx, &from_file, prune).await, + } +} + +async fn run_list(db: &Db, ctx: OutputCtx) -> anyhow::Result<()> { + let snapshot = db.snapshot(); + let combos = snapshot.combos.clone(); + if ctx.is_robot() { + emit_robot( + "openproxy.v1.combo.list", + json!({ "combos": combos, "count": combos.len() }), + )?; + } else { + humanln(ctx, format!("Combos ({}):", combos.len())); + for combo in &combos { + humanln( + ctx, + format!( + " {} [{}] {} model(s)", + combo.name, + combo.kind.as_deref().unwrap_or("fallback"), + combo.models.len() + ), + ); + } + } + Ok(()) +} + +async fn run_get(db: &Db, ctx: OutputCtx, name: &str) -> anyhow::Result<()> { + let Some(combo) = db.combo_by_name(name) else { + let exit = emit_error(ctx, "not_found", &format!("combo '{name}' not found"))?; + std::process::exit(exit); + }; + if ctx.is_robot() { + emit_robot("openproxy.v1.combo.get", serde_json::to_value(&combo)?)?; + } else { + humanln(ctx, format!("Combo: {}", combo.name)); + humanln( + ctx, + format!(" kind: {}", combo.kind.as_deref().unwrap_or("fallback")), + ); + humanln(ctx, format!(" models ({}):", combo.models.len())); + for m in &combo.models { + humanln(ctx, format!(" - {m}")); + } + } + Ok(()) +} + +async fn run_create( + db: &Db, + ctx: OutputCtx, + name: String, + models: Vec, + strategy: String, +) -> anyhow::Result<()> { + if !is_valid_combo_name(&name) { + let exit = emit_error( + ctx, + "validation", + &format!("invalid combo name '{name}'. Use letters, digits, '_', '-', or '.'."), + )?; + std::process::exit(exit); + } + if models.is_empty() { + let exit = emit_error(ctx, "validation", "--models requires at least one model id")?; + std::process::exit(exit); + } + if db.combo_by_name(&name).is_some() { + let exit = emit_error( + ctx, + "conflict", + &format!("combo '{name}' already exists. Use `combo edit` or `combo apply`."), + )?; + std::process::exit(exit); + } + + let now = chrono::Utc::now().to_rfc3339(); + let combo = Combo { + id: uuid::Uuid::new_v4().to_string(), + name: name.clone(), + models, + kind: normalize_strategy(&strategy), + created_at: Some(now.clone()), + updated_at: Some(now), + extra: BTreeMap::new(), + }; + + db.update(|db| db.combos.push(combo.clone())).await?; + + if ctx.is_robot() { + emit_robot("openproxy.v1.combo.create", serde_json::to_value(&combo)?)?; + } else { + humanln(ctx, format!("created combo '{}'", combo.name)); + } + Ok(()) +} + +async fn run_edit( + db: &Db, + ctx: OutputCtx, + name: &str, + models: Option>, + strategy: Option, +) -> anyhow::Result<()> { + if db.combo_by_name(name).is_none() { + let exit = emit_error(ctx, "not_found", &format!("combo '{name}' not found"))?; + std::process::exit(exit); + } + let mut updated: Option = None; + db.update(|db| { + if let Some(combo) = db.combos.iter_mut().find(|c| c.name == name) { + if let Some(m) = &models { + combo.models = m.clone(); + } + if let Some(s) = &strategy { + combo.kind = normalize_strategy(s); + } + combo.updated_at = Some(chrono::Utc::now().to_rfc3339()); + updated = Some(combo.clone()); + } + }) + .await?; + + let combo = updated.expect("combo existed before update"); + if ctx.is_robot() { + emit_robot("openproxy.v1.combo.edit", serde_json::to_value(&combo)?)?; + } else { + humanln(ctx, format!("updated combo '{}'", combo.name)); + } + Ok(()) +} + +async fn run_delete(db: &Db, ctx: OutputCtx, name: &str, strict: bool) -> anyhow::Result<()> { + if db.combo_by_name(name).is_none() { + if strict { + let exit = emit_error(ctx, "not_found", &format!("combo '{name}' not found"))?; + std::process::exit(exit); + } + if ctx.is_robot() { + emit_robot( + "openproxy.v1.combo.delete", + json!({ "name": name, "deleted": false }), + )?; + } else { + humanln(ctx, format!("combo '{name}' not found (no-op)")); + } + return Ok(()); + } + + db.update(|db| db.combos.retain(|c| c.name != name)).await?; + + if ctx.is_robot() { + emit_robot( + "openproxy.v1.combo.delete", + json!({ "name": name, "deleted": true }), + )?; + } else { + humanln(ctx, format!("deleted combo '{name}'")); + } + Ok(()) +} + +async fn run_set_active(db: &Db, ctx: OutputCtx, name: &str, active: bool) -> anyhow::Result<()> { + if db.combo_by_name(name).is_none() { + let exit = emit_error(ctx, "not_found", &format!("combo '{name}' not found"))?; + std::process::exit(exit); + } + let mut updated: Option = None; + db.update(|db| { + if let Some(combo) = db.combos.iter_mut().find(|c| c.name == name) { + combo.extra.insert("isActive".into(), Value::Bool(active)); + combo.updated_at = Some(chrono::Utc::now().to_rfc3339()); + updated = Some(combo.clone()); + } + }) + .await?; + + let combo = updated.expect("combo existed"); + let schema = if active { + "openproxy.v1.combo.enable" + } else { + "openproxy.v1.combo.disable" + }; + if ctx.is_robot() { + emit_robot(schema, serde_json::to_value(&combo)?)?; + } else { + humanln( + ctx, + format!( + "{} combo '{}'", + if active { "enabled" } else { "disabled" }, + combo.name + ), + ); + } + Ok(()) +} + +async fn run_test( + db: &Db, + ctx: OutputCtx, + name: &str, + _prompt: Option, +) -> anyhow::Result<()> { + let Some(combo) = db.combo_by_name(name) else { + let exit = emit_error(ctx, "not_found", &format!("combo '{name}' not found"))?; + std::process::exit(exit); + }; + + let snapshot = db.snapshot(); + let known_providers: HashSet = snapshot + .provider_connections + .iter() + .map(|c| c.provider.clone()) + .collect(); + let known_node_ids: HashSet = snapshot + .provider_nodes + .iter() + .map(|n| n.id.clone()) + .collect(); + + let mut members = Vec::with_capacity(combo.models.len()); + for model in &combo.models { + let provider_part = model.split('/').next().unwrap_or(""); + let resolved = !provider_part.is_empty() + && (known_providers.contains(provider_part) || known_node_ids.contains(provider_part)); + members.push(json!({ + "model": model, + "provider": provider_part, + "resolved": resolved, + })); + } + + let payload = json!({ + "name": combo.name, + "kind": combo.kind.clone().unwrap_or_else(|| "fallback".into()), + "members": members, + "reachable": members.iter().all(|m| m.get("resolved") == Some(&Value::Bool(true))), + }); + + if ctx.is_robot() { + emit_robot("openproxy.v1.combo.test", payload)?; + } else { + humanln(ctx, format!("combo '{}' resolution:", combo.name)); + for member in &members { + let resolved = member.get("resolved") == Some(&Value::Bool(true)); + humanln( + ctx, + format!( + " {} {} ({})", + if resolved { "OK " } else { "MISS" }, + member.get("model").and_then(Value::as_str).unwrap_or(""), + member.get("provider").and_then(Value::as_str).unwrap_or(""), + ), + ); + } + } + Ok(()) +} + +async fn run_apply(db: &Db, ctx: OutputCtx, from_file: &str, prune: bool) -> anyhow::Result<()> { + let (doc, _) = match load_document(from_file) { + Ok(d) => d, + Err(e) => { + let exit = emit_error(ctx, "validation", &e.to_string())?; + std::process::exit(exit); + } + }; + let items: Vec = match into_items(doc) { + Ok(items) => items, + Err(e) => { + let exit = emit_error(ctx, "validation", &e.to_string())?; + std::process::exit(exit); + } + }; + + for item in &items { + if !is_valid_combo_name(&item.name) { + let exit = emit_error( + ctx, + "validation", + &format!("invalid combo name '{}'", item.name), + )?; + std::process::exit(exit); + } + } + + let mut diff = ApplyDiff::default(); + let names_in_doc: HashSet = items.iter().map(|i| i.name.clone()).collect(); + let now = chrono::Utc::now().to_rfc3339(); + + db.update(|app| { + for item in &items { + let target_kind = item + .kind + .clone() + .or_else(|| item.strategy.as_ref().and_then(|s| normalize_strategy(s))); + if let Some(existing) = app.combos.iter_mut().find(|c| c.name == item.name) { + let mut changed = false; + if existing.models != item.models { + existing.models = item.models.clone(); + changed = true; + } + if existing.kind != target_kind { + existing.kind = target_kind.clone(); + changed = true; + } + if let Some(active) = item.is_active { + let prev = existing.extra.get("isActive").and_then(Value::as_bool); + if prev != Some(active) { + existing + .extra + .insert("isActive".into(), Value::Bool(active)); + changed = true; + } + } + if changed { + existing.updated_at = Some(now.clone()); + diff.updated.push(item.name.clone()); + } else { + diff.unchanged.push(item.name.clone()); + } + } else { + let mut extra = BTreeMap::new(); + if let Some(active) = item.is_active { + extra.insert("isActive".into(), Value::Bool(active)); + } + app.combos.push(Combo { + id: uuid::Uuid::new_v4().to_string(), + name: item.name.clone(), + models: item.models.clone(), + kind: target_kind, + created_at: Some(now.clone()), + updated_at: Some(now.clone()), + extra, + }); + diff.created.push(item.name.clone()); + } + } + if prune { + let to_remove: Vec = app + .combos + .iter() + .filter(|c| !names_in_doc.contains(&c.name)) + .map(|c| c.name.clone()) + .collect(); + for name in &to_remove { + diff.deleted.push(name.clone()); + } + app.combos.retain(|c| names_in_doc.contains(&c.name)); + } + }) + .await?; + + let summary = diff.summary(); + if ctx.is_robot() { + emit_robot( + "openproxy.v1.combo.apply", + json!({ + "diff": diff, + "summary": summary, + "prune": prune, + }), + )?; + } else { + humanln(ctx, format!("combo apply: {summary}")); + } + Ok(()) +} + +fn is_valid_combo_name(name: &str) -> bool { + !name.is_empty() + && name.len() <= 100 + && name + .chars() + .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.') +} + +fn normalize_strategy(s: &str) -> Option { + let trimmed = s.trim().to_ascii_lowercase(); + if trimmed.is_empty() { + return None; + } + let canonical = match trimmed.as_str() { + "fallback" | "fallback-chain" => "fallback", + "rr" | "round-robin" | "round_robin" | "roundrobin" => "round-robin", + "sticky-rr" | "sticky-round-robin" | "sticky_round_robin" => "sticky-round-robin", + other => return Some(other.to_string()), + }; + Some(canonical.to_string()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn validates_combo_name() { + assert!(is_valid_combo_name("combo-1")); + assert!(is_valid_combo_name("combo_1.v2")); + assert!(!is_valid_combo_name("")); + assert!(!is_valid_combo_name("combo name")); + assert!(!is_valid_combo_name("café")); + } + + #[test] + fn normalizes_strategy_aliases() { + assert_eq!(normalize_strategy("FALLBACK").as_deref(), Some("fallback")); + assert_eq!(normalize_strategy("rr").as_deref(), Some("round-robin")); + assert_eq!( + normalize_strategy("sticky-rr").as_deref(), + Some("sticky-round-robin") + ); + assert_eq!(normalize_strategy("").as_deref(), None); + // Unknown values pass through (lowercased). + assert_eq!(normalize_strategy("custom").as_deref(), Some("custom")); + } +} diff --git a/src/cli/key_ext.rs b/src/cli/key_ext.rs new file mode 100644 index 00000000..49dd63d0 --- /dev/null +++ b/src/cli/key_ext.rs @@ -0,0 +1,291 @@ +//! `openproxy key *` — extended API key lifecycle commands (M3). +//! +//! Complements the existing `key list` / `key add` (which now live as +//! variants on `KeyCmd` in `cli::mod`). These commands implement the +//! full CRUD + rotate + idempotent apply. + +use std::collections::HashSet; + +use clap::Subcommand; +use serde::Deserialize; +use serde_json::{json, Value}; + +use crate::cli::apply::{into_items, load_document, ApplyDiff}; +use crate::cli::output::{emit_error, emit_robot, humanln, OutputCtx}; +use crate::db::Db; +use crate::types::ApiKey; + +#[derive(Debug, Clone, Subcommand)] +pub enum KeyExtCmd { + /// Show one key by id or name. + Get { id_or_name: String }, + /// Generate a fresh secret for an existing key. Returns the new key. + Rotate { id_or_name: String }, + /// Delete a key. + Delete { + id_or_name: String, + #[arg(long)] + strict: bool, + }, + /// Mark a key active. + Enable { id_or_name: String }, + /// Mark a key inactive (kept in DB, ignored by auth). + Disable { id_or_name: String }, + /// Idempotent upsert from a YAML/JSON file or stdin. + Apply { + #[arg(long = "from-file", default_value = "-")] + from_file: String, + #[arg(long)] + prune: bool, + }, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct KeyInput { + name: String, + #[serde(default)] + key: Option, + #[serde(default)] + is_active: Option, +} + +pub async fn run(cmd: KeyExtCmd, db: &Db, ctx: OutputCtx) -> anyhow::Result<()> { + match cmd { + KeyExtCmd::Get { id_or_name } => run_get(db, ctx, &id_or_name).await, + KeyExtCmd::Rotate { id_or_name } => run_rotate(db, ctx, &id_or_name).await, + KeyExtCmd::Delete { id_or_name, strict } => run_delete(db, ctx, &id_or_name, strict).await, + KeyExtCmd::Enable { id_or_name } => run_set_active(db, ctx, &id_or_name, true).await, + KeyExtCmd::Disable { id_or_name } => run_set_active(db, ctx, &id_or_name, false).await, + KeyExtCmd::Apply { from_file, prune } => run_apply(db, ctx, &from_file, prune).await, + } +} + +fn find_key(db: &Db, id_or_name: &str) -> Option { + db.snapshot() + .api_keys + .iter() + .find(|k| k.id == id_or_name || k.name == id_or_name) + .cloned() +} + +fn mask(secret: &str) -> String { + crate::cli::output::mask_secret(secret) +} + +async fn run_get(db: &Db, ctx: OutputCtx, id_or_name: &str) -> anyhow::Result<()> { + let Some(key) = find_key(db, id_or_name) else { + let exit = emit_error(ctx, "not_found", &format!("key '{id_or_name}' not found"))?; + std::process::exit(exit); + }; + let payload = json!({ + "id": key.id, + "name": key.name, + "keyMasked": mask(&key.key), + "isActive": key.is_active(), + "createdAt": key.created_at, + }); + if ctx.is_robot() { + emit_robot("openproxy.v1.key.get", payload)?; + } else { + humanln(ctx, format!("Key: {} ({})", key.name, key.id)); + humanln(ctx, format!(" key: {}", mask(&key.key))); + humanln(ctx, format!(" active: {}", key.is_active())); + } + Ok(()) +} + +async fn run_rotate(db: &Db, ctx: OutputCtx, id_or_name: &str) -> anyhow::Result<()> { + if find_key(db, id_or_name).is_none() { + let exit = emit_error(ctx, "not_found", &format!("key '{id_or_name}' not found"))?; + std::process::exit(exit); + } + let machine_id = uuid::Uuid::new_v4().simple().to_string(); + let new_secret = crate::core::auth::generate_api_key_with_machine(&machine_id); + let new_secret_clone = new_secret.clone(); + db.update(|app| { + if let Some(key) = app + .api_keys + .iter_mut() + .find(|k| k.id == id_or_name || k.name == id_or_name) + { + key.key = new_secret_clone.clone(); + key.machine_id = Some(machine_id.clone()); + } + }) + .await?; + + let payload = json!({ + "name": id_or_name, + "newKey": new_secret, + "newKeyMasked": mask(&new_secret), + }); + if ctx.is_robot() { + emit_robot("openproxy.v1.key.rotate", payload)?; + } else { + humanln(ctx, format!("rotated key '{id_or_name}'")); + humanln(ctx, format!(" new key: {new_secret}")); + } + Ok(()) +} + +async fn run_delete(db: &Db, ctx: OutputCtx, id_or_name: &str, strict: bool) -> anyhow::Result<()> { + if find_key(db, id_or_name).is_none() { + if strict { + let exit = emit_error(ctx, "not_found", &format!("key '{id_or_name}' not found"))?; + std::process::exit(exit); + } + if ctx.is_robot() { + emit_robot( + "openproxy.v1.key.delete", + json!({ "key": id_or_name, "deleted": false }), + )?; + } else { + humanln(ctx, format!("key '{id_or_name}' not found (no-op)")); + } + return Ok(()); + } + db.update(|app| { + app.api_keys + .retain(|k| k.id != id_or_name && k.name != id_or_name); + }) + .await?; + if ctx.is_robot() { + emit_robot( + "openproxy.v1.key.delete", + json!({ "key": id_or_name, "deleted": true }), + )?; + } else { + humanln(ctx, format!("deleted key '{id_or_name}'")); + } + Ok(()) +} + +async fn run_set_active( + db: &Db, + ctx: OutputCtx, + id_or_name: &str, + active: bool, +) -> anyhow::Result<()> { + if find_key(db, id_or_name).is_none() { + let exit = emit_error(ctx, "not_found", &format!("key '{id_or_name}' not found"))?; + std::process::exit(exit); + } + db.update(|app| { + if let Some(key) = app + .api_keys + .iter_mut() + .find(|k| k.id == id_or_name || k.name == id_or_name) + { + key.is_active = Some(active); + } + }) + .await?; + let schema = if active { + "openproxy.v1.key.enable" + } else { + "openproxy.v1.key.disable" + }; + if ctx.is_robot() { + emit_robot(schema, json!({ "key": id_or_name, "active": active }))?; + } else { + humanln( + ctx, + format!( + "{} key '{id_or_name}'", + if active { "enabled" } else { "disabled" } + ), + ); + } + Ok(()) +} + +async fn run_apply(db: &Db, ctx: OutputCtx, from_file: &str, prune: bool) -> anyhow::Result<()> { + let (doc, _) = match load_document(from_file) { + Ok(d) => d, + Err(e) => { + let exit = emit_error(ctx, "validation", &e.to_string())?; + std::process::exit(exit); + } + }; + let items: Vec = match into_items(doc) { + Ok(items) => items, + Err(e) => { + let exit = emit_error(ctx, "validation", &e.to_string())?; + std::process::exit(exit); + } + }; + + let names_in_doc: HashSet = items.iter().map(|i| i.name.clone()).collect(); + let mut diff = ApplyDiff::default(); + let now = chrono::Utc::now().to_rfc3339(); + + db.update(|app| { + for item in &items { + if let Some(existing) = app.api_keys.iter_mut().find(|k| k.name == item.name) { + let mut changed = false; + if let Some(active) = item.is_active { + if existing.is_active() != active { + existing.is_active = Some(active); + changed = true; + } + } + if let Some(key) = item.key.as_deref().filter(|k| !k.is_empty()) { + if existing.key != key { + existing.key = key.to_string(); + changed = true; + } + } + if changed { + diff.updated.push(item.name.clone()); + } else { + diff.unchanged.push(item.name.clone()); + } + } else { + let machine_id = uuid::Uuid::new_v4().simple().to_string(); + let key_secret = item + .key + .clone() + .filter(|k| !k.is_empty()) + .unwrap_or_else(|| { + crate::core::auth::generate_api_key_with_machine(&machine_id) + }); + app.api_keys.push(ApiKey { + id: uuid::Uuid::new_v4().to_string(), + name: item.name.clone(), + key: key_secret, + machine_id: Some(machine_id), + is_active: Some(item.is_active.unwrap_or(true)), + created_at: Some(now.clone()), + extra: std::collections::BTreeMap::new(), + }); + diff.created.push(item.name.clone()); + } + } + if prune { + let to_delete: Vec = app + .api_keys + .iter() + .filter(|k| !names_in_doc.contains(&k.name)) + .map(|k| k.name.clone()) + .collect(); + for name in &to_delete { + diff.deleted.push(name.clone()); + } + app.api_keys.retain(|k| names_in_doc.contains(&k.name)); + } + }) + .await?; + + let summary = diff.summary(); + if ctx.is_robot() { + emit_robot( + "openproxy.v1.key.apply", + json!({ "diff": diff, "summary": summary, "prune": prune }), + )?; + } else { + humanln(ctx, format!("key apply: {summary}")); + } + let _ = Value::Null; + Ok(()) +} diff --git a/src/cli/mod.rs b/src/cli/mod.rs index d0d61389..7b630307 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -19,10 +19,18 @@ use crate::types::{ApiKey, AppDb, ProviderConnection, ProxyPool}; use crate::core::tunnel::{TunnelManager, TunnelProvider}; +pub mod apply; pub mod auth; +pub mod combo; pub mod config; pub mod doctor; +pub mod key_ext; +pub mod models; pub mod output; +pub mod pool_ext; +pub mod provider_ext; +pub mod provider_models; +pub mod provider_node; pub mod schema; pub mod server; @@ -131,6 +139,16 @@ pub enum Command { #[command(subcommand)] cmd: PoolCmd, }, + /// Combo (fallback / round-robin chain) management. + Combo { + #[command(subcommand)] + cmd: combo::ComboCmd, + }, + /// Top-level model registry (built-in + custom). + Models { + #[command(subcommand)] + cmd: models::ModelsCmd, + }, Tunnel { #[command(subcommand)] cmd: TunnelCmd, @@ -269,6 +287,60 @@ pub enum ProviderCmd { #[arg(long)] json: bool, }, + /// Custom provider node (instance) management. + Node { + #[command(subcommand)] + cmd: provider_node::NodeCmd, + }, + /// Models, aliases, and disabled-model list for a provider. + Models { + #[command(subcommand)] + cmd: provider_models::ModelsCmd, + }, + /// Show one provider connection by id or name. + Get { id_or_name: String }, + /// Edit a provider connection's fields. + Edit { + id_or_name: String, + #[arg(long)] + api_key: Option, + #[arg(long)] + base_url: Option, + #[arg(long)] + priority: Option, + #[arg(long)] + default_model: Option, + }, + /// Delete a provider connection. + Delete { + id_or_name: String, + #[arg(long)] + strict: bool, + }, + /// Mark provider active. + Enable { id_or_name: String }, + /// Mark provider inactive. + Disable { id_or_name: String }, + /// Run a real connectivity probe. + Test { id_or_name: String }, + /// Validate raw credentials (no DB write). + Validate { + #[arg(long)] + provider: String, + #[arg(long)] + api_key: Option, + #[arg(long)] + base_url: Option, + }, + /// Report this CLI's client identity. + ClientInfo, + /// Idempotent upsert from a YAML/JSON document. + Apply { + #[arg(long = "from-file", default_value = "-")] + from_file: String, + #[arg(long)] + prune: bool, + }, } #[derive(Debug, Clone, Subcommand)] @@ -283,6 +355,27 @@ pub enum KeyCmd { #[arg(long)] json: bool, }, + /// Show one key by id or name (secret is masked). + Get { id_or_name: String }, + /// Generate a fresh secret for an existing key. + Rotate { id_or_name: String }, + /// Delete a key. + Delete { + id_or_name: String, + #[arg(long)] + strict: bool, + }, + /// Mark key active. + Enable { id_or_name: String }, + /// Mark key inactive. + Disable { id_or_name: String }, + /// Idempotent upsert from YAML/JSON. + Apply { + #[arg(long = "from-file", default_value = "-")] + from_file: String, + #[arg(long)] + prune: bool, + }, } #[derive(Debug, Clone, Subcommand)] @@ -307,6 +400,37 @@ pub enum PoolCmd { #[arg(long)] json: bool, }, + /// Show a single pool by name or id. + Get { name: String }, + /// Edit a pool's URL/type/strict flag. + Edit { + name: String, + #[arg(long)] + proxy_url: Option, + #[arg(long)] + r#type: Option, + #[arg(long)] + strict: Option, + }, + /// Mark pool active. + Enable { name: String }, + /// Mark pool inactive. + Disable { name: String }, + /// Probe an HTTP target through the pool. + Test { + name: String, + #[arg(long, default_value = "https://httpbin.org/get")] + target: String, + }, + /// Show recorded success/rtt stats. + Stats { name: String }, + /// Idempotent upsert from YAML/JSON. + Apply { + #[arg(long = "from-file", default_value = "-")] + from_file: String, + #[arg(long)] + prune: bool, + }, } #[derive(Debug, Clone, Subcommand)] @@ -346,6 +470,18 @@ impl Cli { let rt = tokio::runtime::Runtime::new()?; rt.block_on(run_pool(cmd, &db, ctx)) } + Command::Combo { cmd } => { + let db = rt.block_on(Db::load())?; + let db = std::sync::Arc::new(db); + let rt = tokio::runtime::Runtime::new()?; + rt.block_on(combo::run(cmd, &db, ctx)) + } + Command::Models { cmd } => { + let db = rt.block_on(Db::load())?; + let db = std::sync::Arc::new(db); + let rt = tokio::runtime::Runtime::new()?; + rt.block_on(models::run(cmd, &db, ctx)) + } Command::Tunnel { cmd } => { let db = rt.block_on(Db::load())?; let db = std::sync::Arc::new(db); @@ -544,6 +680,80 @@ pub async fn run_provider(cmd: ProviderCmd, db: &Db, ctx: output::OutputCtx) -> ); } } + ProviderCmd::Node { cmd } => provider_node::run(cmd, db, ctx).await?, + ProviderCmd::Models { cmd } => provider_models::run(cmd, db, ctx).await?, + ProviderCmd::Get { id_or_name } => { + provider_ext::run(provider_ext::ProviderExtCmd::Get { id_or_name }, db, ctx).await? + } + ProviderCmd::Edit { + id_or_name, + api_key, + base_url, + priority, + default_model, + } => { + provider_ext::run( + provider_ext::ProviderExtCmd::Edit { + id_or_name, + api_key, + base_url, + priority, + default_model, + }, + db, + ctx, + ) + .await? + } + ProviderCmd::Delete { id_or_name, strict } => { + provider_ext::run( + provider_ext::ProviderExtCmd::Delete { id_or_name, strict }, + db, + ctx, + ) + .await? + } + ProviderCmd::Enable { id_or_name } => { + provider_ext::run(provider_ext::ProviderExtCmd::Enable { id_or_name }, db, ctx).await? + } + ProviderCmd::Disable { id_or_name } => { + provider_ext::run( + provider_ext::ProviderExtCmd::Disable { id_or_name }, + db, + ctx, + ) + .await? + } + ProviderCmd::Test { id_or_name } => { + provider_ext::run(provider_ext::ProviderExtCmd::Test { id_or_name }, db, ctx).await? + } + ProviderCmd::Validate { + provider, + api_key, + base_url, + } => { + provider_ext::run( + provider_ext::ProviderExtCmd::Validate { + provider, + api_key, + base_url, + }, + db, + ctx, + ) + .await? + } + ProviderCmd::ClientInfo => { + provider_ext::run(provider_ext::ProviderExtCmd::ClientInfo, db, ctx).await? + } + ProviderCmd::Apply { from_file, prune } => { + provider_ext::run( + provider_ext::ProviderExtCmd::Apply { from_file, prune }, + db, + ctx, + ) + .await? + } } Ok(()) } @@ -692,6 +902,24 @@ pub async fn run_key(cmd: KeyCmd, db: &Db, ctx: output::OutputCtx) -> anyhow::Re output::humanln(ctx, "API key added successfully"); } } + KeyCmd::Get { id_or_name } => { + key_ext::run(key_ext::KeyExtCmd::Get { id_or_name }, db, ctx).await? + } + KeyCmd::Rotate { id_or_name } => { + key_ext::run(key_ext::KeyExtCmd::Rotate { id_or_name }, db, ctx).await? + } + KeyCmd::Delete { id_or_name, strict } => { + key_ext::run(key_ext::KeyExtCmd::Delete { id_or_name, strict }, db, ctx).await? + } + KeyCmd::Enable { id_or_name } => { + key_ext::run(key_ext::KeyExtCmd::Enable { id_or_name }, db, ctx).await? + } + KeyCmd::Disable { id_or_name } => { + key_ext::run(key_ext::KeyExtCmd::Disable { id_or_name }, db, ctx).await? + } + KeyCmd::Apply { from_file, prune } => { + key_ext::run(key_ext::KeyExtCmd::Apply { from_file, prune }, db, ctx).await? + } } Ok(()) } @@ -826,6 +1054,40 @@ pub async fn run_pool(cmd: PoolCmd, db: &Db, ctx: output::OutputCtx) -> anyhow:: output::humanln(ctx, format!("Pool '{}' deleted successfully", name)); } } + PoolCmd::Get { name } => pool_ext::run(pool_ext::PoolExtCmd::Get { name }, db, ctx).await?, + PoolCmd::Edit { + name, + proxy_url, + r#type, + strict, + } => { + pool_ext::run( + pool_ext::PoolExtCmd::Edit { + name, + proxy_url, + r#type, + strict, + }, + db, + ctx, + ) + .await? + } + PoolCmd::Enable { name } => { + pool_ext::run(pool_ext::PoolExtCmd::Enable { name }, db, ctx).await? + } + PoolCmd::Disable { name } => { + pool_ext::run(pool_ext::PoolExtCmd::Disable { name }, db, ctx).await? + } + PoolCmd::Test { name, target } => { + pool_ext::run(pool_ext::PoolExtCmd::Test { name, target }, db, ctx).await? + } + PoolCmd::Stats { name } => { + pool_ext::run(pool_ext::PoolExtCmd::Stats { name }, db, ctx).await? + } + PoolCmd::Apply { from_file, prune } => { + pool_ext::run(pool_ext::PoolExtCmd::Apply { from_file, prune }, db, ctx).await? + } } Ok(()) } diff --git a/src/cli/models.rs b/src/cli/models.rs new file mode 100644 index 00000000..e044d8cf --- /dev/null +++ b/src/cli/models.rs @@ -0,0 +1,238 @@ +//! `openproxy models *` — top-level model registry view. +//! +//! Reads the built-in provider catalog merged with the user's custom +//! models, aliases, and disabled lists from `db.json`. Exposes a +//! pricing view that mirrors the dashboard's pricing editor. + +use std::collections::BTreeMap; + +use clap::Subcommand; +use serde_json::{json, Value}; + +use crate::cli::output::{emit_error, emit_robot, humanln, OutputCtx}; +use crate::core::model::catalog::provider_catalog; +use crate::db::Db; +use crate::types::ModelAliasTarget; + +#[derive(Debug, Clone, Subcommand)] +pub enum ModelsCmd { + /// List models across all providers (built-in + custom). + List { + /// Filter to a single provider alias. + #[arg(long)] + provider: Option, + }, + /// Show info for a single model id (`/` or alias). + Info { model: String }, + /// Probe the underlying provider connection for the model. + Test { model: String }, + /// Show the pricing table for a model (or all models if not specified). + Pricing { + #[arg(long)] + model: Option, + }, +} + +pub async fn run(cmd: ModelsCmd, db: &Db, ctx: OutputCtx) -> anyhow::Result<()> { + match cmd { + ModelsCmd::List { provider } => run_list(db, ctx, provider.as_deref()).await, + ModelsCmd::Info { model } => run_info(db, ctx, &model).await, + ModelsCmd::Test { model } => run_test(db, ctx, &model).await, + ModelsCmd::Pricing { model } => run_pricing(db, ctx, model.as_deref()).await, + } +} + +async fn run_list(db: &Db, ctx: OutputCtx, provider: Option<&str>) -> anyhow::Result<()> { + let catalog = provider_catalog(); + let snapshot = db.snapshot(); + + let mut groups: Vec = Vec::new(); + for entry in catalog.iter_provider_models() { + if let Some(p) = provider { + if entry.alias != p { + continue; + } + } + let custom_for_provider: Vec<&str> = snapshot + .custom_models + .iter() + .filter(|m| m.provider_alias == entry.alias) + .map(|m| m.id.as_str()) + .collect(); + let mut models: Vec = entry + .models + .iter() + .map(|m| { + json!({ + "id": m.id, + "name": m.name, + "kind": m.kind, + "source": "builtin", + }) + }) + .collect(); + for id in &custom_for_provider { + models.push(json!({ "id": id, "source": "custom" })); + } + groups.push(json!({ + "provider": entry.alias, + "count": models.len(), + "models": models, + })); + } + + if ctx.is_robot() { + emit_robot( + "openproxy.v1.models.list", + json!({ "providers": groups, "count": groups.len() }), + )?; + } else { + for group in &groups { + humanln( + ctx, + format!( + "{} ({})", + group.get("provider").and_then(Value::as_str).unwrap_or(""), + group.get("count").and_then(Value::as_u64).unwrap_or(0) + ), + ); + if let Some(models) = group.get("models").and_then(Value::as_array) { + for m in models { + humanln( + ctx, + format!( + " {} {} [{}]", + m.get("id").and_then(Value::as_str).unwrap_or(""), + m.get("kind").and_then(Value::as_str).unwrap_or(""), + m.get("source").and_then(Value::as_str).unwrap_or(""), + ), + ); + } + } + } + } + Ok(()) +} + +async fn run_info(db: &Db, ctx: OutputCtx, model: &str) -> anyhow::Result<()> { + let snapshot = db.snapshot(); + let resolved = crate::core::model::get_model_info(model, &snapshot); + let alias_target = snapshot.model_aliases.get(model).cloned(); + let provider = resolved.provider.as_deref().unwrap_or(""); + let pricing = snapshot.pricing.get(provider).cloned(); + let route_kind = match resolved.route_kind { + crate::core::model::ModelRouteKind::Direct => "direct", + crate::core::model::ModelRouteKind::Combo => "combo", + }; + let payload = json!({ + "input": model, + "provider": resolved.provider, + "modelId": resolved.model, + "routeKind": route_kind, + "alias": alias_target.map(|t| match t { + ModelAliasTarget::Path(s) => json!({"kind":"path","value":s}), + ModelAliasTarget::Mapping(r) => json!({"kind":"mapping","provider":r.provider,"model":r.model}), + }), + "pricing": pricing, + }); + if ctx.is_robot() { + emit_robot("openproxy.v1.models.info", payload)?; + } else { + humanln(ctx, format!("Model: {model}")); + humanln( + ctx, + format!( + " provider: {}", + resolved.provider.as_deref().unwrap_or("?"), + ), + ); + humanln(ctx, format!(" modelId: {}", resolved.model)); + humanln(ctx, format!(" route: {route_kind}")); + } + Ok(()) +} + +async fn run_test(db: &Db, ctx: OutputCtx, model: &str) -> anyhow::Result<()> { + let snapshot = db.snapshot(); + let resolved = crate::core::model::get_model_info(model, &snapshot); + let Some(provider_alias) = resolved.provider.as_deref() else { + let exit = emit_error( + ctx, + "not_found", + &format!("cannot resolve provider for '{model}'"), + )?; + std::process::exit(exit); + }; + let conn = snapshot + .provider_connections + .iter() + .find(|c| c.provider == provider_alias || c.name.as_deref() == Some(provider_alias)) + .cloned(); + let Some(conn) = conn else { + let exit = emit_error( + ctx, + "not_found", + &format!("no provider connection configured for '{provider_alias}'"), + )?; + std::process::exit(exit); + }; + let base_url = conn + .provider_specific_data + .get("baseUrl") + .and_then(Value::as_str) + .map(String::from); + let start = std::time::Instant::now(); + let (valid, error, latency_ms) = crate::server::api::providers::test_provider_api( + provider_alias, + conn.api_key.as_deref(), + base_url.as_deref(), + ) + .await; + let latency_ms = latency_ms.unwrap_or(start.elapsed().as_millis() as u64); + let payload = json!({ + "model": model, + "providerAlias": provider_alias, + "valid": valid, + "latencyMs": latency_ms, + "error": error, + }); + if ctx.is_robot() { + emit_robot("openproxy.v1.models.test", payload)?; + } else if valid { + humanln( + ctx, + format!("OK {model} via {provider_alias} ({latency_ms}ms)"), + ); + } else { + humanln( + ctx, + format!( + "FAIL {model} via {provider_alias} ({latency_ms}ms) — {}", + error.as_deref().unwrap_or("unknown") + ), + ); + } + Ok(()) +} + +async fn run_pricing(db: &Db, ctx: OutputCtx, model: Option<&str>) -> anyhow::Result<()> { + let snapshot = db.snapshot(); + let pricing = if let Some(m) = model { + let mut bucket: BTreeMap = BTreeMap::new(); + for (provider, rows) in &snapshot.pricing { + if let Some(entry) = rows.get(m) { + bucket.insert(provider.clone(), entry.clone()); + } + } + json!({ "model": m, "entries": bucket }) + } else { + serde_json::to_value(&snapshot.pricing)? + }; + if ctx.is_robot() { + emit_robot("openproxy.v1.models.pricing", pricing)?; + } else { + let pretty = serde_json::to_string_pretty(&pricing).unwrap_or_default(); + println!("{pretty}"); + } + Ok(()) +} diff --git a/src/cli/pool_ext.rs b/src/cli/pool_ext.rs new file mode 100644 index 00000000..c1f4aae8 --- /dev/null +++ b/src/cli/pool_ext.rs @@ -0,0 +1,425 @@ +//! `openproxy pool *` — extended proxy pool commands (M3). +//! +//! Complements the existing `pool list/status/create/delete` with +//! get/edit/enable/disable/test/stats/apply. + +use std::collections::{BTreeMap, HashSet}; +use std::time::{Duration, Instant}; + +use clap::Subcommand; +use serde::Deserialize; +use serde_json::{json, Value}; + +use crate::cli::apply::{into_items, load_document, ApplyDiff}; +use crate::cli::output::{emit_error, emit_robot, humanln, OutputCtx}; +use crate::db::Db; +use crate::types::ProxyPool; + +#[derive(Debug, Clone, Subcommand)] +pub enum PoolExtCmd { + /// Show one pool by name. + Get { name: String }, + /// Edit a pool's properties (any flag omitted = unchanged). + Edit { + name: String, + #[arg(long)] + proxy_url: Option, + #[arg(long)] + r#type: Option, + #[arg(long)] + strict: Option, + }, + /// Mark pool active. + Enable { name: String }, + /// Mark pool inactive. + Disable { name: String }, + /// Probe an HTTP endpoint through the pool to confirm it works. + Test { + name: String, + /// URL to probe. Defaults to https://httpbin.org/get. + #[arg(long, default_value = "https://httpbin.org/get")] + target: String, + }, + /// Show recorded counters (success_rate, rtt_ms, totals). + Stats { name: String }, + /// Idempotent upsert from a YAML/JSON file or stdin. + Apply { + #[arg(long = "from-file", default_value = "-")] + from_file: String, + #[arg(long)] + prune: bool, + }, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct PoolInput { + name: String, + proxy_url: String, + #[serde(default)] + r#type: Option, + #[serde(default)] + is_active: Option, + #[serde(default)] + strict_proxy: Option, + #[serde(default)] + no_proxy: Option, +} + +pub async fn run(cmd: PoolExtCmd, db: &Db, ctx: OutputCtx) -> anyhow::Result<()> { + match cmd { + PoolExtCmd::Get { name } => run_get(db, ctx, &name).await, + PoolExtCmd::Edit { + name, + proxy_url, + r#type, + strict, + } => run_edit(db, ctx, &name, proxy_url, r#type, strict).await, + PoolExtCmd::Enable { name } => run_set_active(db, ctx, &name, true).await, + PoolExtCmd::Disable { name } => run_set_active(db, ctx, &name, false).await, + PoolExtCmd::Test { name, target } => run_test(db, ctx, &name, &target).await, + PoolExtCmd::Stats { name } => run_stats(db, ctx, &name).await, + PoolExtCmd::Apply { from_file, prune } => run_apply(db, ctx, &from_file, prune).await, + } +} + +fn find_pool(db: &Db, name: &str) -> Option { + db.snapshot() + .proxy_pools + .iter() + .find(|p| p.name == name || p.id == name) + .cloned() +} + +async fn run_get(db: &Db, ctx: OutputCtx, name: &str) -> anyhow::Result<()> { + let Some(pool) = find_pool(db, name) else { + let exit = emit_error(ctx, "not_found", &format!("pool '{name}' not found"))?; + std::process::exit(exit); + }; + if ctx.is_robot() { + emit_robot("openproxy.v1.pool.get", serde_json::to_value(&pool)?)?; + } else { + humanln(ctx, format!("Pool: {} ({})", pool.name, pool.id)); + humanln(ctx, format!(" proxyUrl: {}", pool.proxy_url)); + humanln(ctx, format!(" type: {}", pool.r#type)); + humanln( + ctx, + format!(" active: {}", pool.is_active.unwrap_or(true)), + ); + } + Ok(()) +} + +async fn run_edit( + db: &Db, + ctx: OutputCtx, + name: &str, + proxy_url: Option, + pool_type: Option, + strict: Option, +) -> anyhow::Result<()> { + if find_pool(db, name).is_none() { + let exit = emit_error(ctx, "not_found", &format!("pool '{name}' not found"))?; + std::process::exit(exit); + } + let mut updated: Option = None; + db.update(|app| { + if let Some(pool) = app + .proxy_pools + .iter_mut() + .find(|p| p.name == name || p.id == name) + { + if let Some(url) = &proxy_url { + pool.proxy_url = url.clone(); + } + if let Some(kind) = &pool_type { + pool.r#type = kind.clone(); + } + if let Some(s) = strict { + pool.strict_proxy = Some(s); + } + pool.updated_at = Some(chrono::Utc::now().to_rfc3339()); + updated = Some(pool.clone()); + } + }) + .await?; + let pool = updated.expect("pool existed"); + if ctx.is_robot() { + emit_robot("openproxy.v1.pool.edit", serde_json::to_value(&pool)?)?; + } else { + humanln(ctx, format!("updated pool '{}'", pool.name)); + } + Ok(()) +} + +async fn run_set_active(db: &Db, ctx: OutputCtx, name: &str, active: bool) -> anyhow::Result<()> { + if find_pool(db, name).is_none() { + let exit = emit_error(ctx, "not_found", &format!("pool '{name}' not found"))?; + std::process::exit(exit); + } + db.update(|app| { + if let Some(pool) = app + .proxy_pools + .iter_mut() + .find(|p| p.name == name || p.id == name) + { + pool.is_active = Some(active); + pool.updated_at = Some(chrono::Utc::now().to_rfc3339()); + } + }) + .await?; + let schema = if active { + "openproxy.v1.pool.enable" + } else { + "openproxy.v1.pool.disable" + }; + if ctx.is_robot() { + emit_robot(schema, json!({ "name": name, "active": active }))?; + } else { + humanln( + ctx, + format!( + "{} pool '{name}'", + if active { "enabled" } else { "disabled" } + ), + ); + } + Ok(()) +} + +async fn run_test(db: &Db, ctx: OutputCtx, name: &str, target: &str) -> anyhow::Result<()> { + let Some(pool) = find_pool(db, name) else { + let exit = emit_error(ctx, "not_found", &format!("pool '{name}' not found"))?; + std::process::exit(exit); + }; + + let proxy = match reqwest::Proxy::all(&pool.proxy_url) { + Ok(p) => p, + Err(e) => { + let exit = emit_error( + ctx, + "validation", + &format!("invalid proxy_url '{}': {e}", pool.proxy_url), + )?; + std::process::exit(exit); + } + }; + + let client = reqwest::Client::builder() + .proxy(proxy) + .timeout(Duration::from_secs(10)) + .build()?; + let start = Instant::now(); + let result = client.get(target).send().await; + let rtt_ms = start.elapsed().as_millis() as u64; + + let (valid, status, error) = match result { + Ok(resp) => ( + resp.status().is_success(), + Some(resp.status().as_u16()), + None, + ), + Err(e) => (false, None, Some(e.to_string())), + }; + + let last_tested_at = chrono::Utc::now().to_rfc3339(); + db.update(|app| { + if let Some(p) = app + .proxy_pools + .iter_mut() + .find(|p| p.name == name || p.id == name) + { + p.test_status = Some(if valid { + "ok".to_string() + } else { + "failed".to_string() + }); + p.last_tested_at = Some(last_tested_at.clone()); + p.last_error = error.clone(); + p.rtt_ms = Some(rtt_ms); + } + }) + .await?; + + let payload = json!({ + "name": pool.name, + "target": target, + "valid": valid, + "status": status, + "rttMs": rtt_ms, + "error": error, + }); + if ctx.is_robot() { + emit_robot("openproxy.v1.pool.test", payload)?; + } else if valid { + humanln(ctx, format!("OK {name} ({rtt_ms}ms) — {target}")); + } else { + humanln( + ctx, + format!( + "FAIL {name} ({rtt_ms}ms) — {} — {target}", + error.as_deref().unwrap_or("?") + ), + ); + } + Ok(()) +} + +async fn run_stats(db: &Db, ctx: OutputCtx, name: &str) -> anyhow::Result<()> { + let Some(pool) = find_pool(db, name) else { + let exit = emit_error(ctx, "not_found", &format!("pool '{name}' not found"))?; + std::process::exit(exit); + }; + let payload = json!({ + "name": pool.name, + "id": pool.id, + "successRate": pool.success_rate, + "rttMs": pool.rtt_ms, + "totalRequests": pool.total_requests, + "failedRequests": pool.failed_requests, + "testStatus": pool.test_status, + "lastTestedAt": pool.last_tested_at, + "lastError": pool.last_error, + }); + if ctx.is_robot() { + emit_robot("openproxy.v1.pool.stats", payload)?; + } else { + humanln(ctx, format!("Pool stats: {}", pool.name)); + humanln( + ctx, + format!( + " successRate: {}", + pool.success_rate + .map(|r| format!("{:.2}%", r * 100.0)) + .unwrap_or_else(|| "-".into()) + ), + ); + humanln( + ctx, + format!( + " rttMs: {}", + pool.rtt_ms + .map(|r| r.to_string()) + .unwrap_or_else(|| "-".into()) + ), + ); + humanln( + ctx, + format!(" totalRequests: {}", pool.total_requests.unwrap_or(0)), + ); + humanln( + ctx, + format!(" failedRequests:{}", pool.failed_requests.unwrap_or(0)), + ); + } + Ok(()) +} + +async fn run_apply(db: &Db, ctx: OutputCtx, from_file: &str, prune: bool) -> anyhow::Result<()> { + let (doc, _) = match load_document(from_file) { + Ok(d) => d, + Err(e) => { + let exit = emit_error(ctx, "validation", &e.to_string())?; + std::process::exit(exit); + } + }; + let items: Vec = match into_items(doc) { + Ok(items) => items, + Err(e) => { + let exit = emit_error(ctx, "validation", &e.to_string())?; + std::process::exit(exit); + } + }; + + let names_in_doc: HashSet = items.iter().map(|i| i.name.clone()).collect(); + let mut diff = ApplyDiff::default(); + let now = chrono::Utc::now().to_rfc3339(); + + db.update(|app| { + for item in &items { + if let Some(existing) = app.proxy_pools.iter_mut().find(|p| p.name == item.name) { + let mut changed = false; + if existing.proxy_url != item.proxy_url { + existing.proxy_url = item.proxy_url.clone(); + changed = true; + } + if let Some(kind) = item.r#type.as_deref() { + if existing.r#type != kind { + existing.r#type = kind.to_string(); + changed = true; + } + } + if let Some(active) = item.is_active { + if existing.is_active.unwrap_or(true) != active { + existing.is_active = Some(active); + changed = true; + } + } + if let Some(strict) = item.strict_proxy { + if existing.strict_proxy != Some(strict) { + existing.strict_proxy = Some(strict); + changed = true; + } + } + if let Some(no_proxy) = item.no_proxy.as_deref() { + if existing.no_proxy != no_proxy { + existing.no_proxy = no_proxy.to_string(); + changed = true; + } + } + if changed { + existing.updated_at = Some(now.clone()); + diff.updated.push(item.name.clone()); + } else { + diff.unchanged.push(item.name.clone()); + } + } else { + app.proxy_pools.push(ProxyPool { + id: uuid::Uuid::new_v4().to_string(), + name: item.name.clone(), + proxy_url: item.proxy_url.clone(), + no_proxy: item.no_proxy.clone().unwrap_or_default(), + r#type: item.r#type.clone().unwrap_or_else(|| "http".to_string()), + is_active: Some(item.is_active.unwrap_or(true)), + strict_proxy: item.strict_proxy, + test_status: None, + last_tested_at: None, + last_error: None, + success_rate: None, + rtt_ms: None, + total_requests: None, + failed_requests: None, + created_at: Some(now.clone()), + updated_at: Some(now.clone()), + extra: BTreeMap::new(), + }); + diff.created.push(item.name.clone()); + } + } + if prune { + let to_delete: Vec = app + .proxy_pools + .iter() + .filter(|p| !names_in_doc.contains(&p.name)) + .map(|p| p.name.clone()) + .collect(); + for name in &to_delete { + diff.deleted.push(name.clone()); + } + app.proxy_pools.retain(|p| names_in_doc.contains(&p.name)); + } + }) + .await?; + + let summary = diff.summary(); + if ctx.is_robot() { + emit_robot( + "openproxy.v1.pool.apply", + json!({ "diff": diff, "summary": summary, "prune": prune }), + )?; + } else { + humanln(ctx, format!("pool apply: {summary}")); + } + let _ = Value::Null; + Ok(()) +} diff --git a/src/cli/provider_ext.rs b/src/cli/provider_ext.rs new file mode 100644 index 00000000..d1ee964f --- /dev/null +++ b/src/cli/provider_ext.rs @@ -0,0 +1,580 @@ +//! `openproxy provider *` — extended provider-connection commands (M3). +//! +//! Complements the existing `provider list/add`. Adds the full CRUD +//! lifecycle, connection test/validate, client-info, and idempotent +//! `apply` with diff output. + +use std::collections::{BTreeMap, HashSet}; +use std::time::Duration; + +use clap::Subcommand; +use serde::Deserialize; +use serde_json::{json, Value}; + +use crate::cli::apply::{into_items, load_document, ApplyDiff}; +use crate::cli::output::{emit_error, emit_robot, humanln, OutputCtx}; +use crate::db::{Db, ProviderConnectionFilter}; +use crate::types::ProviderConnection; + +#[derive(Debug, Clone, Subcommand)] +pub enum ProviderExtCmd { + /// Show one provider by id or name. + Get { id_or_name: String }, + /// Edit a provider connection. Any flag omitted = unchanged. + Edit { + id_or_name: String, + #[arg(long)] + api_key: Option, + #[arg(long)] + base_url: Option, + #[arg(long)] + priority: Option, + #[arg(long)] + default_model: Option, + }, + /// Delete a provider. + Delete { + id_or_name: String, + #[arg(long)] + strict: bool, + }, + /// Mark provider active. + Enable { id_or_name: String }, + /// Mark provider inactive. + Disable { id_or_name: String }, + /// Run a real connectivity probe against the provider's `/v1/models`. + Test { id_or_name: String }, + /// Validate raw credentials (does not require saved connection). + Validate { + /// Provider alias (openai, anthropic, ...). + #[arg(long)] + provider: String, + #[arg(long)] + api_key: Option, + #[arg(long)] + base_url: Option, + }, + /// Report the hostname/version/client identity (mirrors /api/providers/client). + ClientInfo, + /// Idempotent upsert from YAML/JSON. + Apply { + #[arg(long = "from-file", default_value = "-")] + from_file: String, + #[arg(long)] + prune: bool, + }, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct ProviderInput { + name: String, + provider: String, + #[serde(default)] + api_key: Option, + #[serde(default)] + base_url: Option, + #[serde(default)] + priority: Option, + #[serde(default)] + is_active: Option, + #[serde(default)] + default_model: Option, + #[serde(default)] + auth_type: Option, +} + +pub async fn run(cmd: ProviderExtCmd, db: &Db, ctx: OutputCtx) -> anyhow::Result<()> { + match cmd { + ProviderExtCmd::Get { id_or_name } => run_get(db, ctx, &id_or_name).await, + ProviderExtCmd::Edit { + id_or_name, + api_key, + base_url, + priority, + default_model, + } => { + run_edit( + db, + ctx, + &id_or_name, + api_key, + base_url, + priority, + default_model, + ) + .await + } + ProviderExtCmd::Delete { id_or_name, strict } => { + run_delete(db, ctx, &id_or_name, strict).await + } + ProviderExtCmd::Enable { id_or_name } => run_set_active(db, ctx, &id_or_name, true).await, + ProviderExtCmd::Disable { id_or_name } => run_set_active(db, ctx, &id_or_name, false).await, + ProviderExtCmd::Test { id_or_name } => run_test(db, ctx, &id_or_name).await, + ProviderExtCmd::Validate { + provider, + api_key, + base_url, + } => run_validate(ctx, &provider, api_key, base_url).await, + ProviderExtCmd::ClientInfo => run_client_info(ctx).await, + ProviderExtCmd::Apply { from_file, prune } => run_apply(db, ctx, &from_file, prune).await, + } +} + +fn find_provider(db: &Db, id_or_name: &str) -> Option { + db.snapshot() + .provider_connections + .iter() + .find(|c| { + c.id == id_or_name || c.name.as_deref() == Some(id_or_name) || c.provider == id_or_name + }) + .cloned() +} + +async fn run_get(db: &Db, ctx: OutputCtx, id_or_name: &str) -> anyhow::Result<()> { + let Some(conn) = find_provider(db, id_or_name) else { + let exit = emit_error( + ctx, + "not_found", + &format!("provider '{id_or_name}' not found"), + )?; + std::process::exit(exit); + }; + let mut conn_masked = conn.clone(); + if let Some(key) = conn_masked.api_key.as_deref() { + conn_masked.api_key = Some(crate::cli::output::mask_secret(key)); + } + if let Some(token) = conn_masked.access_token.as_deref() { + conn_masked.access_token = Some(crate::cli::output::mask_secret(token)); + } + if let Some(token) = conn_masked.refresh_token.as_deref() { + conn_masked.refresh_token = Some(crate::cli::output::mask_secret(token)); + } + if ctx.is_robot() { + emit_robot( + "openproxy.v1.provider.get", + serde_json::to_value(&conn_masked)?, + )?; + } else { + humanln( + ctx, + format!( + "Provider: {} ({}) type={}", + conn.name.as_deref().unwrap_or("-"), + conn.id, + conn.provider + ), + ); + humanln( + ctx, + format!( + " apiKey: {}", + conn.api_key + .as_deref() + .map(crate::cli::output::mask_secret) + .unwrap_or_else(|| "-".into()) + ), + ); + humanln( + ctx, + format!(" active: {}", conn.is_active.unwrap_or(true)), + ); + } + Ok(()) +} + +async fn run_edit( + db: &Db, + ctx: OutputCtx, + id_or_name: &str, + api_key: Option, + base_url: Option, + priority: Option, + default_model: Option, +) -> anyhow::Result<()> { + if find_provider(db, id_or_name).is_none() { + let exit = emit_error( + ctx, + "not_found", + &format!("provider '{id_or_name}' not found"), + )?; + std::process::exit(exit); + } + let mut updated: Option = None; + db.update(|app| { + if let Some(conn) = app.provider_connections.iter_mut().find(|c| { + c.id == id_or_name || c.name.as_deref() == Some(id_or_name) || c.provider == id_or_name + }) { + if let Some(key) = &api_key { + conn.api_key = Some(key.clone()); + } + if let Some(url) = &base_url { + conn.provider_specific_data + .insert("baseUrl".into(), Value::String(url.clone())); + } + if let Some(p) = priority { + conn.priority = Some(p); + } + if let Some(model) = &default_model { + conn.default_model = Some(model.clone()); + } + conn.updated_at = Some(chrono::Utc::now().to_rfc3339()); + updated = Some(conn.clone()); + } + }) + .await?; + + let mut conn = updated.expect("provider existed"); + if let Some(key) = conn.api_key.as_deref() { + conn.api_key = Some(crate::cli::output::mask_secret(key)); + } + if ctx.is_robot() { + emit_robot("openproxy.v1.provider.edit", serde_json::to_value(&conn)?)?; + } else { + humanln(ctx, format!("updated provider '{id_or_name}'")); + } + Ok(()) +} + +async fn run_delete(db: &Db, ctx: OutputCtx, id_or_name: &str, strict: bool) -> anyhow::Result<()> { + if find_provider(db, id_or_name).is_none() { + if strict { + let exit = emit_error( + ctx, + "not_found", + &format!("provider '{id_or_name}' not found"), + )?; + std::process::exit(exit); + } + if ctx.is_robot() { + emit_robot( + "openproxy.v1.provider.delete", + json!({ "key": id_or_name, "deleted": false }), + )?; + } else { + humanln(ctx, format!("provider '{id_or_name}' not found (no-op)")); + } + return Ok(()); + } + db.update(|app| { + app.provider_connections.retain(|c| { + c.id != id_or_name && c.name.as_deref() != Some(id_or_name) && c.provider != id_or_name + }); + }) + .await?; + if ctx.is_robot() { + emit_robot( + "openproxy.v1.provider.delete", + json!({ "key": id_or_name, "deleted": true }), + )?; + } else { + humanln(ctx, format!("deleted provider '{id_or_name}'")); + } + Ok(()) +} + +async fn run_set_active( + db: &Db, + ctx: OutputCtx, + id_or_name: &str, + active: bool, +) -> anyhow::Result<()> { + if find_provider(db, id_or_name).is_none() { + let exit = emit_error( + ctx, + "not_found", + &format!("provider '{id_or_name}' not found"), + )?; + std::process::exit(exit); + } + db.update(|app| { + if let Some(conn) = app.provider_connections.iter_mut().find(|c| { + c.id == id_or_name || c.name.as_deref() == Some(id_or_name) || c.provider == id_or_name + }) { + conn.is_active = Some(active); + conn.updated_at = Some(chrono::Utc::now().to_rfc3339()); + } + }) + .await?; + let schema = if active { + "openproxy.v1.provider.enable" + } else { + "openproxy.v1.provider.disable" + }; + if ctx.is_robot() { + emit_robot(schema, json!({ "provider": id_or_name, "active": active }))?; + } else { + humanln( + ctx, + format!( + "{} provider '{id_or_name}'", + if active { "enabled" } else { "disabled" } + ), + ); + } + Ok(()) +} + +async fn run_test(db: &Db, ctx: OutputCtx, id_or_name: &str) -> anyhow::Result<()> { + let Some(conn) = find_provider(db, id_or_name) else { + let exit = emit_error( + ctx, + "not_found", + &format!("provider '{id_or_name}' not found"), + )?; + std::process::exit(exit); + }; + + let base_url = conn + .provider_specific_data + .get("baseUrl") + .and_then(Value::as_str) + .map(String::from); + let api_key = conn.api_key.as_deref(); + let start = std::time::Instant::now(); + let (valid, error, latency_ms) = crate::server::api::providers::test_provider_api( + conn.provider.as_str(), + api_key, + base_url.as_deref(), + ) + .await; + let measured_ms = start.elapsed().as_millis() as u64; + let latency_ms = latency_ms.unwrap_or(measured_ms); + + // Persist last-tested metadata + let now = chrono::Utc::now().to_rfc3339(); + let valid_clone = valid; + let error_clone = error.clone(); + let now_clone = now.clone(); + db.update(|app| { + if let Some(c) = app + .provider_connections + .iter_mut() + .find(|c| c.id == conn.id) + { + c.test_status = Some(if valid_clone { "ok" } else { "failed" }.to_string()); + c.last_tested = Some(now_clone.clone()); + c.last_error = error_clone.clone(); + } + }) + .await?; + + let payload = json!({ + "providerId": conn.id, + "provider": conn.provider, + "valid": valid, + "latencyMs": latency_ms, + "error": error, + }); + if ctx.is_robot() { + emit_robot("openproxy.v1.provider.test", payload)?; + } else if valid { + humanln(ctx, format!("OK {} ({}ms)", conn.provider, latency_ms)); + } else { + humanln( + ctx, + format!( + "FAIL {} ({}ms) — {}", + conn.provider, + latency_ms, + error.as_deref().unwrap_or("unknown") + ), + ); + } + Ok(()) +} + +async fn run_validate( + ctx: OutputCtx, + provider: &str, + api_key: Option, + base_url: Option, +) -> anyhow::Result<()> { + let start = std::time::Instant::now(); + let (valid, error, latency_ms) = crate::server::api::providers::test_provider_api( + provider, + api_key.as_deref(), + base_url.as_deref(), + ) + .await; + let measured_ms = start.elapsed().as_millis() as u64; + let latency_ms = latency_ms.unwrap_or(measured_ms); + + let payload = json!({ + "provider": provider, + "baseUrl": base_url, + "valid": valid, + "latencyMs": latency_ms, + "error": error, + }); + if ctx.is_robot() { + emit_robot("openproxy.v1.provider.validate", payload)?; + } else if valid { + humanln(ctx, format!("OK {provider} ({latency_ms}ms)")); + } else { + humanln( + ctx, + format!( + "FAIL {provider} ({latency_ms}ms) — {}", + error.as_deref().unwrap_or("unknown") + ), + ); + } + Ok(()) +} + +async fn run_client_info(ctx: OutputCtx) -> anyhow::Result<()> { + let client_id = whoami::hostname(); + let client_name = whoami::username(); + let payload = json!({ + "clientId": client_id, + "clientName": client_name, + "version": env!("CARGO_PKG_VERSION"), + }); + if ctx.is_robot() { + emit_robot("openproxy.v1.provider.client-info", payload)?; + } else { + humanln(ctx, format!("Client ID: {client_id}")); + humanln(ctx, format!("Client name: {client_name}")); + humanln(ctx, format!("Version: {}", env!("CARGO_PKG_VERSION"))); + } + Ok(()) +} + +async fn run_apply(db: &Db, ctx: OutputCtx, from_file: &str, prune: bool) -> anyhow::Result<()> { + let (doc, _) = match load_document(from_file) { + Ok(d) => d, + Err(e) => { + let exit = emit_error(ctx, "validation", &e.to_string())?; + std::process::exit(exit); + } + }; + let items: Vec = match into_items(doc) { + Ok(items) => items, + Err(e) => { + let exit = emit_error(ctx, "validation", &e.to_string())?; + std::process::exit(exit); + } + }; + + let names_in_doc: HashSet = items.iter().map(|i| i.name.clone()).collect(); + let mut diff = ApplyDiff::default(); + let now = chrono::Utc::now().to_rfc3339(); + + db.update(|app| { + for item in &items { + if let Some(existing) = app + .provider_connections + .iter_mut() + .find(|c| c.name.as_deref() == Some(item.name.as_str())) + { + let mut changed = false; + if existing.provider != item.provider { + existing.provider = item.provider.clone(); + changed = true; + } + if let Some(ak) = item.api_key.as_deref().filter(|k| !k.is_empty()) { + if existing.api_key.as_deref() != Some(ak) { + existing.api_key = Some(ak.to_string()); + changed = true; + } + } + if let Some(url) = item.base_url.as_deref().filter(|u| !u.is_empty()) { + let prev = existing + .provider_specific_data + .get("baseUrl") + .and_then(Value::as_str); + if prev != Some(url) { + existing + .provider_specific_data + .insert("baseUrl".into(), Value::String(url.to_string())); + changed = true; + } + } + if let Some(p) = item.priority { + if existing.priority != Some(p) { + existing.priority = Some(p); + changed = true; + } + } + if let Some(active) = item.is_active { + if existing.is_active.unwrap_or(true) != active { + existing.is_active = Some(active); + changed = true; + } + } + if let Some(model) = item.default_model.as_deref() { + if existing.default_model.as_deref() != Some(model) { + existing.default_model = Some(model.to_string()); + changed = true; + } + } + if changed { + existing.updated_at = Some(now.clone()); + diff.updated.push(item.name.clone()); + } else { + diff.unchanged.push(item.name.clone()); + } + } else { + let mut psd = BTreeMap::new(); + if let Some(url) = item.base_url.as_deref().filter(|u| !u.is_empty()) { + psd.insert("baseUrl".to_string(), Value::String(url.to_string())); + } + app.provider_connections.push(ProviderConnection { + id: uuid::Uuid::new_v4().to_string(), + provider: item.provider.clone(), + auth_type: item + .auth_type + .clone() + .unwrap_or_else(|| "apiKey".to_string()), + name: Some(item.name.clone()), + priority: item.priority, + is_active: Some(item.is_active.unwrap_or(true)), + created_at: Some(now.clone()), + updated_at: Some(now.clone()), + api_key: item.api_key.clone(), + default_model: item.default_model.clone(), + provider_specific_data: psd, + ..Default::default() + }); + diff.created.push(item.name.clone()); + } + } + if prune { + let to_delete: Vec = app + .provider_connections + .iter() + .filter(|c| { + c.name + .as_deref() + .map(|n| !names_in_doc.contains(n)) + .unwrap_or(true) + }) + .map(|c| c.name.clone().unwrap_or_else(|| c.id.clone())) + .collect(); + for name in &to_delete { + diff.deleted.push(name.clone()); + } + app.provider_connections.retain(|c| { + c.name + .as_deref() + .map(|n| names_in_doc.contains(n)) + .unwrap_or(false) + }); + } + }) + .await?; + + let summary = diff.summary(); + if ctx.is_robot() { + emit_robot( + "openproxy.v1.provider.apply", + json!({ "diff": diff, "summary": summary, "prune": prune }), + )?; + } else { + humanln(ctx, format!("provider apply: {summary}")); + } + let _ = ProviderConnectionFilter::default(); + let _ = Duration::from_secs(0); + Ok(()) +} diff --git a/src/cli/provider_models.rs b/src/cli/provider_models.rs new file mode 100644 index 00000000..45a8cb45 --- /dev/null +++ b/src/cli/provider_models.rs @@ -0,0 +1,520 @@ +//! `openproxy provider models *` — model registry per provider. +//! +//! Wraps the in-DB model alias map (`modelAliases`), custom models +//! (`customModels`), and disabled-model list (in `extra.disabledModels`). +//! All writes go through `Db::update`; the server file watcher picks up +//! the change. + +use std::collections::{BTreeMap, BTreeSet}; + +use clap::Subcommand; +use serde_json::{json, Value}; + +use crate::cli::output::{emit_error, emit_robot, humanln, OutputCtx}; +use crate::db::Db; +use crate::types::{CustomModel, ModelAliasTarget, ProviderConnection, ProviderModelRef}; + +#[derive(Debug, Clone, Subcommand)] +pub enum ModelsCmd { + /// List models registered against a provider (built-in + custom + aliases). + List { provider: String }, + /// Run a real `/v1/models` probe against the provider connection. + Test { + provider: String, + /// Specific model to ping (defaults to first available). + #[arg(long)] + model: Option, + }, + /// Alias management subgroup. + #[command(subcommand)] + Alias(AliasCmd), + /// Disable a model for a provider (hidden from `/v1/models`). + Disable { + provider: String, + #[arg(long)] + model: String, + }, + /// Re-enable a previously disabled model. + Enable { + provider: String, + #[arg(long)] + model: String, + }, + /// Custom model management subgroup. + #[command(subcommand)] + Custom(CustomCmd), +} + +#[derive(Debug, Clone, Subcommand)] +pub enum AliasCmd { + /// List all aliases. + List, + /// Set an alias mapping ` -> /`. + Set { + provider: String, + model: String, + alias: String, + }, + /// Remove an alias by name. + Unset { alias: String }, +} + +#[derive(Debug, Clone, Subcommand)] +pub enum CustomCmd { + /// Register a custom model under a provider alias. + Add { + provider: String, + model: String, + /// Model kind, e.g. `chat`, `embedding`, `image`. + #[arg(long, default_value = "chat")] + r#type: String, + /// Optional human-readable name. + #[arg(long)] + name: Option, + }, + /// Remove a custom model. + Remove { provider: String, model: String }, + /// List custom models. + List, +} + +pub async fn run(cmd: ModelsCmd, db: &Db, ctx: OutputCtx) -> anyhow::Result<()> { + match cmd { + ModelsCmd::List { provider } => run_list(db, ctx, &provider).await, + ModelsCmd::Test { provider, model } => run_test(db, ctx, &provider, model).await, + ModelsCmd::Alias(cmd) => run_alias(db, ctx, cmd).await, + ModelsCmd::Disable { provider, model } => { + run_set_disabled(db, ctx, &provider, &model, true).await + } + ModelsCmd::Enable { provider, model } => { + run_set_disabled(db, ctx, &provider, &model, false).await + } + ModelsCmd::Custom(cmd) => run_custom(db, ctx, cmd).await, + } +} + +async fn run_list(db: &Db, ctx: OutputCtx, provider: &str) -> anyhow::Result<()> { + let snapshot = db.snapshot(); + let custom: Vec<&CustomModel> = snapshot + .custom_models + .iter() + .filter(|m| m.provider_alias == provider) + .collect(); + let disabled = disabled_models_for(&snapshot.extra, provider); + let aliases: BTreeMap = snapshot + .model_aliases + .iter() + .filter( + |(_, target)| matches!(target, ModelAliasTarget::Mapping(r) if r.provider == provider), + ) + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + + let payload = json!({ + "provider": provider, + "custom": custom, + "disabled": disabled, + "aliases": aliases, + }); + + if ctx.is_robot() { + emit_robot("openproxy.v1.provider-models.list", payload)?; + } else { + humanln(ctx, format!("Provider models — {provider}")); + humanln(ctx, format!(" custom ({}):", custom.len())); + for m in &custom { + humanln( + ctx, + format!( + " {} ({}) {}", + m.id, + m.r#type, + m.name.as_deref().unwrap_or("") + ), + ); + } + humanln(ctx, format!(" disabled ({}):", disabled.len())); + for id in &disabled { + humanln(ctx, format!(" {id}")); + } + humanln(ctx, format!(" aliases ({}):", aliases.len())); + for (alias, target) in &aliases { + if let ModelAliasTarget::Mapping(r) = target { + humanln(ctx, format!(" {alias} -> {}/{}", r.provider, r.model)); + } + } + } + Ok(()) +} + +async fn run_test( + db: &Db, + ctx: OutputCtx, + provider: &str, + _model: Option, +) -> anyhow::Result<()> { + let snapshot = db.snapshot(); + let conn = snapshot + .provider_connections + .iter() + .find(|c| c.provider == provider || c.name.as_deref() == Some(provider)) + .cloned(); + let Some(conn) = conn else { + let exit = emit_error( + ctx, + "not_found", + &format!("no provider connection for '{provider}'"), + )?; + std::process::exit(exit); + }; + + let base_url = conn + .provider_specific_data + .get("baseUrl") + .and_then(Value::as_str) + .map(String::from); + let api_key = conn.api_key.as_deref(); + let provider_alias = conn.provider.as_str(); + + let start = std::time::Instant::now(); + let (valid, error, latency_ms) = crate::server::api::providers::test_provider_api( + provider_alias, + api_key, + base_url.as_deref(), + ) + .await; + let measured_ms = start.elapsed().as_millis() as u64; + let latency_ms = latency_ms.unwrap_or(measured_ms); + + let payload = json!({ + "provider": provider_alias, + "connectionId": conn.id, + "valid": valid, + "latencyMs": latency_ms, + "error": error, + }); + + if ctx.is_robot() { + emit_robot("openproxy.v1.provider-models.test", payload)?; + } else if valid { + humanln(ctx, format!("OK {provider_alias} ({latency_ms}ms)")); + } else { + humanln( + ctx, + format!( + "FAIL {provider_alias} ({latency_ms}ms) — {}", + error.as_deref().unwrap_or("unknown") + ), + ); + } + + let _ = ProviderConnection::default(); + Ok(()) +} + +async fn run_alias(db: &Db, ctx: OutputCtx, cmd: AliasCmd) -> anyhow::Result<()> { + match cmd { + AliasCmd::List => { + let snapshot = db.snapshot(); + let aliases = snapshot.model_aliases.clone(); + if ctx.is_robot() { + emit_robot( + "openproxy.v1.provider-models.alias.list", + json!({ "aliases": aliases, "count": aliases.len() }), + )?; + } else { + humanln(ctx, format!("Model aliases ({}):", aliases.len())); + for (alias, target) in &aliases { + let target = match target { + ModelAliasTarget::Path(s) => s.clone(), + ModelAliasTarget::Mapping(r) => format!("{}/{}", r.provider, r.model), + }; + humanln(ctx, format!(" {alias} -> {target}")); + } + } + Ok(()) + } + AliasCmd::Set { + provider, + model, + alias, + } => { + if alias.trim().is_empty() { + let exit = emit_error(ctx, "validation", "alias cannot be empty")?; + std::process::exit(exit); + } + let target = ModelAliasTarget::Mapping(ProviderModelRef { + provider: provider.clone(), + model: model.clone(), + extra: BTreeMap::new(), + }); + db.update(|app| { + app.model_aliases.insert(alias.clone(), target.clone()); + }) + .await?; + + let payload = json!({ + "alias": alias, + "target": { "provider": provider, "model": model } + }); + if ctx.is_robot() { + emit_robot("openproxy.v1.provider-models.alias.set", payload)?; + } else { + humanln(ctx, format!("set alias {alias} -> {provider}/{model}")); + } + Ok(()) + } + AliasCmd::Unset { alias } => { + let existed = db.snapshot().model_aliases.contains_key(&alias); + if !existed { + if ctx.is_robot() { + emit_robot( + "openproxy.v1.provider-models.alias.unset", + json!({ "alias": alias, "removed": false }), + )?; + } else { + humanln(ctx, format!("alias '{alias}' not found (no-op)")); + } + return Ok(()); + } + db.update(|app| { + app.model_aliases.remove(&alias); + }) + .await?; + if ctx.is_robot() { + emit_robot( + "openproxy.v1.provider-models.alias.unset", + json!({ "alias": alias, "removed": true }), + )?; + } else { + humanln(ctx, format!("removed alias '{alias}'")); + } + Ok(()) + } + } +} + +async fn run_set_disabled( + db: &Db, + ctx: OutputCtx, + provider: &str, + model: &str, + disable: bool, +) -> anyhow::Result<()> { + let mut result_disabled: Vec = Vec::new(); + db.update(|app| { + let mut map = disabled_map(&app.extra); + let entry = map.entry(provider.to_string()).or_default(); + let mut set: BTreeSet = entry.iter().cloned().collect(); + if disable { + set.insert(model.to_string()); + } else { + set.remove(model); + } + *entry = set.into_iter().collect(); + if entry.is_empty() { + map.remove(provider); + } + result_disabled = map.get(provider).cloned().unwrap_or_default(); + write_disabled_map(&mut app.extra, &map); + }) + .await?; + + let schema = if disable { + "openproxy.v1.provider-models.disable" + } else { + "openproxy.v1.provider-models.enable" + }; + if ctx.is_robot() { + emit_robot( + schema, + json!({ + "provider": provider, + "model": model, + "disabled": result_disabled, + }), + )?; + } else { + humanln( + ctx, + format!( + "{} {provider}/{model}", + if disable { "disabled" } else { "enabled" } + ), + ); + } + Ok(()) +} + +async fn run_custom(db: &Db, ctx: OutputCtx, cmd: CustomCmd) -> anyhow::Result<()> { + match cmd { + CustomCmd::List => { + let snapshot = db.snapshot(); + let models = snapshot.custom_models.clone(); + if ctx.is_robot() { + emit_robot( + "openproxy.v1.provider-models.custom.list", + json!({ "customModels": models, "count": models.len() }), + )?; + } else { + humanln(ctx, format!("Custom models ({}):", models.len())); + for m in &models { + humanln( + ctx, + format!( + " {}/{} type={} {}", + m.provider_alias, + m.id, + m.r#type, + m.name.as_deref().unwrap_or(""), + ), + ); + } + } + Ok(()) + } + CustomCmd::Add { + provider, + model, + r#type, + name, + } => { + let snapshot = db.snapshot(); + if snapshot + .custom_models + .iter() + .any(|m| m.provider_alias == provider && m.id == model) + { + let exit = emit_error( + ctx, + "conflict", + &format!("custom model '{provider}/{model}' already exists"), + )?; + std::process::exit(exit); + } + let entry = CustomModel { + provider_alias: provider.clone(), + id: model.clone(), + r#type, + name, + extra: BTreeMap::new(), + }; + db.update(|app| app.custom_models.push(entry.clone())) + .await?; + if ctx.is_robot() { + emit_robot( + "openproxy.v1.provider-models.custom.add", + serde_json::to_value(&entry)?, + )?; + } else { + humanln(ctx, format!("added custom model {}/{}", provider, model)); + } + Ok(()) + } + CustomCmd::Remove { provider, model } => { + let existed = db + .snapshot() + .custom_models + .iter() + .any(|m| m.provider_alias == provider && m.id == model); + if !existed { + if ctx.is_robot() { + emit_robot( + "openproxy.v1.provider-models.custom.remove", + json!({ + "provider": provider, + "model": model, + "removed": false, + }), + )?; + } else { + humanln(ctx, format!("{provider}/{model} not found (no-op)")); + } + return Ok(()); + } + db.update(|app| { + app.custom_models + .retain(|m| !(m.provider_alias == provider && m.id == model)); + }) + .await?; + if ctx.is_robot() { + emit_robot( + "openproxy.v1.provider-models.custom.remove", + json!({ + "provider": provider, + "model": model, + "removed": true, + }), + )?; + } else { + humanln(ctx, format!("removed custom model {provider}/{model}")); + } + Ok(()) + } + } +} + +fn disabled_map(extra: &BTreeMap) -> BTreeMap> { + extra + .get("disabledModels") + .cloned() + .and_then(|v| serde_json::from_value::>>(v).ok()) + .unwrap_or_default() +} + +fn write_disabled_map(extra: &mut BTreeMap, map: &BTreeMap>) { + if map.is_empty() { + extra.remove("disabledModels"); + } else { + extra.insert( + "disabledModels".to_string(), + serde_json::to_value(map).unwrap_or_else(|_| Value::Object(Default::default())), + ); + } +} + +fn disabled_models_for(extra: &BTreeMap, provider: &str) -> Vec { + disabled_map(extra).remove(provider).unwrap_or_default() +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn disabled_map_handles_missing_key() { + let extra = BTreeMap::new(); + assert!(disabled_map(&extra).is_empty()); + } + + #[test] + fn disabled_map_parses_existing_entries() { + let mut extra = BTreeMap::new(); + extra.insert( + "disabledModels".to_string(), + json!({"openai": ["gpt-4o", "gpt-4o-mini"]}), + ); + let map = disabled_map(&extra); + assert_eq!(map.get("openai").unwrap(), &vec!["gpt-4o", "gpt-4o-mini"]); + } + + #[test] + fn write_disabled_map_removes_empty() { + let mut extra = BTreeMap::new(); + extra.insert("disabledModels".to_string(), json!({"x": ["y"]})); + write_disabled_map(&mut extra, &BTreeMap::new()); + assert!(!extra.contains_key("disabledModels")); + } + + #[test] + fn disabled_models_for_returns_provider_subset() { + let mut extra = BTreeMap::new(); + extra.insert( + "disabledModels".to_string(), + json!({"openai": ["a"], "anthropic": ["b"]}), + ); + assert_eq!(disabled_models_for(&extra, "openai"), vec!["a"]); + assert!(disabled_models_for(&extra, "missing").is_empty()); + } +} diff --git a/src/cli/provider_node.rs b/src/cli/provider_node.rs new file mode 100644 index 00000000..67dbe43c --- /dev/null +++ b/src/cli/provider_node.rs @@ -0,0 +1,414 @@ +//! `openproxy provider node *` — register custom provider instances. +//! +//! A `ProviderNode` is the "what is this server's API shape?" half. It +//! pairs with one or more `ProviderConnection` entries that hold actual +//! credentials. Combo entries that reference a custom node use the node's +//! UUID as the provider prefix (`/gpt-4o`). + +use std::collections::BTreeMap; +use std::time::Duration; + +use clap::Subcommand; +use serde_json::{json, Value}; + +use crate::cli::output::{emit_error, emit_robot, humanln, OutputCtx}; +use crate::db::Db; +use crate::types::ProviderNode; + +#[derive(Debug, Clone, Subcommand)] +pub enum NodeCmd { + /// List registered provider nodes. + List { + /// Optional filter by node type (e.g. `openai-compatible`). + #[arg(long)] + r#type: Option, + }, + /// Show one node by id or name. + Get { id_or_name: String }, + /// Register a new provider node. + Add { + #[arg(long)] + name: String, + /// Node type, e.g. `openai-compatible`, `anthropic-compatible`. + #[arg(long, default_value = "openai-compatible")] + r#type: String, + /// Base URL of the provider's API. + #[arg(long)] + base_url: String, + /// Optional prefix used to namespace this node's model ids. + #[arg(long)] + prefix: Option, + /// Optional API type override (e.g. `openai`, `anthropic`). + #[arg(long)] + api_type: Option, + }, + /// Edit an existing node. Any flag omitted is left unchanged. + Edit { + id_or_name: String, + #[arg(long)] + name: Option, + #[arg(long)] + base_url: Option, + #[arg(long)] + prefix: Option, + #[arg(long)] + api_type: Option, + }, + /// Delete a node. + Delete { + id_or_name: String, + /// Fail with exit code 3 if the node does not exist. + #[arg(long)] + strict: bool, + }, + /// Probe `/models` (or `/embeddings` for embedding nodes) to + /// confirm the node is reachable. Does NOT touch DB. + Validate { + id_or_name: String, + /// Bearer token to send (defaults to "test" for unauthenticated probes). + #[arg(long)] + api_key: Option, + /// Model id for embedding-style probes. + #[arg(long)] + model_id: Option, + }, +} + +pub async fn run(cmd: NodeCmd, db: &Db, ctx: OutputCtx) -> anyhow::Result<()> { + match cmd { + NodeCmd::List { r#type } => run_list(db, ctx, r#type.as_deref()).await, + NodeCmd::Get { id_or_name } => run_get(db, ctx, &id_or_name).await, + NodeCmd::Add { + name, + r#type, + base_url, + prefix, + api_type, + } => run_add(db, ctx, name, r#type, base_url, prefix, api_type).await, + NodeCmd::Edit { + id_or_name, + name, + base_url, + prefix, + api_type, + } => run_edit(db, ctx, &id_or_name, name, base_url, prefix, api_type).await, + NodeCmd::Delete { id_or_name, strict } => run_delete(db, ctx, &id_or_name, strict).await, + NodeCmd::Validate { + id_or_name, + api_key, + model_id, + } => run_validate(db, ctx, &id_or_name, api_key, model_id).await, + } +} + +fn find_node(db: &Db, id_or_name: &str) -> Option { + db.snapshot() + .provider_nodes + .iter() + .find(|n| n.id == id_or_name || n.name == id_or_name) + .cloned() +} + +async fn run_list(db: &Db, ctx: OutputCtx, node_type: Option<&str>) -> anyhow::Result<()> { + let nodes = db.provider_nodes(node_type); + if ctx.is_robot() { + emit_robot( + "openproxy.v1.provider-node.list", + json!({ "nodes": nodes, "count": nodes.len() }), + )?; + } else { + humanln(ctx, format!("Provider nodes ({}):", nodes.len())); + for node in &nodes { + humanln( + ctx, + format!( + " {} ({}) type={} baseUrl={}", + node.name, + node.id, + node.r#type, + node.base_url.as_deref().unwrap_or("-"), + ), + ); + } + } + Ok(()) +} + +async fn run_get(db: &Db, ctx: OutputCtx, id_or_name: &str) -> anyhow::Result<()> { + let Some(node) = find_node(db, id_or_name) else { + let exit = emit_error( + ctx, + "not_found", + &format!("provider node '{id_or_name}' not found"), + )?; + std::process::exit(exit); + }; + if ctx.is_robot() { + emit_robot( + "openproxy.v1.provider-node.get", + serde_json::to_value(&node)?, + )?; + } else { + humanln(ctx, format!("Node: {} ({})", node.name, node.id)); + humanln(ctx, format!(" type: {}", node.r#type)); + humanln( + ctx, + format!(" baseUrl: {}", node.base_url.as_deref().unwrap_or("-")), + ); + if let Some(prefix) = &node.prefix { + humanln(ctx, format!(" prefix: {prefix}")); + } + if let Some(api_type) = &node.api_type { + humanln(ctx, format!(" apiType: {api_type}")); + } + } + Ok(()) +} + +async fn run_add( + db: &Db, + ctx: OutputCtx, + name: String, + r#type: String, + base_url: String, + prefix: Option, + api_type: Option, +) -> anyhow::Result<()> { + if name.trim().is_empty() { + let exit = emit_error(ctx, "validation", "--name is required")?; + std::process::exit(exit); + } + if base_url.trim().is_empty() { + let exit = emit_error(ctx, "validation", "--base-url is required")?; + std::process::exit(exit); + } + if db.snapshot().provider_nodes.iter().any(|n| n.name == name) { + let exit = emit_error( + ctx, + "conflict", + &format!("provider node '{name}' already exists"), + )?; + std::process::exit(exit); + } + + let now = chrono::Utc::now().to_rfc3339(); + let node = ProviderNode { + id: uuid::Uuid::new_v4().to_string(), + r#type, + name, + prefix, + api_type, + base_url: Some(base_url), + created_at: Some(now.clone()), + updated_at: Some(now), + extra: BTreeMap::new(), + }; + + db.update(|db| db.provider_nodes.push(node.clone())).await?; + + if ctx.is_robot() { + emit_robot( + "openproxy.v1.provider-node.add", + serde_json::to_value(&node)?, + )?; + } else { + humanln( + ctx, + format!("created provider node '{}' ({})", node.name, node.id), + ); + } + Ok(()) +} + +async fn run_edit( + db: &Db, + ctx: OutputCtx, + id_or_name: &str, + name: Option, + base_url: Option, + prefix: Option, + api_type: Option, +) -> anyhow::Result<()> { + if find_node(db, id_or_name).is_none() { + let exit = emit_error( + ctx, + "not_found", + &format!("provider node '{id_or_name}' not found"), + )?; + std::process::exit(exit); + } + let mut updated: Option = None; + db.update(|app| { + if let Some(node) = app + .provider_nodes + .iter_mut() + .find(|n| n.id == id_or_name || n.name == id_or_name) + { + if let Some(v) = &name { + node.name = v.clone(); + } + if let Some(v) = &base_url { + node.base_url = Some(v.clone()); + } + if let Some(v) = &prefix { + node.prefix = Some(v.clone()); + } + if let Some(v) = &api_type { + node.api_type = Some(v.clone()); + } + node.updated_at = Some(chrono::Utc::now().to_rfc3339()); + updated = Some(node.clone()); + } + }) + .await?; + + let node = updated.expect("node existed"); + if ctx.is_robot() { + emit_robot( + "openproxy.v1.provider-node.edit", + serde_json::to_value(&node)?, + )?; + } else { + humanln(ctx, format!("updated provider node '{}'", node.name)); + } + Ok(()) +} + +async fn run_delete(db: &Db, ctx: OutputCtx, id_or_name: &str, strict: bool) -> anyhow::Result<()> { + if find_node(db, id_or_name).is_none() { + if strict { + let exit = emit_error( + ctx, + "not_found", + &format!("provider node '{id_or_name}' not found"), + )?; + std::process::exit(exit); + } + if ctx.is_robot() { + emit_robot( + "openproxy.v1.provider-node.delete", + json!({ "key": id_or_name, "deleted": false }), + )?; + } else { + humanln( + ctx, + format!("provider node '{id_or_name}' not found (no-op)"), + ); + } + return Ok(()); + } + + db.update(|app| { + app.provider_nodes + .retain(|n| n.id != id_or_name && n.name != id_or_name); + }) + .await?; + + if ctx.is_robot() { + emit_robot( + "openproxy.v1.provider-node.delete", + json!({ "key": id_or_name, "deleted": true }), + )?; + } else { + humanln(ctx, format!("deleted provider node '{id_or_name}'")); + } + Ok(()) +} + +async fn run_validate( + db: &Db, + ctx: OutputCtx, + id_or_name: &str, + api_key: Option, + model_id: Option, +) -> anyhow::Result<()> { + let Some(node) = find_node(db, id_or_name) else { + let exit = emit_error( + ctx, + "not_found", + &format!("provider node '{id_or_name}' not found"), + )?; + std::process::exit(exit); + }; + let Some(base_url) = node.base_url.as_deref() else { + let exit = emit_error( + ctx, + "validation", + &format!("node '{id_or_name}' has no baseUrl"), + )?; + std::process::exit(exit); + }; + + let probe_url = if node.r#type == "custom-embedding" { + format!("{}/embeddings", base_url.trim_end_matches('/')) + } else { + format!("{}/models", base_url.trim_end_matches('/')) + }; + + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(10)) + .build()?; + let mut req = client.get(&probe_url); + if let Some(key) = api_key.as_deref().filter(|k| !k.is_empty()) { + req = req.header("Authorization", format!("Bearer {key}")); + } + if let Some(model) = model_id.as_deref().filter(|m| !m.is_empty()) { + req = req.query(&[("model", model)]); + } + + let start = std::time::Instant::now(); + let result = req.send().await; + let latency_ms = start.elapsed().as_millis() as u64; + + let (valid, error, status) = match result { + Ok(resp) => { + let status = resp.status().as_u16(); + let ok = resp.status().is_success(); + let err = if ok { + None + } else { + Some(format!("HTTP {status}")) + }; + (ok, err, Some(status)) + } + Err(e) => (false, Some(e.to_string()), None), + }; + + let payload = json!({ + "id": node.id, + "name": node.name, + "type": node.r#type, + "baseUrl": base_url, + "probeUrl": probe_url, + "valid": valid, + "status": status, + "latencyMs": latency_ms, + "error": error, + }); + + if ctx.is_robot() { + emit_robot("openproxy.v1.provider-node.validate", payload)?; + } else if valid { + humanln( + ctx, + format!("OK {} ({}ms) — {probe_url}", node.name, latency_ms), + ); + } else { + humanln( + ctx, + format!( + "FAIL {} ({}ms) — {} — {}", + node.name, + latency_ms, + probe_url, + error.as_deref().unwrap_or("unknown error"), + ), + ); + } + + // We intentionally always return Ok here — the failure is encoded in + // the envelope. Use `--robot` and parse `valid` to fail in scripts. + let _ = ctx; + let _ = Value::Null; + Ok(()) +} diff --git a/src/cli/schema.rs b/src/cli/schema.rs index e76fac47..5db7f1f8 100644 --- a/src/cli/schema.rs +++ b/src/cli/schema.rs @@ -20,6 +20,8 @@ const RESOURCES: &[&str] = &[ "key", "pool", "settings", + "custom-model", + "model-alias", ]; pub fn run_list(ctx: OutputCtx) -> anyhow::Result<()> { @@ -170,6 +172,35 @@ fn schema_for(resource: &str) -> Option { "tunnelProvider": {"type": "string"} } }), + "custom-model" => json!({ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "CustomModel", + "type": "object", + "required": ["providerAlias", "id"], + "properties": { + "providerAlias": {"type": "string"}, + "id": {"type": "string"}, + "type": {"type": "string", "default": "chat"}, + "name": {"type": ["string", "null"]} + } + }), + "model-alias" => json!({ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "ModelAlias", + "type": "object", + "required": ["alias", "target"], + "properties": { + "alias": {"type": "string"}, + "target": { + "type": "object", + "required": ["provider", "model"], + "properties": { + "provider": {"type": "string"}, + "model": {"type": "string"} + } + } + } + }), _ => return None, }) } @@ -209,6 +240,19 @@ fn example_for(resource: &str) -> Option { "cavemanLevel": "medium", "requireLogin": true }), + "custom-model" => json!({ + "providerAlias": "openai", + "id": "gpt-4o-2025-stub", + "type": "chat", + "name": "GPT-4o stub" + }), + "model-alias" => json!({ + "alias": "fast", + "target": { + "provider": "openai", + "model": "gpt-4o-mini" + } + }), _ => return None, }) } diff --git a/src/main.rs b/src/main.rs index af55e1ca..6fee6cf5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,6 +43,18 @@ async fn main() -> anyhow::Result<()> { openproxy::cli::run_pool(cmd.clone(), &db, ctx).await?; return Ok(()); } + Command::Combo { cmd } => { + let db = Db::load().await?; + let db = Arc::new(db); + openproxy::cli::combo::run(cmd.clone(), &db, ctx).await?; + return Ok(()); + } + Command::Models { cmd } => { + let db = Db::load().await?; + let db = Arc::new(db); + openproxy::cli::models::run(cmd.clone(), &db, ctx).await?; + return Ok(()); + } Command::Tunnel { cmd } => { let db = Db::load().await?; let db = Arc::new(db); diff --git a/src/server/api/providers.rs b/src/server/api/providers.rs index f1e91d45..9497cb10 100644 --- a/src/server/api/providers.rs +++ b/src/server/api/providers.rs @@ -629,7 +629,12 @@ fn is_safe_outbound_url(url: &str) -> Result<(), String> { Ok(()) } -async fn test_provider_api( +/// Test connectivity to a provider's `/v1/models` (or equivalent) endpoint. +/// Returns `(success, optional error string, optional latency in ms)`. +/// +/// Exposed at `pub(crate)` so the CLI can run the same validation logic +/// directly without going through the HTTP API. +pub(crate) async fn test_provider_api( provider: &str, api_key: Option<&str>, base_url: Option<&str>, From a8568bdfc46dc0710d1589fb3a7ca167bd9a21e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tr=E1=BA=A7n=20Quang=20=C4=90=C3=A3ng?= Date: Mon, 11 May 2026 14:08:13 +0000 Subject: [PATCH 2/2] test(cli): share ENV_LOCK across config + auth tests to fix parallel env-var race --- src/cli/auth.rs | 1 + src/cli/config.rs | 4 +--- src/cli/mod.rs | 6 ++++++ 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/cli/auth.rs b/src/cli/auth.rs index 71cb6dcd..2b97b296 100644 --- a/src/cli/auth.rs +++ b/src/cli/auth.rs @@ -381,6 +381,7 @@ mod tests { /// Round-trip: login then logout against a temporary config file. #[tokio::test] async fn login_then_logout_round_trip() { + let _g = crate::cli::test_lock::ENV_LOCK.lock().unwrap(); let tmp = tempfile::tempdir().unwrap(); let cfg_path = tmp.path().join("config.toml"); std::env::set_var("OPENPROXY_CONFIG", &cfg_path); diff --git a/src/cli/config.rs b/src/cli/config.rs index 8026ce93..d5742d60 100644 --- a/src/cli/config.rs +++ b/src/cli/config.rs @@ -169,9 +169,7 @@ pub fn save_config_file(file: &ConfigFile) -> anyhow::Result { #[cfg(test)] mod tests { use super::*; - use std::sync::Mutex; - - static ENV_LOCK: Mutex<()> = Mutex::new(()); + use crate::cli::test_lock::ENV_LOCK; fn clear_env() { std::env::remove_var("DATA_DIR"); diff --git a/src/cli/mod.rs b/src/cli/mod.rs index 7b630307..10d9009d 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -34,6 +34,12 @@ pub mod provider_node; pub mod schema; pub mod server; +#[cfg(test)] +pub(crate) mod test_lock { + use std::sync::Mutex; + pub static ENV_LOCK: Mutex<()> = Mutex::new(()); +} + #[derive(Debug, Clone, Parser)] #[command( name = "openproxy",