diff --git a/apps/aether-gateway/src/admin_api.rs b/apps/aether-gateway/src/admin_api.rs index 547fbaef3..279da670d 100644 --- a/apps/aether-gateway/src/admin_api.rs +++ b/apps/aether-gateway/src/admin_api.rs @@ -1,16 +1,18 @@ pub(crate) use crate::handlers::admin::{ - admin_provider_ops_local_action_response, admin_provider_pool_config, - build_internal_control_error_response, create_provider_oauth_catalog_key, - find_duplicate_provider_oauth_key, maybe_build_local_admin_pool_response, - maybe_build_local_admin_response, persist_provider_quota_refresh_state, - provider_oauth_maintenance_endpoint_for_provider, provider_oauth_runtime_endpoint_for_provider, - provider_quota_refresh_endpoint_for_provider, provider_type_supports_quota_refresh, - reconcile_admin_fixed_provider_template_endpoints, + admin_pool_selection_snapshot_key, admin_provider_ops_local_action_response, + admin_provider_pool_config, build_internal_control_error_response, + create_provider_oauth_catalog_key, find_duplicate_provider_oauth_key, + maybe_build_local_admin_pool_response, maybe_build_local_admin_response, + persist_provider_quota_refresh_state, provider_oauth_maintenance_endpoint_for_provider, + provider_oauth_runtime_endpoint_for_provider, provider_quota_refresh_endpoint_for_provider, + provider_type_supports_quota_refresh, reconcile_admin_fixed_provider_template_endpoints, refresh_provider_oauth_account_state_after_update, refresh_provider_pool_quota_locally, store_admin_provider_ops_balance_cache, update_existing_provider_oauth_catalog_key, AdminAppState, AdminGatewayProviderTransportSnapshot, AdminLocalOAuthRefreshError, - AdminRequestContext, AdminRouteRequest, AdminRouteResponse, AdminRouteResult, - AdminStatsTimeRange, AdminStatsUsageFilter, OAUTH_ACCOUNT_BLOCK_PREFIX, + AdminPoolSelectionSnapshot, AdminPoolSelectionSnapshotItem, AdminRequestContext, + AdminRouteRequest, AdminRouteResponse, AdminRouteResult, AdminStatsTimeRange, + AdminStatsUsageFilter, ADMIN_POOL_SELECTION_SNAPSHOT_MAX_ACTIVE_PER_ADMIN_PROVIDER, + ADMIN_POOL_SELECTION_SNAPSHOT_MAX_TOTAL, OAUTH_ACCOUNT_BLOCK_PREFIX, OAUTH_REQUEST_FAILED_PREFIX, }; diff --git a/apps/aether-gateway/src/control/route/admin/observability_families.rs b/apps/aether-gateway/src/control/route/admin/observability_families.rs index e04403cfe..ff1eb359f 100644 --- a/apps/aether-gateway/src/control/route/admin/observability_families.rs +++ b/apps/aether-gateway/src/control/route/admin/observability_families.rs @@ -288,6 +288,18 @@ pub(super) fn classify_admin_observability_family_route( "admin:pool", false, )) + } else if method == http::Method::POST + && normalized_path_no_trailing.starts_with("/api/admin/pool/") + && normalized_path_no_trailing.ends_with("/keys/selection-snapshot") + && normalized_path_no_trailing.matches('/').count() == 6 + { + Some(classified( + "admin_proxy", + "pool_manage", + "create_selection_snapshot", + "admin:pool", + false, + )) } else if method == http::Method::POST && normalized_path_no_trailing.starts_with("/api/admin/pool/") && normalized_path_no_trailing.ends_with("/keys/resolve-selection") diff --git a/apps/aether-gateway/src/control/tests/admin_pool.rs b/apps/aether-gateway/src/control/tests/admin_pool.rs index f747cc6cd..3c5ecf951 100644 --- a/apps/aether-gateway/src/control/tests/admin_pool.rs +++ b/apps/aether-gateway/src/control/tests/admin_pool.rs @@ -1,6 +1,8 @@ use http::Uri; -use super::{classify_control_route, headers}; +use crate::handlers::shared::local_proxy_route_requires_buffered_body; + +use super::{classify_control_route, headers, GatewayPublicRequestContext}; #[test] fn classifies_admin_pool_overview_as_admin_proxy_route() { @@ -82,6 +84,21 @@ fn classifies_admin_pool_provider_key_routes_as_admin_proxy_route() { Some("batch_action_keys") ); + let selection_snapshot_uri: Uri = "/api/admin/pool/provider-1/keys/selection-snapshot" + .parse() + .expect("uri should parse"); + let selection_snapshot = + classify_control_route(&http::Method::POST, &selection_snapshot_uri, &headers) + .expect("route should classify"); + assert_eq!( + selection_snapshot.route_family.as_deref(), + Some("pool_manage") + ); + assert_eq!( + selection_snapshot.route_kind.as_deref(), + Some("create_selection_snapshot") + ); + let resolve_selection_uri: Uri = "/api/admin/pool/provider-1/keys/resolve-selection" .parse() .expect("uri should parse"); @@ -128,6 +145,25 @@ fn classifies_admin_pool_provider_key_routes_as_admin_proxy_route() { ); } +#[test] +fn admin_pool_selection_snapshot_buffers_request_body() { + let headers = headers(&[]); + let uri: Uri = "/api/admin/pool/provider-1/keys/selection-snapshot" + .parse() + .expect("uri should parse"); + let decision = + classify_control_route(&http::Method::POST, &uri, &headers).expect("route should classify"); + let context = GatewayPublicRequestContext::from_request_parts( + "trace-pool-selection-snapshot", + &http::Method::POST, + &uri, + &headers, + Some(decision), + ); + + assert!(local_proxy_route_requires_buffered_body(&context)); +} + #[test] fn classifies_admin_pool_trailing_slash_routes_as_admin_proxy_route() { let headers = headers(&[]); @@ -140,6 +176,21 @@ fn classifies_admin_pool_trailing_slash_routes_as_admin_proxy_route() { assert_eq!(list.route_family.as_deref(), Some("pool_manage")); assert_eq!(list.route_kind.as_deref(), Some("list_keys")); + let selection_snapshot_uri: Uri = "/api/admin/pool/provider-1/keys/selection-snapshot/" + .parse() + .expect("uri should parse"); + let selection_snapshot = + classify_control_route(&http::Method::POST, &selection_snapshot_uri, &headers) + .expect("route should classify"); + assert_eq!( + selection_snapshot.route_family.as_deref(), + Some("pool_manage") + ); + assert_eq!( + selection_snapshot.route_kind.as_deref(), + Some("create_selection_snapshot") + ); + let resolve_selection_uri: Uri = "/api/admin/pool/provider-1/keys/resolve-selection/" .parse() .expect("uri should parse"); diff --git a/apps/aether-gateway/src/handlers/admin/mod.rs b/apps/aether-gateway/src/handlers/admin/mod.rs index a085d1ace..6a3dc7f36 100644 --- a/apps/aether-gateway/src/handlers/admin/mod.rs +++ b/apps/aether-gateway/src/handlers/admin/mod.rs @@ -39,7 +39,12 @@ pub(crate) use self::provider::oauth::runtime::{ pub(crate) use self::provider::ops::providers::actions::admin_provider_ops_local_action_response; pub(crate) use self::provider::ops::providers::store_admin_provider_ops_balance_cache; pub(crate) use self::provider::pool::config::admin_provider_pool_config; -pub(crate) use self::provider::pool_admin::maybe_build_local_admin_pool_response; +pub(crate) use self::provider::pool_admin::{ + admin_pool_selection_snapshot_key, maybe_build_local_admin_pool_response, + AdminPoolSelectionSnapshot, AdminPoolSelectionSnapshotItem, + ADMIN_POOL_SELECTION_SNAPSHOT_MAX_ACTIVE_PER_ADMIN_PROVIDER, + ADMIN_POOL_SELECTION_SNAPSHOT_MAX_TOTAL, +}; pub(crate) use self::provider::shared::payloads::{ OAUTH_ACCOUNT_BLOCK_PREFIX, OAUTH_REQUEST_FAILED_PREFIX, }; diff --git a/apps/aether-gateway/src/handlers/admin/provider/pool_admin/batch_routes/action.rs b/apps/aether-gateway/src/handlers/admin/provider/pool_admin/batch_routes/action.rs index 2085ecd81..d1c7f5258 100644 --- a/apps/aether-gateway/src/handlers/admin/provider/pool_admin/batch_routes/action.rs +++ b/apps/aether-gateway/src/handlers/admin/provider/pool_admin/batch_routes/action.rs @@ -1,16 +1,129 @@ use super::{ - admin_pool_provider_id_from_path, build_admin_pool_error_response, AdminPoolBatchActionRequest, - ADMIN_POOL_PROVIDER_CATALOG_READER_UNAVAILABLE_DETAIL, + admin_pool_provider_id_from_path, build_admin_pool_error_response, + resolve_admin_pool_selection_snapshot_key_ids, AdminPoolBatchActionRequest, + AdminPoolSelectionSnapshotReference, ADMIN_POOL_PROVIDER_CATALOG_READER_UNAVAILABLE_DETAIL, ADMIN_POOL_PROVIDER_CATALOG_WRITER_UNAVAILABLE_DETAIL, + ADMIN_POOL_SELECTION_SNAPSHOT_ITEM_PAGE_SIZE, }; use crate::handlers::admin::request::{AdminAppState, AdminRequestContext}; use crate::GatewayError; +use aether_admin::provider::pool::{ + self as admin_provider_pool_pure, AdminPoolBatchSelectionRequest, +}; use axum::{ body::{Body, Bytes}, http, - response::Response, + response::{IntoResponse, Response}, + Json, }; +fn admin_pool_batch_snapshot_reference( + payload: &AdminPoolBatchActionRequest, +) -> Result, Response> { + let Some(selection) = payload.selection.as_ref() else { + return Ok(None); + }; + if !payload.key_ids.is_empty() { + return Err(build_admin_pool_error_response( + http::StatusCode::BAD_REQUEST, + "selection and key_ids cannot be used together", + )); + } + + match selection { + AdminPoolBatchSelectionRequest::Snapshot { + snapshot_id, + expected_total, + } => { + let snapshot_id = snapshot_id.trim(); + if snapshot_id.is_empty() { + return Err(build_admin_pool_error_response( + http::StatusCode::BAD_REQUEST, + "selection.snapshot_id should not be empty", + )); + } + let Some(expected_total) = expected_total else { + return Err(build_admin_pool_error_response( + http::StatusCode::BAD_REQUEST, + "selection.expected_total is required", + )); + }; + Ok(Some(AdminPoolSelectionSnapshotReference { + snapshot_id: snapshot_id.to_string(), + expected_total: *expected_total, + })) + } + } +} + +async fn build_admin_pool_snapshot_batch_action_response( + state: &AdminAppState<'_>, + request_context: &AdminRequestContext<'_>, + provider_id: &str, + payload: AdminPoolBatchActionRequest, + reference: AdminPoolSelectionSnapshotReference, +) -> Result, GatewayError> { + let key_ids = match resolve_admin_pool_selection_snapshot_key_ids( + state, + request_context, + provider_id, + &reference, + ) + .await? + { + Ok(key_ids) => key_ids, + Err(response) => return Ok(response), + }; + + let Some(provider) = state + .read_provider_catalog_providers_by_ids(std::slice::from_ref(&provider_id.to_string())) + .await? + .into_iter() + .next() + else { + return Ok(build_admin_pool_error_response( + http::StatusCode::NOT_FOUND, + format!("Provider {provider_id} 不存在"), + )); + }; + + let mut affected = 0usize; + let mut action_label = None; + for chunk in key_ids.chunks(ADMIN_POOL_SELECTION_SNAPSHOT_ITEM_PAGE_SIZE) { + let chunk_payload = AdminPoolBatchActionRequest { + key_ids: chunk.to_vec(), + selection: None, + action: payload.action.clone(), + payload: payload.payload.clone(), + }; + let plan = match admin_provider_pool_pure::build_admin_pool_batch_action_plan(chunk_payload) + { + Ok(plan) => plan, + Err(detail) => { + return Ok(build_admin_pool_error_response( + http::StatusCode::BAD_REQUEST, + detail, + )); + } + }; + action_label.get_or_insert(plan.action_label); + affected = affected.saturating_add( + state + .execute_admin_pool_batch_action_plan(&provider, plan) + .await?, + ); + } + + let action_label = action_label.unwrap_or("processed"); + Ok(Json( + admin_provider_pool_pure::build_admin_pool_batch_action_result_payload( + affected, + action_label, + ), + ) + .into_response()) +} + pub(super) async fn build_admin_pool_batch_action_response( state: &AdminAppState<'_>, request_context: &AdminRequestContext<'_>, @@ -54,6 +167,20 @@ pub(super) async fn build_admin_pool_batch_action_response( )); } }; + let reference = match admin_pool_batch_snapshot_reference(&payload) { + Ok(reference) => reference, + Err(response) => return Ok(response), + }; + if let Some(reference) = reference { + return build_admin_pool_snapshot_batch_action_response( + state, + request_context, + &provider_id, + payload, + reference, + ) + .await; + } state .build_admin_pool_batch_action_response(&provider_id, payload) diff --git a/apps/aether-gateway/src/handlers/admin/provider/pool_admin/mod.rs b/apps/aether-gateway/src/handlers/admin/provider/pool_admin/mod.rs index 4cc02f666..6e419a63f 100644 --- a/apps/aether-gateway/src/handlers/admin/provider/pool_admin/mod.rs +++ b/apps/aether-gateway/src/handlers/admin/provider/pool_admin/mod.rs @@ -27,6 +27,7 @@ mod read_resolve_selection; #[path = "read_routes/scores.rs"] mod read_scores; pub(crate) mod selection; +mod selection_snapshots; mod support; pub(crate) use self::batch_shared::{ @@ -35,12 +36,24 @@ pub(crate) use self::batch_shared::{ build_admin_pool_batch_delete_task_payload, AdminPoolBatchActionRequest, AdminPoolBatchImportRequest, }; +pub(crate) use self::selection_snapshots::{ + admin_pool_selection_snapshot_key, read_admin_pool_selection_snapshot, + resolve_admin_pool_selection_snapshot_key_ids, store_admin_pool_selection_snapshot, + validate_admin_pool_selection_snapshot_reference, AdminPoolSelectionSnapshot, + AdminPoolSelectionSnapshotItem, AdminPoolSelectionSnapshotReference, + ADMIN_POOL_SELECTION_SNAPSHOT_ITEM_PAGE_SIZE, + ADMIN_POOL_SELECTION_SNAPSHOT_MAX_ACTIVE_PER_ADMIN_PROVIDER, + ADMIN_POOL_SELECTION_SNAPSHOT_MAX_TOTAL, +}; pub(crate) use self::support::{ admin_pool_provider_id_from_path, admin_pool_provider_id_from_scores_path, - parse_admin_pool_key_sort, parse_admin_pool_page, parse_admin_pool_page_size, - parse_admin_pool_quick_selectors, parse_admin_pool_search, parse_admin_pool_status_filter, - AdminPoolKeySort, AdminPoolKeySortDirection, AdminPoolKeySortField, - AdminPoolResolveSelectionRequest, ADMIN_POOL_BANNED_KEY_CLEANUP_EMPTY_MESSAGE, + parse_admin_pool_key_sort, parse_admin_pool_key_sort_values, parse_admin_pool_page, + parse_admin_pool_page_size, parse_admin_pool_page_size_value, parse_admin_pool_page_value, + parse_admin_pool_quick_selectors, parse_admin_pool_search, parse_admin_pool_search_scope, + parse_admin_pool_search_scope_value, parse_admin_pool_status_filter, + parse_admin_pool_status_filter_value, AdminPoolKeySort, AdminPoolKeySortDirection, + AdminPoolKeySortField, AdminPoolResolveSelectionRequest, AdminPoolSearchScope, + ADMIN_POOL_BANNED_KEY_CLEANUP_EMPTY_MESSAGE, ADMIN_POOL_PROVIDER_CATALOG_READER_UNAVAILABLE_DETAIL, ADMIN_POOL_PROVIDER_CATALOG_WRITER_UNAVAILABLE_DETAIL, }; @@ -105,6 +118,16 @@ pub(crate) async fn maybe_build_local_admin_pool_response( read_keys::build_admin_pool_list_keys_response(state, request_context).await?, )); } + Some("create_selection_snapshot") => { + return Ok(Some( + read_keys::build_admin_pool_create_selection_snapshot_response( + state, + request_context, + request_body, + ) + .await?, + )); + } Some("scores") => { return Ok(Some( read_scores::build_admin_pool_scores_response(state, request_context).await?, diff --git a/apps/aether-gateway/src/handlers/admin/provider/pool_admin/read_routes/keys.rs b/apps/aether-gateway/src/handlers/admin/provider/pool_admin/read_routes/keys.rs index 698e1f82a..a16e0ed49 100644 --- a/apps/aether-gateway/src/handlers/admin/provider/pool_admin/read_routes/keys.rs +++ b/apps/aether-gateway/src/handlers/admin/provider/pool_admin/read_routes/keys.rs @@ -1,28 +1,34 @@ use super::{ admin_pool_provider_id_from_path, admin_provider_pool_config, build_admin_pool_error_response, - parse_admin_pool_key_sort, parse_admin_pool_page, parse_admin_pool_page_size, - parse_admin_pool_quick_selectors, parse_admin_pool_search, parse_admin_pool_status_filter, - pool_payloads, pool_selection, read_admin_provider_pool_runtime_state, AdminPoolKeySort, - AdminPoolKeySortDirection, AdminPoolKeySortField, AdminProviderPoolRuntimeState, - ProviderCatalogKeyListOrder, ProviderCatalogKeyListQuery, - ADMIN_POOL_PROVIDER_CATALOG_READER_UNAVAILABLE_DETAIL, + parse_admin_pool_key_sort, parse_admin_pool_key_sort_values, parse_admin_pool_page, + parse_admin_pool_page_size, parse_admin_pool_page_size_value, parse_admin_pool_page_value, + parse_admin_pool_quick_selectors, parse_admin_pool_search, parse_admin_pool_search_scope, + parse_admin_pool_search_scope_value, parse_admin_pool_status_filter, + parse_admin_pool_status_filter_value, pool_payloads, pool_selection, + read_admin_provider_pool_runtime_state, store_admin_pool_selection_snapshot, AdminPoolKeySort, + AdminPoolKeySortDirection, AdminPoolKeySortField, AdminPoolSearchScope, + AdminPoolSelectionSnapshotItem, AdminProviderPoolRuntimeState, ProviderCatalogKeyListOrder, + ProviderCatalogKeyListQuery, ADMIN_POOL_PROVIDER_CATALOG_READER_UNAVAILABLE_DETAIL, + ADMIN_POOL_SELECTION_SNAPSHOT_MAX_TOTAL, }; use crate::ai_serving::{provider_key_pool_score_id, provider_key_pool_score_scope}; use crate::handlers::admin::provider::shared::support::AdminProviderPoolConfig; use crate::handlers::admin::request::{AdminAppState, AdminRequestContext}; -use crate::handlers::admin::shared::provider_key_status_snapshot_payload; +use crate::handlers::admin::shared::{provider_key_status_snapshot_payload, unix_secs_to_rfc3339}; use crate::provider_key_auth::provider_key_auth_semantics; use crate::GatewayError; use aether_admin::provider::pool as admin_provider_pool_pure; use aether_data_contracts::repository::pool_scores::{ GetPoolMemberScoresByIdsQuery, PoolMemberIdentity, StoredPoolMemberScore, }; -use aether_data_contracts::repository::provider_catalog::StoredProviderCatalogKey; +use aether_data_contracts::repository::provider_catalog::{ + StoredProviderCatalogKey, StoredProviderCatalogProvider, +}; use aether_data_contracts::repository::usage::{ ProviderApiKeyWindowUsageRequest, StoredProviderApiKeyWindowUsageSummary, }; use axum::{ - body::Body, + body::{Body, Bytes}, http, response::{IntoResponse, Response}, Json, @@ -34,6 +40,8 @@ use std::{ time::{SystemTime, UNIX_EPOCH}, }; +const ADMIN_POOL_SELECTION_SNAPSHOT_TTL_SECONDS: u64 = 15 * 60; + type AdminPoolCodexCycleUsageByKey = BTreeMap>; @@ -293,6 +301,238 @@ fn admin_pool_repository_key_order(sort: AdminPoolKeySort) -> ProviderCatalogKey } } +fn admin_pool_key_matches_name_search( + key: &StoredProviderCatalogKey, + search: Option<&str>, +) -> bool { + let Some(search) = search else { + return true; + }; + let search = search.trim().to_ascii_lowercase(); + if search.is_empty() { + return true; + } + key.name.to_ascii_lowercase().contains(&search) || key.id.to_ascii_lowercase().contains(&search) +} + +fn admin_pool_normalized_expected_page_key_ids(values: Vec) -> Vec { + values + .into_iter() + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty()) + .collect() +} + +fn admin_pool_sort_field_label(field: AdminPoolKeySortField) -> &'static str { + match field { + AdminPoolKeySortField::Default => "name", + AdminPoolKeySortField::ImportedAt => "imported_at", + AdminPoolKeySortField::LastUsedAt => "last_used_at", + AdminPoolKeySortField::Score => "score", + } +} + +fn admin_pool_sort_direction_label(direction: AdminPoolKeySortDirection) -> &'static str { + match direction { + AdminPoolKeySortDirection::Asc => "asc", + AdminPoolKeySortDirection::Desc => "desc", + } +} + +fn admin_pool_effective_search_scope( + requested_scope: AdminPoolSearchScope, + status: &str, + quick_selectors: &[String], + sort: AdminPoolKeySort, +) -> AdminPoolSearchScope { + let legacy_full_search_path = status != "all" + || !quick_selectors.is_empty() + || matches!(sort.field, AdminPoolKeySortField::Score); + match (requested_scope, legacy_full_search_path) { + (AdminPoolSearchScope::Auto, true) => AdminPoolSearchScope::Full, + (AdminPoolSearchScope::Auto, false) => AdminPoolSearchScope::Name, + (scope, _) => scope, + } +} + +async fn build_admin_pool_selection_snapshot_payload( + state: &AdminAppState<'_>, + request_context: &AdminRequestContext<'_>, + provider_id: &str, + search: Option<&str>, + quick_selectors: &[String], + status: &str, + search_scope: AdminPoolSearchScope, + sort: AdminPoolKeySort, + now_unix_secs: u64, + keys: &[StoredProviderCatalogKey], +) -> Result { + let mut items = Vec::with_capacity(keys.len()); + for (ordinal, key) in keys.iter().enumerate() { + let ordinal = u64::try_from(ordinal).map_err(|_| { + GatewayError::Internal("pool selection snapshot ordinal overflow".to_string()) + })?; + items.push(AdminPoolSelectionSnapshotItem { + ordinal, + key_id: key.id.clone(), + key_updated_at_unix_secs: key.updated_at_unix_secs, + }); + } + + let snapshot = store_admin_pool_selection_snapshot( + state, + request_context, + provider_id, + Some(json!({ + "provider_id": provider_id, + "search": search, + "search_scope": match search_scope { + AdminPoolSearchScope::Auto => "auto", + AdminPoolSearchScope::Name => "name", + AdminPoolSearchScope::Full => "full", + }, + "quick_selectors": quick_selectors, + "status": status, + "sort_by": admin_pool_sort_field_label(sort.field), + "sort_order": admin_pool_sort_direction_label(sort.direction), + })), + now_unix_secs, + ADMIN_POOL_SELECTION_SNAPSHOT_TTL_SECONDS, + items, + ) + .await?; + + Ok(json!({ + "id": snapshot.id, + "total": snapshot.total, + "status": snapshot.status, + "created_at_unix_secs": snapshot.created_at_unix_secs, + "created_at": unix_secs_to_rfc3339(snapshot.created_at_unix_secs), + "expires_at_unix_secs": snapshot.expires_at_unix_secs, + "expires_at": unix_secs_to_rfc3339(snapshot.expires_at_unix_secs), + })) +} + +async fn build_admin_pool_key_payload_items( + state: &AdminAppState<'_>, + provider: &StoredProviderCatalogProvider, + pool_config: Option, + keys: Vec, + preloaded_pool_scores_by_key_id: Option>, + now_unix_secs: u64, +) -> Result, GatewayError> { + let key_ids = keys.iter().map(|key| key.id.clone()).collect::>(); + let pool_scores_by_key_id = match preloaded_pool_scores_by_key_id { + Some(scores) => scores, + None => read_admin_pool_scores_by_key_id(state, &provider.id, &key_ids) + .await + .unwrap_or_default(), + }; + let endpoints = state + .list_provider_catalog_endpoints_by_provider_ids(std::slice::from_ref(&provider.id)) + .await?; + let runtime = match pool_config.as_ref() { + Some(pool_config) if !key_ids.is_empty() => { + read_admin_provider_pool_runtime_state( + state.runtime_state(), + &provider.id, + &key_ids, + pool_config, + None, + ) + .await + } + _ => AdminProviderPoolRuntimeState::default(), + }; + let codex_cycle_usage_by_key = read_admin_pool_codex_cycle_usage_by_key( + state, + &provider.provider_type, + &keys, + now_unix_secs, + ) + .await?; + + Ok(keys + .into_iter() + .map(|key| { + pool_payloads::build_admin_pool_key_payload( + state, + &provider.provider_type, + &endpoints, + &key, + &runtime, + pool_config.clone(), + pool_scores_by_key_id.get(&key.id), + codex_cycle_usage_by_key.get(&key.id), + now_unix_secs, + ) + }) + .collect::>()) +} + +async fn maybe_build_admin_pool_selection_snapshot_payload( + state: &AdminAppState<'_>, + request_context: &AdminRequestContext<'_>, + provider_id: &str, + search: Option<&str>, + quick_selectors: &[String], + status: &str, + search_scope: AdminPoolSearchScope, + sort: AdminPoolKeySort, + now_unix_secs: u64, + keys: &[StoredProviderCatalogKey], + page_offset: usize, + page_size: usize, + expected_total: Option, + expected_page_key_ids: Option<&[String]>, +) -> Result<(Option, Option), GatewayError> { + let total = keys.len(); + let mut mismatch = None; + if let Some(expected_total) = expected_total { + if expected_total != total { + mismatch = Some(json!({ + "reason": "total_changed", + "expected_total": expected_total, + "actual_total": total, + })); + } + } + if let Some(expected_page_key_ids) = expected_page_key_ids { + let actual_page_key_ids = keys + .iter() + .skip(page_offset) + .take(page_size) + .map(|key| key.id.clone()) + .collect::>(); + if expected_page_key_ids != actual_page_key_ids.as_slice() { + mismatch = Some(json!({ + "reason": "page_keys_changed", + "expected_total": expected_total.unwrap_or(total), + "actual_total": total, + })); + } + } + + Ok(( + Some( + build_admin_pool_selection_snapshot_payload( + state, + request_context, + provider_id, + search, + quick_selectors, + status, + search_scope, + sort, + now_unix_secs, + keys, + ) + .await?, + ), + mismatch, + )) +} + fn admin_pool_trimmed_string(value: Option<&Value>) -> Option { value .and_then(Value::as_str) @@ -491,6 +731,93 @@ fn admin_pool_key_visible_status_filter( "available" } +async fn read_admin_pool_filtered_sorted_keys( + state: &AdminAppState<'_>, + provider: &StoredProviderCatalogProvider, + pool_config: Option<&AdminProviderPoolConfig>, + search: Option<&str>, + search_scope: AdminPoolSearchScope, + quick_selectors: &[String], + status: &str, + sort: AdminPoolKeySort, + now_unix_secs: u64, +) -> Result< + ( + Vec, + Option>, + ), + GatewayError, +> { + let mut keys = state + .list_provider_catalog_keys_by_provider_ids(std::slice::from_ref(&provider.id)) + .await? + .into_iter() + .filter(|key| match search_scope { + AdminPoolSearchScope::Auto | AdminPoolSearchScope::Name => { + admin_pool_key_matches_name_search(key, search) + } + AdminPoolSearchScope::Full if search.is_some() => { + pool_selection::admin_pool_matches_search( + state, + key, + &provider.provider_type, + search, + ) + } + AdminPoolSearchScope::Full => true, + }) + .filter(|key| { + quick_selectors.iter().all(|selector| { + pool_selection::admin_pool_matches_quick_selector( + state, + key, + &provider.provider_type, + selector, + ) + }) + }) + .collect::>(); + + if status != "all" { + let key_ids = keys.iter().map(|key| key.id.clone()).collect::>(); + let runtime = match pool_config { + Some(pool_config) if !key_ids.is_empty() => { + read_admin_provider_pool_runtime_state( + state.runtime_state(), + &provider.id, + &key_ids, + pool_config, + None, + ) + .await + } + _ => AdminProviderPoolRuntimeState::default(), + }; + keys.retain(|key| { + admin_pool_key_visible_status_filter( + state, + key, + &provider.provider_type, + pool_config, + &runtime, + now_unix_secs, + ) == status + }); + } + + if matches!(sort.field, AdminPoolKeySortField::Score) { + let key_ids = keys.iter().map(|key| key.id.clone()).collect::>(); + let scores = read_admin_pool_scores_by_key_id(state, &provider.id, &key_ids) + .await + .unwrap_or_default(); + admin_pool_sort_keys_by_score(&mut keys, &scores, sort.direction); + Ok((keys, Some(scores))) + } else { + admin_pool_sort_keys_for_request(&mut keys, sort); + Ok((keys, None)) + } +} + pub(super) async fn build_admin_pool_list_keys_response( state: &AdminAppState<'_>, request_context: &AdminRequestContext<'_>, @@ -528,6 +855,15 @@ pub(super) async fn build_admin_pool_list_keys_response( } }; let search = parse_admin_pool_search(query).map(|value| value.to_ascii_lowercase()); + let search_scope = match parse_admin_pool_search_scope(query) { + Ok(value) => value, + Err(detail) => { + return Ok(build_admin_pool_error_response( + http::StatusCode::BAD_REQUEST, + detail, + )); + } + }; let quick_selectors = admin_provider_pool_pure::admin_pool_sanitize_quick_selectors( parse_admin_pool_quick_selectors(query), ); @@ -566,114 +902,26 @@ pub(super) async fn build_admin_pool_list_keys_response( let page_offset = page.saturating_sub(1).saturating_mul(page_size); let sort_by_score = matches!(sort.field, AdminPoolKeySortField::Score); let now_unix_secs = admin_pool_current_unix_secs(); + let search_scope = + admin_pool_effective_search_scope(search_scope, &status, &quick_selectors, sort); - let (keys, total, preloaded_pool_scores_by_key_id) = if status != "all" { - let mut keys = state - .list_provider_catalog_keys_by_provider_ids(std::slice::from_ref(&provider.id)) - .await? - .into_iter() - .collect::>(); - if let Some(keyword) = search.as_ref() { - keys.retain(|key| { - pool_selection::admin_pool_matches_search( - state, - key, - &provider.provider_type, - Some(keyword), - ) - }); - } - if !quick_selectors.is_empty() { - keys.retain(|key| { - quick_selectors.iter().all(|selector| { - pool_selection::admin_pool_matches_quick_selector( - state, - key, - &provider.provider_type, - selector, - ) - }) - }); - } - - let key_ids = keys.iter().map(|key| key.id.clone()).collect::>(); - let runtime = match pool_config.as_ref() { - Some(pool_config) if !key_ids.is_empty() => { - read_admin_provider_pool_runtime_state( - state.runtime_state(), - &provider.id, - &key_ids, - pool_config, - None, - ) - .await - } - _ => AdminProviderPoolRuntimeState::default(), - }; - keys.retain(|key| { - admin_pool_key_visible_status_filter( - state, - key, - &provider.provider_type, - pool_config.as_ref(), - &runtime, - now_unix_secs, - ) == status - }); - - let preloaded_pool_scores_by_key_id = if sort_by_score { - let key_ids = keys.iter().map(|key| key.id.clone()).collect::>(); - let scores = read_admin_pool_scores_by_key_id(state, &provider.id, &key_ids) - .await - .unwrap_or_default(); - admin_pool_sort_keys_by_score(&mut keys, &scores, sort.direction); - Some(scores) - } else { - admin_pool_sort_keys_for_request(&mut keys, sort); - None - }; - let total = keys.len(); - let keys = keys - .into_iter() - .skip(page_offset) - .take(page_size) - .collect::>(); - (keys, total, preloaded_pool_scores_by_key_id) - } else if !quick_selectors.is_empty() || sort_by_score { - let mut keys = state - .list_provider_catalog_keys_by_provider_ids(std::slice::from_ref(&provider.id)) - .await? - .into_iter() - .filter(|key| { - pool_selection::admin_pool_matches_search( - state, - key, - &provider.provider_type, - search.as_deref(), - ) - }) - .filter(|key| { - quick_selectors.iter().all(|selector| { - pool_selection::admin_pool_matches_quick_selector( - state, - key, - &provider.provider_type, - selector, - ) - }) - }) - .collect::>(); - let preloaded_pool_scores_by_key_id = if sort_by_score { - let key_ids = keys.iter().map(|key| key.id.clone()).collect::>(); - let scores = read_admin_pool_scores_by_key_id(state, &provider.id, &key_ids) - .await - .unwrap_or_default(); - admin_pool_sort_keys_by_score(&mut keys, &scores, sort.direction); - Some(scores) - } else { - admin_pool_sort_keys_for_request(&mut keys, sort); - None - }; + let (keys, total, preloaded_pool_scores_by_key_id) = if status != "all" + || !quick_selectors.is_empty() + || sort_by_score + || (matches!(search_scope, AdminPoolSearchScope::Full) && search.is_some()) + { + let (keys, preloaded_pool_scores_by_key_id) = read_admin_pool_filtered_sorted_keys( + state, + &provider, + pool_config.as_ref(), + search.as_deref(), + search_scope, + &quick_selectors, + &status, + sort, + now_unix_secs, + ) + .await?; let total = keys.len(); let keys = keys .into_iter() @@ -695,61 +943,222 @@ pub(super) async fn build_admin_pool_list_keys_response( (key_page.items, key_page.total, None) }; - let key_ids = keys.iter().map(|key| key.id.clone()).collect::>(); - let pool_scores_by_key_id = match preloaded_pool_scores_by_key_id { - Some(scores) => scores, - None => read_admin_pool_scores_by_key_id(state, &provider.id, &key_ids) - .await - .unwrap_or_default(), + let items = build_admin_pool_key_payload_items( + state, + &provider, + pool_config, + keys, + preloaded_pool_scores_by_key_id, + now_unix_secs, + ) + .await?; + + let payload = json!({ + "total": total, + "page": page, + "page_size": page_size, + "keys": items, + }); + + Ok(Json(payload).into_response()) +} + +pub(super) async fn build_admin_pool_create_selection_snapshot_response( + state: &AdminAppState<'_>, + request_context: &AdminRequestContext<'_>, + request_body: Option<&Bytes>, +) -> Result, GatewayError> { + if !state.has_provider_catalog_data_reader() { + return Ok(build_admin_pool_error_response( + http::StatusCode::SERVICE_UNAVAILABLE, + ADMIN_POOL_PROVIDER_CATALOG_READER_UNAVAILABLE_DETAIL, + )); + } + + let Some(provider_id) = admin_pool_provider_id_from_path(request_context.path()) else { + return Ok(build_admin_pool_error_response( + http::StatusCode::BAD_REQUEST, + "provider_id 无效", + )); }; - let endpoints = state - .list_provider_catalog_endpoints_by_provider_ids(std::slice::from_ref(&provider.id)) - .await?; - let runtime = match pool_config.as_ref() { - Some(pool_config) if !key_ids.is_empty() => { - read_admin_provider_pool_runtime_state( - state.runtime_state(), - &provider.id, - &key_ids, - pool_config, - None, - ) - .await + let payload = match request_body { + Some(body) if !body.is_empty() => { + match serde_json::from_slice::< + admin_provider_pool_pure::AdminPoolCreateSelectionSnapshotRequest, + >(body) + { + Ok(value) => value, + Err(_) => { + return Ok(build_admin_pool_error_response( + http::StatusCode::BAD_REQUEST, + "Invalid JSON request body", + )); + } + } + } + _ => { + return Ok(build_admin_pool_error_response( + http::StatusCode::BAD_REQUEST, + "Invalid JSON request body", + )); } - _ => AdminProviderPoolRuntimeState::default(), }; - let codex_cycle_usage_by_key = read_admin_pool_codex_cycle_usage_by_key( + + let page = match parse_admin_pool_page_value(payload.page) { + Ok(value) => value, + Err(detail) => { + return Ok(build_admin_pool_error_response( + http::StatusCode::BAD_REQUEST, + detail, + )); + } + }; + let page_size = match parse_admin_pool_page_size_value(payload.page_size) { + Ok(value) => value, + Err(detail) => { + return Ok(build_admin_pool_error_response( + http::StatusCode::BAD_REQUEST, + detail, + )); + } + }; + let search = payload.search.trim().to_ascii_lowercase(); + let search = (!search.is_empty()).then_some(search); + let search_scope = match parse_admin_pool_search_scope_value( + (!payload.search_scope.trim().is_empty()).then_some(payload.search_scope.as_str()), + ) { + Ok(value) => value, + Err(detail) => { + return Ok(build_admin_pool_error_response( + http::StatusCode::BAD_REQUEST, + detail, + )); + } + }; + let quick_selectors = + admin_provider_pool_pure::admin_pool_sanitize_quick_selectors(payload.quick_selectors); + let status_value = payload.status.trim(); + let status = match parse_admin_pool_status_filter_value( + (!status_value.is_empty()).then_some(status_value), + ) { + Ok(value) => value, + Err(detail) => { + return Ok(build_admin_pool_error_response( + http::StatusCode::BAD_REQUEST, + detail, + )); + } + }; + let sort = match parse_admin_pool_key_sort_values( + payload.sort_by.as_deref(), + payload.sort_order.as_deref(), + ) { + Ok(value) => value, + Err(detail) => { + return Ok(build_admin_pool_error_response( + http::StatusCode::BAD_REQUEST, + detail, + )); + } + }; + let Some(expected_total) = payload.expected_total else { + return Ok(build_admin_pool_error_response( + http::StatusCode::BAD_REQUEST, + "expected_total is required", + )); + }; + let expected_page_key_ids = + admin_pool_normalized_expected_page_key_ids(payload.expected_page_key_ids); + + let Some(provider) = state + .read_provider_catalog_providers_by_ids(std::slice::from_ref(&provider_id)) + .await? + .into_iter() + .next() + else { + return Ok(build_admin_pool_error_response( + http::StatusCode::NOT_FOUND, + format!("Provider {provider_id} 不存在"), + )); + }; + + let pool_config = admin_provider_pool_config(&provider); + let page_offset = page.saturating_sub(1).saturating_mul(page_size); + let now_unix_secs = admin_pool_current_unix_secs(); + let search_scope = + admin_pool_effective_search_scope(search_scope, &status, &quick_selectors, sort); + let (keys, preloaded_pool_scores_by_key_id) = read_admin_pool_filtered_sorted_keys( state, - &provider.provider_type, - &keys, + &provider, + pool_config.as_ref(), + search.as_deref(), + search_scope, + &quick_selectors, + &status, + sort, now_unix_secs, ) .await?; + let total = keys.len(); + if total > ADMIN_POOL_SELECTION_SNAPSHOT_MAX_TOTAL { + return Ok(build_admin_pool_error_response( + http::StatusCode::PAYLOAD_TOO_LARGE, + format!( + "筛选结果过大(最多支持 {} 个账号),请缩小筛选范围后重试", + ADMIN_POOL_SELECTION_SNAPSHOT_MAX_TOTAL + ), + )); + } - let items = keys - .into_iter() - .map(|key| { - pool_payloads::build_admin_pool_key_payload( - state, - &provider.provider_type, - &endpoints, - &key, - &runtime, - pool_config.clone(), - pool_scores_by_key_id.get(&key.id), - codex_cycle_usage_by_key.get(&key.id), - now_unix_secs, - ) - }) + let (selection_snapshot, selection_snapshot_mismatch) = + maybe_build_admin_pool_selection_snapshot_payload( + state, + request_context, + &provider.id, + search.as_deref(), + &quick_selectors, + &status, + search_scope, + sort, + now_unix_secs, + &keys, + page_offset, + page_size, + Some(expected_total), + Some(expected_page_key_ids.as_slice()), + ) + .await?; + + let page_keys = keys + .iter() + .skip(page_offset) + .take(page_size) + .cloned() .collect::>(); + let page_items = build_admin_pool_key_payload_items( + state, + &provider, + pool_config, + page_keys, + preloaded_pool_scores_by_key_id, + now_unix_secs, + ) + .await?; - Ok(Json(json!({ + let mut response_payload = json!({ "total": total, "page": page, "page_size": page_size, - "keys": items, - })) - .into_response()) + "keys": page_items, + }); + if let Some(selection_snapshot) = selection_snapshot { + response_payload["selection_snapshot"] = selection_snapshot; + } + if let Some(selection_snapshot_mismatch) = selection_snapshot_mismatch { + response_payload["selection_snapshot_mismatch"] = selection_snapshot_mismatch; + } + + Ok(Json(response_payload).into_response()) } #[cfg(test)] diff --git a/apps/aether-gateway/src/handlers/admin/provider/pool_admin/read_routes/resolve_selection.rs b/apps/aether-gateway/src/handlers/admin/provider/pool_admin/read_routes/resolve_selection.rs index 123b1c91c..f72c59179 100644 --- a/apps/aether-gateway/src/handlers/admin/provider/pool_admin/read_routes/resolve_selection.rs +++ b/apps/aether-gateway/src/handlers/admin/provider/pool_admin/read_routes/resolve_selection.rs @@ -1,11 +1,13 @@ use super::{ admin_pool_provider_id_from_path, build_admin_pool_error_response, pool_selection, - AdminPoolResolveSelectionRequest, ADMIN_POOL_PROVIDER_CATALOG_READER_UNAVAILABLE_DETAIL, + resolve_admin_pool_selection_snapshot_key_ids, AdminPoolResolveSelectionRequest, + AdminPoolSelectionSnapshotReference, ADMIN_POOL_PROVIDER_CATALOG_READER_UNAVAILABLE_DETAIL, }; use crate::handlers::admin::request::{AdminAppState, AdminRequestContext}; use crate::provider_key_auth::{provider_key_auth_semantics, provider_key_can_refresh_oauth}; use crate::GatewayError; use aether_admin::provider::pool as admin_provider_pool_pure; +use aether_data_contracts::repository::provider_catalog::StoredProviderCatalogKey; use axum::{ body::{Body, Bytes}, http, @@ -13,6 +15,30 @@ use axum::{ Json, }; use serde_json::json; +use std::collections::BTreeMap; + +const ADMIN_POOL_SELECTION_KEY_FETCH_CHUNK_SIZE: usize = 500; + +fn build_admin_pool_resolved_selection_item( + state: &AdminAppState<'_>, + key: &StoredProviderCatalogKey, + provider_type: &str, +) -> serde_json::Value { + let auth_semantics = provider_key_auth_semantics(key, provider_type); + let auth_config = state.parse_catalog_auth_config_json(key); + json!({ + "key_id": key.id, + "key_name": key.name, + "auth_type": key.auth_type, + "auth_type_by_format": key.auth_type_by_format, + "credential_kind": auth_semantics.credential_kind().as_str(), + "runtime_auth_kind": auth_semantics.runtime_auth_kind().as_str(), + "oauth_managed": auth_semantics.oauth_managed(), + "can_refresh_oauth": provider_key_can_refresh_oauth(auth_semantics, auth_config.as_ref()), + "can_export_oauth": auth_semantics.can_export_oauth(), + "can_edit_oauth": auth_semantics.can_edit_oauth(), + }) +} pub(super) async fn build_admin_pool_resolve_selection_response( state: &AdminAppState<'_>, @@ -60,6 +86,61 @@ pub(super) async fn build_admin_pool_resolve_selection_response( }; let provider_type = provider.provider_type.clone(); + if let Some(snapshot_id) = payload + .snapshot_id + .as_deref() + .map(str::trim) + .filter(|value| !value.is_empty()) + { + let Some(expected_total) = payload.expected_total else { + return Ok(build_admin_pool_error_response( + http::StatusCode::BAD_REQUEST, + "selection.expected_total is required", + )); + }; + let reference = AdminPoolSelectionSnapshotReference { + snapshot_id: snapshot_id.to_string(), + expected_total, + }; + let key_ids = match resolve_admin_pool_selection_snapshot_key_ids( + state, + request_context, + &provider.id, + &reference, + ) + .await? + { + Ok(key_ids) => key_ids, + Err(response) => return Ok(response), + }; + let mut keys_by_id = BTreeMap::new(); + for chunk in key_ids.chunks(ADMIN_POOL_SELECTION_KEY_FETCH_CHUNK_SIZE) { + for key in state + .list_provider_catalog_keys_by_ids(chunk) + .await? + .into_iter() + .filter(|key| key.provider_id == provider.id) + { + keys_by_id.insert(key.id.clone(), key); + } + } + let items = key_ids + .iter() + .filter_map(|key_id| keys_by_id.get(key_id)) + .map(|key| build_admin_pool_resolved_selection_item(state, key, &provider_type)) + .collect::>(); + let snapshot_total = key_ids.len(); + let missing_count = snapshot_total.saturating_sub(items.len()); + + return Ok(Json(json!({ + "total": items.len(), + "snapshot_total": snapshot_total, + "missing_count": missing_count, + "items": items, + })) + .into_response()); + } + let search = payload.search.trim(); let quick_selectors = admin_provider_pool_pure::admin_pool_sanitize_quick_selectors(payload.quick_selectors); @@ -92,22 +173,7 @@ pub(super) async fn build_admin_pool_resolve_selection_response( let items = keys .iter() - .map(|key| { - let auth_semantics = provider_key_auth_semantics(key, &provider_type); - let auth_config = state.parse_catalog_auth_config_json(key); - json!({ - "key_id": key.id, - "key_name": key.name, - "auth_type": key.auth_type, - "auth_type_by_format": key.auth_type_by_format, - "credential_kind": auth_semantics.credential_kind().as_str(), - "runtime_auth_kind": auth_semantics.runtime_auth_kind().as_str(), - "oauth_managed": auth_semantics.oauth_managed(), - "can_refresh_oauth": provider_key_can_refresh_oauth(auth_semantics, auth_config.as_ref()), - "can_export_oauth": auth_semantics.can_export_oauth(), - "can_edit_oauth": auth_semantics.can_edit_oauth(), - }) - }) + .map(|key| build_admin_pool_resolved_selection_item(state, key, &provider_type)) .collect::>(); Ok(Json(json!({ diff --git a/apps/aether-gateway/src/handlers/admin/provider/pool_admin/selection_snapshots.rs b/apps/aether-gateway/src/handlers/admin/provider/pool_admin/selection_snapshots.rs new file mode 100644 index 000000000..f07590cab --- /dev/null +++ b/apps/aether-gateway/src/handlers/admin/provider/pool_admin/selection_snapshots.rs @@ -0,0 +1,355 @@ +use super::build_admin_pool_error_response; +use crate::handlers::admin::request::{AdminAppState, AdminRequestContext}; +use crate::GatewayError; +use aether_runtime_state::{RuntimeLockLease, RuntimeState}; +use axum::{body::Body, http, response::Response}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::{ + collections::BTreeSet, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; +use tokio::time::sleep; +use uuid::Uuid; + +pub(crate) const ADMIN_POOL_SELECTION_SNAPSHOT_ITEM_PAGE_SIZE: usize = 1_000; +pub(crate) const ADMIN_POOL_SELECTION_SNAPSHOT_MAX_TOTAL: usize = 20_000; +pub(crate) const ADMIN_POOL_SELECTION_SNAPSHOT_MAX_ACTIVE_PER_ADMIN_PROVIDER: usize = 3; + +const ADMIN_POOL_SELECTION_SNAPSHOT_STATUS_READY: &str = "ready"; +const ADMIN_POOL_SELECTION_SNAPSHOT_INDEX_LOCK_TTL: Duration = Duration::from_secs(5); +const ADMIN_POOL_SELECTION_SNAPSHOT_INDEX_LOCK_RETRY_DELAY: Duration = Duration::from_millis(10); +const ADMIN_POOL_SELECTION_SNAPSHOT_INDEX_LOCK_RETRIES: usize = 100; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub(crate) struct AdminPoolSelectionSnapshotItem { + pub(crate) ordinal: u64, + pub(crate) key_id: String, + pub(crate) key_updated_at_unix_secs: Option, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub(crate) struct AdminPoolSelectionSnapshot { + pub(crate) id: String, + pub(crate) provider_id: String, + pub(crate) created_by: Option, + pub(crate) filter_json: Option, + pub(crate) total: usize, + pub(crate) status: String, + pub(crate) created_at_unix_secs: u64, + pub(crate) expires_at_unix_secs: u64, + pub(crate) items: Vec, +} + +pub(crate) struct AdminPoolSelectionSnapshotReference { + pub(crate) snapshot_id: String, + pub(crate) expected_total: usize, +} + +fn current_unix_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .ok() + .map(|duration| duration.as_secs()) + .unwrap_or(0) +} + +fn current_admin_user_id(request_context: &AdminRequestContext<'_>) -> Option { + request_context + .decision() + .and_then(|decision| decision.admin_principal.as_ref()) + .map(|principal| principal.user_id.trim()) + .filter(|user_id| !user_id.is_empty()) + .map(ToOwned::to_owned) +} + +pub(crate) fn admin_pool_selection_snapshot_key(snapshot_id: &str) -> String { + format!("admin:pool:selection_snapshot:{}", snapshot_id.trim()) +} + +fn admin_pool_selection_snapshot_index_key(created_by: Option<&str>, provider_id: &str) -> String { + let owner = created_by + .map(str::trim) + .filter(|value| !value.is_empty()) + .unwrap_or("anonymous"); + format!("admin:pool:selection_snapshot_index:{provider_id}:{owner}") +} + +fn admin_pool_selection_snapshot_index_lock_key( + created_by: Option<&str>, + provider_id: &str, +) -> String { + format!( + "admin:pool:selection_snapshot_index_lock:{provider_id}:{}", + created_by + .map(str::trim) + .filter(|value| !value.is_empty()) + .unwrap_or("anonymous") + ) +} + +async fn acquire_admin_pool_selection_snapshot_index_lock( + runtime: &RuntimeState, + lock_key: &str, +) -> Result, GatewayError> { + let owner = format!( + "aether-gateway-admin-pool-selection-snapshot-{}", + std::process::id() + ); + for attempt in 0..ADMIN_POOL_SELECTION_SNAPSHOT_INDEX_LOCK_RETRIES { + let lease = runtime + .lock_try_acquire( + lock_key, + &owner, + ADMIN_POOL_SELECTION_SNAPSHOT_INDEX_LOCK_TTL, + ) + .await + .map_err(|err| GatewayError::Internal(err.to_string()))?; + if lease.is_some() { + return Ok(lease); + } + if attempt + 1 < ADMIN_POOL_SELECTION_SNAPSHOT_INDEX_LOCK_RETRIES { + sleep(ADMIN_POOL_SELECTION_SNAPSHOT_INDEX_LOCK_RETRY_DELAY).await; + } + } + Ok(None) +} + +async fn release_admin_pool_selection_snapshot_index_lock( + runtime: &RuntimeState, + lease: Option, +) -> Result<(), GatewayError> { + let Some(lease) = lease else { + return Ok(()); + }; + runtime + .lock_release(&lease) + .await + .map(|_| ()) + .map_err(|err| GatewayError::Internal(err.to_string())) +} + +async fn update_admin_pool_selection_snapshot_index( + runtime: &RuntimeState, + index_key: &str, + snapshot_id: &str, + ttl: Duration, +) -> Result<(), GatewayError> { + let mut snapshot_ids = match runtime + .kv_get(index_key) + .await + .map_err(|err| GatewayError::Internal(err.to_string()))? + { + Some(raw) => serde_json::from_str::>(&raw).unwrap_or_default(), + None => Vec::new(), + }; + snapshot_ids.retain(|id| id != snapshot_id); + snapshot_ids.push(snapshot_id.to_string()); + + let stale_snapshot_ids = + if snapshot_ids.len() > ADMIN_POOL_SELECTION_SNAPSHOT_MAX_ACTIVE_PER_ADMIN_PROVIDER { + let overflow = snapshot_ids + .len() + .saturating_sub(ADMIN_POOL_SELECTION_SNAPSHOT_MAX_ACTIVE_PER_ADMIN_PROVIDER); + snapshot_ids.drain(0..overflow).collect::>() + } else { + Vec::new() + }; + if !stale_snapshot_ids.is_empty() { + let stale_keys = stale_snapshot_ids + .iter() + .map(|snapshot_id| admin_pool_selection_snapshot_key(snapshot_id)) + .collect::>(); + runtime + .kv_delete_many(&stale_keys) + .await + .map_err(|err| GatewayError::Internal(err.to_string()))?; + } + + let serialized_index = serde_json::to_string(&snapshot_ids) + .map_err(|err| GatewayError::Internal(err.to_string()))?; + runtime + .kv_set(index_key, serialized_index, Some(ttl)) + .await + .map_err(|err| GatewayError::Internal(err.to_string())) +} + +pub(crate) async fn store_admin_pool_selection_snapshot( + state: &AdminAppState<'_>, + request_context: &AdminRequestContext<'_>, + provider_id: &str, + filter_json: Option, + now_unix_secs: u64, + ttl_seconds: u64, + items: Vec, +) -> Result { + if items.len() > ADMIN_POOL_SELECTION_SNAPSHOT_MAX_TOTAL { + return Err(GatewayError::Internal( + "pool selection snapshot exceeded max total before route validation".to_string(), + )); + } + + let created_by = current_admin_user_id(request_context); + let index_key = admin_pool_selection_snapshot_index_key(created_by.as_deref(), provider_id); + let lock_key = admin_pool_selection_snapshot_index_lock_key(created_by.as_deref(), provider_id); + let runtime = state.app().runtime_state(); + let lock = acquire_admin_pool_selection_snapshot_index_lock(runtime, &lock_key).await?; + if lock.is_none() { + return Err(GatewayError::Internal( + "pool selection snapshot index lock busy".to_string(), + )); + } + + let result = async { + let snapshot_id = Uuid::new_v4().to_string(); + let expires_at_unix_secs = now_unix_secs.saturating_add(ttl_seconds); + let snapshot = AdminPoolSelectionSnapshot { + id: snapshot_id, + provider_id: provider_id.to_string(), + created_by, + filter_json, + total: items.len(), + status: ADMIN_POOL_SELECTION_SNAPSHOT_STATUS_READY.to_string(), + created_at_unix_secs: now_unix_secs, + expires_at_unix_secs, + items, + }; + + let serialized_snapshot = serde_json::to_string(&snapshot) + .map_err(|err| GatewayError::Internal(err.to_string()))?; + let ttl = Duration::from_secs(ttl_seconds); + runtime + .kv_set( + &admin_pool_selection_snapshot_key(&snapshot.id), + serialized_snapshot, + Some(ttl), + ) + .await + .map_err(|err| GatewayError::Internal(err.to_string()))?; + update_admin_pool_selection_snapshot_index(runtime, &index_key, &snapshot.id, ttl).await?; + + Ok(snapshot) + } + .await; + + let release_result = release_admin_pool_selection_snapshot_index_lock(runtime, lock).await; + match (result, release_result) { + (Ok(snapshot), Ok(())) => Ok(snapshot), + (Err(err), _) => Err(err), + (Ok(_), Err(err)) => Err(err), + } +} + +pub(crate) async fn read_admin_pool_selection_snapshot( + state: &AdminAppState<'_>, + snapshot_id: &str, +) -> Result, GatewayError> { + let raw = state + .app() + .runtime_state() + .kv_get(&admin_pool_selection_snapshot_key(snapshot_id)) + .await + .map_err(|err| GatewayError::Internal(err.to_string()))?; + raw.map(|raw| serde_json::from_str::(&raw)) + .transpose() + .map_err(|err| GatewayError::Internal(err.to_string())) +} + +pub(crate) async fn validate_admin_pool_selection_snapshot_reference( + state: &AdminAppState<'_>, + request_context: &AdminRequestContext<'_>, + provider_id: &str, + reference: &AdminPoolSelectionSnapshotReference, +) -> Result>, GatewayError> { + let snapshot_id = reference.snapshot_id.trim(); + if snapshot_id.is_empty() { + return Ok(Err(build_admin_pool_error_response( + http::StatusCode::BAD_REQUEST, + "selection.snapshot_id should not be empty", + ))); + } + + let Some(snapshot) = read_admin_pool_selection_snapshot(state, snapshot_id).await? else { + return Ok(Err(build_admin_pool_error_response( + http::StatusCode::NOT_FOUND, + "selection snapshot 不存在或已清理", + ))); + }; + + if snapshot.provider_id != provider_id { + return Ok(Err(build_admin_pool_error_response( + http::StatusCode::CONFLICT, + "selection snapshot 与当前 Provider 不匹配", + ))); + } + if snapshot.status.trim() != ADMIN_POOL_SELECTION_SNAPSHOT_STATUS_READY + || snapshot.expires_at_unix_secs <= current_unix_secs() + { + return Ok(Err(build_admin_pool_error_response( + http::StatusCode::CONFLICT, + "selection snapshot 已过期,请重新选择", + ))); + } + if snapshot.total != reference.expected_total { + return Ok(Err(build_admin_pool_error_response( + http::StatusCode::CONFLICT, + "selection snapshot 数量与确认数量不一致,请刷新后重试", + ))); + } + + let current_admin_user_id = current_admin_user_id(request_context); + if snapshot.created_by.as_deref() != current_admin_user_id.as_deref() { + return Ok(Err(build_admin_pool_error_response( + http::StatusCode::FORBIDDEN, + "selection snapshot 不属于当前管理员", + ))); + } + + Ok(Ok(snapshot)) +} + +pub(crate) async fn resolve_admin_pool_selection_snapshot_key_ids( + state: &AdminAppState<'_>, + request_context: &AdminRequestContext<'_>, + provider_id: &str, + reference: &AdminPoolSelectionSnapshotReference, +) -> Result, Response>, GatewayError> { + let snapshot = match validate_admin_pool_selection_snapshot_reference( + state, + request_context, + provider_id, + reference, + ) + .await? + { + Ok(snapshot) => snapshot, + Err(response) => return Ok(Err(response)), + }; + + if snapshot.items.len() != snapshot.total { + return Ok(Err(build_admin_pool_error_response( + http::StatusCode::CONFLICT, + "selection snapshot 数据不完整,请重新选择", + ))); + } + + let mut key_ids = Vec::with_capacity(snapshot.total); + let mut seen_key_ids = BTreeSet::new(); + for item in snapshot.items { + if item.key_id.trim().is_empty() { + return Ok(Err(build_admin_pool_error_response( + http::StatusCode::CONFLICT, + "selection snapshot 数据不完整,请重新选择", + ))); + } + if !seen_key_ids.insert(item.key_id.clone()) { + return Ok(Err(build_admin_pool_error_response( + http::StatusCode::CONFLICT, + "selection snapshot 包含重复账号,请重新选择", + ))); + } + key_ids.push(item.key_id); + } + + Ok(Ok(key_ids)) +} diff --git a/apps/aether-gateway/src/handlers/admin/provider/pool_admin/support.rs b/apps/aether-gateway/src/handlers/admin/provider/pool_admin/support.rs index 7cad116b3..114dc2c69 100644 --- a/apps/aether-gateway/src/handlers/admin/provider/pool_admin/support.rs +++ b/apps/aether-gateway/src/handlers/admin/provider/pool_admin/support.rs @@ -35,6 +35,13 @@ pub(crate) struct AdminPoolKeySort { pub direction: AdminPoolKeySortDirection, } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) enum AdminPoolSearchScope { + Auto, + Name, + Full, +} + impl Default for AdminPoolKeySort { fn default() -> Self { Self { @@ -52,34 +59,40 @@ pub(crate) fn build_admin_pool_error_response( } pub(crate) fn parse_admin_pool_page(query: Option<&str>) -> Result { - match query_param_value(query, "page") { - None => Ok(1), - Some(value) => { - let parsed = value + let value = query_param_value(query, "page") + .map(|value| { + value .parse::() - .map_err(|_| "page must be an integer between 1 and 10000".to_string())?; - if (1..=10_000).contains(&parsed) { - Ok(parsed) - } else { - Err("page must be an integer between 1 and 10000".to_string()) - } - } + .map_err(|_| "page must be an integer between 1 and 10000".to_string()) + }) + .transpose()?; + parse_admin_pool_page_value(value) +} + +pub(crate) fn parse_admin_pool_page_value(value: Option) -> Result { + match value { + None => Ok(1), + Some(value) if (1..=10_000).contains(&value) => Ok(value), + Some(_) => Err("page must be an integer between 1 and 10000".to_string()), } } pub(crate) fn parse_admin_pool_page_size(query: Option<&str>) -> Result { - match query_param_value(query, "page_size") { - None => Ok(50), - Some(value) => { - let parsed = value + let value = query_param_value(query, "page_size") + .map(|value| { + value .parse::() - .map_err(|_| "page_size must be an integer between 1 and 200".to_string())?; - if (1..=200).contains(&parsed) { - Ok(parsed) - } else { - Err("page_size must be an integer between 1 and 200".to_string()) - } - } + .map_err(|_| "page_size must be an integer between 1 and 200".to_string()) + }) + .transpose()?; + parse_admin_pool_page_size_value(value) +} + +pub(crate) fn parse_admin_pool_page_size_value(value: Option) -> Result { + match value { + None => Ok(50), + Some(value) if (1..=200).contains(&value) => Ok(value), + Some(_) => Err("page_size must be an integer between 1 and 200".to_string()), } } @@ -89,6 +102,28 @@ pub(crate) fn parse_admin_pool_search(query: Option<&str>) -> Option { .filter(|value| !value.is_empty()) } +pub(crate) fn parse_admin_pool_search_scope( + query: Option<&str>, +) -> Result { + let value = query_param_value(query, "search_scope"); + parse_admin_pool_search_scope_value(value.as_deref()) +} + +pub(crate) fn parse_admin_pool_search_scope_value( + value: Option<&str>, +) -> Result { + match value + .map(|value| value.trim().to_ascii_lowercase()) + .filter(|value| !value.is_empty()) + .as_deref() + { + None => Ok(AdminPoolSearchScope::Auto), + Some("name") => Ok(AdminPoolSearchScope::Name), + Some("full") => Ok(AdminPoolSearchScope::Full), + Some(_) => Err("search_scope must be one of: name, full".to_string()), + } +} + pub(crate) fn parse_admin_pool_quick_selectors(query: Option<&str>) -> Vec { query_param_value(query, "quick_selectors") .map(|value| { @@ -102,10 +137,12 @@ pub(crate) fn parse_admin_pool_quick_selectors(query: Option<&str>) -> Vec) -> Result { - let value = query_param_value(query, "status") - .unwrap_or_else(|| "all".to_string()) - .trim() - .to_ascii_lowercase(); + let value = query_param_value(query, "status"); + parse_admin_pool_status_filter_value(value.as_deref()) +} + +pub(crate) fn parse_admin_pool_status_filter_value(value: Option<&str>) -> Result { + let value = value.unwrap_or("all").trim().to_ascii_lowercase(); match value.as_str() { "all" | "available" @@ -134,7 +171,16 @@ pub(crate) fn parse_admin_pool_status_filter(query: Option<&str>) -> Result) -> Result { - let field = match query_param_value(query, "sort_by") + let sort_by = query_param_value(query, "sort_by"); + let sort_order = query_param_value(query, "sort_order"); + parse_admin_pool_key_sort_values(sort_by.as_deref(), sort_order.as_deref()) +} + +pub(crate) fn parse_admin_pool_key_sort_values( + sort_by: Option<&str>, + sort_order: Option<&str>, +) -> Result { + let field = match sort_by .map(|value| value.trim().to_ascii_lowercase()) .filter(|value| !value.is_empty()) .as_deref() @@ -150,7 +196,7 @@ pub(crate) fn parse_admin_pool_key_sort(query: Option<&str>) -> Result) -> && path.starts_with("/api/admin/pool/") && path.ends_with("/keys/batch-action") && path.matches('/').count() == 6) + || (request_context.method() == http::Method::POST + && path.starts_with("/api/admin/pool/") + && path.ends_with("/keys/selection-snapshot") + && path.matches('/').count() == 6) || (request_context.method() == http::Method::POST && path.starts_with("/api/admin/pool/") && path.ends_with("/keys/resolve-selection") diff --git a/apps/aether-gateway/src/handlers/admin/request/provider/tasks.rs b/apps/aether-gateway/src/handlers/admin/request/provider/tasks.rs index d324a66a1..7d0cac736 100644 --- a/apps/aether-gateway/src/handlers/admin/request/provider/tasks.rs +++ b/apps/aether-gateway/src/handlers/admin/request/provider/tasks.rs @@ -10,6 +10,8 @@ use axum::{ }; use serde_json::json; +const ADMIN_POOL_BATCH_KEY_FETCH_CHUNK_SIZE: usize = 500; + impl<'a> AdminAppState<'a> { pub(crate) async fn clear_admin_provider_pool_cooldown(&self, provider_id: &str, key_id: &str) { crate::handlers::admin::provider::pool::runtime::clear_admin_provider_pool_cooldown( @@ -306,9 +308,7 @@ impl<'a> AdminAppState<'a> { provider_id: &str, payload: aether_admin::provider::pool::AdminPoolBatchActionRequest, ) -> Result, GatewayError> { - use aether_admin::provider::pool::{ - self as admin_provider_pool_pure, AdminPoolBatchActionKind, - }; + use aether_admin::provider::pool as admin_provider_pool_pure; let Some(provider) = self .read_provider_catalog_providers_by_ids(std::slice::from_ref(&provider_id.to_string())) @@ -334,12 +334,36 @@ impl<'a> AdminAppState<'a> { } }; - let keys = self - .read_provider_catalog_keys_by_ids(&plan.key_ids) - .await? - .into_iter() - .filter(|key| key.provider_id == provider.id) - .collect::>(); + let action_label = plan.action_label; + let affected = self + .execute_admin_pool_batch_action_plan(&provider, plan) + .await?; + + Ok(Json( + admin_provider_pool_pure::build_admin_pool_batch_action_result_payload( + affected, + action_label, + ), + ) + .into_response()) + } + + pub(crate) async fn execute_admin_pool_batch_action_plan( + &self, + provider: &StoredProviderCatalogProvider, + plan: aether_admin::provider::pool::AdminPoolBatchActionPlan, + ) -> Result { + use aether_admin::provider::pool::AdminPoolBatchActionKind; + + let mut keys = Vec::new(); + for chunk in plan.key_ids.chunks(ADMIN_POOL_BATCH_KEY_FETCH_CHUNK_SIZE) { + keys.extend( + self.read_provider_catalog_keys_by_ids(chunk) + .await? + .into_iter() + .filter(|key| key.provider_id == provider.id), + ); + } if plan.action == AdminPoolBatchActionKind::Delete { let deleted_key_ids = keys.iter().map(|key| key.id.clone()).collect::>(); @@ -359,13 +383,7 @@ impl<'a> AdminAppState<'a> { self.cleanup_deleted_provider_catalog_refs(&provider.id, &[], &deleted_key_ids) .await?; - return Ok(Json( - admin_provider_pool_pure::build_admin_pool_batch_action_result_payload( - affected, - plan.action_label, - ), - ) - .into_response()); + return Ok(affected); } let mut affected = 0usize; @@ -386,12 +404,6 @@ impl<'a> AdminAppState<'a> { } } - Ok(Json( - admin_provider_pool_pure::build_admin_pool_batch_action_result_payload( - affected, - plan.action_label, - ), - ) - .into_response()) + Ok(affected) } } diff --git a/apps/aether-gateway/src/handlers/shared/request_utils.rs b/apps/aether-gateway/src/handlers/shared/request_utils.rs index aba7e9606..fb52bde8f 100644 --- a/apps/aether-gateway/src/handlers/shared/request_utils.rs +++ b/apps/aether-gateway/src/handlers/shared/request_utils.rs @@ -366,6 +366,7 @@ pub(crate) fn admin_proxy_local_requires_buffered_body( | (Some("users_manage"), http::Method::PATCH, Some("lock_user_api_key")) | (Some("pool_manage"), http::Method::POST, Some("batch_import_keys")) | (Some("pool_manage"), http::Method::POST, Some("batch_action_keys")) + | (Some("pool_manage"), http::Method::POST, Some("create_selection_snapshot")) | (Some("pool_manage"), http::Method::POST, Some("resolve_selection")) | (Some("usage_manage"), http::Method::POST, Some("replay")) | (Some("wallets_manage"), http::Method::POST, Some("adjust_balance")) diff --git a/apps/aether-gateway/src/tests/control/admin/pool.rs b/apps/aether-gateway/src/tests/control/admin/pool.rs index aaaf89819..16e13e001 100644 --- a/apps/aether-gateway/src/tests/control/admin/pool.rs +++ b/apps/aether-gateway/src/tests/control/admin/pool.rs @@ -1,4 +1,6 @@ +use std::collections::BTreeSet; use std::sync::{Arc, Mutex}; +use std::time::Duration; use aether_crypto::{encrypt_python_fernet_plaintext, DEVELOPMENT_ENCRYPTION_KEY}; use aether_data::repository::pool_scores::InMemoryPoolMemberScoreRepository; @@ -7,7 +9,9 @@ use aether_data::repository::usage::InMemoryUsageReadRepository; use aether_data_contracts::repository::pool_scores::{ PoolMemberHardState, PoolMemberIdentity, PoolMemberProbeStatus, StoredPoolMemberScore, }; -use aether_data_contracts::repository::provider_catalog::ProviderCatalogReadRepository; +use aether_data_contracts::repository::provider_catalog::{ + ProviderCatalogReadRepository, ProviderCatalogWriteRepository, +}; use aether_data_contracts::repository::usage::StoredRequestUsageAudit; use axum::body::{to_bytes, Body, Bytes}; use axum::routing::{any, get, post}; @@ -18,7 +22,12 @@ use serde_json::json; use super::super::{ build_router_with_state, sample_endpoint, sample_key, sample_provider, start_server, AppState, }; -use crate::admin_api::{maybe_build_local_admin_pool_response, AdminAppState, AdminRequestContext}; +use crate::admin_api::{ + admin_pool_selection_snapshot_key, maybe_build_local_admin_pool_response, AdminAppState, + AdminPoolSelectionSnapshot, AdminPoolSelectionSnapshotItem, AdminRequestContext, + ADMIN_POOL_SELECTION_SNAPSHOT_MAX_ACTIVE_PER_ADMIN_PROVIDER, + ADMIN_POOL_SELECTION_SNAPSHOT_MAX_TOTAL, +}; use crate::ai_serving::{provider_key_pool_score_id, provider_key_pool_score_scope}; use crate::audit::AdminAuditEvent; use crate::constants::{ @@ -77,6 +86,77 @@ async fn local_admin_pool_response( .expect("pool route should resolve locally") } +fn pool_test_state( + provider_catalog_repository: Arc, +) -> AppState { + AppState::new() + .expect("gateway should build") + .with_data_state_for_tests( + GatewayDataState::with_provider_catalog_repository_for_tests( + provider_catalog_repository, + ), + ) +} + +async fn response_json(response: axum::response::Response) -> serde_json::Value { + serde_json::from_slice( + &to_bytes(response.into_body(), usize::MAX) + .await + .expect("body should read"), + ) + .expect("json body should parse") +} + +fn list_payload_key_ids(payload: &serde_json::Value) -> Vec { + payload["keys"] + .as_array() + .expect("keys should be array") + .iter() + .filter_map(|item| item["key_id"].as_str()) + .map(ToOwned::to_owned) + .collect() +} + +async fn create_pool_selection_snapshot_payload( + state: &AppState, + provider_id: &str, + body: serde_json::Value, +) -> serde_json::Value { + let response = local_admin_pool_response( + state, + http::Method::POST, + &format!("/api/admin/pool/{provider_id}/keys/selection-snapshot"), + Some(body), + ) + .await; + assert_eq!(response.status(), StatusCode::OK); + response_json(response).await +} + +async fn read_runtime_pool_selection_snapshot( + state: &AppState, + snapshot_id: &str, +) -> Option { + state + .runtime_kv_get(&admin_pool_selection_snapshot_key(snapshot_id)) + .await + .expect("snapshot runtime lookup should succeed") + .map(|raw| serde_json::from_str(&raw).expect("snapshot runtime payload should parse")) +} + +async fn write_runtime_pool_selection_snapshot( + state: &AppState, + snapshot: AdminPoolSelectionSnapshot, +) { + let key = admin_pool_selection_snapshot_key(&snapshot.id); + let value = serde_json::to_string(&snapshot).expect("snapshot should serialize"); + state + .runtime_state() + .kv_set(&key, value, Some(Duration::from_secs(60 * 60))) + .await + .expect("snapshot runtime write should succeed"); +} + fn sample_pool_member_score(provider_id: &str, key_id: &str, score: f64) -> StoredPoolMemberScore { let score_scope = provider_key_pool_score_scope(); let score_identity = PoolMemberIdentity::provider_api_key(provider_id, key_id); @@ -918,6 +998,1449 @@ async fn gateway_handles_admin_pool_list_keys_locally_with_trusted_admin_princip upstream_handle.abort(); } +#[tokio::test] +async fn gateway_pool_list_keys_keeps_legacy_full_search_for_default_filtered_scope() { + let provider = sample_provider("provider-openai", "openai", 10); + let mut inactive_note_key = sample_key( + "key-openai-note", + "provider-openai", + "openai:chat", + "sk-note", + ); + inactive_note_key.name = "plain inactive account".to_string(); + inactive_note_key.note = Some("legacy-search-marker".to_string()); + inactive_note_key.is_active = false; + let mut active_note_key = sample_key( + "key-openai-active", + "provider-openai", + "openai:chat", + "sk-active", + ); + active_note_key.name = "plain active account".to_string(); + active_note_key.note = Some("legacy-search-marker".to_string()); + + let provider_catalog_repository = Arc::new(InMemoryProviderCatalogReadRepository::seed( + vec![provider], + Vec::new(), + vec![inactive_note_key, active_note_key], + )); + let state = pool_test_state(provider_catalog_repository); + + let legacy_response = local_admin_pool_response( + &state, + http::Method::GET, + "/api/admin/pool/provider-openai/keys?page=1&page_size=10&search=legacy-search-marker&status=inactive", + None, + ) + .await; + assert_eq!(legacy_response.status(), StatusCode::OK); + let legacy_payload = response_json(legacy_response).await; + assert_eq!(legacy_payload["total"], json!(1)); + assert_eq!( + list_payload_key_ids(&legacy_payload), + vec!["key-openai-note"] + ); + + let name_scope_response = local_admin_pool_response( + &state, + http::Method::GET, + "/api/admin/pool/provider-openai/keys?page=1&page_size=10&search=legacy-search-marker&status=inactive&search_scope=name", + None, + ) + .await; + assert_eq!(name_scope_response.status(), StatusCode::OK); + let name_scope_payload = response_json(name_scope_response).await; + assert_eq!(name_scope_payload["total"], json!(0)); +} + +#[tokio::test] +async fn gateway_pool_create_selection_snapshot_freezes_filtered_result() { + let provider = sample_provider("provider-openai", "openai", 10); + let mut first_key = sample_key("key-openai-a", "provider-openai", "openai:chat", "sk-a"); + first_key.name = "alpha first".to_string(); + let mut second_key = sample_key("key-openai-b", "provider-openai", "openai:chat", "sk-b"); + second_key.name = "alpha second".to_string(); + let mut other_key = sample_key("key-openai-c", "provider-openai", "openai:chat", "sk-c"); + other_key.name = "beta other".to_string(); + + let provider_catalog_repository = Arc::new(InMemoryProviderCatalogReadRepository::seed( + vec![provider], + Vec::new(), + vec![first_key, second_key, other_key], + )); + let state = pool_test_state(Arc::clone(&provider_catalog_repository)); + + let response = local_admin_pool_response( + &state, + http::Method::GET, + "/api/admin/pool/provider-openai/keys?page=1&page_size=1&search=alpha&status=all", + None, + ) + .await; + + assert_eq!(response.status(), StatusCode::OK); + let payload: serde_json::Value = serde_json::from_slice( + &to_bytes(response.into_body(), usize::MAX) + .await + .expect("body should read"), + ) + .expect("json body should parse"); + assert_eq!(payload["total"], json!(2)); + assert_eq!(payload["keys"].as_array().map(Vec::len), Some(1)); + assert!(payload.get("selection_snapshot").is_none()); + + let snapshot_payload = create_pool_selection_snapshot_payload( + &state, + "provider-openai", + json!({ + "page": 1, + "page_size": 1, + "search": "alpha", + "status": "all", + "expected_total": payload["total"], + "expected_page_key_ids": list_payload_key_ids(&payload), + }), + ) + .await; + assert_eq!(snapshot_payload["selection_snapshot"]["total"], json!(2)); + assert_eq!( + snapshot_payload["selection_snapshot"]["status"], + json!("ready") + ); + let snapshot_id = snapshot_payload["selection_snapshot"]["id"] + .as_str() + .expect("snapshot id should be present") + .to_string(); + + let mut new_key = sample_key("key-openai-d", "provider-openai", "openai:chat", "sk-d"); + new_key.name = "alpha newly imported".to_string(); + provider_catalog_repository + .create_key(&new_key) + .await + .expect("new matching key should be stored"); + + let snapshot = read_runtime_pool_selection_snapshot(&state, &snapshot_id) + .await + .expect("snapshot should exist"); + assert_eq!(snapshot.total, 2); + assert_eq!(snapshot.created_by.as_deref(), Some("admin-user-123")); + + let snapshot_key_ids = snapshot + .items + .iter() + .map(|item| item.key_id.as_str()) + .collect::>(); + assert_eq!(snapshot_key_ids.len(), 2); + assert!(!snapshot_key_ids.contains(&"key-openai-d")); + + let resolve_response = local_admin_pool_response( + &state, + http::Method::POST, + "/api/admin/pool/provider-openai/keys/resolve-selection", + Some(json!({ + "snapshot_id": snapshot_id, + "expected_total": 2 + })), + ) + .await; + assert_eq!(resolve_response.status(), StatusCode::OK); + let resolve_payload: serde_json::Value = serde_json::from_slice( + &to_bytes(resolve_response.into_body(), usize::MAX) + .await + .expect("body should read"), + ) + .expect("json body should parse"); + let resolved_ids = resolve_payload["items"] + .as_array() + .expect("resolved items should be array") + .iter() + .filter_map(|item| item["key_id"].as_str()) + .collect::>(); + assert_eq!(resolve_payload["total"], json!(2)); + assert_eq!(resolved_ids.len(), 2); + assert!(!resolved_ids.contains(&"key-openai-d")); +} + +#[tokio::test] +async fn gateway_pool_list_keys_stays_read_only_for_legacy_snapshot_query_params() { + let provider = sample_provider("provider-openai", "openai", 10); + let mut key = sample_key("key-openai-a", "provider-openai", "openai:chat", "sk-a"); + key.name = "alpha first".to_string(); + + let provider_catalog_repository = Arc::new(InMemoryProviderCatalogReadRepository::seed( + vec![provider], + Vec::new(), + vec![key], + )); + let state = AppState::new() + .expect("gateway should build") + .with_data_state_for_tests( + GatewayDataState::with_provider_catalog_repository_for_tests( + provider_catalog_repository, + ), + ); + + let response = local_admin_pool_response( + &state, + http::Method::GET, + "/api/admin/pool/provider-openai/keys?page=1&page_size=1&search=alpha&status=all&include_selection_snapshot=true", + None, + ) + .await; + assert_eq!(response.status(), StatusCode::OK); + let payload = response_json(response).await; + assert_eq!(payload["total"], json!(1)); + assert!(payload.get("selection_snapshot").is_none()); + assert!(payload.get("selection_snapshot_mismatch").is_none()); +} + +#[tokio::test] +async fn gateway_pool_selection_snapshot_uses_same_full_search_scope_as_list() { + let provider = sample_provider("provider-openai", "openai", 10); + let mut oauth_key = sample_key("key-openai-oauth", "provider-openai", "openai:chat", "sk-a"); + oauth_key.name = "plain account".to_string(); + oauth_key.auth_type = "oauth".to_string(); + let mut note_key = sample_key("key-openai-note", "provider-openai", "openai:chat", "sk-b"); + note_key.name = "ordinary account".to_string(); + note_key.note = Some("contains-search-marker".to_string()); + let mut other_key = sample_key("key-openai-other", "provider-openai", "openai:chat", "sk-c"); + other_key.name = "unrelated account".to_string(); + + let provider_catalog_repository = Arc::new(InMemoryProviderCatalogReadRepository::seed( + vec![provider], + Vec::new(), + vec![oauth_key, note_key, other_key], + )); + let state = pool_test_state(Arc::clone(&provider_catalog_repository)); + + let list_response = local_admin_pool_response( + &state, + http::Method::GET, + "/api/admin/pool/provider-openai/keys?page=1&page_size=10&search=oauth&search_scope=full&status=all", + None, + ) + .await; + assert_eq!(list_response.status(), StatusCode::OK); + let list_payload = response_json(list_response).await; + assert_eq!(list_payload["total"], json!(1)); + assert_eq!( + list_payload_key_ids(&list_payload), + vec!["key-openai-oauth"] + ); + + let snapshot_payload = create_pool_selection_snapshot_payload( + &state, + "provider-openai", + json!({ + "page": 1, + "page_size": 10, + "search": "oauth", + "search_scope": "full", + "status": "all", + "expected_total": list_payload["total"], + "expected_page_key_ids": list_payload_key_ids(&list_payload), + }), + ) + .await; + assert!(snapshot_payload + .get("selection_snapshot_mismatch") + .is_none()); + assert_eq!(snapshot_payload["selection_snapshot"]["total"], json!(1)); + + let snapshot_id = snapshot_payload["selection_snapshot"]["id"] + .as_str() + .expect("snapshot id should be present"); + let snapshot = read_runtime_pool_selection_snapshot(&state, snapshot_id) + .await + .expect("snapshot should exist"); + assert_eq!(snapshot.items.len(), 1); + assert_eq!(snapshot.items[0].key_id, "key-openai-oauth"); +} + +#[tokio::test] +async fn gateway_pool_selection_snapshot_reports_drift_while_writing_current_snapshot() { + let provider = sample_provider("provider-openai", "openai", 10); + let mut first_key = sample_key("key-openai-a", "provider-openai", "openai:chat", "sk-a"); + first_key.name = "alpha first".to_string(); + let mut second_key = sample_key("key-openai-b", "provider-openai", "openai:chat", "sk-b"); + second_key.name = "alpha second".to_string(); + let mut third_key = sample_key("key-openai-c", "provider-openai", "openai:chat", "sk-c"); + third_key.name = "alpha third".to_string(); + + let provider_catalog_repository = Arc::new(InMemoryProviderCatalogReadRepository::seed( + vec![provider], + Vec::new(), + vec![first_key, second_key, third_key], + )); + let state = pool_test_state(Arc::clone(&provider_catalog_repository)); + + let response = local_admin_pool_response( + &state, + http::Method::POST, + "/api/admin/pool/provider-openai/keys/selection-snapshot", + Some(json!({ + "page": 1, + "page_size": 2, + "search": "alpha", + "status": "all", + "expected_total": 2, + "expected_page_key_ids": ["key-openai-a", "key-openai-b"], + })), + ) + .await; + + assert_eq!(response.status(), StatusCode::OK); + let payload = response_json(response).await; + assert_eq!(payload["total"], json!(3)); + assert_eq!(payload["selection_snapshot"]["total"], json!(3)); + assert_eq!( + payload["selection_snapshot_mismatch"], + json!({ + "reason": "total_changed", + "expected_total": 2, + "actual_total": 3, + }) + ); + + let response = local_admin_pool_response( + &state, + http::Method::POST, + "/api/admin/pool/provider-openai/keys/selection-snapshot", + Some(json!({ + "page": 1, + "page_size": 2, + "search": "alpha", + "status": "all", + "expected_total": 3, + "expected_page_key_ids": ["key-openai-b", "key-openai-a"], + })), + ) + .await; + + assert_eq!(response.status(), StatusCode::OK); + let payload = response_json(response).await; + assert_eq!(payload["total"], json!(3)); + assert_eq!(payload["selection_snapshot"]["total"], json!(3)); + assert_eq!( + payload["selection_snapshot_mismatch"], + json!({ + "reason": "page_keys_changed", + "expected_total": 3, + "actual_total": 3, + }) + ); + + let snapshot_id = payload["selection_snapshot"]["id"] + .as_str() + .expect("snapshot id should be present"); + assert!( + read_runtime_pool_selection_snapshot(&state, snapshot_id) + .await + .is_some(), + "drifted selections should still write snapshots" + ); +} + +#[tokio::test] +async fn gateway_pool_selection_snapshot_returns_current_page_keys_after_drift() { + let provider = sample_provider("provider-openai", "openai", 10); + let mut keys = Vec::new(); + for index in 0..5 { + let key_id = format!("key-openai-{index}"); + let mut key = sample_key( + &key_id, + "provider-openai", + "openai:chat", + &format!("sk-{index}"), + ); + key.name = format!("alpha {index:03}"); + keys.push(key); + } + + let provider_catalog_repository = Arc::new(InMemoryProviderCatalogReadRepository::seed( + vec![provider], + Vec::new(), + keys, + )); + let state = pool_test_state(Arc::clone(&provider_catalog_repository)); + + let payload = create_pool_selection_snapshot_payload( + &state, + "provider-openai", + json!({ + "page": 1, + "page_size": 50, + "search": "alpha", + "status": "all", + "expected_total": 4, + "expected_page_key_ids": [ + "key-openai-0", + "key-openai-1", + "key-openai-2", + "key-openai-3", + ], + }), + ) + .await; + + assert_eq!(payload["total"], json!(5)); + assert_eq!(payload["selection_snapshot"]["total"], json!(5)); + assert_eq!( + list_payload_key_ids(&payload), + vec![ + "key-openai-0", + "key-openai-1", + "key-openai-2", + "key-openai-3", + "key-openai-4", + ] + ); + assert_eq!( + payload["selection_snapshot_mismatch"], + json!({ + "reason": "page_keys_changed", + "expected_total": 4, + "actual_total": 5, + }) + ); +} + +#[tokio::test] +async fn gateway_pool_selection_snapshot_uses_same_default_page_order_as_list() { + let provider = sample_provider("provider-openai", "openai", 10); + let mut alpha_key = sample_key("key-openai-alpha", "provider-openai", "openai:chat", "sk-a"); + alpha_key.name = "Alpha same timestamp".to_string(); + alpha_key.created_at_unix_ms = Some(2_000); + let mut beta_key = sample_key("key-openai-beta", "provider-openai", "openai:chat", "sk-b"); + beta_key.name = "beta same timestamp".to_string(); + beta_key.created_at_unix_ms = Some(2_000); + let mut older_key = sample_key("key-openai-older", "provider-openai", "openai:chat", "sk-c"); + older_key.name = "older account".to_string(); + older_key.created_at_unix_ms = Some(1_000); + + let provider_catalog_repository = Arc::new(InMemoryProviderCatalogReadRepository::seed( + vec![provider], + Vec::new(), + vec![older_key, beta_key, alpha_key], + )); + let state = pool_test_state(Arc::clone(&provider_catalog_repository)); + + let list_response = local_admin_pool_response( + &state, + http::Method::GET, + "/api/admin/pool/provider-openai/keys?page=1&page_size=2&status=all", + None, + ) + .await; + assert_eq!(list_response.status(), StatusCode::OK); + let list_payload = response_json(list_response).await; + assert_eq!(list_payload["total"], json!(3)); + let list_page_key_ids = list_payload_key_ids(&list_payload); + assert_eq!(list_page_key_ids.len(), 2); + + let snapshot_payload = create_pool_selection_snapshot_payload( + &state, + "provider-openai", + json!({ + "page": 1, + "page_size": 2, + "status": "all", + "expected_total": list_payload["total"], + "expected_page_key_ids": list_page_key_ids, + }), + ) + .await; + assert!(snapshot_payload + .get("selection_snapshot_mismatch") + .is_none()); + assert_eq!(snapshot_payload["selection_snapshot"]["total"], json!(3)); + + let snapshot_id = snapshot_payload["selection_snapshot"]["id"] + .as_str() + .expect("snapshot id should be present"); + let snapshot = read_runtime_pool_selection_snapshot(&state, snapshot_id) + .await + .expect("snapshot should exist"); + let snapshot_first_page_ids = snapshot + .items + .into_iter() + .take(2) + .map(|item| item.key_id) + .collect::>(); + assert_eq!(snapshot_first_page_ids, list_payload_key_ids(&list_payload)); +} + +#[tokio::test] +async fn gateway_pool_batch_action_uses_selection_snapshot_targets() { + let provider = sample_provider("provider-openai", "openai", 10); + let mut first_key = sample_key("key-openai-a", "provider-openai", "openai:chat", "sk-a"); + first_key.name = "alpha first".to_string(); + let mut second_key = sample_key("key-openai-b", "provider-openai", "openai:chat", "sk-b"); + second_key.name = "alpha second".to_string(); + let mut other_key = sample_key("key-openai-c", "provider-openai", "openai:chat", "sk-c"); + other_key.name = "beta other".to_string(); + + let provider_catalog_repository = Arc::new(InMemoryProviderCatalogReadRepository::seed( + vec![provider], + Vec::new(), + vec![first_key, second_key, other_key], + )); + let state = pool_test_state(Arc::clone(&provider_catalog_repository)); + + let list_response = local_admin_pool_response( + &state, + http::Method::GET, + "/api/admin/pool/provider-openai/keys?page=1&page_size=1&search=alpha&status=all", + None, + ) + .await; + assert_eq!(list_response.status(), StatusCode::OK); + let list_payload: serde_json::Value = serde_json::from_slice( + &to_bytes(list_response.into_body(), usize::MAX) + .await + .expect("body should read"), + ) + .expect("json body should parse"); + let snapshot_payload = create_pool_selection_snapshot_payload( + &state, + "provider-openai", + json!({ + "page": 1, + "page_size": 1, + "search": "alpha", + "status": "all", + "expected_total": list_payload["total"], + "expected_page_key_ids": list_payload_key_ids(&list_payload), + }), + ) + .await; + let snapshot_id = snapshot_payload["selection_snapshot"]["id"] + .as_str() + .expect("snapshot id should be present") + .to_string(); + assert_eq!(snapshot_payload["selection_snapshot"]["total"], json!(2)); + + let mismatch_response = local_admin_pool_response( + &state, + http::Method::POST, + "/api/admin/pool/provider-openai/keys/batch-action", + Some(json!({ + "selection": { + "type": "snapshot", + "snapshot_id": snapshot_id.clone(), + "expected_total": 3 + }, + "action": "disable" + })), + ) + .await; + assert_eq!(mismatch_response.status(), StatusCode::CONFLICT); + + let mut new_key = sample_key("key-openai-d", "provider-openai", "openai:chat", "sk-d"); + new_key.name = "alpha newly imported".to_string(); + provider_catalog_repository + .create_key(&new_key) + .await + .expect("new matching key should be stored"); + + let action_response = local_admin_pool_response( + &state, + http::Method::POST, + "/api/admin/pool/provider-openai/keys/batch-action", + Some(json!({ + "selection": { + "type": "snapshot", + "snapshot_id": snapshot_id, + "expected_total": 2 + }, + "action": "disable" + })), + ) + .await; + assert_eq!(action_response.status(), StatusCode::OK); + let action_payload: serde_json::Value = serde_json::from_slice( + &to_bytes(action_response.into_body(), usize::MAX) + .await + .expect("body should read"), + ) + .expect("json body should parse"); + assert_eq!(action_payload["affected"], json!(2)); + + let stored_keys = provider_catalog_repository + .list_keys_by_ids(&[ + "key-openai-a".to_string(), + "key-openai-b".to_string(), + "key-openai-d".to_string(), + ]) + .await + .expect("keys should list") + .into_iter() + .map(|key| (key.id, key.is_active)) + .collect::>(); + assert_eq!(stored_keys.get("key-openai-a"), Some(&false)); + assert_eq!(stored_keys.get("key-openai-b"), Some(&false)); + assert_eq!(stored_keys.get("key-openai-d"), Some(&true)); +} + +#[tokio::test] +async fn gateway_pool_selection_snapshot_keeps_only_recent_runtime_snapshots() { + let provider = sample_provider("provider-openai", "openai", 10); + let mut key = sample_key("key-openai-a", "provider-openai", "openai:chat", "sk-a"); + key.name = "alpha only".to_string(); + + let provider_catalog_repository = Arc::new(InMemoryProviderCatalogReadRepository::seed( + vec![provider], + Vec::new(), + vec![key], + )); + let state = pool_test_state(Arc::clone(&provider_catalog_repository)); + + let mut snapshot_ids = Vec::new(); + for _ in 0..=ADMIN_POOL_SELECTION_SNAPSHOT_MAX_ACTIVE_PER_ADMIN_PROVIDER { + let snapshot_payload = create_pool_selection_snapshot_payload( + &state, + "provider-openai", + json!({ + "page": 1, + "page_size": 1, + "search": "alpha", + "status": "all", + "expected_total": 1, + "expected_page_key_ids": ["key-openai-a"], + }), + ) + .await; + snapshot_ids.push( + snapshot_payload["selection_snapshot"]["id"] + .as_str() + .expect("snapshot id should be present") + .to_string(), + ); + } + + assert!( + read_runtime_pool_selection_snapshot(&state, &snapshot_ids[0]) + .await + .is_none(), + "oldest runtime snapshot should be pruned" + ); + assert!( + read_runtime_pool_selection_snapshot( + &state, + snapshot_ids + .last() + .expect("latest snapshot id should exist"), + ) + .await + .is_some(), + "latest runtime snapshot should remain" + ); +} + +#[tokio::test] +async fn gateway_pool_snapshot_batch_action_uses_runtime_snapshot_blob_membership() { + let provider = sample_provider("provider-openai", "openai", 10); + let mut first_key = sample_key("key-openai-a", "provider-openai", "openai:chat", "sk-a"); + first_key.name = "alpha first".to_string(); + let mut second_key = sample_key("key-openai-b", "provider-openai", "openai:chat", "sk-b"); + second_key.name = "alpha second".to_string(); + + let provider_catalog_repository = Arc::new(InMemoryProviderCatalogReadRepository::seed( + vec![provider], + Vec::new(), + vec![first_key, second_key], + )); + let state = pool_test_state(Arc::clone(&provider_catalog_repository)); + write_runtime_pool_selection_snapshot( + &state, + AdminPoolSelectionSnapshot { + id: "snapshot-runtime".to_string(), + provider_id: "provider-openai".to_string(), + created_by: Some("admin-user-123".to_string()), + filter_json: None, + total: 2, + status: "ready".to_string(), + created_at_unix_secs: 100, + expires_at_unix_secs: 4_000_000_000, + items: vec![ + AdminPoolSelectionSnapshotItem { + ordinal: 0, + key_id: "key-openai-a".to_string(), + key_updated_at_unix_secs: None, + }, + AdminPoolSelectionSnapshotItem { + ordinal: 1, + key_id: "key-openai-b".to_string(), + key_updated_at_unix_secs: None, + }, + ], + }, + ) + .await; + + let action_response = local_admin_pool_response( + &state, + http::Method::POST, + "/api/admin/pool/provider-openai/keys/batch-action", + Some(json!({ + "selection": { + "type": "snapshot", + "snapshot_id": "snapshot-runtime", + "expected_total": 2 + }, + "action": "disable" + })), + ) + .await; + assert_eq!(action_response.status(), StatusCode::OK); + let action_payload = response_json(action_response).await; + assert_eq!(action_payload["affected"], json!(2)); + + let stored_keys = provider_catalog_repository + .list_keys_by_ids(&["key-openai-a".to_string(), "key-openai-b".to_string()]) + .await + .expect("keys should list") + .into_iter() + .map(|key| (key.id, key.is_active)) + .collect::>(); + assert_eq!(stored_keys.get("key-openai-a"), Some(&false)); + assert_eq!(stored_keys.get("key-openai-b"), Some(&false)); +} + +#[tokio::test] +async fn gateway_pool_selection_snapshot_handles_large_result_sets_consistently() { + const KEY_COUNT: usize = 5_000; + + let provider = sample_provider("provider-openai", "openai", 10); + let mut keys = Vec::with_capacity(KEY_COUNT + 25); + for index in 0..KEY_COUNT { + let key_id = format!("key-openai-bulk-{index:05}"); + let mut key = sample_key( + &key_id, + "provider-openai", + "openai:chat", + &format!("sk-bulk-{index:05}"), + ); + key.name = format!("bulk alpha {index:05}"); + key.updated_at_unix_secs = Some(1_700_000_000 + index as u64); + keys.push(key); + } + for index in 0..25 { + let key_id = format!("key-openai-noise-{index:05}"); + let mut key = sample_key( + &key_id, + "provider-openai", + "openai:chat", + &format!("sk-noise-{index:05}"), + ); + key.name = format!("noise beta {index:05}"); + keys.push(key); + } + + let provider_catalog_repository = Arc::new(InMemoryProviderCatalogReadRepository::seed( + vec![provider], + Vec::new(), + keys, + )); + let state = pool_test_state(Arc::clone(&provider_catalog_repository)); + + let list_response = local_admin_pool_response( + &state, + http::Method::GET, + "/api/admin/pool/provider-openai/keys?page=1&page_size=25&search=bulk%20alpha&status=all", + None, + ) + .await; + assert_eq!(list_response.status(), StatusCode::OK); + let list_payload = response_json(list_response).await; + assert_eq!(list_payload["total"], json!(KEY_COUNT)); + assert_eq!(list_payload["keys"].as_array().map(Vec::len), Some(25)); + let snapshot_payload = create_pool_selection_snapshot_payload( + &state, + "provider-openai", + json!({ + "page": 1, + "page_size": 25, + "search": "bulk alpha", + "status": "all", + "expected_total": list_payload["total"], + "expected_page_key_ids": list_payload_key_ids(&list_payload), + }), + ) + .await; + assert_eq!( + snapshot_payload["selection_snapshot"]["total"], + json!(KEY_COUNT) + ); + let snapshot_id = snapshot_payload["selection_snapshot"]["id"] + .as_str() + .expect("snapshot id should be present") + .to_string(); + + let snapshot = read_runtime_pool_selection_snapshot(&state, &snapshot_id) + .await + .expect("snapshot should exist"); + let first_page = snapshot.items.iter().take(1_000).collect::>(); + assert_eq!(first_page.len(), 1_000); + assert_eq!(first_page.first().map(|item| item.ordinal), Some(0)); + assert_eq!( + first_page.first().map(|item| item.key_id.as_str()), + Some("key-openai-bulk-00000") + ); + assert_eq!(first_page.last().map(|item| item.ordinal), Some(999)); + + let tail_page = snapshot + .items + .iter() + .skip(4_000) + .take(1_100) + .collect::>(); + assert_eq!(tail_page.len(), 1_000); + assert_eq!(tail_page.first().map(|item| item.ordinal), Some(4_000)); + assert_eq!( + tail_page.last().map(|item| item.key_id.as_str()), + Some("key-openai-bulk-04999") + ); + + let resolve_response = local_admin_pool_response( + &state, + http::Method::POST, + "/api/admin/pool/provider-openai/keys/resolve-selection", + Some(json!({ + "snapshot_id": snapshot_id, + "expected_total": KEY_COUNT + })), + ) + .await; + assert_eq!(resolve_response.status(), StatusCode::OK); + let resolve_payload = response_json(resolve_response).await; + let resolved_items = resolve_payload["items"] + .as_array() + .expect("resolved items should be array"); + assert_eq!(resolve_payload["total"], json!(KEY_COUNT)); + assert_eq!(resolved_items.len(), KEY_COUNT); + assert_eq!(resolved_items[0]["key_id"], json!("key-openai-bulk-00000")); + assert_eq!( + resolved_items[KEY_COUNT - 1]["key_id"], + json!("key-openai-bulk-04999") + ); + + let snapshot_id = snapshot_payload["selection_snapshot"]["id"] + .as_str() + .expect("snapshot id should be present") + .to_string(); + let action_response = local_admin_pool_response( + &state, + http::Method::POST, + "/api/admin/pool/provider-openai/keys/batch-action", + Some(json!({ + "selection": { + "type": "snapshot", + "snapshot_id": snapshot_id, + "expected_total": KEY_COUNT + }, + "action": "disable" + })), + ) + .await; + assert_eq!(action_response.status(), StatusCode::OK); + let action_payload = response_json(action_response).await; + assert_eq!(action_payload["affected"], json!(KEY_COUNT)); + + let sampled_keys = provider_catalog_repository + .list_keys_by_ids(&[ + "key-openai-bulk-00000".to_string(), + "key-openai-bulk-02500".to_string(), + "key-openai-bulk-04999".to_string(), + "key-openai-noise-00000".to_string(), + ]) + .await + .expect("sampled keys should list") + .into_iter() + .map(|key| (key.id, key.is_active)) + .collect::>(); + assert_eq!(sampled_keys.get("key-openai-bulk-00000"), Some(&false)); + assert_eq!(sampled_keys.get("key-openai-bulk-02500"), Some(&false)); + assert_eq!(sampled_keys.get("key-openai-bulk-04999"), Some(&false)); + assert_eq!(sampled_keys.get("key-openai-noise-00000"), Some(&true)); +} + +#[tokio::test] +async fn gateway_pool_selection_snapshot_rejects_results_above_runtime_limit() { + let provider = sample_provider("provider-openai", "openai", 10); + let mut keys = Vec::with_capacity(ADMIN_POOL_SELECTION_SNAPSHOT_MAX_TOTAL + 1); + for index in 0..=ADMIN_POOL_SELECTION_SNAPSHOT_MAX_TOTAL { + let key_id = format!("key-openai-over-limit-{index:05}"); + let mut key = sample_key( + &key_id, + "provider-openai", + "openai:chat", + &format!("sk-over-limit-{index:05}"), + ); + key.name = format!("over limit alpha {index:05}"); + keys.push(key); + } + + let provider_catalog_repository = Arc::new(InMemoryProviderCatalogReadRepository::seed( + vec![provider], + Vec::new(), + keys, + )); + let state = pool_test_state(Arc::clone(&provider_catalog_repository)); + + let response = local_admin_pool_response( + &state, + http::Method::POST, + "/api/admin/pool/provider-openai/keys/selection-snapshot", + Some(json!({ + "page": 1, + "page_size": 50, + "search": "over limit alpha", + "status": "all", + "expected_total": ADMIN_POOL_SELECTION_SNAPSHOT_MAX_TOTAL + 1, + "expected_page_key_ids": (0..50) + .map(|index| format!("key-openai-over-limit-{index:05}")) + .collect::>(), + })), + ) + .await; + + assert_eq!(response.status(), StatusCode::PAYLOAD_TOO_LARGE); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn gateway_pool_selection_snapshot_stays_consistent_under_concurrent_growth() { + const INITIAL_KEYS: usize = 80; + const ADDED_KEYS: usize = 120; + const SNAPSHOT_REQUESTS: usize = 24; + + let provider = sample_provider("provider-openai", "openai", 10); + let mut keys = Vec::with_capacity(INITIAL_KEYS); + for index in 0..INITIAL_KEYS { + let key_id = format!("key-openai-initial-{index:03}"); + let mut key = sample_key( + &key_id, + "provider-openai", + "openai:chat", + &format!("sk-initial-{index:03}"), + ); + key.name = format!("alpha concurrent initial {index:03}"); + keys.push(key); + } + + let provider_catalog_repository = Arc::new(InMemoryProviderCatalogReadRepository::seed( + vec![provider], + Vec::new(), + keys, + )); + let state = pool_test_state(Arc::clone(&provider_catalog_repository)); + let start_barrier = Arc::new(tokio::sync::Barrier::new(SNAPSHOT_REQUESTS + 1)); + + let writer = tokio::spawn({ + let provider_catalog_repository = Arc::clone(&provider_catalog_repository); + let start_barrier = Arc::clone(&start_barrier); + async move { + start_barrier.wait().await; + for index in 0..ADDED_KEYS { + let key_id = format!("key-openai-added-{index:03}"); + let mut key = sample_key( + &key_id, + "provider-openai", + "openai:chat", + &format!("sk-added-{index:03}"), + ); + key.name = format!("alpha concurrent added {index:03}"); + provider_catalog_repository + .create_key(&key) + .await + .expect("concurrent key should be stored"); + if index % 5 == 0 { + tokio::task::yield_now().await; + } + } + } + }); + + let mut handles = Vec::with_capacity(SNAPSHOT_REQUESTS); + for _ in 0..SNAPSHOT_REQUESTS { + let state = state.clone(); + let start_barrier = Arc::clone(&start_barrier); + handles.push(tokio::spawn(async move { + start_barrier.wait().await; + for _attempt in 0..20 { + let response = local_admin_pool_response( + &state, + http::Method::GET, + "/api/admin/pool/provider-openai/keys?page=1&page_size=7&search=alpha%20concurrent&status=all", + None, + ) + .await; + assert_eq!(response.status(), StatusCode::OK); + let payload = response_json(response).await; + let total = payload["total"] + .as_u64() + .expect("total should be numeric") as usize; + let snapshot_payload = create_pool_selection_snapshot_payload( + &state, + "provider-openai", + json!({ + "page": 1, + "page_size": 7, + "search": "alpha concurrent", + "status": "all", + "expected_total": total, + "expected_page_key_ids": list_payload_key_ids(&payload), + }), + ) + .await; + if let Some(snapshot) = snapshot_payload.get("selection_snapshot") { + let snapshot_total = snapshot["total"] + .as_u64() + .expect("snapshot total should be numeric") + as usize; + assert!( + (total..=INITIAL_KEYS + ADDED_KEYS).contains(&snapshot_total), + "snapshot total {snapshot_total} should include the listed {total} items and stay within growth bounds" + ); + let snapshot_id = snapshot["id"] + .as_str() + .expect("snapshot id should be present") + .to_string(); + return (snapshot_id, snapshot_total); + } + tokio::task::yield_now().await; + } + panic!("selection snapshot should eventually stabilize") + })); + } + + let mut snapshots = Vec::with_capacity(SNAPSHOT_REQUESTS); + for handle in handles { + snapshots.push(handle.await.expect("snapshot request should join")); + } + writer.await.expect("concurrent writer should join"); + + let mut retained_snapshots = 0usize; + for (snapshot_id, total) in snapshots { + assert!( + (INITIAL_KEYS..=INITIAL_KEYS + ADDED_KEYS).contains(&total), + "snapshot {snapshot_id} total {total} should stay within concurrent growth bounds" + ); + let Some(snapshot) = read_runtime_pool_selection_snapshot(&state, &snapshot_id).await + else { + continue; + }; + retained_snapshots = retained_snapshots.saturating_add(1); + assert_eq!(snapshot.items.len(), total); + let unique_ids = snapshot + .items + .iter() + .map(|item| item.key_id.as_str()) + .collect::>(); + assert_eq!(unique_ids.len(), total); + } + assert!( + (1..=ADMIN_POOL_SELECTION_SNAPSHOT_MAX_ACTIVE_PER_ADMIN_PROVIDER) + .contains(&retained_snapshots), + "runtime should retain a bounded number of recent snapshots" + ); + + let final_response = local_admin_pool_response( + &state, + http::Method::GET, + "/api/admin/pool/provider-openai/keys?page=1&page_size=10&search=alpha%20concurrent&status=all", + None, + ) + .await; + assert_eq!(final_response.status(), StatusCode::OK); + let final_payload = response_json(final_response).await; + assert_eq!(final_payload["total"], json!(INITIAL_KEYS + ADDED_KEYS)); + let final_snapshot_payload = create_pool_selection_snapshot_payload( + &state, + "provider-openai", + json!({ + "page": 1, + "page_size": 10, + "search": "alpha concurrent", + "status": "all", + "expected_total": final_payload["total"], + "expected_page_key_ids": list_payload_key_ids(&final_payload), + }), + ) + .await; + assert_eq!( + final_snapshot_payload["selection_snapshot"]["total"], + json!(INITIAL_KEYS + ADDED_KEYS) + ); +} + +#[tokio::test] +async fn gateway_pool_selection_snapshot_batch_action_uses_frozen_membership_after_churn() { + const ORIGINAL_KEYS: usize = 10; + + let provider = sample_provider("provider-openai", "openai", 10); + let mut keys = Vec::with_capacity(ORIGINAL_KEYS); + let mut original_ids = Vec::with_capacity(ORIGINAL_KEYS); + for index in 0..ORIGINAL_KEYS { + let key_id = format!("key-openai-freeze-{index:02}"); + let mut key = sample_key( + &key_id, + "provider-openai", + "openai:chat", + &format!("sk-freeze-{index:02}"), + ); + key.name = format!("alpha frozen member {index:02}"); + original_ids.push(key_id); + keys.push(key); + } + + let provider_catalog_repository = Arc::new(InMemoryProviderCatalogReadRepository::seed( + vec![provider], + Vec::new(), + keys, + )); + let state = pool_test_state(Arc::clone(&provider_catalog_repository)); + + let list_response = local_admin_pool_response( + &state, + http::Method::GET, + "/api/admin/pool/provider-openai/keys?page=1&page_size=5&search=alpha%20frozen&status=all", + None, + ) + .await; + assert_eq!(list_response.status(), StatusCode::OK); + let list_payload = response_json(list_response).await; + assert_eq!(list_payload["total"], json!(ORIGINAL_KEYS)); + let snapshot_payload = create_pool_selection_snapshot_payload( + &state, + "provider-openai", + json!({ + "page": 1, + "page_size": 5, + "search": "alpha frozen", + "status": "all", + "expected_total": list_payload["total"], + "expected_page_key_ids": list_payload_key_ids(&list_payload), + }), + ) + .await; + assert_eq!( + snapshot_payload["selection_snapshot"]["total"], + json!(ORIGINAL_KEYS) + ); + let snapshot_id = snapshot_payload["selection_snapshot"]["id"] + .as_str() + .expect("snapshot id should be present") + .to_string(); + + for index in 0..3 { + let key_id = format!("key-openai-new-match-{index:02}"); + let mut key = sample_key( + &key_id, + "provider-openai", + "openai:chat", + &format!("sk-new-match-{index:02}"), + ); + key.name = format!("alpha frozen newly imported {index:02}"); + provider_catalog_repository + .create_key(&key) + .await + .expect("new matching key should be stored"); + } + provider_catalog_repository + .delete_key(&original_ids[0]) + .await + .expect("first original should delete"); + provider_catalog_repository + .delete_key(&original_ids[1]) + .await + .expect("second original should delete"); + + let mut renamed_original = provider_catalog_repository + .list_keys_by_ids(&[original_ids[2].clone()]) + .await + .expect("renamed original should list") + .into_iter() + .next() + .expect("renamed original should exist"); + renamed_original.name = "beta renamed after selection".to_string(); + provider_catalog_repository + .update_key(&renamed_original) + .await + .expect("renamed original should update"); + + let live_response = local_admin_pool_response( + &state, + http::Method::GET, + "/api/admin/pool/provider-openai/keys?page=1&page_size=20&search=alpha%20frozen&status=all", + None, + ) + .await; + assert_eq!(live_response.status(), StatusCode::OK); + let live_payload = response_json(live_response).await; + assert_eq!(live_payload["total"], json!(ORIGINAL_KEYS)); + + let action_response = local_admin_pool_response( + &state, + http::Method::POST, + "/api/admin/pool/provider-openai/keys/batch-action", + Some(json!({ + "selection": { + "type": "snapshot", + "snapshot_id": snapshot_id, + "expected_total": ORIGINAL_KEYS + }, + "action": "disable" + })), + ) + .await; + assert_eq!(action_response.status(), StatusCode::OK); + let action_payload = response_json(action_response).await; + assert_eq!(action_payload["affected"], json!(ORIGINAL_KEYS - 2)); + + let mut ids_to_check = original_ids[2..].to_vec(); + ids_to_check.extend((0..3).map(|index| format!("key-openai-new-match-{index:02}"))); + let stored_keys = provider_catalog_repository + .list_keys_by_ids(&ids_to_check) + .await + .expect("post-action keys should list") + .into_iter() + .map(|key| (key.id, key.is_active)) + .collect::>(); + + for key_id in &original_ids[2..] { + assert_eq!(stored_keys.get(key_id), Some(&false)); + } + for index in 0..3 { + assert_eq!( + stored_keys.get(&format!("key-openai-new-match-{index:02}")), + Some(&true) + ); + } +} + +#[tokio::test] +async fn gateway_pool_resolve_selection_reports_missing_snapshot_members_consistently() { + let provider = sample_provider("provider-openai", "openai", 10); + let mut keys = Vec::new(); + for index in 0..3 { + let key_id = format!("key-openai-resolve-{index}"); + let mut key = sample_key( + &key_id, + "provider-openai", + "openai:chat", + &format!("sk-resolve-{index}"), + ); + key.name = format!("alpha resolve member {index}"); + keys.push(key); + } + + let provider_catalog_repository = Arc::new(InMemoryProviderCatalogReadRepository::seed( + vec![provider], + Vec::new(), + keys, + )); + let state = pool_test_state(Arc::clone(&provider_catalog_repository)); + + let list_response = local_admin_pool_response( + &state, + http::Method::GET, + "/api/admin/pool/provider-openai/keys?page=1&page_size=2&search=alpha%20resolve&status=all", + None, + ) + .await; + assert_eq!(list_response.status(), StatusCode::OK); + let list_payload = response_json(list_response).await; + assert_eq!(list_payload["total"], json!(3)); + let snapshot_payload = create_pool_selection_snapshot_payload( + &state, + "provider-openai", + json!({ + "page": 1, + "page_size": 2, + "search": "alpha resolve", + "status": "all", + "expected_total": list_payload["total"], + "expected_page_key_ids": list_payload_key_ids(&list_payload), + }), + ) + .await; + assert_eq!(snapshot_payload["selection_snapshot"]["total"], json!(3)); + let snapshot_id = snapshot_payload["selection_snapshot"]["id"] + .as_str() + .expect("snapshot id should be present") + .to_string(); + + provider_catalog_repository + .delete_key("key-openai-resolve-1") + .await + .expect("snapshot member should delete"); + + let resolve_response = local_admin_pool_response( + &state, + http::Method::POST, + "/api/admin/pool/provider-openai/keys/resolve-selection", + Some(json!({ + "snapshot_id": snapshot_id, + "expected_total": 3 + })), + ) + .await; + assert_eq!(resolve_response.status(), StatusCode::OK); + let resolve_payload = response_json(resolve_response).await; + assert_eq!(resolve_payload["total"], json!(2)); + assert_eq!(resolve_payload["snapshot_total"], json!(3)); + assert_eq!(resolve_payload["missing_count"], json!(1)); + assert_eq!(resolve_payload["items"].as_array().map(Vec::len), Some(2)); +} + +#[tokio::test] +async fn gateway_pool_selection_snapshot_rejects_stale_or_invalid_references() { + fn snapshot( + id: &str, + provider_id: &str, + created_by: Option<&str>, + status: &str, + expires_at_unix_secs: u64, + items: Vec, + ) -> AdminPoolSelectionSnapshot { + AdminPoolSelectionSnapshot { + id: id.to_string(), + provider_id: provider_id.to_string(), + created_by: created_by.map(ToOwned::to_owned), + filter_json: None, + total: items.len(), + status: status.to_string(), + created_at_unix_secs: 100, + expires_at_unix_secs, + items, + } + } + + fn item(ordinal: u64, key_id: &str) -> AdminPoolSelectionSnapshotItem { + AdminPoolSelectionSnapshotItem { + ordinal, + key_id: key_id.to_string(), + key_updated_at_unix_secs: None, + } + } + + let provider = sample_provider("provider-openai", "openai", 10); + let other_provider = sample_provider("provider-other", "openai", 20); + let provider_catalog_repository = Arc::new(InMemoryProviderCatalogReadRepository::seed( + vec![provider, other_provider], + Vec::new(), + vec![ + sample_key("key-openai-a", "provider-openai", "openai:chat", "sk-a"), + sample_key("key-openai-b", "provider-openai", "openai:chat", "sk-b"), + ], + )); + let state = pool_test_state(Arc::clone(&provider_catalog_repository)); + for snapshot in [ + snapshot( + "snapshot-ready", + "provider-openai", + Some("admin-user-123"), + "ready", + 4_000_000_000, + vec![item(0, "key-openai-a")], + ), + snapshot( + "snapshot-other-provider", + "provider-other", + Some("admin-user-123"), + "ready", + 4_000_000_000, + vec![item(0, "key-openai-a")], + ), + snapshot( + "snapshot-expired", + "provider-openai", + Some("admin-user-123"), + "expired", + 4_000_000_000, + vec![item(0, "key-openai-a")], + ), + snapshot( + "snapshot-other-admin", + "provider-openai", + Some("admin-user-456"), + "ready", + 4_000_000_000, + vec![item(0, "key-openai-a")], + ), + snapshot( + "snapshot-duplicate", + "provider-openai", + Some("admin-user-123"), + "ready", + 4_000_000_000, + vec![item(0, "key-openai-a"), item(1, "key-openai-a")], + ), + ] { + write_runtime_pool_selection_snapshot(&state, snapshot).await; + } + let mut incomplete_snapshot = snapshot( + "snapshot-incomplete", + "provider-openai", + Some("admin-user-123"), + "ready", + 4_000_000_000, + vec![item(0, "key-openai-a")], + ); + incomplete_snapshot.total = 2; + write_runtime_pool_selection_snapshot(&state, incomplete_snapshot).await; + + let selection_with_key_ids_response = local_admin_pool_response( + &state, + http::Method::POST, + "/api/admin/pool/provider-openai/keys/batch-action", + Some(json!({ + "key_ids": ["key-openai-b"], + "selection": { + "type": "snapshot", + "snapshot_id": "snapshot-ready", + "expected_total": 1 + }, + "action": "disable" + })), + ) + .await; + assert_eq!( + selection_with_key_ids_response.status(), + StatusCode::BAD_REQUEST + ); + + for (snapshot_id, expected_total, expected_status) in [ + ("snapshot-missing", 1usize, StatusCode::NOT_FOUND), + ("snapshot-other-provider", 1usize, StatusCode::CONFLICT), + ("snapshot-expired", 1usize, StatusCode::CONFLICT), + ("snapshot-other-admin", 1usize, StatusCode::FORBIDDEN), + ("snapshot-duplicate", 2usize, StatusCode::CONFLICT), + ("snapshot-incomplete", 2usize, StatusCode::CONFLICT), + ] { + let response = local_admin_pool_response( + &state, + http::Method::POST, + "/api/admin/pool/provider-openai/keys/batch-action", + Some(json!({ + "selection": { + "type": "snapshot", + "snapshot_id": snapshot_id, + "expected_total": expected_total + }, + "action": "disable" + })), + ) + .await; + assert_eq!( + response.status(), + expected_status, + "{snapshot_id} should be rejected with {expected_status}" + ); + } +} + #[tokio::test] async fn gateway_sorts_admin_pool_keys_by_imported_and_last_used_time() { let provider = sample_provider("provider-openai", "openai", 10).with_transport_fields( diff --git a/crates/aether-admin/src/provider/pool.rs b/crates/aether-admin/src/provider/pool.rs index f3514688e..c4f686190 100644 --- a/crates/aether-admin/src/provider/pool.rs +++ b/crates/aether-admin/src/provider/pool.rs @@ -14,6 +14,34 @@ pub struct AdminPoolResolveSelectionRequest { pub search: String, #[serde(default)] pub quick_selectors: Vec, + #[serde(default)] + pub snapshot_id: Option, + #[serde(default)] + pub expected_total: Option, +} + +#[derive(Debug, Default, Clone, serde::Deserialize)] +pub struct AdminPoolCreateSelectionSnapshotRequest { + #[serde(default)] + pub page: Option, + #[serde(default)] + pub page_size: Option, + #[serde(default)] + pub search: String, + #[serde(default)] + pub quick_selectors: Vec, + #[serde(default)] + pub status: String, + #[serde(default)] + pub search_scope: String, + #[serde(default)] + pub sort_by: Option, + #[serde(default)] + pub sort_order: Option, + #[serde(default)] + pub expected_total: Option, + #[serde(default)] + pub expected_page_key_ids: Vec, } #[derive(Debug, Default, Clone, serde::Deserialize)] @@ -21,11 +49,23 @@ pub struct AdminPoolBatchActionRequest { #[serde(default)] pub key_ids: Vec, #[serde(default)] + pub selection: Option, + #[serde(default)] pub action: String, #[serde(default)] pub payload: Option, } +#[derive(Debug, Clone, serde::Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum AdminPoolBatchSelectionRequest { + Snapshot { + snapshot_id: String, + #[serde(default)] + expected_total: Option, + }, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum AdminPoolBatchActionKind { Enable, @@ -377,6 +417,7 @@ pub fn build_admin_pool_batch_action_plan( } }; + let has_unresolved_selection = payload.selection.is_some(); let key_ids = payload .key_ids .into_iter() @@ -386,6 +427,9 @@ pub fn build_admin_pool_batch_action_plan( .into_iter() .collect::>(); if key_ids.is_empty() { + if has_unresolved_selection { + return Err("selection should be resolved to key_ids before execution".to_string()); + } return Err("key_ids should not be empty".to_string()); } diff --git a/frontend/src/api/endpoints/pool.ts b/frontend/src/api/endpoints/pool.ts index 78706d0c1..1e328d354 100644 --- a/frontend/src/api/endpoints/pool.ts +++ b/frontend/src/api/endpoints/pool.ts @@ -202,6 +202,24 @@ export interface PoolKeysPageResponse { page: number page_size: number keys: PoolKeyDetail[] + selection_snapshot?: PoolSelectionSnapshot | null + selection_snapshot_mismatch?: PoolSelectionSnapshotMismatch | null +} + +export interface PoolSelectionSnapshot { + id: string + total: number + status: 'ready' | 'expired' | string + created_at_unix_secs?: number + created_at?: string | null + expires_at_unix_secs?: number + expires_at?: string | null +} + +export interface PoolSelectionSnapshotMismatch { + reason?: 'total_changed' | 'page_keys_changed' | string + expected_total: number + actual_total: number } export interface PoolKeyScoreDetail { @@ -306,6 +324,30 @@ export interface PoolScoresQuery { export interface PoolKeySelectionRequest { search?: string quick_selectors?: string[] + snapshot_id?: string + expected_total?: number +} + +export interface PoolKeySelectionSnapshotRequest { + page?: number + page_size?: number + search?: string + status?: PoolKeysQuery['status'] + quick_selectors?: string[] + search_scope?: PoolKeysQuery['search_scope'] + sort_by?: PoolKeysQuery['sort_by'] + sort_order?: PoolKeysQuery['sort_order'] + expected_total: number + expected_page_key_ids: string[] +} + +export interface PoolKeySelectionSnapshotResponse { + total: number + page: number + page_size: number + keys?: PoolKeyDetail[] + selection_snapshot?: PoolSelectionSnapshot | null + selection_snapshot_mismatch?: PoolSelectionSnapshotMismatch | null } export interface PoolKeySelectionItem { @@ -324,11 +366,18 @@ export interface PoolKeySelectionItem { export interface PoolKeySelectionResponse { total: number + snapshot_total?: number + missing_count?: number items: PoolKeySelectionItem[] } export interface PoolBatchAction { - key_ids: string[] + key_ids?: string[] + selection?: { + type: 'snapshot' + snapshot_id: string + expected_total: number + } action: | 'enable' | 'disable' @@ -393,6 +442,18 @@ export async function listPoolKeys( ) } +export async function createPoolKeySelectionSnapshot( + providerId: string, + body: PoolKeySelectionSnapshotRequest, +): Promise { + const response = await client.post( + `/api/admin/pool/${providerId}/keys/selection-snapshot`, + body, + { timeout: POOL_BATCH_ACTION_TIMEOUT_MS }, + ) + return response.data +} + export async function listPoolScores( providerId: string, params: PoolScoresQuery = {}, diff --git a/frontend/src/features/pool/components/PoolAccountBatchDialog.vue b/frontend/src/features/pool/components/PoolAccountBatchDialog.vue index bb5ed3f5a..2b30ab0ea 100644 --- a/frontend/src/features/pool/components/PoolAccountBatchDialog.vue +++ b/frontend/src/features/pool/components/PoolAccountBatchDialog.vue @@ -15,7 +15,7 @@ variant="ghost" size="sm" class="h-7 px-2 text-[11px]" - :disabled="loading || executing || !hasActiveFilters" + :disabled="interactionBusy || !hasActiveFilters" @click="clearFilters" > 重置筛选 @@ -30,7 +30,7 @@ size="sm" class="h-8 px-2.5 text-[11px]" :class="activeQuickSelectorSet.has(option.value) ? 'border-primary/70 bg-primary/10 text-primary' : ''" - :disabled="loading || executing" + :disabled="interactionBusy" @click="toggleQuickSelector(option.value)" > {{ option.label }} @@ -43,13 +43,14 @@ :model-value="searchText" placeholder="搜索账号名 / 套餐 / 额度 / 代理状态" class="h-8 flex-1" + :disabled="interactionBusy" @update:model-value="(v) => searchText = String(v || '')" /> +
+ + 正在生成筛选结果快照... +
@@ -118,7 +126,7 @@ >
@@ -169,7 +177,7 @@ variant="ghost" size="icon" class="h-7 w-7" - :disabled="currentPage <= 1" + :disabled="currentPage <= 1 || interactionBusy || selectAllFiltered" @click="goToPage(1)" > @@ -178,7 +186,7 @@ variant="ghost" size="icon" class="h-7 w-7" - :disabled="currentPage <= 1" + :disabled="currentPage <= 1 || interactionBusy || selectAllFiltered" @click="goToPage(currentPage - 1)" > @@ -187,7 +195,7 @@ variant="ghost" size="icon" class="h-7 w-7" - :disabled="currentPage >= totalPages" + :disabled="currentPage >= totalPages || interactionBusy || selectAllFiltered" @click="goToPage(currentPage + 1)" > @@ -196,7 +204,7 @@ variant="ghost" size="icon" class="h-7 w-7" - :disabled="currentPage >= totalPages" + :disabled="currentPage >= totalPages || interactionBusy || selectAllFiltered" @click="goToPage(totalPages)" > @@ -216,6 +224,7 @@
@@ -260,7 +269,7 @@