diff --git a/Cargo.lock b/Cargo.lock index 2bc45c657..14fc159b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5658,6 +5658,7 @@ dependencies = [ "superposition_types", "thiserror 1.0.58", "tokio", + "tokio-util", "uuid", ] diff --git a/crates/superposition_provider/Cargo.toml b/crates/superposition_provider/Cargo.toml index 52bc13c91..2eb88ad35 100644 --- a/crates/superposition_provider/Cargo.toml +++ b/crates/superposition_provider/Cargo.toml @@ -10,26 +10,21 @@ readme = "README.md" description = "Open feature provider for Superposition" [dependencies] -serde = { workspace = true } -serde_json = { workspace = true } -thiserror = "1.0" -tokio = { version = "1.0", features = ["full"] } async-trait = "0.1" +aws-smithy-types = { version = "1.3.0" } +chrono = { workspace = true } log = { workspace = true } +open-feature = "0.2.5" reqwest = { workspace = true } -chrono = { workspace = true } -uuid = { workspace = true } -aws-smithy-types = { version = "1.3.0" } +serde = { workspace = true } +serde_json = { workspace = true } superposition_core = { workspace = true } - -# OpenFeature Rust SDK -open-feature = "0.2.5" - -# Generated Smithy Rust SDK for Superposition superposition_sdk = { workspace = true, features = ["behavior-version-latest"] } - -# Superposition types for proper type conversion superposition_types = { workspace = true } +thiserror = "1.0" +tokio = { version = "1.0", features = ["full"] } +tokio-util = "0.7" +uuid = { workspace = true } [lints] diff --git a/crates/superposition_provider/src/client.rs b/crates/superposition_provider/src/client.rs index 8027d22bd..f4f1273c1 100644 --- a/crates/superposition_provider/src/client.rs +++ b/crates/superposition_provider/src/client.rs @@ -2,6 +2,10 @@ use std::collections::HashMap; use std::sync::Arc; use log::{debug, error, info, warn}; +pub use open_feature::{ + provider::{ProviderMetadata, ProviderStatus, ResolutionDetails}, + EvaluationContext, +}; use serde_json::Value; use superposition_core::experiment::ExperimentGroups; use superposition_core::{ @@ -10,26 +14,20 @@ use superposition_core::{ use superposition_types::{Config, DimensionInfo}; use tokio::join; use tokio::sync::RwLock; -use tokio::task::JoinHandle; use tokio::time::{sleep, Duration}; +use tokio_util::sync::CancellationToken; use crate::types::*; use crate::utils::ConversionUtils; -pub use open_feature::{ - provider::{ProviderMetadata, ProviderStatus, ResolutionDetails}, - EvaluationContext, -}; - -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct CacConfig { superposition_options: SuperpositionOptions, options: ConfigurationOptions, fallback_config: Option>, cached_config: Arc>>, last_updated: Arc>>>, - evaluation_cache: RwLock>>, - polling_task: RwLock>>, + polling_task_cancellation_token: Arc>>, } impl CacConfig { @@ -43,8 +41,7 @@ impl CacConfig { options, cached_config: Arc::new(RwLock::new(None)), last_updated: Arc::new(RwLock::new(None)), - evaluation_cache: RwLock::new(HashMap::new()), - polling_task: RwLock::new(None), + polling_task_cancellation_token: Arc::new(RwLock::new(None)), } } @@ -85,9 +82,10 @@ impl CacConfig { polling_strategy.interval, polling_strategy.timeout.unwrap_or(30) ); - let task = self.start_polling(polling_strategy.interval).await; - let mut polling_task = self.polling_task.write().await; - *polling_task = Some(task); + let task_token = self.start_polling(polling_strategy.interval).await; + let mut polling_task_cancellation_token = + self.polling_task_cancellation_token.write().await; + *polling_task_cancellation_token = Some(task_token); } RefreshStrategy::OnDemand(on_demand_strategy) => { info!( @@ -102,28 +100,39 @@ impl CacConfig { Ok(()) } - async fn start_polling(&self, interval: u64) -> JoinHandle<()> { + async fn start_polling(&self, interval: u64) -> CancellationToken { let superposition_options = self.superposition_options.clone(); let cached_config = self.cached_config.clone(); let last_updated = self.last_updated.clone(); + let cancellation_token = CancellationToken::new(); + let cancellation_token_clone = cancellation_token.clone(); tokio::spawn(async move { - loop { - match Self::get_config_static(&superposition_options).await { - Ok(config) => { - let mut cached = cached_config.write().await; - *cached = Some(config); - let mut updated = last_updated.write().await; - *updated = Some(chrono::Utc::now()); - debug!("CAC config updated via polling"); + tokio::select! { + _ = cancellation_token_clone.cancelled() => { + info!("shutting down polling task gracefully"); + }, + _ = async { + loop { + match Self::get_config_static(&superposition_options).await { + Ok(config) => { + let mut cached = cached_config.write().await; + *cached = Some(config); + let mut updated = last_updated.write().await; + *updated = Some(chrono::Utc::now()); + debug!("CAC config updated via polling"); + } + Err(e) => { + error!("Polling error: {}", e); + } + } + sleep(Duration::from_secs(interval)).await; } - Err(e) => { - error!("Polling error: {}", e); - } - } - sleep(Duration::from_secs(interval)).await; + } => {} } - }) + }); + + cancellation_token } pub async fn on_demand_config(&self, ttl: u64, use_stale: bool) -> Result { @@ -247,31 +256,29 @@ impl CacConfig { pub async fn close(&self) -> Result<()> { // Stop polling task - let mut polling_task = self.polling_task.write().await; - if let Some(task) = polling_task.take() { - task.abort(); + let mut polling_task_cancellation_token = + self.polling_task_cancellation_token.write().await; + if let Some(task) = polling_task_cancellation_token.take() { + task.cancel(); } // Clear caches let mut cached_config = self.cached_config.write().await; *cached_config = None; - let mut evaluation_cache = self.evaluation_cache.write().await; - evaluation_cache.clear(); Ok(()) } } /// Experimentation Configuration client -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ExperimentationConfig { superposition_options: SuperpositionOptions, options: ExperimentationOptions, cached_experiments: Arc>>, cached_experiment_groups: Arc>>, last_updated: Arc>>>, - evaluation_cache: RwLock>>, - polling_task: RwLock>>, + polling_task_cancellation_token: Arc>>, } impl ExperimentationConfig { @@ -285,8 +292,7 @@ impl ExperimentationConfig { cached_experiments: Arc::new(RwLock::new(None)), cached_experiment_groups: Arc::new(RwLock::new(None)), last_updated: Arc::new(RwLock::new(None)), - evaluation_cache: RwLock::new(HashMap::new()), - polling_task: RwLock::new(None), + polling_task_cancellation_token: Arc::new(RwLock::new(None)), } } @@ -334,9 +340,10 @@ impl ExperimentationConfig { "Using PollingStrategy for experiments: interval={}s", polling_strategy.interval ); - let task = self.start_polling(polling_strategy.interval).await; - let mut polling_task = self.polling_task.write().await; - *polling_task = Some(task); + let task_token = self.start_polling(polling_strategy.interval).await; + let mut polling_task_cancellation_token = + self.polling_task_cancellation_token.write().await; + *polling_task_cancellation_token = Some(task_token); } RefreshStrategy::OnDemand(on_demand_strategy) => { info!( @@ -349,41 +356,52 @@ impl ExperimentationConfig { Ok(()) } - async fn start_polling(&self, interval: u64) -> JoinHandle<()> { + async fn start_polling(&self, interval: u64) -> CancellationToken { let superposition_options = self.superposition_options.clone(); let cached_experiments = self.cached_experiments.clone(); let cached_experiment_groups = self.cached_experiment_groups.clone(); let last_updated = self.last_updated.clone(); + let cancellation_token = CancellationToken::new(); + let cancellation_token_clone = cancellation_token.clone(); tokio::spawn(async move { - loop { - let (experiments_result, groups_result) = join!( - Self::get_experiments_static(&superposition_options), - Self::get_experiment_groups_static(&superposition_options) - ); - match (experiments_result, groups_result) { - (Ok(Some(experiments)), Ok(Some(experiment_groups))) => { - let mut cached = cached_experiments.write().await; - *cached = Some(experiments); - let mut cached_groups = cached_experiment_groups.write().await; - *cached_groups = Some(experiment_groups); - let mut updated = last_updated.write().await; - *updated = Some(chrono::Utc::now()); - debug!("Experiments and Experiment Groups updated via polling"); - } - (Ok(None), Ok(None)) => { - warn!( - "No experiments or experiment groups returned from polling" + tokio::select! { + _ = cancellation_token_clone.cancelled() => { + info!("shutting down polling task gracefully"); + }, + _ = async { + loop { + let (experiments_result, groups_result) = join!( + Self::get_experiments_static(&superposition_options), + Self::get_experiment_groups_static(&superposition_options) ); + match (experiments_result, groups_result) { + (Ok(Some(experiments)), Ok(Some(experiment_groups))) => { + let mut cached = cached_experiments.write().await; + *cached = Some(experiments); + let mut cached_groups = cached_experiment_groups.write().await; + *cached_groups = Some(experiment_groups); + let mut updated = last_updated.write().await; + *updated = Some(chrono::Utc::now()); + debug!("Experiments and Experiment Groups updated via polling"); + } + (Ok(None), Ok(None)) => { + warn!( + "No experiments or experiment groups returned from polling" + ); + } + (Err(e), _) | (_, Err(e)) => { + error!("Polling error: {}", e); + } + _ => {} + } + sleep(Duration::from_secs(interval)).await; } - (Err(e), _) | (_, Err(e)) => { - error!("Polling error: {}", e); - } - _ => {} - } - sleep(Duration::from_secs(interval)).await; + } => {} } - }) + }); + + cancellation_token } pub async fn on_demand_config( @@ -550,16 +568,15 @@ impl ExperimentationConfig { pub async fn close(&self) -> Result<()> { // Stop polling task - let mut polling_task = self.polling_task.write().await; - if let Some(task) = polling_task.take() { - task.abort(); + let mut polling_task_cancellation_token = + self.polling_task_cancellation_token.write().await; + if let Some(token) = polling_task_cancellation_token.take() { + token.cancel(); } // Clear caches let mut cached_experiments = self.cached_experiments.write().await; *cached_experiments = None; - let mut evaluation_cache = self.evaluation_cache.write().await; - evaluation_cache.clear(); Ok(()) } diff --git a/crates/superposition_provider/src/provider.rs b/crates/superposition_provider/src/provider.rs index dd2a31720..7fe4be3c0 100644 --- a/crates/superposition_provider/src/provider.rs +++ b/crates/superposition_provider/src/provider.rs @@ -1,4 +1,7 @@ -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use async_trait::async_trait; use log::{error, info}; @@ -16,10 +19,10 @@ use crate::client::{CacConfig, ExperimentationConfig}; use crate::types::*; use crate::utils::ConversionUtils; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct SuperpositionProvider { metadata: ProviderMetadata, - status: RwLock, + status: Arc>, cac_config: Option, exp_config: Option, } @@ -56,7 +59,7 @@ impl SuperpositionProvider { metadata: ProviderMetadata { name: "SuperpositionProvider".to_string(), }, - status: RwLock::new(ProviderStatus::NotReady), + status: Arc::new(RwLock::new(ProviderStatus::NotReady)), cac_config: Some(cac_config), exp_config, } @@ -195,8 +198,11 @@ impl FeatureProvider for SuperpositionProvider { async fn initialize(&mut self, _context: &EvaluationContext) { info!("Initializing SuperpositionProvider..."); { - let mut status = self.status.write().await; - *status = ProviderStatus::NotReady; + let status = self.status.read().await; + if *status == ProviderStatus::Ready { + info!("SuperpositionProvider is already initialized"); + return; + } } if (self.init().await).is_err() { let mut status = self.status.write().await; diff --git a/crates/superposition_provider/tests/integration_test.rs b/crates/superposition_provider/tests/integration_test.rs index 328466d35..9e0f9f811 100644 --- a/crates/superposition_provider/tests/integration_test.rs +++ b/crates/superposition_provider/tests/integration_test.rs @@ -1,4 +1,5 @@ -use open_feature::{EvaluationContext, OpenFeature}; +use open_feature::{provider::FeatureProvider, EvaluationContext, OpenFeature}; +use serde_json::Value; use superposition_provider::{ ExperimentationOptions, OnDemandStrategy, RefreshStrategy, SuperpositionProvider, SuperpositionProviderOptions, @@ -393,6 +394,28 @@ async fn run_provider_tests(org_id: &str, workspace_id: &str) { }; let provider = SuperpositionProvider::new(provider_options); + // Test 0: Verify provider clone works (sanity check) + println!("Test 0: Verify provider clone works (sanity check)"); + { + let mut provider_clone = provider.clone(); + provider_clone + .initialize(&EvaluationContext::default()) + .await; + let ctx = EvaluationContext::default().with_custom_field("name", "karbik"); + let all_fields = provider_clone.resolve_full_config(&ctx).await.unwrap(); + + assert_eq!( + all_fields.get("price").unwrap(), + &Value::from(1), + "Price should be 1 for karbik" + ); + assert_eq!( + all_fields.get("currency").unwrap(), + &Value::from("Rupee"), + "Currency should be default Rupee" + ); + println!(" ✓ Test passed\n"); + } // Set provider as the global provider let mut api = OpenFeature::singleton_mut().await;