diff --git a/crates/control-plane-api/src/server/public/graphql/mod.rs b/crates/control-plane-api/src/server/public/graphql/mod.rs index be94746c1fc..1b4a8820f3b 100644 --- a/crates/control-plane-api/src/server/public/graphql/mod.rs +++ b/crates/control-plane-api/src/server/public/graphql/mod.rs @@ -36,6 +36,7 @@ mod live_spec_refs; mod live_specs; mod prefixes; mod publication_history; +mod service_accounts; pub mod status; mod storage_mappings; @@ -64,6 +65,7 @@ pub struct QueryRoot( data_planes::DataPlanesQuery, invite_links::InviteLinksQuery, connectors::ConnectorsQuery, + service_accounts::ServiceAccountsQuery, ); // Represents the portion of the GraphQL schema that deals with mutations. @@ -73,6 +75,7 @@ pub struct MutationRoot( alert_configs::AlertConfigsMutation, alert_subscriptions::AlertSubscriptionsMutation, invite_links::InviteLinksMutation, + service_accounts::ServiceAccountsMutation, ); pub fn create_schema(alert_config_defaults: models::AlertConfig) -> GraphQLSchema { diff --git a/crates/control-plane-api/src/server/public/graphql/service_accounts.rs b/crates/control-plane-api/src/server/public/graphql/service_accounts.rs new file mode 100644 index 00000000000..86eabcd259d --- /dev/null +++ b/crates/control-plane-api/src/server/public/graphql/service_accounts.rs @@ -0,0 +1,937 @@ +use super::{TimestampCursor, filters}; +use async_graphql::{Context, types::connection}; + +#[derive(Debug, Clone, async_graphql::SimpleObject)] +pub struct ServiceAccount { + pub user_id: uuid::Uuid, + pub display_name: String, + pub prefix: models::Prefix, + pub capability: models::Capability, + pub created_by: uuid::Uuid, + pub created_at: chrono::DateTime, + pub updated_at: chrono::DateTime, + pub last_used_at: Option>, + pub disabled_at: Option>, + pub api_keys: Vec, +} + +#[derive(Debug, Clone, async_graphql::SimpleObject)] +pub struct ApiKeyInfo { + pub key_id: models::Id, + pub label: String, + pub created_by: uuid::Uuid, + pub created_at: chrono::DateTime, + pub expires_at: chrono::DateTime, + pub last_used_at: Option>, +} + +#[derive(Debug, Clone, async_graphql::SimpleObject)] +pub struct CreateApiKeyResult { + pub key_id: models::Id, + pub secret: String, +} + +pub type PaginatedServiceAccounts = connection::Connection< + TimestampCursor, + ServiceAccount, + connection::EmptyFields, + connection::EmptyFields, + connection::DefaultConnectionName, + connection::DefaultEdgeName, + connection::DisableNodesField, +>; + +#[derive(Debug, Clone, Default, async_graphql::InputObject)] +pub struct ServiceAccountsFilter { + pub catalog_prefix: Option, +} + +#[derive(Debug, Default)] +pub struct ServiceAccountsQuery; + +const DEFAULT_PAGE_SIZE: usize = 25; +const MAX_PREFIXES: usize = 20; + +#[async_graphql::Object] +impl ServiceAccountsQuery { + async fn service_accounts( + &self, + ctx: &Context<'_>, + filter: Option, + after: Option, + first: Option, + ) -> async_graphql::Result { + let env = ctx.data::()?; + + let prefix_starts_with = filter + .and_then(|f| f.catalog_prefix) + .and_then(|f| f.starts_with); + + let admin_prefixes = super::authorized_prefixes::authorized_prefixes( + &env.snapshot().role_grants, + &env.snapshot().user_grants, + env.claims()?.sub, + models::Capability::Admin, + prefix_starts_with.as_deref(), + ); + + if admin_prefixes.is_empty() { + return Ok(PaginatedServiceAccounts::new(false, false)); + } + if admin_prefixes.len() > MAX_PREFIXES { + return Err(async_graphql::Error::new( + "Too many admin prefixes; narrow results with a prefix filter", + )); + } + + connection::query_with::( + after, + None, + first, + None, + |after, _, first, _| async move { + let after_created_at = after.map(|c| c.0); + let limit = first.unwrap_or(DEFAULT_PAGE_SIZE); + + let sa_rows = sqlx::query!( + r#" + SELECT + sa.user_id, + sa.display_name, + sa.prefix AS "prefix!: String", + sa.capability AS "capability!: models::Capability", + sa.created_by, + sa.created_at AS "created_at!: chrono::DateTime", + sa.updated_at AS "updated_at!: chrono::DateTime", + sa.last_used_at AS "last_used_at: chrono::DateTime", + sa.disabled_at AS "disabled_at: chrono::DateTime" + FROM internal.service_accounts sa + WHERE sa.prefix::text ^@ ANY($1) + AND ($2::timestamptz IS NULL OR sa.created_at < $2) + ORDER BY sa.created_at DESC + LIMIT $3 + 1 + "#, + &admin_prefixes, + after_created_at, + limit as i64, + ) + .fetch_all(&env.pg_pool) + .await?; + + let has_next = sa_rows.len() > limit; + + let user_ids: Vec = sa_rows + .iter() + .take(limit) + .map(|r| r.user_id) + .collect(); + + let key_rows = if user_ids.is_empty() { + vec![] + } else { + sqlx::query!( + r#" + SELECT + ak.id AS "id!: models::Id", + ak.service_account_id, + ak.label, + ak.created_by, + ak.created_at AS "created_at!: chrono::DateTime", + ak.expires_at AS "expires_at!: chrono::DateTime", + ak.last_used_at AS "last_used_at: chrono::DateTime" + FROM internal.api_keys ak + WHERE ak.service_account_id = ANY($1) + ORDER BY ak.created_at DESC + "#, + &user_ids, + ) + .fetch_all(&env.pg_pool) + .await? + }; + + let mut keys_by_sa: std::collections::HashMap> = + std::collections::HashMap::new(); + for kr in key_rows { + keys_by_sa + .entry(kr.service_account_id) + .or_default() + .push(ApiKeyInfo { + key_id: kr.id, + label: kr.label, + created_by: kr.created_by, + created_at: kr.created_at, + expires_at: kr.expires_at, + last_used_at: kr.last_used_at, + }); + } + + let edges: Vec<_> = sa_rows + .into_iter() + .take(limit) + .map(|r| { + let api_keys = keys_by_sa.remove(&r.user_id).unwrap_or_default(); + connection::Edge::new( + TimestampCursor(r.created_at), + ServiceAccount { + user_id: r.user_id, + display_name: r.display_name, + prefix: models::Prefix::new(&r.prefix), + capability: r.capability, + created_by: r.created_by, + created_at: r.created_at, + updated_at: r.updated_at, + last_used_at: r.last_used_at, + disabled_at: r.disabled_at, + api_keys, + }, + ) + }) + .collect(); + + let mut conn = + connection::Connection::new(after_created_at.is_some(), has_next); + conn.edges = edges; + Ok(conn) + }, + ) + .await + } +} + +#[derive(Debug, Default)] +pub struct ServiceAccountsMutation; + +#[async_graphql::Object] +impl ServiceAccountsMutation { + /// Create a service account with a grant to the specified prefix. + /// + /// The caller must have admin capability on the prefix. + /// Creates an auth.users row, an internal.service_accounts row, + /// and a user_grants row for the service account. + async fn create_service_account( + &self, + ctx: &Context<'_>, + prefix: models::Prefix, + capability: models::Capability, + display_name: String, + ) -> async_graphql::Result { + let env = ctx.data::()?; + let claims = env.claims()?; + + if let Err(err) = validator::Validate::validate(&prefix) { + return Err(async_graphql::Error::new(format!( + "invalid catalog prefix: {err}" + ))); + } + + verify_authorization(env, prefix.as_str()).await?; + + let mut txn = env.pg_pool.begin().await?; + + let sa_user_id = uuid::Uuid::new_v4(); + + sqlx::query!( + r#" + INSERT INTO auth.users (id, email, raw_user_meta_data) + VALUES ($1, $2, $3) + "#, + sa_user_id, + format!("sa+{}@service.estuary.dev", sa_user_id), + serde_json::json!({ + "full_name": display_name, + "is_service_account": true, + }), + ) + .execute(&mut *txn) + .await?; + + let now = sqlx::query_scalar!( + r#" + INSERT INTO internal.service_accounts (user_id, prefix, capability, display_name, created_by) + VALUES ($1, $2::text::catalog_prefix, $3, $4, $5) + RETURNING created_at AS "created_at!: chrono::DateTime" + "#, + sa_user_id, + prefix.as_str(), + capability as models::Capability, + display_name, + claims.sub, + ) + .fetch_one(&mut *txn) + .await?; + + crate::grants::upsert_user_grant( + sa_user_id, + prefix.as_str(), + capability, + Some("service account grant".to_string()), + &mut txn, + ) + .await?; + + txn.commit().await?; + + tracing::info!( + %prefix, + ?capability, + %claims.sub, + %sa_user_id, + "created service account" + ); + + Ok(ServiceAccount { + user_id: sa_user_id, + display_name, + prefix, + capability, + created_by: claims.sub, + created_at: now, + updated_at: now, + last_used_at: None, + disabled_at: None, + api_keys: vec![], + }) + } + + /// Disable a service account, revoking all API keys and grants. + /// + /// The caller must have admin capability on the service account's prefix. + /// The auth.users row is preserved for audit trail / FK integrity. + async fn disable_service_account( + &self, + ctx: &Context<'_>, + user_id: uuid::Uuid, + ) -> async_graphql::Result { + let env = ctx.data::()?; + let claims = env.claims()?; + + let sa = lookup_service_account(&env.pg_pool, user_id).await?; + verify_authorization(env, &sa.prefix).await?; + + if sa.disabled_at.is_some() { + return Err(async_graphql::Error::new("service account is already disabled")); + } + + let mut txn = env.pg_pool.begin().await?; + + sqlx::query!( + "UPDATE internal.service_accounts SET disabled_at = now(), updated_at = now() WHERE user_id = $1", + user_id, + ) + .execute(&mut *txn) + .await?; + + sqlx::query!("DELETE FROM internal.api_keys WHERE service_account_id = $1", user_id) + .execute(&mut *txn) + .await?; + + sqlx::query!("DELETE FROM public.user_grants WHERE user_id = $1", user_id) + .execute(&mut *txn) + .await?; + + txn.commit().await?; + + tracing::info!( + %user_id, + prefix = %sa.prefix, + %claims.sub, + "disabled service account" + ); + + Ok(true) + } + + /// Re-enable a disabled service account, restoring its user_grants row. + /// + /// Does NOT restore previously revoked API keys — new ones must be minted. + async fn enable_service_account( + &self, + ctx: &Context<'_>, + user_id: uuid::Uuid, + ) -> async_graphql::Result { + let env = ctx.data::()?; + let claims = env.claims()?; + + let sa = lookup_service_account(&env.pg_pool, user_id).await?; + verify_authorization(env, &sa.prefix).await?; + + if sa.disabled_at.is_none() { + return Err(async_graphql::Error::new( + "service account is not disabled", + )); + } + + let mut txn = env.pg_pool.begin().await?; + + sqlx::query!( + "UPDATE internal.service_accounts SET disabled_at = NULL, updated_at = now() WHERE user_id = $1", + user_id, + ) + .execute(&mut *txn) + .await?; + + crate::grants::upsert_user_grant( + user_id, + &sa.prefix, + sa.capability, + Some("service account grant".to_string()), + &mut txn, + ) + .await?; + + txn.commit().await?; + + tracing::info!( + %user_id, + prefix = %sa.prefix, + %claims.sub, + "enabled service account" + ); + + Ok(true) + } + + /// Create an API key for a service account. + /// + /// Returns the key_id and the plaintext secret (flow_sa_...). + /// The secret is returned exactly once and cannot be retrieved again. + async fn create_api_key( + &self, + ctx: &Context<'_>, + user_id: uuid::Uuid, + label: String, + #[graphql(desc = "ISO 8601 duration for key validity (e.g. P90D, P1Y)")] + valid_for: String, + ) -> async_graphql::Result { + let env = ctx.data::()?; + let claims = env.claims()?; + + let sa = lookup_service_account(&env.pg_pool, user_id).await?; + verify_authorization(env, &sa.prefix).await?; + + if sa.disabled_at.is_some() { + return Err(async_graphql::Error::new( + "cannot create API key for a disabled service account", + )); + } + + let row = sqlx::query!( + r#" + WITH new_key AS ( + SELECT + internal.id_generator() AS id, + gen_random_uuid()::text AS secret + ) + INSERT INTO internal.api_keys (id, service_account_id, secret_hash, label, expires_at, created_by) + SELECT + nk.id, + $1, + crypt(nk.secret, gen_salt('bf')), + $2, + now() + $3::interval, + $4 + FROM new_key nk + RETURNING + id AS "id!: models::Id", + (SELECT secret FROM new_key) AS "secret!: String" + "#, + user_id, + label, + valid_for, + claims.sub, + ) + .fetch_one(&env.pg_pool) + .await?; + + use base64::Engine; + let payload = format!("{}:{}", row.id, row.secret); + let encoded = base64::engine::general_purpose::STANDARD.encode(payload.as_bytes()); + let full_secret = format!("flow_sa_{encoded}"); + + tracing::info!( + key_id = %row.id, + %user_id, + %label, + %claims.sub, + "created api key for service account" + ); + + Ok(CreateApiKeyResult { + key_id: row.id, + secret: full_secret, + }) + } + + /// Revoke (delete) an API key. + /// + /// The caller must have admin capability on the owning service account's prefix. + async fn revoke_api_key( + &self, + ctx: &Context<'_>, + key_id: models::Id, + ) -> async_graphql::Result { + let env = ctx.data::()?; + let claims = env.claims()?; + + let key_owner = sqlx::query!( + r#" + SELECT ak.service_account_id + FROM internal.api_keys ak + WHERE ak.id = $1 + "#, + key_id as models::Id, + ) + .fetch_optional(&env.pg_pool) + .await?; + + let key_owner = match key_owner { + Some(row) => row.service_account_id, + None => return Err(async_graphql::Error::new("API key not found")), + }; + + let sa = lookup_service_account(&env.pg_pool, key_owner).await?; + verify_authorization(env, &sa.prefix).await?; + + sqlx::query!("DELETE FROM internal.api_keys WHERE id = $1", key_id as models::Id) + .execute(&env.pg_pool) + .await?; + + tracing::info!( + %key_id, + service_account = %key_owner, + %claims.sub, + "revoked api key" + ); + + Ok(true) + } +} + +struct ServiceAccountRow { + prefix: String, + capability: models::Capability, + disabled_at: Option>, +} + +async fn lookup_service_account( + pg_pool: &sqlx::PgPool, + user_id: uuid::Uuid, +) -> async_graphql::Result { + let row = sqlx::query!( + r#" + SELECT + prefix AS "prefix!: String", + capability AS "capability!: models::Capability", + disabled_at AS "disabled_at: chrono::DateTime" + FROM internal.service_accounts + WHERE user_id = $1 + "#, + user_id, + ) + .fetch_optional(pg_pool) + .await?; + + match row { + Some(r) => Ok(ServiceAccountRow { + prefix: r.prefix, + capability: r.capability, + disabled_at: r.disabled_at, + }), + None => Err(async_graphql::Error::new("service account not found")), + } +} + +async fn verify_authorization( + envelope: &crate::Envelope, + catalog_prefix: &str, +) -> async_graphql::Result<()> { + let policy_result = crate::server::evaluate_names_authorization( + envelope.snapshot(), + envelope.claims()?, + models::Capability::Admin, + [catalog_prefix], + ); + let (_expiry, ()) = envelope.authorization_outcome(policy_result).await?; + Ok(()) +} + +#[cfg(test)] +mod test { + use crate::test_server; + + #[sqlx::test( + migrations = "../../supabase/migrations", + fixtures(path = "../../../fixtures", scripts("data_planes", "alice")) + )] + async fn test_service_account_lifecycle(pool: sqlx::PgPool) { + let _guard = test_server::init(); + + let server = test_server::TestServer::start( + pool.clone(), + test_server::snapshot(pool.clone(), true).await, + ) + .await; + + let alice_token = server.make_access_token( + uuid::Uuid::from_bytes([0x11; 16]), + Some("alice@example.test"), + ); + + // Create a bob user who does NOT have admin on aliceCo/. + sqlx::query("INSERT INTO auth.users (id, email) VALUES ('22222222-2222-2222-2222-222222222222', 'bob@example.test')") + .execute(&pool) + .await + .unwrap(); + + let bob_token = + server.make_access_token(uuid::Uuid::from_bytes([0x22; 16]), Some("bob@example.test")); + + // === Create a service account === + let create_response: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation($prefix: Prefix!, $capability: Capability!, $name: String!) { + createServiceAccount( + prefix: $prefix + capability: $capability + displayName: $name + ) { + userId + displayName + prefix + capability + disabledAt + apiKeys { keyId } + } + }"#, + "variables": { + "prefix": "aliceCo/", + "capability": "admin", + "name": "CI Deploy Bot" + } + }), + Some(&alice_token), + ) + .await; + + assert!( + create_response["errors"].is_null(), + "create should succeed: {create_response}" + ); + let sa = &create_response["data"]["createServiceAccount"]; + let sa_user_id = sa["userId"].as_str().expect("should have userId"); + assert_eq!(sa["displayName"], "CI Deploy Bot"); + assert_eq!(sa["prefix"], "aliceCo/"); + assert_eq!(sa["capability"], "admin"); + assert!(sa["disabledAt"].is_null()); + assert_eq!(sa["apiKeys"].as_array().unwrap().len(), 0); + + // === Bob cannot create a service account for aliceCo/ === + let unauthorized: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation { + createServiceAccount( + prefix: "aliceCo/" + capability: read + displayName: "hacker bot" + ) { userId } + }"# + }), + Some(&bob_token), + ) + .await; + + assert!(unauthorized["errors"].is_array()); + + // === Create an API key === + let create_key: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation($userId: UUID!, $label: String!, $validFor: String!) { + createApiKey( + userId: $userId + label: $label + validFor: $validFor + ) { + keyId + secret + } + }"#, + "variables": { + "userId": sa_user_id, + "label": "GitHub Actions", + "validFor": "P90D" + } + }), + Some(&alice_token), + ) + .await; + + assert!( + create_key["errors"].is_null(), + "create key should succeed: {create_key}" + ); + let key_data = &create_key["data"]["createApiKey"]; + let key_id = key_data["keyId"].as_str().expect("should have keyId"); + let secret = key_data["secret"].as_str().expect("should have secret"); + assert!(secret.starts_with("flow_sa_")); + + // === Exchange the API key for an access token === + let exchange_result = sqlx::query!( + "SELECT generate_access_token(api_key := $1) AS token", + secret, + ) + .fetch_one(&pool) + .await + .unwrap(); + + let token_json: serde_json::Value = + serde_json::from_value(exchange_result.token.unwrap()).unwrap(); + assert!(token_json["access_token"].is_string()); + + // === List service accounts === + let list: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + query { + serviceAccounts(filter: { catalogPrefix: { startsWith: "aliceCo/" } }) { + edges { + node { + userId + displayName + prefix + capability + apiKeys { keyId label } + } + } + } + }"# + }), + Some(&alice_token), + ) + .await; + + let edges = list["data"]["serviceAccounts"]["edges"] + .as_array() + .expect("should have edges"); + assert_eq!(edges.len(), 1); + assert_eq!(edges[0]["node"]["displayName"], "CI Deploy Bot"); + assert_eq!(edges[0]["node"]["apiKeys"].as_array().unwrap().len(), 1); + + // Bob sees no service accounts. + let bob_list: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + query { + serviceAccounts { edges { node { userId } } } + }"# + }), + Some(&bob_token), + ) + .await; + + let bob_edges = bob_list["data"]["serviceAccounts"]["edges"] + .as_array() + .expect("should have edges"); + assert_eq!(bob_edges.len(), 0); + + // === Revoke the API key === + let revoke: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation($keyId: Id!) { + revokeApiKey(keyId: $keyId) + }"#, + "variables": { "keyId": key_id } + }), + Some(&alice_token), + ) + .await; + + assert!(revoke["errors"].is_null(), "revoke should succeed: {revoke}"); + + // Exchanging the revoked key fails. + let exchange_fail = sqlx::query!( + "SELECT generate_access_token(api_key := $1) AS token", + secret, + ) + .fetch_one(&pool) + .await; + + assert!(exchange_fail.is_err()); + + // === Create a new key and then disable the service account === + let create_key2: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation($userId: UUID!, $label: String!, $validFor: String!) { + createApiKey(userId: $userId, label: $label, validFor: $validFor) { + keyId + secret + } + }"#, + "variables": { + "userId": sa_user_id, + "label": "temp key", + "validFor": "P30D" + } + }), + Some(&alice_token), + ) + .await; + + let secret2 = create_key2["data"]["createApiKey"]["secret"] + .as_str() + .unwrap(); + + let disable: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation($userId: UUID!) { + disableServiceAccount(userId: $userId) + }"#, + "variables": { "userId": sa_user_id } + }), + Some(&alice_token), + ) + .await; + + assert!( + disable["errors"].is_null(), + "disable should succeed: {disable}" + ); + + // API key from disabled account fails. + let exchange_disabled = sqlx::query!( + "SELECT generate_access_token(api_key := $1) AS token", + secret2, + ) + .fetch_one(&pool) + .await; + + assert!(exchange_disabled.is_err()); + + // Cannot create key for disabled account. + let key_while_disabled: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation($userId: UUID!, $label: String!, $validFor: String!) { + createApiKey(userId: $userId, label: $label, validFor: $validFor) { + keyId + secret + } + }"#, + "variables": { + "userId": sa_user_id, + "label": "should fail", + "validFor": "P30D" + } + }), + Some(&alice_token), + ) + .await; + + assert!(key_while_disabled["errors"].is_array()); + + // Disabling again fails. + let disable_again: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation($userId: UUID!) { + disableServiceAccount(userId: $userId) + }"#, + "variables": { "userId": sa_user_id } + }), + Some(&alice_token), + ) + .await; + + assert!(disable_again["errors"].is_array()); + + // === Re-enable the service account === + let enable: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation($userId: UUID!) { + enableServiceAccount(userId: $userId) + }"#, + "variables": { "userId": sa_user_id } + }), + Some(&alice_token), + ) + .await; + + assert!( + enable["errors"].is_null(), + "enable should succeed: {enable}" + ); + + // Re-enabled account can have new keys created. + let key_after_enable: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation($userId: UUID!, $label: String!, $validFor: String!) { + createApiKey(userId: $userId, label: $label, validFor: $validFor) { + keyId + secret + } + }"#, + "variables": { + "userId": sa_user_id, + "label": "post-enable key", + "validFor": "P90D" + } + }), + Some(&alice_token), + ) + .await; + + assert!( + key_after_enable["errors"].is_null(), + "create key after enable should succeed: {key_after_enable}" + ); + + let secret3 = key_after_enable["data"]["createApiKey"]["secret"] + .as_str() + .unwrap(); + + // Exchange works again. + let exchange_reenabled = sqlx::query!( + "SELECT generate_access_token(api_key := $1) AS token", + secret3, + ) + .fetch_one(&pool) + .await; + + assert!(exchange_reenabled.is_ok()); + + // Enabling an already enabled account fails. + let enable_again: serde_json::Value = server + .graphql( + &serde_json::json!({ + "query": r#" + mutation($userId: UUID!) { + enableServiceAccount(userId: $userId) + }"#, + "variables": { "userId": sa_user_id } + }), + Some(&alice_token), + ) + .await; + + assert!(enable_again["errors"].is_array()); + } +} diff --git a/supabase/migrations/20260513000000_service_accounts.sql b/supabase/migrations/20260513000000_service_accounts.sql new file mode 100644 index 00000000000..bea97e3ded5 --- /dev/null +++ b/supabase/migrations/20260513000000_service_accounts.sql @@ -0,0 +1,198 @@ +begin; + +-- Service accounts: non-human identities that authenticate via API keys +-- and are authorized through the same user_grants / role_grants system as humans. + +create table internal.service_accounts ( + user_id uuid primary key references auth.users (id), + prefix public.catalog_prefix not null, + capability public.grant_capability not null + constraint valid_capability check ( + capability = any (array[ + 'read'::public.grant_capability, + 'write'::public.grant_capability, + 'admin'::public.grant_capability + ]) + ), + display_name text not null, + created_by uuid not null references auth.users (id), + last_used_at timestamptz, + disabled_at timestamptz, + created_at timestamptz not null default now(), + updated_at timestamptz not null default now() +); + +comment on table internal.service_accounts is + 'Non-human identities that authenticate via API keys and are authorized through user_grants.'; + +-- API keys: long-lived credentials for service accounts, exchanged for short-lived JWTs +-- via generate_access_token. + +create table internal.api_keys ( + id public.flowid primary key not null default internal.id_generator(), + service_account_id uuid not null references auth.users (id), + secret_hash text not null, + label text not null, + expires_at timestamptz not null, + created_by uuid not null references auth.users (id), + last_used_at timestamptz, + created_at timestamptz not null default now() +); + +create index api_keys_service_account_id on internal.api_keys (service_account_id); + +comment on table internal.api_keys is + 'Long-lived credentials for service accounts, exchanged via generate_access_token for short-lived JWTs.'; + +-- Extend generate_access_token to accept API keys alongside refresh tokens. +-- The two input shapes are mutually exclusive and dispatched by which parameter is non-null. +-- Must drop the old 2-arg signature first since PostgreSQL treats different arities as separate overloads. + +drop function public.generate_access_token(public.flowid, text); + +create or replace function public.generate_access_token( + refresh_token_id public.flowid default null, + secret text default null, + api_key text default null +) returns json + language plpgsql security definer +as $$ +declare + rt refresh_tokens; + rt_new_secret text; + access_token text; + ak internal.api_keys; + ak_id public.flowid; + ak_secret text; + decoded bytea; + separator_pos int; +begin + + -- Dispatch: API key path vs refresh token path + if api_key is not null then + -- Strip the flow_sa_ prefix + if not starts_with(api_key, 'flow_sa_') then + raise 'api_key must start with flow_sa_'; + end if; + + decoded = decode(substring(api_key from 9), 'base64'); + separator_pos = position(':' in convert_from(decoded, 'UTF8')); + + if separator_pos = 0 then + raise 'malformed api_key payload'; + end if; + + ak_id = left(convert_from(decoded, 'UTF8'), separator_pos - 1)::public.flowid; + ak_secret = substring(convert_from(decoded, 'UTF8') from separator_pos + 1); + + select * into ak from internal.api_keys where internal.api_keys.id = ak_id; + + if not found then + raise 'invalid api key'; + end if; + + if ak.secret_hash <> crypt(ak_secret, ak.secret_hash) then + raise 'invalid api key'; + end if; + + if ak.expires_at < now() then + raise 'api key has expired'; + end if; + + -- Verify the service account is not disabled + if exists ( + select 1 from internal.service_accounts + where user_id = ak.service_account_id and disabled_at is not null + ) then + raise 'service account is disabled'; + end if; + + -- Update last_used_at on the key and the service account + update internal.api_keys set last_used_at = now() where id = ak.id; + update internal.service_accounts set last_used_at = now() where user_id = ak.service_account_id; + + select sign(json_build_object( + 'exp', trunc(extract(epoch from (now() + interval '1 hour'))), + 'iat', trunc(extract(epoch from (now()))), + 'sub', ak.service_account_id, + 'aud', 'authenticated', + 'role', 'authenticated' + ), internal.access_token_jwt_secret()) into access_token + limit 1; + + return json_build_object('access_token', access_token); + + elsif refresh_token_id is not null and secret is not null then + -- Original refresh token path (unchanged behavior) + select * into rt from refresh_tokens where + refresh_tokens.id = refresh_token_id; + + if not found then + raise 'could not find refresh_token with the given `refresh_token_id`'; + end if; + + if rt.hash <> crypt(secret, rt.hash) then + raise 'invalid secret provided'; + end if; + + if (rt.updated_at + rt.valid_for) < now() then + raise 'refresh_token has expired.'; + end if; + + select sign(json_build_object( + 'exp', trunc(extract(epoch from (now() + interval '1 hour'))), + 'iat', trunc(extract(epoch from (now()))), + 'sub', rt.user_id, + 'aud', 'authenticated', + 'role', 'authenticated' + ), internal.access_token_jwt_secret()) into access_token + limit 1; + + if rt.multi_use = false then + rt_new_secret = gen_random_uuid(); + update refresh_tokens + set + hash = crypt(rt_new_secret, gen_salt('bf')), + uses = (uses + 1), + updated_at = clock_timestamp() + where refresh_tokens.id = rt.id; + else + update refresh_tokens + set + uses = (uses + 1), + updated_at = clock_timestamp() + where refresh_tokens.id = rt.id; + end if; + + if rt_new_secret is null then + return json_build_object( + 'access_token', access_token + ); + else + return json_build_object( + 'access_token', access_token, + 'refresh_token', json_build_object( + 'id', rt.id, + 'secret', rt_new_secret + ) + ); + end if; + + else + raise 'must provide either api_key or (refresh_token_id, secret)'; + end if; + +commit; +end +$$; + +alter function public.generate_access_token(public.flowid, text, text) owner to postgres; + +comment on function public.generate_access_token(public.flowid, text, text) is ' +Generates a short-lived access token (JWT) from either: + - A refresh token (refresh_token_id + secret): existing behavior, rotates single-use tokens. + - An API key (api_key): for service accounts, validates the key and mints a JWT. +The two input shapes are mutually exclusive. +'; + +commit;