Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 9 additions & 14 deletions crates/superposition_provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,21 @@ readme = "README.md"
description = "Open feature provider for Superposition"

[dependencies]
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = "1.0"
tokio = { version = "1.0", features = ["full"] }
async-trait = "0.1"
aws-smithy-types = { version = "1.3.0" }
chrono = { workspace = true }
log = { workspace = true }
open-feature = "0.2.5"
reqwest = { workspace = true }
chrono = { workspace = true }
uuid = { workspace = true }
aws-smithy-types = { version = "1.3.0" }
serde = { workspace = true }
serde_json = { workspace = true }
superposition_core = { workspace = true }

# OpenFeature Rust SDK
open-feature = "0.2.5"

# Generated Smithy Rust SDK for Superposition
superposition_sdk = { workspace = true, features = ["behavior-version-latest"] }

# Superposition types for proper type conversion
superposition_types = { workspace = true }
thiserror = "1.0"
tokio = { version = "1.0", features = ["full"] }
tokio-util = "0.7"
uuid = { workspace = true }


[lints]
Expand Down
163 changes: 90 additions & 73 deletions crates/superposition_provider/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ use std::collections::HashMap;
use std::sync::Arc;

use log::{debug, error, info, warn};
pub use open_feature::{
provider::{ProviderMetadata, ProviderStatus, ResolutionDetails},
EvaluationContext,
};
use serde_json::Value;
use superposition_core::experiment::ExperimentGroups;
use superposition_core::{
Expand All @@ -10,26 +14,20 @@ use superposition_core::{
use superposition_types::{Config, DimensionInfo};
use tokio::join;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tokio::time::{sleep, Duration};
use tokio_util::sync::CancellationToken;

use crate::types::*;
use crate::utils::ConversionUtils;

pub use open_feature::{
provider::{ProviderMetadata, ProviderStatus, ResolutionDetails},
EvaluationContext,
};

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct CacConfig {
superposition_options: SuperpositionOptions,
options: ConfigurationOptions,
fallback_config: Option<serde_json::Map<String, Value>>,
cached_config: Arc<RwLock<Option<Config>>>,
last_updated: Arc<RwLock<Option<chrono::DateTime<chrono::Utc>>>>,
evaluation_cache: RwLock<HashMap<String, HashMap<String, Value>>>,
polling_task: RwLock<Option<JoinHandle<()>>>,
polling_task_cancellation_token: Arc<RwLock<Option<CancellationToken>>>,
}

impl CacConfig {
Expand All @@ -43,8 +41,7 @@ impl CacConfig {
options,
cached_config: Arc::new(RwLock::new(None)),
last_updated: Arc::new(RwLock::new(None)),
evaluation_cache: RwLock::new(HashMap::new()),
polling_task: RwLock::new(None),
polling_task_cancellation_token: Arc::new(RwLock::new(None)),
}
}

Expand Down Expand Up @@ -85,9 +82,10 @@ impl CacConfig {
polling_strategy.interval,
polling_strategy.timeout.unwrap_or(30)
);
let task = self.start_polling(polling_strategy.interval).await;
let mut polling_task = self.polling_task.write().await;
*polling_task = Some(task);
let task_token = self.start_polling(polling_strategy.interval).await;
let mut polling_task_cancellation_token =
self.polling_task_cancellation_token.write().await;
*polling_task_cancellation_token = Some(task_token);
}
RefreshStrategy::OnDemand(on_demand_strategy) => {
info!(
Expand All @@ -102,28 +100,39 @@ impl CacConfig {
Ok(())
}

async fn start_polling(&self, interval: u64) -> JoinHandle<()> {
async fn start_polling(&self, interval: u64) -> CancellationToken {
let superposition_options = self.superposition_options.clone();
let cached_config = self.cached_config.clone();
let last_updated = self.last_updated.clone();
let cancellation_token = CancellationToken::new();
let cancellation_token_clone = cancellation_token.clone();

tokio::spawn(async move {
loop {
match Self::get_config_static(&superposition_options).await {
Ok(config) => {
let mut cached = cached_config.write().await;
*cached = Some(config);
let mut updated = last_updated.write().await;
*updated = Some(chrono::Utc::now());
debug!("CAC config updated via polling");
tokio::select! {
_ = cancellation_token_clone.cancelled() => {
info!("shutting down polling task gracefully");
},
_ = async {
loop {
match Self::get_config_static(&superposition_options).await {
Ok(config) => {
let mut cached = cached_config.write().await;
*cached = Some(config);
let mut updated = last_updated.write().await;
*updated = Some(chrono::Utc::now());
debug!("CAC config updated via polling");
}
Err(e) => {
error!("Polling error: {}", e);
}
}
sleep(Duration::from_secs(interval)).await;
}
Err(e) => {
error!("Polling error: {}", e);
}
}
sleep(Duration::from_secs(interval)).await;
} => {}
}
})
});

cancellation_token
}

pub async fn on_demand_config(&self, ttl: u64, use_stale: bool) -> Result<Config> {
Expand Down Expand Up @@ -247,31 +256,29 @@ impl CacConfig {

pub async fn close(&self) -> Result<()> {
// Stop polling task
let mut polling_task = self.polling_task.write().await;
if let Some(task) = polling_task.take() {
task.abort();
let mut polling_task_cancellation_token =
self.polling_task_cancellation_token.write().await;
if let Some(task) = polling_task_cancellation_token.take() {
task.cancel();
}

// Clear caches
let mut cached_config = self.cached_config.write().await;
*cached_config = None;
let mut evaluation_cache = self.evaluation_cache.write().await;
evaluation_cache.clear();

Ok(())
}
}

/// Experimentation Configuration client
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ExperimentationConfig {
superposition_options: SuperpositionOptions,
options: ExperimentationOptions,
cached_experiments: Arc<RwLock<Option<Experiments>>>,
cached_experiment_groups: Arc<RwLock<Option<ExperimentGroups>>>,
last_updated: Arc<RwLock<Option<chrono::DateTime<chrono::Utc>>>>,
evaluation_cache: RwLock<HashMap<String, HashMap<String, Value>>>,
polling_task: RwLock<Option<JoinHandle<()>>>,
polling_task_cancellation_token: Arc<RwLock<Option<CancellationToken>>>,
}

impl ExperimentationConfig {
Expand All @@ -285,8 +292,7 @@ impl ExperimentationConfig {
cached_experiments: Arc::new(RwLock::new(None)),
cached_experiment_groups: Arc::new(RwLock::new(None)),
last_updated: Arc::new(RwLock::new(None)),
evaluation_cache: RwLock::new(HashMap::new()),
polling_task: RwLock::new(None),
polling_task_cancellation_token: Arc::new(RwLock::new(None)),
}
}

Expand Down Expand Up @@ -334,9 +340,10 @@ impl ExperimentationConfig {
"Using PollingStrategy for experiments: interval={}s",
polling_strategy.interval
);
let task = self.start_polling(polling_strategy.interval).await;
let mut polling_task = self.polling_task.write().await;
*polling_task = Some(task);
let task_token = self.start_polling(polling_strategy.interval).await;
let mut polling_task_cancellation_token =
self.polling_task_cancellation_token.write().await;
*polling_task_cancellation_token = Some(task_token);
}
RefreshStrategy::OnDemand(on_demand_strategy) => {
info!(
Expand All @@ -349,41 +356,52 @@ impl ExperimentationConfig {
Ok(())
}

async fn start_polling(&self, interval: u64) -> JoinHandle<()> {
async fn start_polling(&self, interval: u64) -> CancellationToken {
let superposition_options = self.superposition_options.clone();
let cached_experiments = self.cached_experiments.clone();
let cached_experiment_groups = self.cached_experiment_groups.clone();
let last_updated = self.last_updated.clone();
let cancellation_token = CancellationToken::new();
let cancellation_token_clone = cancellation_token.clone();

tokio::spawn(async move {
loop {
let (experiments_result, groups_result) = join!(
Self::get_experiments_static(&superposition_options),
Self::get_experiment_groups_static(&superposition_options)
);
match (experiments_result, groups_result) {
(Ok(Some(experiments)), Ok(Some(experiment_groups))) => {
let mut cached = cached_experiments.write().await;
*cached = Some(experiments);
let mut cached_groups = cached_experiment_groups.write().await;
*cached_groups = Some(experiment_groups);
let mut updated = last_updated.write().await;
*updated = Some(chrono::Utc::now());
debug!("Experiments and Experiment Groups updated via polling");
}
(Ok(None), Ok(None)) => {
warn!(
"No experiments or experiment groups returned from polling"
tokio::select! {
_ = cancellation_token_clone.cancelled() => {
info!("shutting down polling task gracefully");
},
_ = async {
loop {
let (experiments_result, groups_result) = join!(
Self::get_experiments_static(&superposition_options),
Self::get_experiment_groups_static(&superposition_options)
);
match (experiments_result, groups_result) {
(Ok(Some(experiments)), Ok(Some(experiment_groups))) => {
let mut cached = cached_experiments.write().await;
*cached = Some(experiments);
let mut cached_groups = cached_experiment_groups.write().await;
*cached_groups = Some(experiment_groups);
let mut updated = last_updated.write().await;
*updated = Some(chrono::Utc::now());
debug!("Experiments and Experiment Groups updated via polling");
}
(Ok(None), Ok(None)) => {
warn!(
"No experiments or experiment groups returned from polling"
);
}
(Err(e), _) | (_, Err(e)) => {
error!("Polling error: {}", e);
}
_ => {}
}
sleep(Duration::from_secs(interval)).await;
}
(Err(e), _) | (_, Err(e)) => {
error!("Polling error: {}", e);
}
_ => {}
}
sleep(Duration::from_secs(interval)).await;
} => {}
}
})
});

cancellation_token
}

pub async fn on_demand_config(
Expand Down Expand Up @@ -550,16 +568,15 @@ impl ExperimentationConfig {

pub async fn close(&self) -> Result<()> {
// Stop polling task
let mut polling_task = self.polling_task.write().await;
if let Some(task) = polling_task.take() {
task.abort();
let mut polling_task_cancellation_token =
self.polling_task_cancellation_token.write().await;
if let Some(token) = polling_task_cancellation_token.take() {
token.cancel();
}

// Clear caches
let mut cached_experiments = self.cached_experiments.write().await;
*cached_experiments = None;
let mut evaluation_cache = self.evaluation_cache.write().await;
evaluation_cache.clear();

Ok(())
}
Comment on lines 569 to 582
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

close() leaves cached_experiment_groups populated — inconsistent cleanup

cached_experiments is set to None (Line 578–579) but cached_experiment_groups is never cleared. After close(), any reader calling get_cached_experiment_groups() will still receive stale data, and get_applicable_variants() will see a None/Some mismatch that routes to the error branch.

🐛 Proposed fix
     // Clear caches
     let mut cached_experiments = self.cached_experiments.write().await;
     *cached_experiments = None;
+    let mut cached_experiment_groups = self.cached_experiment_groups.write().await;
+    *cached_experiment_groups = None;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn close(&self) -> Result<()> {
// Stop polling task
let mut polling_task = self.polling_task.write().await;
if let Some(task) = polling_task.take() {
task.abort();
let mut polling_task_cancellation_token =
self.polling_task_cancellation_token.write().await;
if let Some(token) = polling_task_cancellation_token.take() {
token.cancel();
}
// Clear caches
let mut cached_experiments = self.cached_experiments.write().await;
*cached_experiments = None;
let mut evaluation_cache = self.evaluation_cache.write().await;
evaluation_cache.clear();
Ok(())
}
pub async fn close(&self) -> Result<()> {
// Stop polling task
let mut polling_task_cancellation_token =
self.polling_task_cancellation_token.write().await;
if let Some(token) = polling_task_cancellation_token.take() {
token.cancel();
}
// Clear caches
let mut cached_experiments = self.cached_experiments.write().await;
*cached_experiments = None;
let mut cached_experiment_groups = self.cached_experiment_groups.write().await;
*cached_experiment_groups = None;
Ok(())
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/superposition_provider/src/client.rs` around lines 569 - 582, In
close(), after clearing cached_experiments, also clear cached_experiment_groups
to avoid leaving stale data: acquire a write lock on
self.cached_experiment_groups (similar to how self.cached_experiments is
handled) and set it to None so that get_cached_experiment_groups() and
get_applicable_variants() see a consistent cleared state; ensure you follow the
same async locking pattern as used for polling_task_cancellation_token and
cached_experiments within the close() method.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai ignore here
supposed to be fixed in #881

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if PR `#881` exists and get its details
gh pr view 881 --json title,url,state,body

Length of output: 1123


@ayushjain17 Understood! I can see that PR #881 is open and will handle the provider implementation changes. I'll skip this comment for now since it'll be addressed there.

Expand Down
18 changes: 12 additions & 6 deletions crates/superposition_provider/src/provider.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::collections::{HashMap, HashSet};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

use async_trait::async_trait;
use log::{error, info};
Expand All @@ -16,10 +19,10 @@ use crate::client::{CacConfig, ExperimentationConfig};
use crate::types::*;
use crate::utils::ConversionUtils;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SuperpositionProvider {
metadata: ProviderMetadata,
status: RwLock<ProviderStatus>,
status: Arc<RwLock<ProviderStatus>>,
cac_config: Option<CacConfig>,
exp_config: Option<ExperimentationConfig>,
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Expand Down Expand Up @@ -56,7 +59,7 @@ impl SuperpositionProvider {
metadata: ProviderMetadata {
name: "SuperpositionProvider".to_string(),
},
status: RwLock::new(ProviderStatus::NotReady),
status: Arc::new(RwLock::new(ProviderStatus::NotReady)),
cac_config: Some(cac_config),
exp_config,
}
Expand Down Expand Up @@ -195,8 +198,11 @@ impl FeatureProvider for SuperpositionProvider {
async fn initialize(&mut self, _context: &EvaluationContext) {
info!("Initializing SuperpositionProvider...");
{
let mut status = self.status.write().await;
*status = ProviderStatus::NotReady;
let status = self.status.read().await;
if *status == ProviderStatus::Ready {
info!("SuperpositionProvider is already initialized");
return;
}
}
if (self.init().await).is_err() {
let mut status = self.status.write().await;
Expand Down
Loading
Loading