Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions apps/aether-gateway/src/admin_api.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
pub(crate) use crate::handlers::admin::{
admin_provider_ops_local_action_response, admin_provider_pool_config,
build_internal_control_error_response, create_provider_oauth_catalog_key,
find_duplicate_provider_oauth_key, maybe_build_local_admin_pool_response,
maybe_build_local_admin_response, persist_provider_quota_refresh_state,
provider_oauth_maintenance_endpoint_for_provider, provider_oauth_runtime_endpoint_for_provider,
provider_quota_refresh_endpoint_for_provider, provider_type_supports_quota_refresh,
reconcile_admin_fixed_provider_template_endpoints,
admin_pool_selection_snapshot_key, admin_provider_ops_local_action_response,
admin_provider_pool_config, build_internal_control_error_response,
create_provider_oauth_catalog_key, find_duplicate_provider_oauth_key,
maybe_build_local_admin_pool_response, maybe_build_local_admin_response,
persist_provider_quota_refresh_state, provider_oauth_maintenance_endpoint_for_provider,
provider_oauth_runtime_endpoint_for_provider, provider_quota_refresh_endpoint_for_provider,
provider_type_supports_quota_refresh, reconcile_admin_fixed_provider_template_endpoints,
refresh_provider_oauth_account_state_after_update, refresh_provider_pool_quota_locally,
store_admin_provider_ops_balance_cache, update_existing_provider_oauth_catalog_key,
AdminAppState, AdminGatewayProviderTransportSnapshot, AdminLocalOAuthRefreshError,
AdminRequestContext, AdminRouteRequest, AdminRouteResponse, AdminRouteResult,
AdminStatsTimeRange, AdminStatsUsageFilter, OAUTH_ACCOUNT_BLOCK_PREFIX,
AdminPoolSelectionSnapshot, AdminPoolSelectionSnapshotItem, AdminRequestContext,
AdminRouteRequest, AdminRouteResponse, AdminRouteResult, AdminStatsTimeRange,
AdminStatsUsageFilter, ADMIN_POOL_SELECTION_SNAPSHOT_MAX_ACTIVE_PER_ADMIN_PROVIDER,
ADMIN_POOL_SELECTION_SNAPSHOT_MAX_TOTAL, OAUTH_ACCOUNT_BLOCK_PREFIX,
OAUTH_REQUEST_FAILED_PREFIX,
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,18 @@ pub(super) fn classify_admin_observability_family_route(
"admin:pool",
false,
))
} else if method == http::Method::POST
&& normalized_path_no_trailing.starts_with("/api/admin/pool/")
&& normalized_path_no_trailing.ends_with("/keys/selection-snapshot")
&& normalized_path_no_trailing.matches('/').count() == 6
{
Some(classified(
"admin_proxy",
"pool_manage",
"create_selection_snapshot",
"admin:pool",
false,
))
} else if method == http::Method::POST
&& normalized_path_no_trailing.starts_with("/api/admin/pool/")
&& normalized_path_no_trailing.ends_with("/keys/resolve-selection")
Expand Down
53 changes: 52 additions & 1 deletion apps/aether-gateway/src/control/tests/admin_pool.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use http::Uri;

use super::{classify_control_route, headers};
use crate::handlers::shared::local_proxy_route_requires_buffered_body;

use super::{classify_control_route, headers, GatewayPublicRequestContext};

#[test]
fn classifies_admin_pool_overview_as_admin_proxy_route() {
Expand Down Expand Up @@ -82,6 +84,21 @@ fn classifies_admin_pool_provider_key_routes_as_admin_proxy_route() {
Some("batch_action_keys")
);

let selection_snapshot_uri: Uri = "/api/admin/pool/provider-1/keys/selection-snapshot"
.parse()
.expect("uri should parse");
let selection_snapshot =
classify_control_route(&http::Method::POST, &selection_snapshot_uri, &headers)
.expect("route should classify");
assert_eq!(
selection_snapshot.route_family.as_deref(),
Some("pool_manage")
);
assert_eq!(
selection_snapshot.route_kind.as_deref(),
Some("create_selection_snapshot")
);

let resolve_selection_uri: Uri = "/api/admin/pool/provider-1/keys/resolve-selection"
.parse()
.expect("uri should parse");
Expand Down Expand Up @@ -128,6 +145,25 @@ fn classifies_admin_pool_provider_key_routes_as_admin_proxy_route() {
);
}

#[test]
fn admin_pool_selection_snapshot_buffers_request_body() {
let headers = headers(&[]);
let uri: Uri = "/api/admin/pool/provider-1/keys/selection-snapshot"
.parse()
.expect("uri should parse");
let decision =
classify_control_route(&http::Method::POST, &uri, &headers).expect("route should classify");
let context = GatewayPublicRequestContext::from_request_parts(
"trace-pool-selection-snapshot",
&http::Method::POST,
&uri,
&headers,
Some(decision),
);

assert!(local_proxy_route_requires_buffered_body(&context));
}

#[test]
fn classifies_admin_pool_trailing_slash_routes_as_admin_proxy_route() {
let headers = headers(&[]);
Expand All @@ -140,6 +176,21 @@ fn classifies_admin_pool_trailing_slash_routes_as_admin_proxy_route() {
assert_eq!(list.route_family.as_deref(), Some("pool_manage"));
assert_eq!(list.route_kind.as_deref(), Some("list_keys"));

let selection_snapshot_uri: Uri = "/api/admin/pool/provider-1/keys/selection-snapshot/"
.parse()
.expect("uri should parse");
let selection_snapshot =
classify_control_route(&http::Method::POST, &selection_snapshot_uri, &headers)
.expect("route should classify");
assert_eq!(
selection_snapshot.route_family.as_deref(),
Some("pool_manage")
);
assert_eq!(
selection_snapshot.route_kind.as_deref(),
Some("create_selection_snapshot")
);

let resolve_selection_uri: Uri = "/api/admin/pool/provider-1/keys/resolve-selection/"
.parse()
.expect("uri should parse");
Expand Down
7 changes: 6 additions & 1 deletion apps/aether-gateway/src/handlers/admin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ pub(crate) use self::provider::oauth::runtime::{
pub(crate) use self::provider::ops::providers::actions::admin_provider_ops_local_action_response;
pub(crate) use self::provider::ops::providers::store_admin_provider_ops_balance_cache;
pub(crate) use self::provider::pool::config::admin_provider_pool_config;
pub(crate) use self::provider::pool_admin::maybe_build_local_admin_pool_response;
pub(crate) use self::provider::pool_admin::{
admin_pool_selection_snapshot_key, maybe_build_local_admin_pool_response,
AdminPoolSelectionSnapshot, AdminPoolSelectionSnapshotItem,
ADMIN_POOL_SELECTION_SNAPSHOT_MAX_ACTIVE_PER_ADMIN_PROVIDER,
ADMIN_POOL_SELECTION_SNAPSHOT_MAX_TOTAL,
};
pub(crate) use self::provider::shared::payloads::{
OAUTH_ACCOUNT_BLOCK_PREFIX, OAUTH_REQUEST_FAILED_PREFIX,
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,129 @@
use super::{
admin_pool_provider_id_from_path, build_admin_pool_error_response, AdminPoolBatchActionRequest,
ADMIN_POOL_PROVIDER_CATALOG_READER_UNAVAILABLE_DETAIL,
admin_pool_provider_id_from_path, build_admin_pool_error_response,
resolve_admin_pool_selection_snapshot_key_ids, AdminPoolBatchActionRequest,
AdminPoolSelectionSnapshotReference, ADMIN_POOL_PROVIDER_CATALOG_READER_UNAVAILABLE_DETAIL,
ADMIN_POOL_PROVIDER_CATALOG_WRITER_UNAVAILABLE_DETAIL,
ADMIN_POOL_SELECTION_SNAPSHOT_ITEM_PAGE_SIZE,
};
use crate::handlers::admin::request::{AdminAppState, AdminRequestContext};
use crate::GatewayError;
use aether_admin::provider::pool::{
self as admin_provider_pool_pure, AdminPoolBatchSelectionRequest,
};
use axum::{
body::{Body, Bytes},
http,
response::Response,
response::{IntoResponse, Response},
Json,
};

fn admin_pool_batch_snapshot_reference(
payload: &AdminPoolBatchActionRequest,
) -> Result<Option<AdminPoolSelectionSnapshotReference>, Response<Body>> {
let Some(selection) = payload.selection.as_ref() else {
return Ok(None);
};
if !payload.key_ids.is_empty() {
return Err(build_admin_pool_error_response(
http::StatusCode::BAD_REQUEST,
"selection and key_ids cannot be used together",
));
}

match selection {
AdminPoolBatchSelectionRequest::Snapshot {
snapshot_id,
expected_total,
} => {
let snapshot_id = snapshot_id.trim();
if snapshot_id.is_empty() {
return Err(build_admin_pool_error_response(
http::StatusCode::BAD_REQUEST,
"selection.snapshot_id should not be empty",
));
}
let Some(expected_total) = expected_total else {
return Err(build_admin_pool_error_response(
http::StatusCode::BAD_REQUEST,
"selection.expected_total is required",
));
};
Ok(Some(AdminPoolSelectionSnapshotReference {
snapshot_id: snapshot_id.to_string(),
expected_total: *expected_total,
}))
}
}
}

async fn build_admin_pool_snapshot_batch_action_response(
state: &AdminAppState<'_>,
request_context: &AdminRequestContext<'_>,
provider_id: &str,
payload: AdminPoolBatchActionRequest,
reference: AdminPoolSelectionSnapshotReference,
) -> Result<Response<Body>, GatewayError> {
let key_ids = match resolve_admin_pool_selection_snapshot_key_ids(
state,
request_context,
provider_id,
&reference,
)
.await?
{
Ok(key_ids) => key_ids,
Err(response) => return Ok(response),
};

let Some(provider) = state
.read_provider_catalog_providers_by_ids(std::slice::from_ref(&provider_id.to_string()))
.await?
.into_iter()
.next()
else {
return Ok(build_admin_pool_error_response(
http::StatusCode::NOT_FOUND,
format!("Provider {provider_id} 不存在"),
));
};

let mut affected = 0usize;
let mut action_label = None;
for chunk in key_ids.chunks(ADMIN_POOL_SELECTION_SNAPSHOT_ITEM_PAGE_SIZE) {
let chunk_payload = AdminPoolBatchActionRequest {
key_ids: chunk.to_vec(),
selection: None,
action: payload.action.clone(),
payload: payload.payload.clone(),
};
let plan = match admin_provider_pool_pure::build_admin_pool_batch_action_plan(chunk_payload)
{
Ok(plan) => plan,
Err(detail) => {
return Ok(build_admin_pool_error_response(
http::StatusCode::BAD_REQUEST,
detail,
));
}
};
action_label.get_or_insert(plan.action_label);
affected = affected.saturating_add(
state
.execute_admin_pool_batch_action_plan(&provider, plan)
.await?,
);
}

let action_label = action_label.unwrap_or("processed");
Ok(Json(
admin_provider_pool_pure::build_admin_pool_batch_action_result_payload(
affected,
action_label,
),
)
.into_response())
}

pub(super) async fn build_admin_pool_batch_action_response(
state: &AdminAppState<'_>,
request_context: &AdminRequestContext<'_>,
Expand Down Expand Up @@ -54,6 +167,20 @@ pub(super) async fn build_admin_pool_batch_action_response(
));
}
};
let reference = match admin_pool_batch_snapshot_reference(&payload) {
Ok(reference) => reference,
Err(response) => return Ok(response),
};
if let Some(reference) = reference {
return build_admin_pool_snapshot_batch_action_response(
state,
request_context,
&provider_id,
payload,
reference,
)
.await;
}

state
.build_admin_pool_batch_action_response(&provider_id, payload)
Expand Down
31 changes: 27 additions & 4 deletions apps/aether-gateway/src/handlers/admin/provider/pool_admin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ mod read_resolve_selection;
#[path = "read_routes/scores.rs"]
mod read_scores;
pub(crate) mod selection;
mod selection_snapshots;
mod support;

pub(crate) use self::batch_shared::{
Expand All @@ -35,12 +36,24 @@ pub(crate) use self::batch_shared::{
build_admin_pool_batch_delete_task_payload, AdminPoolBatchActionRequest,
AdminPoolBatchImportRequest,
};
pub(crate) use self::selection_snapshots::{
admin_pool_selection_snapshot_key, read_admin_pool_selection_snapshot,
resolve_admin_pool_selection_snapshot_key_ids, store_admin_pool_selection_snapshot,
validate_admin_pool_selection_snapshot_reference, AdminPoolSelectionSnapshot,
AdminPoolSelectionSnapshotItem, AdminPoolSelectionSnapshotReference,
ADMIN_POOL_SELECTION_SNAPSHOT_ITEM_PAGE_SIZE,
ADMIN_POOL_SELECTION_SNAPSHOT_MAX_ACTIVE_PER_ADMIN_PROVIDER,
ADMIN_POOL_SELECTION_SNAPSHOT_MAX_TOTAL,
};
pub(crate) use self::support::{
admin_pool_provider_id_from_path, admin_pool_provider_id_from_scores_path,
parse_admin_pool_key_sort, parse_admin_pool_page, parse_admin_pool_page_size,
parse_admin_pool_quick_selectors, parse_admin_pool_search, parse_admin_pool_status_filter,
AdminPoolKeySort, AdminPoolKeySortDirection, AdminPoolKeySortField,
AdminPoolResolveSelectionRequest, ADMIN_POOL_BANNED_KEY_CLEANUP_EMPTY_MESSAGE,
parse_admin_pool_key_sort, parse_admin_pool_key_sort_values, parse_admin_pool_page,
parse_admin_pool_page_size, parse_admin_pool_page_size_value, parse_admin_pool_page_value,
parse_admin_pool_quick_selectors, parse_admin_pool_search, parse_admin_pool_search_scope,
parse_admin_pool_search_scope_value, parse_admin_pool_status_filter,
parse_admin_pool_status_filter_value, AdminPoolKeySort, AdminPoolKeySortDirection,
AdminPoolKeySortField, AdminPoolResolveSelectionRequest, AdminPoolSearchScope,
ADMIN_POOL_BANNED_KEY_CLEANUP_EMPTY_MESSAGE,
ADMIN_POOL_PROVIDER_CATALOG_READER_UNAVAILABLE_DETAIL,
ADMIN_POOL_PROVIDER_CATALOG_WRITER_UNAVAILABLE_DETAIL,
};
Expand Down Expand Up @@ -105,6 +118,16 @@ pub(crate) async fn maybe_build_local_admin_pool_response(
read_keys::build_admin_pool_list_keys_response(state, request_context).await?,
));
}
Some("create_selection_snapshot") => {
return Ok(Some(
read_keys::build_admin_pool_create_selection_snapshot_response(
state,
request_context,
request_body,
)
.await?,
));
}
Some("scores") => {
return Ok(Some(
read_scores::build_admin_pool_scores_response(state, request_context).await?,
Expand Down
Loading
Loading