diff --git a/.env.example b/.env.example index 35440abbb..773c35ac2 100644 --- a/.env.example +++ b/.env.example @@ -38,6 +38,12 @@ AUTH_PROVIDER=DISABLED AUTH_Z_PROVIDER=DISABLED WORKER_ID=1 # MASTER_ENCRYPTION_KEY - add this for enabling secrets in local +# REDIS_URL="http://localhost:6379" +REDIS_POOL_SIZE="10" +REDIS_MAX_ATTEMPTS="10" +REDIS_CONN_TIMEOUT="1000" +# TTL in seconds +REDIS_KEY_TTL=604800 ################################################ ## Following values are to be set in KMS and not directly in ENV diff --git a/.github/workflows/ci_check_pr.yaml b/.github/workflows/ci_check_pr.yaml index 897cfca27..3a31525ca 100644 --- a/.github/workflows/ci_check_pr.yaml +++ b/.github/workflows/ci_check_pr.yaml @@ -44,7 +44,7 @@ env: jobs: formatting: name: Check formatting - runs-on: anton1 + runs-on: codebuild-superposition-${{ github.run_id }}-${{ github.run_attempt }} steps: - name: Checkout repository uses: actions/checkout@v4 @@ -101,7 +101,7 @@ jobs: test: name: Testing - runs-on: anton1 + runs-on: codebuild-superposition-${{ github.run_id }}-${{ github.run_attempt }} services: postgres: image: public.ecr.aws/docker/library/postgres:15-alpine3.21 @@ -195,7 +195,7 @@ jobs: java-build: name: Java build - runs-on: anton1 + runs-on: codebuild-superposition-${{ github.run_id }}-${{ github.run_attempt }} defaults: run: working-directory: clients/java @@ -252,7 +252,7 @@ jobs: provider-tests: name: Provider Tests - runs-on: anton1 + runs-on: codebuild-superposition-${{ github.run_id }}-${{ github.run_attempt }} strategy: matrix: provider: diff --git a/Cargo.lock b/Cargo.lock index 3e5c8a95a..01cefb8c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1641,7 +1641,6 @@ dependencies = [ "superposition_derives", "superposition_macros", "superposition_types", - "uuid", ] [[package]] @@ -2281,6 +2280,7 @@ dependencies = [ "cac_client", "chrono", "experimentation_client", + "fred", "juspay_diesel", "log", "reqwest", diff --git a/crates/context_aware_config/Cargo.toml b/crates/context_aware_config/Cargo.toml index 9cc13f4bc..1f52dd3d4 100644 --- a/crates/context_aware_config/Cargo.toml +++ b/crates/context_aware_config/Cargo.toml @@ -17,7 +17,7 @@ blake3 = { workspace = true } cac_client = { path = "../cac_client" } chrono = { workspace = true } diesel = { workspace = true, features = ["numeric"] } -fred = { workspace = true, optional = true, features = ["metrics"] } +fred = { workspace = true, features = ["metrics"] } itertools = { workspace = true } jsonlogic = { workspace = true } jsonschema = { workspace = true } @@ -35,11 +35,9 @@ superposition_types = { workspace = true, features = [ "diesel_derives", "server", ] } -uuid = { workspace = true } [features] disable_db_data_validation = ["superposition_types/disable_db_data_validation"] -high-performance-mode = ["dep:fred"] [lints] diff --git a/crates/context_aware_config/src/api/config/handlers.rs b/crates/context_aware_config/src/api/config/handlers.rs index 27a6157ba..34f1052f3 100644 --- a/crates/context_aware_config/src/api/config/handlers.rs +++ b/crates/context_aware_config/src/api/config/handlers.rs @@ -1,22 +1,16 @@ use std::collections::HashMap; -#[cfg(feature = "high-performance-mode")] -use actix_http::StatusCode; -#[cfg(feature = "high-performance-mode")] -use actix_web::http::header::ContentType; use actix_web::{ HttpRequest, HttpResponse, Scope, get, put, routes, web::{Data, Header, Json, Path, Query}, }; +use chrono::{DateTime, Utc}; use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper}; -#[cfg(feature = "high-performance-mode")] -use fred::interfaces::KeysInterface; use itertools::Itertools; use serde_json::{Map, Value, json}; -#[cfg(feature = "high-performance-mode")] -use service_utils::service::types::AppHeader; use service_utils::{ helpers::fetch_dimensions_info_map, + redis::{CONFIG_KEY_SUFFIX, LAST_MODIFIED_KEY_SUFFIX, read_through_cache}, service::types::{AppState, DbConnection, WorkspaceContext}, }; use superposition_core::{ @@ -24,8 +18,6 @@ use superposition_core::{ serialize_to_toml, }; use superposition_derives::authorized; -#[cfg(feature = "high-performance-mode")] -use superposition_macros::response_error; use superposition_macros::{bad_argument, unexpected_error}; use superposition_types::{ Cac, Condition, Config, Context, DBConnection, DimensionInfo, OverrideWithKeys, @@ -45,33 +37,31 @@ use superposition_types::{ }, schema::config_versions::dsl as config_versions, }, - result as superposition, + result::{self as superposition}, }; -use crate::api::context::{self, helpers::query_description}; use crate::{ - api::config::helpers::{ - add_audit_id_to_header, add_config_version_to_header, - add_last_modified_to_header, generate_config_from_version, get_config_version, - get_max_created_at, is_not_modified, + api::{ + config::helpers::{ + add_config_version_to_header, add_last_modified_to_header, + generate_config_from_version, get_config_version, get_max_created_at, + is_not_modified, + }, + context::{self, helpers::query_description}, }, helpers::{generate_cac, generate_detailed_cac}, }; use super::helpers::{apply_prefix_filter_to_config, resolve, setup_query_data}; -#[allow(clippy::let_and_return)] pub fn endpoints() -> Scope { - let scope = Scope::new("") + Scope::new("") .service(get_handler) .service(get_toml_handler) .service(resolve_handler) .service(reduce_handler) .service(list_version_handler) - .service(get_version_handler); - #[cfg(feature = "high-performance-mode")] - let scope = scope.service(get_fast_handler); - scope + .service(get_version_handler) } fn generate_subsets(map: &Map) -> Vec> { @@ -476,103 +466,6 @@ async fn reduce_handler( Ok(HttpResponse::Ok().json(config)) } -#[cfg(feature = "high-performance-mode")] -#[authorized] -#[get("/fast")] -async fn get_fast_handler( - workspace_context: WorkspaceContext, - state: Data, -) -> superposition::Result { - use fred::interfaces::MetricsInterface; - - log::debug!("Started redis fetch"); - let config_key = format!("{}::cac_config", *workspace_context.schema_name); - let last_modified_at_key = format!( - "{}::cac_config::last_modified_at", - *workspace_context.schema_name - ); - let audit_id_key = - format!("{}::cac_config::audit_id", *workspace_context.schema_name); - let config_version_key = format!( - "{}::cac_config::config_version", - *workspace_context.schema_name - ); - let client = state.redis.next_connected(); - let config = client.get::(config_key).await; - let metrics = client.take_latency_metrics(); - let network_metrics = client.take_network_latency_metrics(); - log::trace!( - "Network metrics for config fetch in milliseconds :: max: {}, min: {}, avg: {}; Latency metrics :: max: {}, min: {}, avg: {}", - network_metrics.max, - network_metrics.min, - network_metrics.avg, - metrics.max, - metrics.min, - metrics.avg - ); - match config { - Ok(config) => { - let mut response = HttpResponse::Ok(); - if let Ok(max_created_at) = - client.get::(last_modified_at_key).await - { - let metrics = client.take_latency_metrics(); - let network_metrics = client.take_network_latency_metrics(); - log::trace!( - "Network metrics max-created-by fetch in milliseconds :: max: {}, min: {}, avg: {}; Latency metrics :: max: {}, min: {}, avg: {}", - network_metrics.max, - network_metrics.min, - network_metrics.avg, - metrics.max, - metrics.min, - metrics.avg - ); - response - .insert_header((AppHeader::LastModified.to_string(), max_created_at)); - } - if let Ok(audit_id) = client.get::(audit_id_key).await { - let metrics = client.take_latency_metrics(); - let network_metrics = client.take_network_latency_metrics(); - log::trace!( - "Network metrics for audit ID in milliseconds :: max: {}, min: {}, avg: {}; Latency metrics :: max: {}, min: {}, avg: {}", - network_metrics.max, - network_metrics.min, - network_metrics.avg, - metrics.max, - metrics.min, - metrics.avg - ); - response.insert_header((AppHeader::XAuditId.to_string(), audit_id)); - } - if let Ok(config_version) = - client.get::, String>(config_version_key).await - { - let metrics = client.take_latency_metrics(); - let network_metrics = client.take_network_latency_metrics(); - log::trace!( - "Network metrics for version ID in milliseconds :: max: {}, min: {}, avg: {}; Latency metrics :: max: {}, min: {}, avg: {}", - network_metrics.max, - network_metrics.min, - network_metrics.avg, - metrics.max, - metrics.min, - metrics.avg - ); - add_config_version_to_header(&config_version, &mut response); - } - response.insert_header(ContentType::json()); - Ok(response.body(config)) - } - Err(err) => { - log::error!("Could not get config in redis due to {}", err); - Err(response_error!( - StatusCode::INTERNAL_SERVER_ERROR, - "could not fetch config, please try /config API" - )) - } - } -} - #[authorized] #[routes] #[get("")] @@ -580,18 +473,25 @@ async fn get_fast_handler( async fn get_handler( req: HttpRequest, body: Option>, - db_conn: DbConnection, dimension_params: DimensionQuery, query_filters: superposition_query::Query, workspace_context: WorkspaceContext, + state: Data, ) -> superposition::Result { - let DbConnection(mut conn) = db_conn; - - let max_created_at = get_max_created_at(&mut conn, &workspace_context.schema_name) - .map_err(|e| log::error!("failed to fetch max timestamp from event_log: {e}")) - .ok(); - - log::info!("Max created at: {max_created_at:?}"); + let mut response = HttpResponse::Ok(); + let is_smithy = req.method() != actix_web::http::Method::GET; + let schema_name = &workspace_context.schema_name; + let max_created_at = read_through_cache::>( + format!("{}{LAST_MODIFIED_KEY_SUFFIX}", **schema_name), + schema_name, + &state.redis, + &state.db_pool, + |conn| get_max_created_at(conn, schema_name), + ) + .await + .ok(); + + log::trace!("Max created at: {max_created_at:?}"); let is_not_modified = is_not_modified(max_created_at, &req); @@ -600,33 +500,43 @@ async fn get_handler( } let query_filters = query_filters.into_inner(); - let mut version = get_config_version(&query_filters.version, &workspace_context)?; - - let mut config = generate_config_from_version( - &mut version, - &mut conn, - &workspace_context.schema_name, - )?; - + let version = + get_config_version(&query_filters.version, &workspace_context, &state).await?; + + let mut config = read_through_cache::( + format!("{}::{}{CONFIG_KEY_SUFFIX}", **schema_name, version), + schema_name, + &state.redis, + &state.db_pool, + |conn| { + generate_config_from_version( + &mut Some(version), + conn, + &workspace_context.schema_name, + ) + .map_err(|err| { + log::error!("failed to generate config from version with error: {}", err); + // can't throw the AppError from here because fetch_from_redis_else_writeback + // expects a DieselResult error type, so we log the actual error and return NotFound + // which will trigger generate_cac in the fallback and if + // that also fails then it will return the actual error + diesel::result::Error::NotFound + }) + }, + ) + .await + .map_err(|e| unexpected_error!("failed to generate config: {}", e))?; config = apply_prefix_filter_to_config(&query_filters.prefix, config)?; - let is_smithy: bool; let context = if req.method() == actix_web::http::Method::GET { - is_smithy = false; dimension_params.into_inner() } else { - // Assuming smithy. - is_smithy = true; body.map_or_else(QueryMap::default, |body| body.into_inner().context.into()) }; if !context.is_empty() { config = config.filter_by_dimensions(&context); } - - let mut response = HttpResponse::Ok(); add_last_modified_to_header(max_created_at, is_smithy, &mut response); - add_audit_id_to_header(&mut conn, &mut response, &workspace_context.schema_name); - add_config_version_to_header(&version, &mut response); - + add_config_version_to_header(&Some(version), &mut response); Ok(response.json(config)) } @@ -659,7 +569,6 @@ async fn get_toml_handler( let mut response = HttpResponse::Ok(); add_last_modified_to_header(max_created_at, false, &mut response); - add_audit_id_to_header(&mut conn, &mut response, &workspace_context.schema_name); response.insert_header(("Content-Type", "application/toml")); Ok(response.body(toml_str)) @@ -674,46 +583,77 @@ async fn resolve_handler( req: HttpRequest, body: Option>, merge_strategy: Header, - db_conn: DbConnection, dimension_params: DimensionQuery, query_filters: superposition_query::Query, workspace_context: WorkspaceContext, state: Data, ) -> superposition::Result { - let DbConnection(mut conn) = db_conn; let query_filters = query_filters.into_inner(); - - let max_created_at = get_max_created_at(&mut conn, &workspace_context.schema_name) - .map_err(|e| log::error!("failed to fetch max timestamp from event_log : {e}")) - .ok(); + let schema_name = &workspace_context.schema_name; + + let max_created_at = read_through_cache::>( + format!("{}{LAST_MODIFIED_KEY_SUFFIX}", **schema_name), + schema_name, + &state.redis, + &state.db_pool, + |conn| get_max_created_at(conn, schema_name), + ) + .await + .ok(); if is_not_modified(max_created_at, &req) { return Ok(HttpResponse::NotModified().finish()); } - let mut config_version = - get_config_version(&query_filters.version, &workspace_context)?; - let mut config = generate_config_from_version( - &mut config_version, - &mut conn, - &workspace_context.schema_name, - )?; + let config_version = + get_config_version(&query_filters.version, &workspace_context, &state).await?; + + let mut config = read_through_cache::( + format!("{}::{}{CONFIG_KEY_SUFFIX}", **schema_name, config_version,), + schema_name, + &state.redis, + &state.db_pool, + |conn| { + generate_config_from_version( + &mut Some(config_version), + conn, + &workspace_context.schema_name, + ) + .map_err(|err| { + log::error!("failed to generate config from version with error: {}", err); + // can't throw the AppError from here because fetch_from_redis_else_writeback + // expects a DieselResult error type, so we log the actual error and return NotFound + // which will trigger generate_cac in the fallback and if + // that also fails then it will return the actual error + diesel::result::Error::NotFound + }) + }, + ) + .await + .map_err(|e| unexpected_error!("failed to generate config: {}", e))?; + let (is_smithy, query_data) = setup_query_data(&req, &body, &dimension_params)?; - let resolved_config = resolve( - &mut config, - query_data, - merge_strategy, - &mut conn, - &query_filters, - &workspace_context, - &state.master_encryption_key, - )?; + let resolved_config = { + let mut conn = state.db_pool.get().map_err(|e| { + log::error!("Unable to get db connection from pool, error: {e}"); + unexpected_error!("Unable to get db connection from pool, error: {}", e) + })?; + // TODO: resolve doesn't return diesel::error, figure that out + resolve( + &mut config, + query_data, + merge_strategy, + &mut conn, + &query_filters, + &workspace_context, + &state.master_encryption_key, + )? + }; let mut resp = HttpResponse::Ok(); add_last_modified_to_header(max_created_at, is_smithy, &mut resp); - add_audit_id_to_header(&mut conn, &mut resp, &workspace_context.schema_name); - add_config_version_to_header(&config_version, &mut resp); + add_config_version_to_header(&Some(config_version), &mut resp); Ok(resp.json(resolved_config)) } diff --git a/crates/context_aware_config/src/api/config/helpers.rs b/crates/context_aware_config/src/api/config/helpers.rs index eecc32e5d..84af4b414 100644 --- a/crates/context_aware_config/src/api/config/helpers.rs +++ b/crates/context_aware_config/src/api/config/helpers.rs @@ -1,26 +1,24 @@ use actix_http::header::HeaderValue; use actix_web::{ HttpRequest, HttpResponseBuilder, - web::{Header, Json}, + web::{Data, Header, Json}, }; use cac_client::{eval_cac, eval_cac_with_reasoning}; use chrono::{DateTime, Timelike, Utc}; use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl, dsl::max}; use serde_json::{Map, Value}; -use service_utils::service::types::{ - AppHeader, EncryptionKey, SchemaName, WorkspaceContext, +use service_utils::{ + redis::{CONFIG_VERSION_KEY_SUFFIX, read_through_cache}, + service::types::{AppHeader, AppState, EncryptionKey, SchemaName, WorkspaceContext}, }; use superposition_macros::{bad_argument, db_error, unexpected_error}; use superposition_types::{ Config, DBConnection, api::config::{ContextPayload, MergeStrategy, ResolveConfigQuery}, custom_query::{CommaSeparatedStringQParams, DimensionQuery, QueryMap}, - database::schema::{ - config_versions::dsl as config_versions, event_log::dsl as event_log, - }, + database::schema::config_versions::dsl as config_versions, result as superposition, }; -use uuid::Uuid; use crate::helpers::{evaluate_remote_cohorts, generate_cac}; @@ -35,45 +33,40 @@ pub fn apply_prefix_filter_to_config( Ok(config) } -pub fn get_config_version( +pub async fn get_config_version( version: &Option, workspace_context: &WorkspaceContext, -) -> superposition::Result> { - version.as_ref().map_or_else( - || Ok(workspace_context.settings.config_version), - |version| { - if *version == *"latest" { - log::trace!("latest config request"); - return Ok(None); - } - version.parse::().map_or_else( - |e| { - log::error!( - "failed to decode version as integer: {version}, error: {e}" - ); - Err(bad_argument!("version is not of type integer")) + state: &Data, +) -> superposition::Result { + match version.as_ref() { + Some(v) if *v != *"latest" => v.parse::().map_or_else( + |e| { + log::error!("failed to decode version as integer: {v}, error: {e}"); + Err(bad_argument!("version is not of type integer")) + }, + Ok, + ), + _ => match workspace_context.settings.config_version { + Some(v) => Ok(v), + None => read_through_cache::( + format!( + "{}{CONFIG_VERSION_KEY_SUFFIX}", + *workspace_context.schema_name + ), + &workspace_context.schema_name, + &state.redis, + &state.db_pool, + |conn| { + config_versions::config_versions + .select(config_versions::id) + .order_by(config_versions::created_at.desc()) + .schema_name(&workspace_context.schema_name) + .first::(conn) }, - |v| Ok(Some(v)), ) + .await + .map_err(|e| unexpected_error!("Config version not found due to: {}", e)), }, - ) -} - -pub fn add_audit_id_to_header( - conn: &mut DBConnection, - resp_builder: &mut HttpResponseBuilder, - schema_name: &SchemaName, -) { - if let Ok(uuid) = event_log::event_log - .select(event_log::id) - .filter(event_log::table_name.eq("contexts")) - .order_by(event_log::timestamp.desc()) - .schema_name(schema_name) - .first::(conn) - { - resp_builder.insert_header((AppHeader::XAuditId.to_string(), uuid.to_string())); - } else { - log::error!("Failed to fetch contexts from event_log"); } } diff --git a/crates/context_aware_config/src/api/context/handlers.rs b/crates/context_aware_config/src/api/context/handlers.rs index 4660569e9..0f1eabd96 100644 --- a/crates/context_aware_config/src/api/context/handlers.rs +++ b/crates/context_aware_config/src/api/context/handlers.rs @@ -8,7 +8,7 @@ use bigdecimal::BigDecimal; use chrono::Utc; use diesel::{ Connection, ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl, - SelectableHelper, delete, + SelectableHelper, dsl::sql, sql_types::{Bool, Text}, }; @@ -48,15 +48,12 @@ use superposition_types::{ result::{self as superposition, AppError}, }; -use crate::helpers::add_config_version; -#[cfg(feature = "high-performance-mode")] -use crate::helpers::put_config_in_redis; use crate::{ api::context::{ helpers::{query_description, validate_ctx}, operations, }, - helpers::validate_change_reason, + helpers::{add_config_version, put_config_in_redis, validate_change_reason}, }; pub fn endpoints() -> Scope { @@ -141,15 +138,13 @@ async fn create_handler( })?; let DbConnection(mut conn) = db_conn; - - #[cfg(feature = "high-performance-mode")] - put_config_in_redis( + let _ = put_config_in_redis( version_id, &state, &workspace_context.schema_name, &mut conn, ) - .await?; + .await; let data = WebhookData { payload: &put_response, @@ -226,15 +221,13 @@ async fn update_handler( })?; let DbConnection(mut conn) = db_conn; - - #[cfg(feature = "high-performance-mode")] - put_config_in_redis( + let _ = put_config_in_redis( version_id, &state, &workspace_context.schema_name, &mut conn, ) - .await?; + .await; let data = WebhookData { payload: &override_resp, @@ -335,14 +328,13 @@ async fn move_handler( let DbConnection(mut conn) = db_conn; - #[cfg(feature = "high-performance-mode")] - put_config_in_redis( + let _ = put_config_in_redis( version_id, &state, &workspace_context.schema_name, &mut conn, ) - .await?; + .await; let data = WebhookData { payload: vec![&move_response.deleted_context, &move_response.context], @@ -598,16 +590,13 @@ async fn delete_handler( })?; let DbConnection(mut conn) = db_conn; - - #[cfg(feature = "high-performance-mode")] - put_config_in_redis( + let _ = put_config_in_redis( version_id, &state, &workspace_context.schema_name, &mut conn, ) - .await?; - + .await; let data = WebhookData { payload: &deleted_ctx, resource: Resource::Context, @@ -739,7 +728,7 @@ async fn bulk_operations_handler( response.push(ContextBulkResponse::Replace(update_resp)); } ContextAction::Delete(ctx_id) => { - let deleted_ctx = delete(contexts) + let deleted_ctx = diesel::delete(contexts) .filter(id.eq(&ctx_id)) .schema_name(&workspace_context.schema_name) .get_result::(transaction_conn) @@ -836,14 +825,13 @@ async fn bulk_operations_handler( Ok((response, version_id)) })?; - #[cfg(feature = "high-performance-mode")] - put_config_in_redis( + let _ = put_config_in_redis( version_id, &state, &workspace_context.schema_name, &mut conn, ) - .await?; + .await; let data = WebhookData { payload: &webhook_contexts, @@ -954,15 +942,13 @@ async fn weight_recompute_handler( let version_id = add_config_version(&state, tags, config_version_desc, transaction_conn, &workspace_context.schema_name)?; Ok(version_id) })?; - - #[cfg(feature = "high-performance-mode")] - put_config_in_redis( + let _ = put_config_in_redis( config_version_id, &state, &workspace_context.schema_name, &mut conn, ) - .await?; + .await; let data = WebhookData { payload: &response, diff --git a/crates/context_aware_config/src/api/default_config/handlers.rs b/crates/context_aware_config/src/api/default_config/handlers.rs index a1074e215..8c7b2ac76 100644 --- a/crates/context_aware_config/src/api/default_config/handlers.rs +++ b/crates/context_aware_config/src/api/default_config/handlers.rs @@ -43,8 +43,6 @@ use superposition_types::{ result as superposition, }; -#[cfg(feature = "high-performance-mode")] -use crate::helpers::put_config_in_redis; use crate::{ api::{ context::helpers::validation_function_executor, @@ -53,7 +51,7 @@ use crate::{ types::FunctionInfo, }, }, - helpers::{add_config_version, validate_change_reason}, + helpers::{add_config_version, put_config_in_redis, validate_change_reason}, }; pub fn endpoints() -> Scope { @@ -166,14 +164,13 @@ async fn create_handler( Ok(version_id) })?; - #[cfg(feature = "high-performance-mode")] - put_config_in_redis( + let _ = put_config_in_redis( version_id, &state, &workspace_context.schema_name, &mut conn, ) - .await?; + .await; let data = WebhookData { payload: &default_config, @@ -322,14 +319,13 @@ async fn update_handler( Ok((val, version_id)) })?; - #[cfg(feature = "high-performance-mode")] - put_config_in_redis( + let _ = put_config_in_redis( version_id, &state, &workspace_context.schema_name, &mut conn, ) - .await?; + .await; let data = WebhookData { payload: &db_row, @@ -557,14 +553,13 @@ async fn delete_handler( } })?; - #[cfg(feature = "high-performance-mode")] - put_config_in_redis( + let _ = put_config_in_redis( version_id, &state, &workspace_context.schema_name, &mut conn, ) - .await?; + .await; let data = WebhookData { payload: &default_config, diff --git a/crates/context_aware_config/src/api/dimension/handlers.rs b/crates/context_aware_config/src/api/dimension/handlers.rs index bb3ad8a17..88bfffbea 100644 --- a/crates/context_aware_config/src/api/dimension/handlers.rs +++ b/crates/context_aware_config/src/api/dimension/handlers.rs @@ -38,7 +38,6 @@ use superposition_types::{ }; use crate::api::dimension::validations::allow_primitive_types; -#[cfg(feature = "high-performance-mode")] use crate::helpers::put_config_in_redis; use crate::{ api::dimension::{ @@ -242,14 +241,13 @@ async fn create_handler( } })?; - #[cfg(feature = "high-performance-mode")] - put_config_in_redis( + let _ = put_config_in_redis( version_id, &state, &workspace_context.schema_name, &mut conn, ) - .await?; + .await; let data = WebhookData { payload: &inserted_dimension, @@ -480,14 +478,13 @@ async fn update_handler( Ok((result, is_mandatory, version_id)) })?; - #[cfg(feature = "high-performance-mode")] - put_config_in_redis( + let _ = put_config_in_redis( version_id, &state, &workspace_context.schema_name, &mut conn, ) - .await?; + .await; let data = WebhookData { payload: &result, @@ -668,15 +665,13 @@ async fn delete_handler( } })?; - #[cfg(feature = "high-performance-mode")] - put_config_in_redis( + let _ = put_config_in_redis( version_id, &state, &workspace_context.schema_name, &mut conn, ) - .await?; - + .await; let data = WebhookData { payload: &dimension_data, resource: Resource::Dimension, diff --git a/crates/context_aware_config/src/helpers.rs b/crates/context_aware_config/src/helpers.rs index 67554bc7e..b9495cf56 100644 --- a/crates/context_aware_config/src/helpers.rs +++ b/crates/context_aware_config/src/helpers.rs @@ -4,20 +4,19 @@ use actix_web::{ http::header::{HeaderMap, HeaderName, HeaderValue}, web::Data, }; -#[cfg(feature = "high-performance-mode")] -use chrono::DateTime; -use chrono::Utc; +use chrono::{DateTime, Utc}; use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper}; -#[cfg(feature = "high-performance-mode")] -use fred::interfaces::KeysInterface; +use fred::{interfaces::KeysInterface, types::Expiration}; use serde_json::{Map, Value, json}; +use service_utils::{ + helpers::get_from_env_or_default, + redis::{CONFIG_KEY_SUFFIX, CONFIG_VERSION_KEY_SUFFIX, LAST_MODIFIED_KEY_SUFFIX}, +}; use service_utils::{ helpers::{fetch_dimensions_info_map, generate_snowflake_id}, service::types::{AppState, EncryptionKey, SchemaName, WorkspaceContext}, }; use superposition_macros::{db_error, unexpected_error, validation_error}; -#[cfg(feature = "high-performance-mode")] -use superposition_types::database::schema::event_log::dsl as event_log; use superposition_types::{ Cac, Condition, Config, Context, DBConnection, DefaultConfigInfo, DefaultConfigsWithSchema, DetailedConfig, DimensionInfo, OverrideWithKeys, Overrides, @@ -43,9 +42,6 @@ use superposition_types::{ result as superposition, }; -#[cfg(feature = "high-performance-mode")] -use uuid::Uuid; - use crate::{ api::{ context::helpers::validation_function_executor, @@ -226,46 +222,64 @@ pub fn add_config_version( Ok(version_id) } -#[cfg(feature = "high-performance-mode")] pub async fn put_config_in_redis( version_id: i64, state: &Data, schema_name: &SchemaName, db_conn: &mut DBConnection, ) -> superposition::Result<()> { + // Only perform Redis operations if Redis is configured + let redis_pool = match &state.redis { + Some(pool) => pool, + None => { + log::debug!("Redis not configured, skipping cache update"); + return Ok(()); + } + }; + let key_ttl: i64 = get_from_env_or_default("REDIS_KEY_TTL", 604800); + let expiration = Some(Expiration::EX(key_ttl)); let raw_config = generate_cac(db_conn, schema_name)?; - let parsed_config = serde_json::to_string(&json!(raw_config)).map_err(|e| { + let parsed_config = serde_json::to_string(&raw_config).map_err(|e| { log::error!("failed to convert cac config to string: {}", e); unexpected_error!("could not convert cac config to string") })?; - let config_key = format!("{}::cac_config", **schema_name); - let last_modified_at_key = format!("{}::cac_config::last_modified_at", **schema_name); - let audit_id_key = format!("{}::cac_config::audit_id", **schema_name); - let config_version_key = format!("{}::cac_config::config_version", **schema_name); + let config_key = format!("{}::{}{CONFIG_KEY_SUFFIX}", **schema_name, version_id); + let last_modified_at_key = format!("{}{LAST_MODIFIED_KEY_SUFFIX}", **schema_name); + let config_version_key = format!("{}{CONFIG_VERSION_KEY_SUFFIX}", **schema_name); let last_modified = DateTime::to_rfc2822(&Utc::now()); - let _ = state - .redis - .set::<(), String, String>(config_key, parsed_config, None, None, false) - .await; - let _ = state - .redis - .set::<(), String, String>(last_modified_at_key, last_modified, None, None, false) - .await; - if let Ok(uuid) = event_log::event_log - .select(event_log::id) - .filter(event_log::table_name.eq("contexts")) - .order_by(event_log::timestamp.desc()) - .first::(db_conn) - { - let _ = state - .redis - .set::<(), String, String>(audit_id_key, uuid.to_string(), None, None, false) - .await; - } - let _ = state - .redis - .set::<(), String, i64>(config_version_key, version_id, None, None, false) - .await; + redis_pool + .set::<(), String, String>( + config_key, + parsed_config, + expiration.clone(), + None, + false, + ) + .await + .map_err(|e| { + log::warn!("failed to set config in redis: {}", e); + unexpected_error!("failed to set config in redis") + })?; + redis_pool + .set::<(), String, String>( + last_modified_at_key, + last_modified, + expiration.clone(), + None, + false, + ) + .await + .map_err(|e| { + log::warn!("failed to set last_modified_key in redis: {}", e); + unexpected_error!("failed to set last_modified_key in redis") + })?; + redis_pool + .set::<(), String, i64>(config_version_key, version_id, expiration, None, false) + .await + .map_err(|e| { + log::warn!("failed to set config_version_key in redis: {}", e); + unexpected_error!("failed to set config_version_keyx in redis") + })?; Ok(()) } diff --git a/crates/experimentation_platform/Cargo.toml b/crates/experimentation_platform/Cargo.toml index 4ec80e083..efad1b1ab 100644 --- a/crates/experimentation_platform/Cargo.toml +++ b/crates/experimentation_platform/Cargo.toml @@ -16,6 +16,7 @@ cac_client = { path = "../cac_client" } chrono = { workspace = true } diesel = { workspace = true } experimentation_client = { path = "../experimentation_client" } +fred = { workspace = true, features = ["metrics"] } log = { workspace = true } reqwest = { workspace = true } serde = { workspace = true } diff --git a/crates/experimentation_platform/src/api/experiment_groups/handlers.rs b/crates/experimentation_platform/src/api/experiment_groups/handlers.rs index 4ffe7f970..d181a6f39 100644 --- a/crates/experimentation_platform/src/api/experiment_groups/handlers.rs +++ b/crates/experimentation_platform/src/api/experiment_groups/handlers.rs @@ -9,13 +9,15 @@ use diesel::{ }; use serde_json::Value; use service_utils::{ + db::run_query, helpers::{generate_snowflake_id, get_from_env_or_default}, + redis::{EXPERIMENT_GROUPS_LIST_KEY_SUFFIX, read_through_cache}, service::types::{AppState, DbConnection, WorkspaceContext}, }; use superposition_derives::authorized; use superposition_macros::{bad_argument, unexpected_error}; use superposition_types::{ - PaginatedResponse, SortBy, User, + DBConnection, IsEmpty, PaginatedResponse, SortBy, User, api::experiment_groups::{ ExpGroupCreateRequest, ExpGroupFilters, ExpGroupMemberRequest, ExpGroupUpdateRequest, SortOn, @@ -39,7 +41,8 @@ use superposition_types::{ use crate::api::{ experiment_groups::helpers::{ add_members, create_system_generated_experiment_group, - fetch_and_validate_members, fetch_experiment_group, remove_members, + fetch_and_validate_members, fetch_experiment_group, + put_experiment_groups_in_redis, remove_members, validate_experiment_group_constraints, }, experiments::{ @@ -140,6 +143,12 @@ async fn create_handler( .get_result::(transaction_conn)?; Ok(new_experiment_group) })?; + let _ = put_experiment_groups_in_redis( + &state.redis, + &mut conn, + &workspace_context.schema_name, + ) + .await; Ok(Json(new_experiment_group)) } @@ -184,6 +193,12 @@ async fn update_handler( .returning(ExperimentGroup::as_returning()) .schema_name(&workspace_context.schema_name) .get_result(&mut conn)?; + let _ = put_experiment_groups_in_redis( + &state.redis, + &mut conn, + &workspace_context.schema_name, + ) + .await; Ok(Json(updated_group)) } @@ -233,6 +248,12 @@ async fn add_members_handler( &user, ) })?; + let _ = put_experiment_groups_in_redis( + &state.redis, + &mut conn, + &workspace_context.schema_name, + ) + .await; Ok(experiment_group) } @@ -275,6 +296,12 @@ async fn remove_members_handler( &user, ) })?; + let _ = put_experiment_groups_in_redis( + &state.redis, + &mut conn, + &workspace_context.schema_name, + ) + .await; Ok(experiment_group) } @@ -284,9 +311,50 @@ async fn list_handler( workspace_context: WorkspaceContext, pagination_params: superposition_query::Query, filters: superposition_query::Query, - db_conn: DbConnection, + state: Data, ) -> superposition::Result>> { - let DbConnection(mut conn) = db_conn; + let key = format!( + "{}{}", + *workspace_context.schema_name, EXPERIMENT_GROUPS_LIST_KEY_SUFFIX + ); + let read_from_redis = pagination_params.all.is_some_and(|e| e) && filters.is_empty(); + if read_from_redis { + read_through_cache::>( + key, + &workspace_context.schema_name, + &state.redis, + &state.db_pool, + |conn| { + list_experiment_groups_db( + &pagination_params, + filters, + conn, + &workspace_context, + ) + }, + ) + .await + .map(Json) + .map_err(|e| unexpected_error!(e)) + } else { + run_query(&state.db_pool, |conn| { + list_experiment_groups_db( + &pagination_params, + filters, + conn, + &workspace_context, + ) + }) + .map(Json) + } +} + +fn list_experiment_groups_db( + pagination_params: &superposition_query::Query, + filters: superposition_query::Query, + conn: &mut DBConnection, + workspace_context: &WorkspaceContext, +) -> superposition::DieselResult> { let query_builder = |filters: &ExpGroupFilters| { let mut builder = experiment_groups::experiment_groups .schema_name(&workspace_context.schema_name) @@ -323,22 +391,21 @@ async fn list_handler( (SortOn::Name, SortBy::Asc) => base_query.order(experiment_groups::name.asc()), }; if let Some(true) = pagination_params.all { - let result: ExperimentGroups = - base_query.get_results::(&mut conn)?; - return Ok(Json(PaginatedResponse::all(result))); + let result: ExperimentGroups = base_query.get_results::(conn)?; + return Ok(PaginatedResponse::all(result)); } - let total_items = count_query.count().get_result(&mut conn)?; + let total_items = count_query.count().get_result(conn)?; let limit = pagination_params.count.unwrap_or(10); let offset = (pagination_params.page.unwrap_or(1) - 1) * limit; let query = base_query.limit(limit).offset(offset); - let data = query.load::(&mut conn)?; + let data = query.load::(conn)?; let total_pages = (total_items as f64 / limit as f64).ceil() as i64; - Ok(Json(PaginatedResponse { + Ok(PaginatedResponse { total_pages, total_items, data, - })) + }) } #[authorized] @@ -364,30 +431,39 @@ async fn delete_handler( exp_group_id: web::Path, mut db_conn: DbConnection, user: User, + state: Data, ) -> superposition::Result> { let id = exp_group_id.into_inner(); - db_conn.transaction::, superposition::AppError, _>(|conn| { - let marked_group = diesel::update(experiment_groups::experiment_groups) - .filter(experiment_groups::id.eq(&id)) - .set(( - experiment_groups::last_modified_by.eq(user.get_email()), - experiment_groups::last_modified_at.eq(chrono::Utc::now()), - )) - .returning(ExperimentGroup::as_returning()) - .schema_name(&workspace_context.schema_name) - .get_result(conn)?; - if !marked_group.member_experiment_ids.is_empty() { - return Err(bad_argument!( - "Cannot delete experiment group {} since it has members", - marked_group.name - )); - } - diesel::delete(experiment_groups::experiment_groups) - .filter(experiment_groups::id.eq(&id)) - .schema_name(&workspace_context.schema_name) - .execute(conn)?; - Ok(Json(marked_group)) - }) + let result = db_conn + .transaction::, superposition::AppError, _>(|conn| { + let marked_group = diesel::update(experiment_groups::experiment_groups) + .filter(experiment_groups::id.eq(&id)) + .set(( + experiment_groups::last_modified_by.eq(user.get_email()), + experiment_groups::last_modified_at.eq(chrono::Utc::now()), + )) + .returning(ExperimentGroup::as_returning()) + .schema_name(&workspace_context.schema_name) + .get_result(conn)?; + if !marked_group.member_experiment_ids.is_empty() { + return Err(bad_argument!( + "Cannot delete experiment group {} since it has members", + marked_group.name + )); + } + diesel::delete(experiment_groups::experiment_groups) + .filter(experiment_groups::id.eq(&id)) + .schema_name(&workspace_context.schema_name) + .execute(conn)?; + Ok(Json(marked_group)) + }); + let _ = put_experiment_groups_in_redis( + &state.redis, + &mut db_conn, + &workspace_context.schema_name, + ) + .await; + result } // Remove this after backfilling experiment groups @@ -447,6 +523,12 @@ async fn backfill_handler( } Ok(results) })?; + let _ = put_experiment_groups_in_redis( + &state.redis, + &mut conn, + &workspace_context.schema_name, + ) + .await; Ok(Json(experiment_groups)) } diff --git a/crates/experimentation_platform/src/api/experiment_groups/helpers.rs b/crates/experimentation_platform/src/api/experiment_groups/helpers.rs index 66ff15b47..2d51a59ea 100644 --- a/crates/experimentation_platform/src/api/experiment_groups/helpers.rs +++ b/crates/experimentation_platform/src/api/experiment_groups/helpers.rs @@ -4,14 +4,19 @@ use actix_web::web::{Data, Json}; use diesel::{ BoolExpressionMethods, ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper, }; +use fred::{ + prelude::{KeysInterface, RedisPool}, + types::Expiration, +}; use serde_json::Value; use service_utils::{ - helpers::generate_snowflake_id, + helpers::{generate_snowflake_id, get_from_env_or_default}, + redis::EXPERIMENT_GROUPS_LIST_KEY_SUFFIX, service::types::{AppState, SchemaName, WorkspaceContext}, }; use superposition_macros::{bad_argument, unexpected_error}; use superposition_types::{ - Condition, DBConnection, User, + Condition, DBConnection, PaginatedResponse, User, api::experiment_groups::ExpGroupMemberRequest, database::{ models::{ @@ -452,3 +457,44 @@ pub fn fetch_experiment_group( .get_result::(conn)?; Ok(experiment_group) } + +pub async fn put_experiment_groups_in_redis( + redis_pool: &Option, + conn: &mut DBConnection, + schema_name: &SchemaName, +) -> superposition::Result<()> { + let pool = match redis_pool { + Some(pool) => pool, + None => { + log::debug!("Redis not configured, skipping experiment groups cache update"); + return Ok(()); + } + }; + + let experiment_group_list: Vec = + experiment_groups::experiment_groups + .order(experiment_groups::last_modified_at.desc()) + .schema_name(schema_name) + .load::(conn)?; + + let paginated_response = PaginatedResponse::all(experiment_group_list); + + let serialized = serde_json::to_string(&paginated_response).map_err(|e| { + log::error!("Failed to serialize experiment groups for redis: {}", e); + unexpected_error!("Failed to serialize experiment groups for redis: {}", e) + })?; + + let key = format!("{}{EXPERIMENT_GROUPS_LIST_KEY_SUFFIX}", **schema_name); + let key_ttl: i64 = get_from_env_or_default("REDIS_KEY_TTL", 604800); + let expiration = Some(Expiration::EX(key_ttl)); + pool.next_connected() + .set::<(), String, String>(key, serialized, expiration, None, false) + .await + .map_err(|e| { + log::warn!("Failed to write experiment groups to redis: {}", e); + unexpected_error!("Failed to write experiment groups to redis: {}", e) + })?; + + log::debug!("Successfully updated experiment groups cache in Redis"); + Ok(()) +} diff --git a/crates/experimentation_platform/src/api/experiments/handlers.rs b/crates/experimentation_platform/src/api/experiments/handlers.rs index e995a539b..031d7c25d 100644 --- a/crates/experimentation_platform/src/api/experiments/handlers.rs +++ b/crates/experimentation_platform/src/api/experiments/handlers.rs @@ -23,10 +23,15 @@ use experimentation_client::{ use reqwest::{Method, StatusCode}; use serde_json::{Map, Value}; use service_utils::{ + db::run_query, helpers::{ WebhookData, construct_request_headers, execute_webhook_call, fetch_dimensions_info_map, generate_snowflake_id, request, }, + redis::{ + EXPERIMENT_GROUPS_LIST_KEY_SUFFIX, EXPERIMENTS_LAST_MODIFIED_KEY_SUFFIX, + EXPERIMENTS_LIST_KEY_SUFFIX, read_through_cache, + }, service::types::{ AppHeader, AppState, CustomHeaders, DbConnection, WorkspaceContext, }, @@ -34,8 +39,8 @@ use service_utils::{ use superposition_derives::authorized; use superposition_macros::{bad_argument, unexpected_error}; use superposition_types::{ - Cac, Condition, Contextual, DimensionInfo, Exp, ListResponse, Overrides, - PaginatedResponse, Resource, SortBy, User, + Cac, Condition, Contextual, DBConnection, DimensionInfo, Exp, ListResponse, + Overrides, PaginatedResponse, Resource, SortBy, User, api::{ DimensionMatchStrategy, context::{ @@ -81,8 +86,8 @@ use crate::api::{ }, experiments::{ helpers::{ - fetch_and_validate_change_reason_with_function, validate_control_overrides, - validate_delete_experiment_variants, + fetch_and_validate_change_reason_with_function, put_experiments_in_redis, + validate_control_overrides, validate_delete_experiment_variants, }, types::StartedByChangeSet, }, @@ -374,6 +379,10 @@ async fn create_handler( Ok(inserted_experiment) })?; + // Update Redis cache with active experiments and experiment groups + let _ = + put_experiments_in_redis(&state.redis, &mut conn, &workspace_context.schema_name) + .await; let response = ExperimentResponse::from(inserted_experiment); let data = WebhookData { @@ -430,6 +439,11 @@ async fn conclude_handler( ) .await?; + // Update Redis cache with active experiments and experiment groups + let _ = + put_experiments_in_redis(&state.redis, &mut conn, &workspace_context.schema_name) + .await; + let experiment_response = ExperimentResponse::from(response); let data = WebhookData { @@ -697,6 +711,11 @@ async fn discard_handler( ) .await?; + // Update Redis cache with active experiments and experiment groups + let _ = + put_experiments_in_redis(&state.redis, &mut conn, &workspace_context.schema_name) + .await; + let experiment_response = ExperimentResponse::from(response); let data = WebhookData { @@ -828,18 +847,29 @@ pub async fn discard( } pub async fn get_applicable_variants_helper( - db_conn: &mut PooledConnection>, context: Map, dimensions_info: &HashMap, identifier: String, workspace_context: &WorkspaceContext, + app_state: &Data, ) -> superposition::Result<(Vec, HashMap)> { use superposition_types::database::schema::experiments::dsl; - let experiment_groups = experiment_groups::experiment_groups - .schema_name(&workspace_context.schema_name) - .load::(db_conn)?; - + let experiment_groups = read_through_cache::>( + format!( + "{}{EXPERIMENT_GROUPS_LIST_KEY_SUFFIX}", + *workspace_context.schema_name + ), + &workspace_context.schema_name, + &app_state.redis, + &app_state.db_pool, + |conn| { + experiment_groups::experiment_groups + .schema_name(&workspace_context.schema_name) + .load::(conn) + }, + ) + .await?; let context = evaluate_local_cohorts(dimensions_info, &context); let buckets = @@ -850,21 +880,41 @@ pub async fn get_applicable_variants_helper( .filter_map(|(_, bucket)| bucket.experiment_id.parse::().ok()) .collect::>(); - let exps = dsl::experiments - .filter( - dsl::id - .eq_any(exp_ids) - .and(dsl::status.eq(ExperimentStatusType::INPROGRESS)), - ) - .schema_name(&workspace_context.schema_name) - .load::(db_conn)? - .into_iter() - .map(|exp| { - let exp_response = ExperimentResponse::from(exp); - let id = exp_response.id.clone(); - (id, exp_response) - }) - .collect::>(); + let exps = read_through_cache::>( + format!( + "{}{EXPERIMENTS_LIST_KEY_SUFFIX}", + *workspace_context.schema_name + ), + &workspace_context.schema_name, + &app_state.redis, + &app_state.db_pool, + |conn| { + let experiments = dsl::experiments + .filter( + dsl::id + .eq_any(&exp_ids) + .and(dsl::status.eq(ExperimentStatusType::INPROGRESS)), + ) + .schema_name(&workspace_context.schema_name) + .load::(conn)?; + Ok(PaginatedResponse { + data: experiments + .into_iter() + .map(ExperimentResponse::from) + .collect(), + total_pages: 1, + total_items: exp_ids.len() as i64, + }) + }, + ) + .await? + .data + .into_iter() + .map(|exp| { + let id = exp.id.clone(); + (id, exp) + }) + .collect::>(); let applicable_variants = get_applicable_variants_from_group_response(&exps, &context, &buckets); @@ -879,12 +929,11 @@ pub async fn get_applicable_variants_helper( async fn get_applicable_variants_handler( workspace_context: WorkspaceContext, req: HttpRequest, - db_conn: DbConnection, + state: Data, req_body: Option>, query_data: Option>, dimension_params: Option>, ) -> superposition::Result>, Json>>> { - let DbConnection(mut conn) = db_conn; let (context, identifier) = match (req.method().clone(), query_data, dimension_params, req_body) { ( @@ -905,17 +954,17 @@ async fn get_applicable_variants_handler( } }; - let dimensions_info = - fetch_dimensions_info_map(&mut conn, &workspace_context.schema_name)?; + let di = run_query(&state.db_pool, |conn| { + fetch_dimensions_info_map(conn, &workspace_context.schema_name) + })?; let (applicable_variants, exps) = get_applicable_variants_helper( - &mut conn, context, - &dimensions_info, + &di, identifier, &workspace_context, + &state, ) .await?; - let variants = exps .into_iter() .filter_map(|(_, experiment)| { @@ -943,15 +992,25 @@ async fn list_handler( pagination_params: superposition_query::Query, filters: superposition_query::Query, dimension_params: DimensionQuery, - db_conn: DbConnection, + state: Data, ) -> superposition::Result { - let DbConnection(mut conn) = db_conn; - - let max_event_timestamp: Option> = event_log::event_log - .filter(event_log::table_name.eq("experiments")) - .select(diesel::dsl::max(event_log::timestamp)) - .schema_name(&workspace_context.schema_name) - .first(&mut conn)?; + let max_event_timestamp = read_through_cache::>>( + format!( + "{}{EXPERIMENTS_LAST_MODIFIED_KEY_SUFFIX}", + *workspace_context.schema_name + ), + &workspace_context.schema_name, + &state.redis, + &state.db_pool, + |conn| { + event_log::event_log + .filter(event_log::table_name.eq("experiments")) + .select(diesel::dsl::max(event_log::timestamp)) + .schema_name(&workspace_context.schema_name) + .first(conn) + }, + ) + .await?; let last_modified = req .headers() @@ -966,7 +1025,55 @@ async fn list_handler( if max_event_timestamp.is_some() && max_event_timestamp < last_modified { return Ok(HttpResponse::NotModified().finish()); }; + let show_all = pagination_params.all.unwrap_or_default(); + let read_from_redis = show_all + && filters + .status + .clone() + .is_some_and(|v| *v == ExperimentStatusType::active_list()) + && dimension_params.is_empty(); + if read_from_redis { + let response = read_through_cache::>( + format!( + "{}{EXPERIMENTS_LIST_KEY_SUFFIX}", + *workspace_context.schema_name + ), + &workspace_context.schema_name, + &state.redis, + &state.db_pool, + |conn| { + list_experiments_db( + pagination_params.clone(), + filters.clone(), + dimension_params.clone(), + conn, + &workspace_context, + ) + }, + ) + .await?; + Ok(HttpResponse::Ok().json(response)) + } else { + let paginated_response = run_query(&state.db_pool, |conn| { + list_experiments_db( + pagination_params, + filters, + dimension_params, + conn, + &workspace_context, + ) + })?; + Ok(HttpResponse::Ok().json(paginated_response)) + } +} +fn list_experiments_db( + pagination_params: superposition_query::Query, + filters: superposition_query::Query, + dimension_params: DimensionQuery, + conn: &mut DBConnection, + workspace_context: &WorkspaceContext, +) -> superposition::DieselResult> { let dimension_params = dimension_params.into_inner(); let query_builder = |filters: &ExperimentListFilters| { @@ -1034,7 +1141,7 @@ async fn list_handler( || filters.global_experiments_only.unwrap_or_default(); let paginated_response = if perform_in_memory_filter { - let all_experiments: Vec = base_query.load(&mut conn)?; + let all_experiments: Vec = base_query.load(conn)?; let filtered_experiments = if filters.global_experiments_only.unwrap_or_default() { all_experiments @@ -1043,7 +1150,7 @@ async fn list_handler( .collect() } else { let dimensions_info = - fetch_dimensions_info_map(&mut conn, &workspace_context.schema_name)?; + fetch_dimensions_info_map(conn, &workspace_context.schema_name)?; let dimension_params = evaluate_local_cohorts_skip_unresolved( &dimensions_info, &dimension_params, @@ -1087,13 +1194,13 @@ async fn list_handler( } } } else if show_all { - let result = base_query.load::(&mut conn)?; + let result = base_query.load::(conn)?; PaginatedResponse::all(result.into_iter().map(ExperimentResponse::from).collect()) } else { let count_query = query_builder(&filters); - let number_of_experiments = count_query.count().get_result(&mut conn)?; + let number_of_experiments = count_query.count().get_result(conn)?; let query = base_query.limit(limit).offset(offset); - let experiment_list = query.load::(&mut conn)?; + let experiment_list = query.load::(conn)?; PaginatedResponse { total_pages: (number_of_experiments as f64 / limit as f64).ceil() as i64, @@ -1105,7 +1212,7 @@ async fn list_handler( } }; - Ok(HttpResponse::Ok().json(paginated_response)) + Ok(paginated_response) } #[authorized] @@ -1291,6 +1398,10 @@ async fn ramp_handler( let (_, config_version_id) = fetch_cac_config(&state, &workspace_context).await?; let experiment_response = ExperimentResponse::from(updated_experiment); + let _ = + put_experiments_in_redis(&state.redis, &mut conn, &workspace_context.schema_name) + .await; + let webhook_event = if matches!(experiment.status, ExperimentStatusType::CREATED) { WebhookEvent::ExperimentStarted } else { @@ -1632,6 +1743,11 @@ async fn update_handler( Ok(updated_experiment) })?; + // Update Redis cache with active experiments and experiment groups + let _ = + put_experiments_in_redis(&state.redis, &mut conn, &workspace_context.schema_name) + .await; + let experiment_response = ExperimentResponse::from(updated_experiment); let data = WebhookData { @@ -1684,6 +1800,11 @@ async fn pause_handler( ) .await?; + // Update Redis cache with active experiments and experiment groups + let _ = + put_experiments_in_redis(&state.redis, &mut conn, &workspace_context.schema_name) + .await; + let experiment_response = ExperimentResponse::from(response); let data = WebhookData { @@ -1772,6 +1893,11 @@ async fn resume_handler( ) .await?; + // Update Redis cache with active experiments and experiment groups + let _ = + put_experiments_in_redis(&state.redis, &mut conn, &workspace_context.schema_name) + .await; + let experiment_response = ExperimentResponse::from(response); let data = WebhookData { diff --git a/crates/experimentation_platform/src/api/experiments/helpers.rs b/crates/experimentation_platform/src/api/experiments/helpers.rs index 0f3ff8a24..316be8e0e 100644 --- a/crates/experimentation_platform/src/api/experiments/helpers.rs +++ b/crates/experimentation_platform/src/api/experiments/helpers.rs @@ -9,17 +9,24 @@ use diesel::{ pg::PgConnection, r2d2::{ConnectionManager, PooledConnection}, }; +use fred::{ + prelude::{KeysInterface, RedisPool}, + types::Expiration, +}; use serde_json::{Map, Value}; -use service_utils::service::types::{ - AppState, ExperimentationFlags, SchemaName, WorkspaceContext, +use service_utils::{ + helpers::get_from_env_or_default, + redis::EXPERIMENTS_LIST_KEY_SUFFIX, + service::types::{AppState, ExperimentationFlags, SchemaName, WorkspaceContext}, }; use superposition_macros::{bad_argument, unexpected_error}; use superposition_types::{ - Condition, Config, DBConnection, Exp, Overrides, User, + Condition, Config, DBConnection, Exp, Overrides, PaginatedResponse, User, api::{ I64Update, config::{ConfigQuery, ResolveConfigQuery}, experiment_groups::ExpGroupMemberRequest, + experiments::ExperimentResponse, functions::{ CHANGE_REASON_VALIDATION_FN_NAME, FunctionExecutionRequest, FunctionExecutionResponse, Stage, @@ -795,3 +802,51 @@ pub async fn fetch_and_validate_change_reason_with_function( } } } + +pub async fn put_experiments_in_redis( + redis_pool: &Option, + conn: &mut DBConnection, + schema_name: &SchemaName, +) -> superposition::Result<()> { + let pool = match redis_pool { + Some(pool) => pool, + None => { + log::debug!("Redis not configured, skipping experiments cache update"); + return Ok(()); + } + }; + + let active_statuses = ExperimentStatusType::active_list(); + + let experiment_list: Vec = experiments::experiments + .filter(experiments::status.eq_any(active_statuses)) + .order(experiments::last_modified.desc()) + .schema_name(schema_name) + .load::(conn)?; + + let experiment_responses: Vec = experiment_list + .into_iter() + .map(ExperimentResponse::from) + .collect(); + + let paginated_response = PaginatedResponse::all(experiment_responses); + + let serialized = serde_json::to_string(&paginated_response).map_err(|e| { + log::error!("Failed to serialize experiments for redis: {}", e); + unexpected_error!("Failed to serialize experiments for redis: {}", e) + })?; + + let key = format!("{}{EXPERIMENTS_LIST_KEY_SUFFIX}", **schema_name); + let key_ttl: i64 = get_from_env_or_default("REDIS_KEY_TTL", 604800); + let expiration = Some(Expiration::EX(key_ttl)); + pool.next_connected() + .set::<(), String, String>(key, serialized, expiration, None, false) + .await + .map_err(|e| { + log::warn!("Failed to write experiments to redis: {}", e); + unexpected_error!("Failed to write experiments to redis: {}", e) + })?; + + log::debug!("Successfully updated experiments cache in Redis"); + Ok(()) +} diff --git a/crates/service_utils/Cargo.toml b/crates/service_utils/Cargo.toml index 067d52ff0..336fce23c 100644 --- a/crates/service_utils/Cargo.toml +++ b/crates/service_utils/Cargo.toml @@ -17,7 +17,7 @@ base64 = { workspace = true } chrono = { workspace = true } derive_more = { workspace = true } diesel = { workspace = true } -fred = { workspace = true, optional = true } +fred = { workspace = true, features = ["metrics"] } futures-util = { workspace = true } log = { workspace = true } once_cell = { workspace = true } @@ -41,8 +41,6 @@ url = { workspace = true } urlencoding = "~2.1.2" tracing-actix-web = { workspace = true } -[features] -high-performance-mode = ["dep:fred"] [lints] workspace = true diff --git a/crates/service_utils/src/helpers.rs b/crates/service_utils/src/helpers.rs index cfb967fc5..a04ca74b4 100644 --- a/crates/service_utils/src/helpers.rs +++ b/crates/service_utils/src/helpers.rs @@ -13,7 +13,7 @@ use diesel::{ RunQueryDsl, SelectableHelper, }; -use log::info; +use log::warn; use once_cell::sync::Lazy; use regex::Regex; use reqwest::{ @@ -80,7 +80,7 @@ where match std::env::var(name) { Ok(env) => env.parse().unwrap(), Err(err) => { - info!( + warn!( "{name} ENV failed to load due to {err}, using default value {default}" ); default @@ -213,11 +213,10 @@ pub fn parse_config_tags( pub fn get_workspace( workspace_schema_name: &SchemaName, db_conn: &mut DBConnection, -) -> result::Result { - let workspace = workspaces::dsl::workspaces +) -> Result { + workspaces::dsl::workspaces .filter(workspaces::workspace_schema_name.eq(workspace_schema_name.to_string())) - .get_result::(db_conn)?; - Ok(workspace) + .get_result::(db_conn) } fn has_pattern_in_headers(headers: &CustomHeaders) -> (bool, bool) { @@ -465,7 +464,7 @@ where pub fn fetch_dimensions_info_map( conn: &mut DBConnection, schema_name: &SchemaName, -) -> result::Result> { +) -> result::DieselResult> { let dimensions_map = dimensions::table .select((dimension, DimensionInfo::as_select())) .schema_name(schema_name) diff --git a/crates/service_utils/src/lib.rs b/crates/service_utils/src/lib.rs index 772b7a016..5f227dc5e 100644 --- a/crates/service_utils/src/lib.rs +++ b/crates/service_utils/src/lib.rs @@ -5,4 +5,5 @@ pub mod encryption; pub mod extensions; pub mod helpers; pub mod middlewares; +pub mod redis; pub mod service; diff --git a/crates/service_utils/src/middlewares/workspace_context.rs b/crates/service_utils/src/middlewares/workspace_context.rs index d1c76af19..cd65d0338 100644 --- a/crates/service_utils/src/middlewares/workspace_context.rs +++ b/crates/service_utils/src/middlewares/workspace_context.rs @@ -10,9 +10,11 @@ use actix_web::{ }; use futures_util::future::LocalBoxFuture; use regex::Regex; -use superposition_macros::{bad_argument, unexpected_error}; +use superposition_macros::bad_argument; +use superposition_types::database::models::Workspace; use crate::helpers::get_workspace; +use crate::redis::read_through_cache; use crate::{ extensions::HttpRequestExt, service::types::{AppState, OrganisationId, SchemaName, WorkspaceContext}, @@ -136,15 +138,15 @@ where } (true, Some(workspace_id)) => { let schema = format!("{}_{}", *organisation, *workspace_id); - let schema_name = SchemaName(schema); - let workspace_settings = { - let mut db_conn = app_state - .db_pool - .get() - .map_err(|err| unexpected_error!("{}", err))?; - - get_workspace(&schema_name, &mut db_conn)? - }; + let schema_name = SchemaName(schema.clone()); + let workspace_settings = read_through_cache::( + schema, + &schema_name, + &app_state.redis, + &app_state.db_pool, + |db_conn| get_workspace(&schema_name, db_conn), + ) + .await?; req.extensions_mut().insert(workspace_id.clone()); req.extensions_mut().insert(WorkspaceContext { diff --git a/crates/service_utils/src/redis.rs b/crates/service_utils/src/redis.rs new file mode 100644 index 000000000..8663a2616 --- /dev/null +++ b/crates/service_utils/src/redis.rs @@ -0,0 +1,112 @@ +use fred::{ + prelude::{KeysInterface, RedisClient, RedisPool}, + types::Expiration, +}; +use serde::{Serialize, de::DeserializeOwned}; +use superposition_types::{DBConnection, result as superposition}; + +use crate::{ + db::{PgSchemaConnectionPool, run_query}, + helpers::get_from_env_or_default, + service::types::SchemaName, +}; + +pub const LAST_MODIFIED_KEY_SUFFIX: &str = "::cac_config::last_modified_at"; +pub const CONFIG_VERSION_KEY_SUFFIX: &str = "::cac_config::config_version"; +pub const CONFIG_KEY_SUFFIX: &str = "::cac_config"; +pub const EXPERIMENTS_LIST_KEY_SUFFIX: &str = "::experiments_list"; +pub const EXPERIMENTS_LAST_MODIFIED_KEY_SUFFIX: &str = "::experiments::last_modified_at"; +pub const EXPERIMENT_GROUPS_LIST_KEY_SUFFIX: &str = "::experiment_groups_list"; + +/// Fetch data from Redis if available, else fall back to database call and write back to Redis +/// if redis is disabled read from the database directly +/// the fallback function is expected to return Result +/// You can use move closures to capture variables in the run_query +pub async fn read_through_cache( + key: String, + schema_name: &SchemaName, + redis_pool: &Option, + db_pool: &PgSchemaConnectionPool, + fallback_fn: impl FnOnce(&mut DBConnection) -> superposition::DieselResult, +) -> superposition::Result +where + T: Serialize + DeserializeOwned, +{ + let Some(pool) = redis_pool else { + log::trace!("Redis pool not configured, using fallback"); + return run_query(db_pool, fallback_fn); + }; + + let client = pool.next_connected(); + + if let Ok(data) = get_data_from_redis(key.clone(), client).await { + return Ok(data); + } + + log::info!( + "Cache miss for schema {}, falling back to DB", + **schema_name, + ); + + let data = run_query(db_pool, fallback_fn)?; + + // Best-effort writeback — don't fail the request if Redis write fails + if let Ok(serialized) = serde_json::to_string(&data) { + let key_ttl: i64 = get_from_env_or_default("REDIS_KEY_TTL", 604800); + let _ = client + .set::<(), String, String>( + key, + serialized, + Some(Expiration::EX(key_ttl)), + None, + false, + ) + .await + .map_err(|e| log::error!("Failed to write back to Redis: {e}")); + } else { + log::error!("Failed to serialize data for Redis writeback"); + } + + Ok(data) +} + +pub async fn get_data_from_redis( + key_name: String, + client: &RedisClient, +) -> Result +where + T: DeserializeOwned, +{ + use fred::interfaces::MetricsInterface; + + log::debug!("Started redis fetch for config"); + let data = { + // this block is so that the client connection is dropped + // before we move on to parsing the config + let data = client + .get::(key_name.clone()) + .await + .map_err(|e| { + log::error!("Failed to fetch {key_name} from redis: {}", e); + format!("Failed to fetch {key_name} from redis due to: {}", e) + })?; + let metrics = client.take_latency_metrics(); + let network_metrics = client.take_network_latency_metrics(); + log::trace!( + "Network metrics for data fetch in milliseconds :: max: {}, min: {}, avg: {}; Latency metrics :: max: {}, min: {}, avg: {}", + network_metrics.max, + network_metrics.min, + network_metrics.avg, + metrics.max, + metrics.min, + metrics.avg + ); + data + }; + + let value = serde_json::from_str::(&data).map_err(|e| { + log::error!("Failed to parse value from redis: {}", e); + format!("Failed to parse value from redis due to: {}", e) + })?; + Ok(value) +} diff --git a/crates/service_utils/src/service/types.rs b/crates/service_utils/src/service/types.rs index 4ae3804ae..ea2858f1b 100644 --- a/crates/service_utils/src/service/types.rs +++ b/crates/service_utils/src/service/types.rs @@ -34,7 +34,6 @@ pub enum AppEnv { #[strum(serialize_all = "kebab-case")] pub enum AppHeader { XConfigVersion, - XAuditId, LastModified, } @@ -53,8 +52,7 @@ pub struct AppState { pub tenant_middleware_exclusion_list: HashSet, pub service_prefix: String, pub superposition_token: String, - #[cfg(feature = "high-performance-mode")] - pub redis: fred::clients::RedisPool, + pub redis: Option, pub http_client: reqwest::Client, pub master_encryption_key: Option, } diff --git a/crates/superposition/Cargo.toml b/crates/superposition/Cargo.toml index 9126862b8..3c7b1a1a6 100644 --- a/crates/superposition/Cargo.toml +++ b/crates/superposition/Cargo.toml @@ -17,7 +17,7 @@ context_aware_config = { path = "../context_aware_config" } diesel = { workspace = true } dotenv = "0.15.0" experimentation_platform = { path = "../experimentation_platform" } -fred = { workspace = true, optional = true } +fred = { workspace = true } frontend = { path = "../frontend" } idgenerator = "2.0.0" leptos = { workspace = true } @@ -42,12 +42,5 @@ tracing-subscriber = { workspace = true } tracing-actix-web = { workspace = true } json-subscriber = { version = "0.2.7", features = ["tracing-log"] } -[features] -high-performance-mode = [ - "context_aware_config/high-performance-mode", - "service_utils/high-performance-mode", - "dep:fred", -] - [lints] workspace = true diff --git a/crates/superposition/src/app_state.rs b/crates/superposition/src/app_state.rs index 016a1d3fb..0083ac256 100644 --- a/crates/superposition/src/app_state.rs +++ b/crates/superposition/src/app_state.rs @@ -1,12 +1,9 @@ use std::{ collections::HashSet, sync::{Arc, Mutex}, + time::Duration, }; -#[cfg(feature = "high-performance-mode")] -use std::time::Duration; - -#[cfg(feature = "high-performance-mode")] use fred::{ clients::RedisPool, interfaces::ClientLike, @@ -39,41 +36,45 @@ pub async fn get( let snowflake_generator = Arc::new(Mutex::new(SnowflakeIdGenerator::new(1, 1))); - #[cfg(feature = "high-performance-mode")] - let redis_pool = { - let redis_url = - get_from_env_or_default("REDIS_URL", String::from("http://localhost:6379")); - let redis_pool_size = get_from_env_or_default("REDIS_POOL_SIZE", 10); - let redis_max_attempts = get_from_env_or_default("REDIS_MAX_ATTEMPTS", 10); - let redis_connection_timeout = - get_from_env_or_default("REDIS_CONN_TIMEOUT", 1000); - let config = RedisConfig::from_url(&redis_url).unwrap_or_else(|_| { - panic!("Failed to create RedisConfig from url {}", redis_url) - }); - let reconnect_policy = ReconnectPolicy::new_constant(redis_max_attempts, 100); - let redis_pool = RedisPool::new( - config, - Some(PerformanceConfig { - auto_pipeline: true, - ..Default::default() - }), - Some(ConnectionConfig { - connection_timeout: Duration::from_millis(redis_connection_timeout), - ..Default::default() - }), - Some(reconnect_policy), - redis_pool_size, - ) - .map_err(|e| format!("Could not connect to redis due to {e}")) - .unwrap(); + // Initialize Redis pool only if REDIS_URL is explicitly set (not default) + let redis_pool = match std::env::var("REDIS_URL") { + Ok(redis_url) if !redis_url.is_empty() => { + let redis_pool_size = get_from_env_or_default("REDIS_POOL_SIZE", 10); + let redis_max_attempts = get_from_env_or_default("REDIS_MAX_ATTEMPTS", 10); + let redis_connection_timeout = + get_from_env_or_default("REDIS_CONN_TIMEOUT", 1000); + let config = RedisConfig::from_url(&redis_url).unwrap_or_else(|_| { + panic!("Failed to create RedisConfig from url {}", redis_url) + }); + let reconnect_policy = ReconnectPolicy::new_constant(redis_max_attempts, 100); + let redis_pool = RedisPool::new( + config, + Some(PerformanceConfig { + auto_pipeline: true, + ..Default::default() + }), + Some(ConnectionConfig { + connection_timeout: Duration::from_millis(redis_connection_timeout), + ..Default::default() + }), + Some(reconnect_policy), + redis_pool_size, + ) + .map_err(|e| format!("Could not connect to redis due to {e}")) + .unwrap(); - redis_pool.connect(); - redis_pool - .wait_for_connect() - .await - .expect("Failed to connect to Redis"); + redis_pool.connect(); + redis_pool + .wait_for_connect() + .await + .expect("Failed to connect to Redis"); - redis_pool + Some(redis_pool) + } + _ => { + log::info!("REDIS_URL not set, Redis caching disabled"); + None + } }; AppState { @@ -106,7 +107,6 @@ pub async fn get( .collect::>(), service_prefix, superposition_token: get_superposition_token(kms_client, &app_env).await, - #[cfg(feature = "high-performance-mode")] redis: redis_pool, http_client: reqwest::Client::new(), master_encryption_key, diff --git a/crates/superposition/src/resolve/handlers.rs b/crates/superposition/src/resolve/handlers.rs index dc54bdc28..014eab07c 100644 --- a/crates/superposition/src/resolve/handlers.rs +++ b/crates/superposition/src/resolve/handlers.rs @@ -2,16 +2,22 @@ use actix_web::{ HttpRequest, HttpResponse, Scope, routes, web::{Data, Header, Json}, }; +use chrono::{DateTime, Utc}; use context_aware_config::api::config::helpers::{ - add_audit_id_to_header, add_config_version_to_header, add_last_modified_to_header, + add_config_version_to_header, add_last_modified_to_header, generate_config_from_version, get_config_version, get_max_created_at, is_not_modified, resolve, setup_query_data, }; use experimentation_platform::api::experiments::handlers::get_applicable_variants_helper; use serde_json::{Map, Value}; -use service_utils::service::types::{AppState, DbConnection, WorkspaceContext}; +use service_utils::{ + redis::{CONFIG_KEY_SUFFIX, LAST_MODIFIED_KEY_SUFFIX, read_through_cache}, + service::types::{AppState, WorkspaceContext}, +}; use superposition_derives::authorized; +use superposition_macros::unexpected_error; use superposition_types::{ + Config, api::config::{ContextPayload, MergeStrategy, ResolveConfigQuery}, custom_query::{self as superposition_query, CustomQuery, DimensionQuery, QueryMap}, result as superposition, @@ -32,65 +38,93 @@ async fn resolve_with_exp_handler( req: HttpRequest, body: Option>, merge_strategy: Header, - db_conn: DbConnection, dimension_params: DimensionQuery, query_filters: superposition_query::Query, identifier_query: superposition_query::Query, workspace_context: WorkspaceContext, state: Data, ) -> superposition::Result { - let DbConnection(mut conn) = db_conn; let query_filters = query_filters.into_inner(); let identifier_query = identifier_query.into_inner(); - let max_created_at = get_max_created_at(&mut conn, &workspace_context.schema_name) - .map_err(|e| log::error!("failed to fetch max timestamp from event_log : {e}")) - .ok(); + let schema_name = workspace_context.schema_name.clone(); + + let max_created_at = read_through_cache::>( + format!("{}{LAST_MODIFIED_KEY_SUFFIX}", *schema_name), + &schema_name, + &state.redis, + &state.db_pool, + |conn| get_max_created_at(conn, &schema_name), + ) + .await + .ok(); if identifier_query.identifier.is_none() && is_not_modified(max_created_at, &req) { return Ok(HttpResponse::NotModified().finish()); } let (is_smithy, mut query_data) = setup_query_data(&req, &body, &dimension_params)?; - let mut config_version = - get_config_version(&query_filters.version, &workspace_context)?; - // This is needed as `generate_config_from_version` updates config_version value - // in case nothing was found either from query params or workspace settings - // This value is separately needed, as in the following check the value before the modification is required - let config_ver = config_version.to_owned(); + let config_version = + get_config_version(&query_filters.version, &workspace_context, &state).await?; - let mut config = generate_config_from_version( - &mut config_version, - &mut conn, - &workspace_context.schema_name, - )?; + let mut config = read_through_cache::( + format!("{}::{}{CONFIG_KEY_SUFFIX}", *schema_name, config_version), + &schema_name, + &state.redis, + &state.db_pool, + |conn| { + generate_config_from_version( + &mut Some(config_version), + conn, + &workspace_context.schema_name, + ) + .map_err(|err| { + log::error!("failed to generate config from version with error: {}", err); + // can't throw the AppError from here because fetch_from_redis_else_writeback + // expects a DieselResult error type, so we log the actual error and return NotFound + // which will trigger generate_cac in the fallback and if + // that also fails then it will return the actual error + diesel::result::Error::NotFound + }) + }, + ) + .await + .map_err(|e| unexpected_error!("failed to generate config: {}", e))?; - if let (None, Some(identifier)) = (config_ver, identifier_query.identifier) { + if let (None, Some(identifier)) = + (&query_filters.version, identifier_query.identifier) + { let context_map: &Map = &query_data; let (applicable_variants, _) = get_applicable_variants_helper( - &mut conn, context_map.clone(), &config.dimensions, identifier, &workspace_context, + &state, ) .await?; query_data.insert("variantIds".to_string(), applicable_variants.into()); - } + }; - let resolved_config = resolve( - &mut config, - query_data, - merge_strategy, - &mut conn, - &query_filters, - &workspace_context, - &state.master_encryption_key, - )?; + let resolved_config = { + let mut conn = state.db_pool.get().map_err(|e| { + log::error!("Unable to get db connection from pool, error: {e}"); + unexpected_error!("Unable to get db connection from pool: {}", e) + })?; + resolve( + &mut config, + query_data, + merge_strategy, + &mut conn, + &query_filters, + &workspace_context, + &state.master_encryption_key, + )? + }; let mut resp = HttpResponse::Ok(); add_last_modified_to_header(max_created_at, is_smithy, &mut resp); - add_audit_id_to_header(&mut conn, &mut resp, &workspace_context.schema_name); - add_config_version_to_header(&config_version, &mut resp); + + add_config_version_to_header(&Some(config_version), &mut resp); Ok(resp.json(resolved_config)) } diff --git a/crates/superposition/src/resolve/types.rs b/crates/superposition/src/resolve/types.rs index 045bdf647..a9bfcf24f 100644 --- a/crates/superposition/src/resolve/types.rs +++ b/crates/superposition/src/resolve/types.rs @@ -2,6 +2,8 @@ use serde::Deserialize; use superposition_derives::{IsEmpty, QueryParam}; use superposition_types::{IsEmpty, custom_query::QueryParam}; +/// Query param for targeting key or the identifier to be used in experiments. +/// Also known as toss #[derive(Deserialize, IsEmpty, QueryParam, Default)] pub struct IdentifierQuery { #[query_param(skip_if_empty)] diff --git a/crates/superposition/src/workspace/handlers.rs b/crates/superposition/src/workspace/handlers.rs index d28447bfb..da1cec714 100644 --- a/crates/superposition/src/workspace/handlers.rs +++ b/crates/superposition/src/workspace/handlers.rs @@ -11,6 +11,7 @@ use diesel::{ connection::SimpleConnection, r2d2::{ConnectionManager, PooledConnection}, }; +use fred::{prelude::KeysInterface, types::Expiration}; use regex::Regex; use service_utils::{ encryption::{ @@ -163,6 +164,9 @@ async fn create_handler( setup_workspace_schema(transaction_conn, &workspace_schema_name)?; Ok(inserted_workspace.remove(0)) })?; + + put_workspace_in_redis(&created_workspace, &state, &workspace_schema_name).await; + let response = WorkspaceResponse::from(created_workspace); Ok(Json(response)) } @@ -174,6 +178,7 @@ async fn create_handler( async fn update_handler( workspace_name: web::Path, request: Json, + app_state: Data, db_conn: DbConnection, org_id: OrganisationId, user: User, @@ -193,28 +198,59 @@ async fn update_handler( .first::(&mut conn)?; } - let updated_workspace = - conn.transaction::(|transaction_conn| { - let updated_workspace = diesel::update(workspaces::table) - .filter(workspaces::organisation_id.eq(&org_id.0)) - .filter(workspaces::workspace_name.eq(workspace_name)) - .set(( - request, - workspaces::last_modified_by.eq(user.get_email()), - workspaces::last_modified_at.eq(timestamp), - )) - .get_result::(transaction_conn) - .map_err(|err| { - log::error!("failed to update workspace with error: {}", err); - err - })?; - - Ok(updated_workspace) - })?; - let response = WorkspaceResponse::from(updated_workspace); + conn.transaction::<(), superposition::AppError, _>(|transaction_conn| { + diesel::update(workspaces::table) + .filter(workspaces::organisation_id.eq(&org_id.0)) + .filter(workspaces::workspace_name.eq(workspace_name)) + .set(( + request, + workspaces::last_modified_by.eq(user.get_email()), + workspaces::last_modified_at.eq(timestamp), + )) + .execute(transaction_conn) + .map_err(|err| { + log::error!("failed to update workspace with error: {}", err); + err + })?; + Ok(()) + })?; + + let workspace = get_workspace(&schema_name, &mut conn)?; + put_workspace_in_redis(&workspace, &app_state, &schema_name.0).await; + + let response = WorkspaceResponse::from(workspace); Ok(Json(response)) } +async fn put_workspace_in_redis( + workspace: &Workspace, + state: &Data, + schema_name: &str, +) { + let redis_pool = match &state.redis { + Some(pool) => pool, + None => { + log::debug!("Redis not configured, skipping workspace cache update"); + return; + } + }; + + let key_ttl: i64 = + service_utils::helpers::get_from_env_or_default("REDIS_KEY_TTL", 604800); + let expiration = Some(Expiration::EX(key_ttl)); + + if let Ok(serialized) = serde_json::to_string(workspace) { + let client = redis_pool.next_connected(); + + if let Err(e) = client + .set::<(), &str, String>(schema_name, serialized, expiration, None, false) + .await + { + log::warn!("Failed to update Redis cache with workspace: {}", e); + } + }; +} + #[authorized] #[get("")] async fn list_handler( @@ -352,6 +388,10 @@ async fn migrate_schema_handler( Ok(()) })?; + // Refetch workspace after transaction to get updated data + let workspace = get_workspace(&schema_name, &mut conn)?; + put_workspace_in_redis(&workspace, &state, &schema_name.0).await; + let response = WorkspaceResponse::from(workspace); Ok(Json(response)) } @@ -377,7 +417,7 @@ pub async fn rotate_encryption_key_handler( let schema_name = SchemaName(format!("{}_{}", *org_id, workspace_name.into_inner())); let workspace = get_workspace(&schema_name, &mut conn)?; let workspace_context = WorkspaceContext { - schema_name, + schema_name: schema_name.clone(), organisation_id: org_id, workspace_id: WorkspaceId(workspace.workspace_name.clone()), settings: workspace, @@ -393,6 +433,10 @@ pub async fn rotate_encryption_key_handler( ) })?; + // Refetch workspace after transaction to get updated data + let workspace = get_workspace(&schema_name, &mut conn)?; + put_workspace_in_redis(&workspace, &state, &schema_name.0).await; + Ok(Json(KeyRotationResponse { total_secrets_re_encrypted, })) diff --git a/crates/superposition_types/src/result.rs b/crates/superposition_types/src/result.rs index 410e558db..fe99ae170 100644 --- a/crates/superposition_types/src/result.rs +++ b/crates/superposition_types/src/result.rs @@ -147,3 +147,5 @@ impl std::fmt::Debug for AppError { error_chain_fmt(self, f) } } + +pub type DieselResult = core::result::Result; diff --git a/docker-compose.yaml b/docker-compose.yaml index 2fa044bf4..add7e3cbc 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -11,26 +11,14 @@ services: - ./docker-compose/postgres/data:/var/lib/postgresql/data restart: on-failure - # localstack: - # build: ./docker-compose/localstack/ - # container_name: superposition_localstack - # ports: - # - "4510-4559:4510-4559" # external service port range - # - "4566:4566" # LocalStack Edge Proxy - # - "4571:4571" - # environment: - # LOCALSTACK_SERVICES: s3, sns, sqs, logs, cloudwatch, kms - # AWS_DEFAULT_REGION: ap-south-1 - # EDGE_PORT: 4566 - -# redis: -# image: redis:7 -# container_name: superposition_redis -# restart: on-failure -# ports: -# - 6379:6379 -# command: redis-server -# network_mode: bridge + redis: + image: redis:7 + container_name: superposition_redis + restart: on-failure + ports: + - 6379:6379 + command: redis-server + network_mode: bridge # app: # image: ghcr.io/juspay/superposition:latest