From d5cd98d56103dfa36caed9bdeaf906c1931db5a5 Mon Sep 17 00:00:00 2001 From: Saurav Suman Date: Sun, 26 Apr 2026 21:07:28 +0530 Subject: [PATCH] feat: added copyToSemantics --- crates/context_aware_config/src/api.rs | 1 + .../context_aware_config/src/api/copy_to.rs | 3 + .../src/api/copy_to/handlers.rs | 525 ++++++++++++++++++ .../context_aware_config/src/api/dimension.rs | 5 +- .../src/api/dimension/handlers.rs | 333 ++--------- .../src/api/dimension/operations.rs | 312 +++++++++++ crates/frontend/src/api.rs | 28 + crates/frontend/src/pages/dimensions.rs | 446 +++++++++++++-- .../service_utils/src/middlewares/auth_z.rs | 13 + crates/superposition/src/main.rs | 5 + crates/superposition_types/src/api.rs | 1 + crates/superposition_types/src/api/copy_to.rs | 54 ++ 12 files changed, 1362 insertions(+), 364 deletions(-) create mode 100644 crates/context_aware_config/src/api/copy_to.rs create mode 100644 crates/context_aware_config/src/api/copy_to/handlers.rs create mode 100644 crates/context_aware_config/src/api/dimension/operations.rs create mode 100644 crates/superposition_types/src/api/copy_to.rs diff --git a/crates/context_aware_config/src/api.rs b/crates/context_aware_config/src/api.rs index 12f6ed459..1fa241157 100644 --- a/crates/context_aware_config/src/api.rs +++ b/crates/context_aware_config/src/api.rs @@ -1,6 +1,7 @@ pub mod audit_log; pub mod config; pub mod context; +pub mod copy_to; pub mod default_config; pub mod dimension; pub mod functions; diff --git a/crates/context_aware_config/src/api/copy_to.rs b/crates/context_aware_config/src/api/copy_to.rs new file mode 100644 index 000000000..3e01a0caf --- /dev/null +++ b/crates/context_aware_config/src/api/copy_to.rs @@ -0,0 +1,3 @@ +mod handlers; + +pub use handlers::endpoints; diff --git a/crates/context_aware_config/src/api/copy_to/handlers.rs b/crates/context_aware_config/src/api/copy_to/handlers.rs new file mode 100644 index 000000000..a2cef79b2 --- /dev/null +++ b/crates/context_aware_config/src/api/copy_to/handlers.rs @@ -0,0 +1,525 @@ +use std::collections::HashSet; + +use actix_web::{ + HttpResponse, Scope, post, + web::{Data, Json}, +}; +use diesel::{ + Connection, ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl, + SelectableHelper, +}; +use service_utils::{ + helpers::{WebhookData, execute_webhook_call, parse_config_tags}, + middlewares::auth_z::{Action as AuthZAction, AuthZDomain, AuthZHandler}, + service::types::{ + AppHeader, AppState, CustomHeaders, DbConnection, WorkspaceContext, + }, +}; +use superposition_derives::{authorized, declare_resource}; +use superposition_macros::{ + bad_argument, db_error, forbidden, not_found, unexpected_error, +}; +use superposition_types::{ + DBConnection, InternalUserContext, Resource, User, + api::{ + copy_to::{ + CopyEntityType, CopyResultStatus, CopyToRequest, CopyToResponse, + CopyToResult, RowSelectionMode, + }, + dimension::{CreateRequest, DimensionName, UpdateRequest}, + webhook::Action, + }, + database::{ + models::{Workspace, cac::Dimension, others::WebhookEvent}, + schema::dimensions, + superposition_schema::superposition::workspaces, + }, + result as superposition, +}; + +use crate::{ + api::dimension::{ + operations::{create_dimension_entry, update_dimension_entry}, + utils::get_dimensions_data, + }, + helpers::{add_config_version, put_config_in_redis, validate_change_reason}, +}; + +declare_resource!(Dimension); + +struct CopyToAuthZActionCreate; + +impl service_utils::middlewares::auth_z::Action for CopyToAuthZActionCreate { + fn get() -> String { + "create".to_string() + } + + fn resource() -> Resource { + Resource::Dimension + } +} + +struct CopyToAuthZActionUpdate; + +impl service_utils::middlewares::auth_z::Action for CopyToAuthZActionUpdate { + fn get() -> String { + "update".to_string() + } + + fn resource() -> Resource { + Resource::Dimension + } +} + +pub fn endpoints() -> Scope { + Scope::new("").service(copy_to_handler) +} + +#[derive(Clone)] +enum CopyOperation { + Create, + Update, +} + +impl CopyOperation { + fn webhook_action(&self) -> Action { + match self { + Self::Create => Action::Create, + Self::Update => Action::Update, + } + } +} + +fn get_target_workspace_context( + conn: &mut DBConnection, + source_workspace_context: &WorkspaceContext, + target_workspace_name: &str, +) -> superposition::Result { + let workspace = workspaces::dsl::workspaces + .filter( + workspaces::dsl::organisation_id + .eq(&source_workspace_context.organisation_id.0), + ) + .filter(workspaces::dsl::workspace_name.eq(target_workspace_name)) + .select(Workspace::as_select()) + .get_result::(conn) + .optional() + .map_err(|err| { + log::error!("failed to fetch target workspace with error: {err}"); + db_error!(err) + })? + .ok_or_else(|| not_found!("Target workspace not found"))?; + + Ok(WorkspaceContext { + workspace_id: service_utils::service::types::WorkspaceId( + workspace.workspace_name.clone(), + ), + organisation_id: source_workspace_context.organisation_id.clone(), + schema_name: service_utils::service::types::SchemaName( + workspace.workspace_schema_name.clone(), + ), + settings: workspace, + }) +} + +fn dedupe_selected_rows(selected_rows: Vec) -> Vec { + let mut seen = HashSet::new(); + selected_rows + .into_iter() + .filter(|row| seen.insert(row.clone())) + .collect() +} + +fn get_source_dimensions( + conn: &mut DBConnection, + workspace_context: &WorkspaceContext, + selection_mode: &RowSelectionMode, + selected_rows: &[String], +) -> superposition::Result> { + let source_dimensions = match selection_mode { + RowSelectionMode::All => { + get_dimensions_data(conn, &workspace_context.schema_name)? + } + RowSelectionMode::Selected => { + let mut query = dimensions::dsl::dimensions + .schema_name(&workspace_context.schema_name) + .order(dimensions::dsl::position.asc()) + .into_boxed(); + query = query.filter(dimensions::dsl::dimension.eq_any(selected_rows)); + query.load::(conn).map_err(|err| { + log::error!("failed to fetch source dimensions with error: {err}"); + db_error!(err) + })? + } + }; + + if matches!(selection_mode, RowSelectionMode::Selected) { + let found = source_dimensions + .iter() + .map(|dimension| dimension.dimension.clone()) + .collect::>(); + let missing = selected_rows + .iter() + .filter(|row| !found.contains(*row)) + .cloned() + .collect::>(); + + if !missing.is_empty() { + return Err(bad_argument!( + "Selected rows not found in source workspace: {}", + missing.join(", ") + )); + } + } + + Ok(source_dimensions) +} + +async fn authorize_copy( + auth_z: &service_utils::middlewares::auth_z::AuthZ, + source_dimensions: &[Dimension], + existing_target_dimensions: &HashSet, + skip_existing: bool, +) -> superposition::Result<()> { + for dimension in source_dimensions { + if existing_target_dimensions.contains(&dimension.dimension) { + if skip_existing { + continue; + } + + auth_z + .action_authorized( + &CopyToAuthZActionUpdate::get(), + &[&dimension.dimension], + ) + .await?; + } else { + auth_z + .action_authorized( + &CopyToAuthZActionCreate::get(), + &[&dimension.dimension], + ) + .await?; + } + } + + Ok(()) +} + +async fn authorize_copy_for_target_workspace( + auth_z_handler: &AuthZHandler, + target_workspace_context: &WorkspaceContext, + user: &User, + internal_user: &InternalUserContext, + source_dimensions: &[Dimension], + existing_target_dimensions: &HashSet, + skip_existing: bool, +) -> superposition::Result<()> { + if **internal_user { + return Ok(()); + } + + let target_domain = AuthZDomain::new(target_workspace_context.schema_name.0.clone()); + + for dimension in source_dimensions { + let action = if existing_target_dimensions.contains(&dimension.dimension) { + if skip_existing { + continue; + } + CopyToAuthZActionUpdate::get() + } else { + CopyToAuthZActionCreate::get() + }; + + let is_allowed = auth_z_handler + .is_allowed( + &target_domain, + user, + &Resource::Dimension, + &action, + Some(&[&dimension.dimension]), + ) + .await + .map_err(|err| { + unexpected_error!( + "Error checking target workspace authorization: {}", + err + ) + })?; + + if !is_allowed { + return Err(forbidden!( + "You are not authorized to copy dimensions into the target workspace." + )); + } + } + + Ok(()) +} + +#[authorized(action = "update")] +#[post("")] +async fn copy_to_handler( + workspace_context: WorkspaceContext, + state: Data, + custom_headers: CustomHeaders, + req: Json, + user: User, + internal_user: InternalUserContext, + auth_z_handler: AuthZHandler, + db_conn: DbConnection, +) -> superposition::Result { + let DbConnection(mut conn) = db_conn; + let req = req.into_inner(); + + if req.entity_type != CopyEntityType::Dimensions { + return Err(bad_argument!("Unsupported entity_type")); + } + + if req.target_workspace == workspace_context.workspace_id.0 { + return Err(bad_argument!( + "Source and target workspaces must be different" + )); + } + + let selected_rows = dedupe_selected_rows(req.selected_rows); + if matches!(req.selection_mode, RowSelectionMode::Selected) + && selected_rows.is_empty() + { + return Err(bad_argument!( + "selected_rows must be provided when selection_mode is selected" + )); + } + + let source_dimensions = get_source_dimensions( + &mut conn, + &workspace_context, + &req.selection_mode, + &selected_rows, + )?; + let target_workspace_context = get_target_workspace_context( + &mut conn, + &workspace_context, + &req.target_workspace, + )?; + + validate_change_reason( + &target_workspace_context, + &req.change_reason, + &mut conn, + &state.master_encryption_key, + ) + .await?; + + let target_dimension_names = dimensions::dsl::dimensions + .select(dimensions::dsl::dimension) + .schema_name(&target_workspace_context.schema_name) + .load::(&mut conn) + .map_err(|err| { + log::error!("failed to fetch target dimension names with error: {err}"); + db_error!(err) + })? + .into_iter() + .collect::>(); + + authorize_copy( + &_auth_z, + &source_dimensions, + &target_dimension_names, + req.skip_existing, + ) + .await?; + authorize_copy_for_target_workspace( + &auth_z_handler, + &target_workspace_context, + &user, + &internal_user, + &source_dimensions, + &target_dimension_names, + req.skip_existing, + ) + .await?; + + let tags = parse_config_tags(custom_headers.config_tags)?; + let initial_selected_create_position = u32::try_from(target_dimension_names.len()) + .map_err(|_| { + bad_argument!("Too many target dimensions to append selected rows") + })?; + let (results, actions, config_version) = conn + .transaction::<_, superposition::AppError, _>(|transaction_conn| { + let mut results = Vec::new(); + let mut actions = Vec::new(); + let mut next_selected_create_position = initial_selected_create_position; + + for source_dimension in &source_dimensions { + if target_dimension_names.contains(&source_dimension.dimension) { + if req.skip_existing { + results.push(CopyToResult { + row_identifier: source_dimension.dimension.clone(), + status: CopyResultStatus::Skipped, + message: Some(String::from( + "Dimension already exists in target workspace", + )), + }); + continue; + } + + update_dimension_entry( + transaction_conn, + &target_workspace_context, + &user, + source_dimension.dimension.clone(), + UpdateRequest { + position: match req.selection_mode { + RowSelectionMode::All => Some(source_dimension.position), + RowSelectionMode::Selected => None, + }, + schema: Some(source_dimension.schema.clone()), + value_validation_function_name: Some( + source_dimension.value_validation_function_name.clone(), + ), + value_compute_function_name: Some( + source_dimension.value_compute_function_name.clone(), + ), + description: Some(source_dimension.description.clone()), + change_reason: req.change_reason.clone(), + }, + )?; + actions.push(CopyOperation::Update); + results.push(CopyToResult { + row_identifier: source_dimension.dimension.clone(), + status: CopyResultStatus::Copied, + message: Some(String::from( + "Dimension updated in target workspace", + )), + }); + continue; + } + + let create_position = match req.selection_mode { + RowSelectionMode::All => source_dimension.position, + RowSelectionMode::Selected => { + let append_position = next_selected_create_position.into(); + next_selected_create_position += 1; + append_position + } + }; + + create_dimension_entry( + transaction_conn, + &target_workspace_context, + &user, + CreateRequest { + dimension: DimensionName::try_from( + source_dimension.dimension.clone(), + ) + .map_err(|err| bad_argument!(err))?, + position: create_position, + schema: source_dimension.schema.clone(), + value_validation_function_name: source_dimension + .value_validation_function_name + .clone(), + description: source_dimension.description.clone(), + change_reason: req.change_reason.clone(), + value_compute_function_name: source_dimension + .value_compute_function_name + .clone(), + dimension_type: source_dimension.dimension_type.clone(), + }, + )?; + actions.push(CopyOperation::Create); + results.push(CopyToResult { + row_identifier: source_dimension.dimension.clone(), + status: CopyResultStatus::Copied, + message: Some(String::from("Dimension created in target workspace")), + }); + } + + let config_version = if actions.is_empty() { + None + } else { + Some(add_config_version( + &state, + tags.clone(), + req.change_reason.clone().into(), + transaction_conn, + &target_workspace_context.schema_name, + )?) + }; + + Ok((results, actions, config_version)) + })?; + + if let Some(ref config_version) = config_version { + let _ = put_config_in_redis( + config_version, + &state, + &target_workspace_context.schema_name, + &mut conn, + ) + .await; + } + + let response = CopyToResponse { + entity_type: req.entity_type, + source_workspace: workspace_context.workspace_id.0.clone(), + target_workspace: req.target_workspace, + requested_count: source_dimensions.len(), + copied_count: results + .iter() + .filter(|result| result.status == CopyResultStatus::Copied) + .count(), + skipped_count: results + .iter() + .filter(|result| result.status == CopyResultStatus::Skipped) + .count(), + failed_count: 0, + results, + }; + + let webhook_status = if let Some(ref config_version) = config_version { + let webhook_action = if actions.len() == 1 { + actions[0].clone().webhook_action() + } else { + Action::Batch( + actions + .iter() + .map(|action| action.clone().webhook_action()) + .collect(), + ) + }; + execute_webhook_call( + WebhookData { + payload: &response, + resource: Resource::Dimension, + action: webhook_action, + event: WebhookEvent::ConfigChanged, + config_version_opt: Some(config_version.id.to_string()), + }, + &target_workspace_context, + &state, + &mut conn, + ) + .await + } else { + true + }; + + let mut http_resp = if webhook_status { + HttpResponse::Ok() + } else { + HttpResponse::build( + actix_web::http::StatusCode::from_u16(512) + .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR), + ) + }; + + if let Some(config_version) = config_version { + http_resp.insert_header(( + AppHeader::XConfigVersion.to_string(), + config_version.id.to_string(), + )); + } + + Ok(http_resp.json(response)) +} diff --git a/crates/context_aware_config/src/api/dimension.rs b/crates/context_aware_config/src/api/dimension.rs index 2f0b305b7..a998ff6d1 100644 --- a/crates/context_aware_config/src/api/dimension.rs +++ b/crates/context_aware_config/src/api/dimension.rs @@ -1,5 +1,6 @@ mod handlers; -mod utils; -mod validations; +pub(crate) mod operations; +pub(crate) mod utils; +pub(crate) mod validations; pub use handlers::endpoints; pub use utils::get_dimensions_data; diff --git a/crates/context_aware_config/src/api/dimension/handlers.rs b/crates/context_aware_config/src/api/dimension/handlers.rs index 8e064005b..a9ef9343c 100644 --- a/crates/context_aware_config/src/api/dimension/handlers.rs +++ b/crates/context_aware_config/src/api/dimension/handlers.rs @@ -7,16 +7,14 @@ use diesel::{ Connection, ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl, SelectableHelper, }; -use serde_json::Value; use service_utils::{ helpers::{WebhookData, execute_webhook_call, parse_config_tags}, service::types::{ AppHeader, AppState, CustomHeaders, DbConnection, WorkspaceContext, }, }; -use superposition_core::validations::validate_schema; use superposition_derives::{authorized, declare_resource}; -use superposition_macros::{bad_argument, db_error, not_found, unexpected_error}; +use superposition_macros::{bad_argument, not_found, unexpected_error}; use superposition_types::{ PaginatedResponse, Resource, User, api::{ @@ -29,7 +27,7 @@ use superposition_types::{ database::{ models::{ Description, - cac::{DependencyGraph, Dimension, DimensionType}, + cac::{Dimension, DimensionType}, others::WebhookEvent, }, schema::dimensions::{self, dsl::*}, @@ -37,20 +35,13 @@ use superposition_types::{ result as superposition, }; -use crate::api::dimension::validations::allow_primitive_types; use crate::helpers::put_config_in_redis; use crate::{ api::dimension::{ - utils::{ - create_connections_with_dependents, get_dimension_usage_context_ids, - remove_connections_with_dependents, - }, - validations::{ - does_dimension_exist_for_cohorting, validate_cohort_position, - validate_cohort_schema, validate_dimension_position, - validate_position_wrt_dependency, validate_validation_function, - validate_value_compute_function, + operations::{ + create_dimension_entry, is_mandatory_dimension, update_dimension_entry, }, + utils::{get_dimension_usage_context_ids, remove_connections_with_dependents}, }, helpers::{add_config_version, validate_change_reason}, }; @@ -78,7 +69,7 @@ async fn create_handler( ) -> superposition::Result { let DbConnection(mut conn) = db_conn; let create_req = req.into_inner(); - let schema_value = Value::from(&create_req.schema); + let create_change_reason = create_req.change_reason.clone(); let tags = parse_config_tags(custom_headers.config_tags)?; validate_change_reason( @@ -89,159 +80,25 @@ async fn create_handler( ) .await?; - let num_rows = dimensions - .count() - .schema_name(&workspace_context.schema_name) - .get_result::(&mut conn) - .map_err(|err| { - log::error!("failed to fetch number of dimension with error: {}", err); - db_error!(err) - })?; - - validate_dimension_position( - create_req.dimension.clone(), - create_req.position, - num_rows, - )?; - - match create_req.dimension_type { - DimensionType::Regular {} => { - allow_primitive_types(&create_req.schema)?; - validate_schema(&schema_value).map_err(|e| { - superposition::AppError::ValidationError(format!( - "JSON Schema's schema is broken - this is unexpected {}", - e.join("") - )) - })?; - } - DimensionType::RemoteCohort(ref cohort_based_on) => { - allow_primitive_types(&create_req.schema)?; - validate_schema(&schema_value).map_err(|e| { - superposition::AppError::ValidationError(format!( - "JSON Schema's schema is broken - this is unexpected {}", - e.join("") - )) - })?; - let based_on_dimension = does_dimension_exist_for_cohorting( - cohort_based_on, - &workspace_context.schema_name, - &mut conn, - )?; - validate_cohort_position(&create_req.position, &based_on_dimension, true)?; - } - DimensionType::LocalCohort(ref cohort_based_on) => { - let based_on_dimension = validate_cohort_schema( - &schema_value, - cohort_based_on, - &workspace_context.schema_name, - &mut conn, - )?; - validate_cohort_position(&create_req.position, &based_on_dimension, true)?; - } - } - - validate_validation_function( - &create_req.value_validation_function_name, - &mut conn, - &workspace_context.schema_name, - )?; - - validate_value_compute_function( - &create_req.dimension_type, - &create_req.value_compute_function_name, - &mut conn, - &workspace_context.schema_name, - )?; - - let dimension_data = Dimension { - dimension: create_req.dimension.into(), - position: create_req.position, - schema: create_req.schema, - created_by: user.get_email(), - created_at: Utc::now(), - value_validation_function_name: create_req.value_validation_function_name.clone(), - last_modified_at: Utc::now(), - last_modified_by: user.get_email(), - description: create_req.description, - change_reason: create_req.change_reason, - dependency_graph: DependencyGraph::default(), - value_compute_function_name: create_req.value_compute_function_name, - dimension_type: create_req.dimension_type, - }; - let (inserted_dimension, is_mandatory, config_version) = conn.transaction::<_, superposition::AppError, _>(|transaction_conn| { - diesel::update(dimensions::table) - .filter(dimensions::position.ge(dimension_data.position)) - .set(( - last_modified_at.eq(Utc::now()), - last_modified_by.eq(user.get_email()), - dimensions::position.eq(dimensions::position + 1), - )) - .returning(Dimension::as_returning()) - .schema_name(&workspace_context.schema_name) - .execute(transaction_conn)?; - - match dimension_data.dimension_type { - DimensionType::LocalCohort(ref cohort_based_on) - | DimensionType::RemoteCohort(ref cohort_based_on) => { - // Update dependency graphs of all dimensions that - // depend on the cohort_based_on dimension as well as - // the cohorted dimension itself - create_connections_with_dependents( - cohort_based_on, - &dimension_data.dimension, - &user.get_email(), - &workspace_context.schema_name, - transaction_conn, - )? - } - DimensionType::Regular {} => (), - } - - let insert_resp = diesel::insert_into(dimensions::table) - .values(&dimension_data) - .returning(Dimension::as_returning()) - .schema_name(&workspace_context.schema_name) - .get_result(transaction_conn); - - match insert_resp { - Ok(inserted_dimension) => { - let is_mandatory = workspace_context - .settings - .mandatory_dimensions - .clone() - .unwrap_or_default() - .contains(&inserted_dimension.dimension); + let inserted_dimension = create_dimension_entry( + transaction_conn, + &workspace_context, + &user, + create_req, + )?; + let is_mandatory = + is_mandatory_dimension(&workspace_context, &inserted_dimension.dimension); - let config_version = add_config_version( - &state, - tags, - dimension_data.change_reason.into(), - transaction_conn, - &workspace_context.schema_name, - )?; - Ok((inserted_dimension, is_mandatory, config_version)) - } - Err(diesel::result::Error::DatabaseError( - diesel::result::DatabaseErrorKind::ForeignKeyViolation, - e, - )) => { - let fun_name = create_req.value_validation_function_name.clone(); - log::error!("{fun_name:?} function not found with error: {e:?}"); - Err(bad_argument!( - "Function {} doesn't exists", - Into::>::into( - create_req.value_validation_function_name.clone() - ) - .unwrap_or_default() - )) - } - Err(e) => { - log::error!("Dimension create failed with error: {e}"); - Err(db_error!(e)) - } - } + let config_version = add_config_version( + &state, + tags, + create_change_reason.into(), + transaction_conn, + &workspace_context.schema_name, + )?; + Ok((inserted_dimension, is_mandatory, config_version)) })?; let _ = put_config_in_redis( @@ -316,7 +173,6 @@ async fn update_handler( db_conn: DbConnection, ) -> superposition::Result { let name: String = path.clone().into(); - use dimensions::dsl; let DbConnection(mut conn) = db_conn; let tags = parse_config_tags(custom_headers.config_tags)?; let update_req = req.into_inner(); @@ -329,147 +185,20 @@ async fn update_handler( ) .await?; - let dimension_data: Dimension = dimensions::dsl::dimensions - .filter(dimensions::dimension.eq(name.clone())) - .schema_name(&workspace_context.schema_name) - .get_result::(&mut conn)?; - - let num_rows = dimensions - .count() - .schema_name(&workspace_context.schema_name) - .get_result::(&mut conn) - .map_err(|err| { - log::error!("failed to fetch number of dimension with error: {}", err); - db_error!(err) - })?; - - if let Some(ref new_schema) = update_req.schema { - let schema_value = Value::from(new_schema); - match dimension_data.dimension_type { - DimensionType::Regular {} | DimensionType::RemoteCohort(_) => { - allow_primitive_types(new_schema)?; - validate_schema(&schema_value).map_err(|e| { - superposition::AppError::ValidationError(format!( - "JSON Schema's schema is broken - this is unexpected {}", - e.join("") - )) - })?; - } - DimensionType::LocalCohort(ref cohort_based_on) => { - validate_cohort_schema( - &schema_value, - cohort_based_on, - &workspace_context.schema_name, - &mut conn, - )?; - } - } - } - - if let Some(ref new_position) = update_req.position { - match dimension_data.dimension_type { - DimensionType::Regular {} => (), - DimensionType::RemoteCohort(ref cohort_based_on) - | DimensionType::LocalCohort(ref cohort_based_on) => { - let based_on_dimension = does_dimension_exist_for_cohorting( - cohort_based_on, - &workspace_context.schema_name, - &mut conn, - )?; - validate_cohort_position(new_position, &based_on_dimension, false)?; - } - } - } - - if let Some(ref fn_name) = update_req.value_validation_function_name { - validate_validation_function(fn_name, &mut conn, &workspace_context.schema_name)?; - } - - if let Some(ref value_compute_function_name_) = update_req.value_compute_function_name - { - validate_value_compute_function( - &dimension_data.dimension_type, - value_compute_function_name_, - &mut conn, - &workspace_context.schema_name, - )?; - } - let update_change_reason = update_req.change_reason.clone(); let (result, is_mandatory, config_version) = conn .transaction::<_, superposition::AppError, _>(|transaction_conn| { - if let Some(position_val) = update_req.position { - let new_position = position_val; - validate_dimension_position( - path.into_inner(), - position_val, - num_rows - 1, - )?; - validate_position_wrt_dependency( - &name, - &position_val, - transaction_conn, - &workspace_context.schema_name, - )?; - let previous_position = dimension_data.position; - - diesel::update(dimensions) - .filter(dsl::dimension.eq(&name)) - .set(( - dsl::last_modified_at.eq(Utc::now()), - dsl::last_modified_by.eq(user.get_email()), - dimensions::position.eq((num_rows + 100) as i32), - )) - .returning(Dimension::as_returning()) - .schema_name(&workspace_context.schema_name) - .get_result::(transaction_conn)?; - - if previous_position < new_position { - diesel::update(dsl::dimensions) - .filter(dimensions::position.gt(previous_position)) - .filter(dimensions::position.le(&new_position)) - .set(( - dsl::last_modified_at.eq(Utc::now()), - dsl::last_modified_by.eq(user.get_email()), - dimensions::position.eq(dimensions::position - 1), - )) - .returning(Dimension::as_returning()) - .schema_name(&workspace_context.schema_name) - .execute(transaction_conn)? - } else { - diesel::update(dsl::dimensions) - .filter(dimensions::position.lt(previous_position)) - .filter(dimensions::position.ge(&new_position)) - .set(( - dsl::last_modified_at.eq(Utc::now()), - dsl::last_modified_by.eq(user.get_email()), - dimensions::position.eq(dimensions::position + 1), - )) - .returning(Dimension::as_returning()) - .schema_name(&workspace_context.schema_name) - .execute(transaction_conn)? - }; - } + let result = update_dimension_entry( + transaction_conn, + &workspace_context, + &user, + name.clone(), + update_req.clone(), + )?; - let result = diesel::update(dimensions) - .filter(dsl::dimension.eq(name)) - .set(( - update_req, - dimensions::last_modified_at.eq(Utc::now()), - dimensions::last_modified_by.eq(user.get_email()), - )) - .returning(Dimension::as_returning()) - .schema_name(&workspace_context.schema_name) - .get_result::(transaction_conn) - .map_err(|err| db_error!(err))?; - - let is_mandatory = workspace_context - .settings - .mandatory_dimensions - .clone() - .unwrap_or_default() - .contains(&result.dimension); + let is_mandatory = + is_mandatory_dimension(&workspace_context, &result.dimension); let config_version = add_config_version( &state, diff --git a/crates/context_aware_config/src/api/dimension/operations.rs b/crates/context_aware_config/src/api/dimension/operations.rs new file mode 100644 index 000000000..352b173d2 --- /dev/null +++ b/crates/context_aware_config/src/api/dimension/operations.rs @@ -0,0 +1,312 @@ +use chrono::Utc; +use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper}; +use serde_json::Value; +use service_utils::service::types::WorkspaceContext; +use superposition_core::validations::validate_schema; +use superposition_macros::{bad_argument, db_error, validation_error}; +use superposition_types::{ + DBConnection, User, + api::dimension::{CreateRequest, DimensionName, UpdateRequest}, + database::{ + models::cac::{DependencyGraph, Dimension, DimensionType}, + schema::dimensions::{self, dsl::*}, + }, + result as superposition, +}; + +use crate::api::dimension::{ + utils::create_connections_with_dependents, + validations::{ + allow_primitive_types, does_dimension_exist_for_cohorting, + validate_cohort_position, validate_cohort_schema, validate_dimension_position, + validate_position_wrt_dependency, validate_validation_function, + validate_value_compute_function, + }, +}; + +pub fn is_mandatory_dimension( + workspace_context: &WorkspaceContext, + dimension_name: &str, +) -> bool { + workspace_context + .settings + .mandatory_dimensions + .clone() + .unwrap_or_default() + .contains(&dimension_name.to_string()) +} + +pub fn create_dimension_entry( + conn: &mut DBConnection, + workspace_context: &WorkspaceContext, + user: &User, + create_req: CreateRequest, +) -> superposition::Result { + let schema_value = Value::from(&create_req.schema); + let num_rows = dimensions + .count() + .schema_name(&workspace_context.schema_name) + .get_result::(conn) + .map_err(|err| { + log::error!("failed to fetch number of dimension with error: {}", err); + db_error!(err) + })?; + + validate_dimension_position( + create_req.dimension.clone(), + create_req.position, + num_rows, + )?; + + match create_req.dimension_type { + DimensionType::Regular {} => { + allow_primitive_types(&create_req.schema)?; + validate_schema(&schema_value).map_err(|err| { + validation_error!( + "JSON Schema's schema is broken - this is unexpected {}", + err.join("") + ) + })?; + } + DimensionType::RemoteCohort(ref cohort_based_on) => { + allow_primitive_types(&create_req.schema)?; + validate_schema(&schema_value).map_err(|err| { + validation_error!( + "JSON Schema's schema is broken - this is unexpected {}", + err.join("") + ) + })?; + let based_on_dimension = does_dimension_exist_for_cohorting( + cohort_based_on, + &workspace_context.schema_name, + conn, + )?; + validate_cohort_position(&create_req.position, &based_on_dimension, true)?; + } + DimensionType::LocalCohort(ref cohort_based_on) => { + let based_on_dimension = validate_cohort_schema( + &schema_value, + cohort_based_on, + &workspace_context.schema_name, + conn, + )?; + validate_cohort_position(&create_req.position, &based_on_dimension, true)?; + } + } + + validate_validation_function( + &create_req.value_validation_function_name, + conn, + &workspace_context.schema_name, + )?; + + validate_value_compute_function( + &create_req.dimension_type, + &create_req.value_compute_function_name, + conn, + &workspace_context.schema_name, + )?; + + let dimension_data = Dimension { + dimension: create_req.dimension.clone().into(), + position: create_req.position, + schema: create_req.schema, + created_by: user.get_email(), + created_at: Utc::now(), + value_validation_function_name: create_req.value_validation_function_name.clone(), + last_modified_at: Utc::now(), + last_modified_by: user.get_email(), + description: create_req.description, + change_reason: create_req.change_reason, + dependency_graph: DependencyGraph::default(), + value_compute_function_name: create_req.value_compute_function_name, + dimension_type: create_req.dimension_type, + }; + + diesel::update(dimensions::table) + .filter(dimensions::position.ge(dimension_data.position)) + .set(( + last_modified_at.eq(Utc::now()), + last_modified_by.eq(user.get_email()), + dimensions::position.eq(dimensions::position + 1), + )) + .schema_name(&workspace_context.schema_name) + .execute(conn) + .map_err(|err| { + log::error!("failed to shift dimensions with error: {err}"); + db_error!(err) + })?; + + match dimension_data.dimension_type { + DimensionType::LocalCohort(ref cohort_based_on) + | DimensionType::RemoteCohort(ref cohort_based_on) => { + create_connections_with_dependents( + cohort_based_on, + &dimension_data.dimension, + &user.get_email(), + &workspace_context.schema_name, + conn, + )? + } + DimensionType::Regular {} => (), + } + + diesel::insert_into(dimensions::table) + .values(&dimension_data) + .returning(Dimension::as_returning()) + .schema_name(&workspace_context.schema_name) + .get_result(conn) + .map_err(|err| match err { + diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::ForeignKeyViolation, + _, + ) => bad_argument!( + "Function {} doesn't exists", + create_req + .value_validation_function_name + .unwrap_or_default() + ), + _ => { + log::error!("Dimension create failed with error: {err}"); + db_error!(err) + } + }) +} + +pub fn update_dimension_entry( + conn: &mut DBConnection, + workspace_context: &WorkspaceContext, + user: &User, + name: String, + update_req: UpdateRequest, +) -> superposition::Result { + use dimensions::dsl; + + let dimension_data: Dimension = dimensions::dsl::dimensions + .filter(dimensions::dimension.eq(name.clone())) + .schema_name(&workspace_context.schema_name) + .get_result::(conn)?; + + let num_rows = dimensions + .count() + .schema_name(&workspace_context.schema_name) + .get_result::(conn) + .map_err(|err| { + log::error!("failed to fetch number of dimension with error: {}", err); + db_error!(err) + })?; + + if let Some(ref new_schema) = update_req.schema { + let schema_value = Value::from(new_schema); + match dimension_data.dimension_type { + DimensionType::Regular {} | DimensionType::RemoteCohort(_) => { + allow_primitive_types(new_schema)?; + validate_schema(&schema_value).map_err(|err| { + validation_error!( + "JSON Schema's schema is broken - this is unexpected {}", + err.join("") + ) + })?; + } + DimensionType::LocalCohort(ref cohort_based_on) => { + validate_cohort_schema( + &schema_value, + cohort_based_on, + &workspace_context.schema_name, + conn, + )?; + } + } + } + + if let Some(ref new_position) = update_req.position { + match dimension_data.dimension_type { + DimensionType::Regular {} => (), + DimensionType::RemoteCohort(ref cohort_based_on) + | DimensionType::LocalCohort(ref cohort_based_on) => { + let based_on_dimension = does_dimension_exist_for_cohorting( + cohort_based_on, + &workspace_context.schema_name, + conn, + )?; + validate_cohort_position(new_position, &based_on_dimension, false)?; + } + } + } + + if let Some(ref fn_name) = update_req.value_validation_function_name { + validate_validation_function(fn_name, conn, &workspace_context.schema_name)?; + } + + if let Some(ref value_compute_function_name_) = update_req.value_compute_function_name + { + validate_value_compute_function( + &dimension_data.dimension_type, + value_compute_function_name_, + conn, + &workspace_context.schema_name, + )?; + } + + if let Some(position_val) = update_req.position { + let new_position = position_val; + validate_dimension_position( + DimensionName::try_from(name.clone()).map_err(|err| bad_argument!(err))?, + position_val, + num_rows - 1, + )?; + validate_position_wrt_dependency( + &name, + &position_val, + conn, + &workspace_context.schema_name, + )?; + let previous_position = dimension_data.position; + + diesel::update(dimensions) + .filter(dsl::dimension.eq(&name)) + .set(( + dsl::last_modified_at.eq(Utc::now()), + dsl::last_modified_by.eq(user.get_email()), + dimensions::position.eq((num_rows + 100) as i32), + )) + .schema_name(&workspace_context.schema_name) + .execute(conn)?; + + if previous_position < new_position { + diesel::update(dsl::dimensions) + .filter(dimensions::position.gt(previous_position)) + .filter(dimensions::position.le(&new_position)) + .set(( + dsl::last_modified_at.eq(Utc::now()), + dsl::last_modified_by.eq(user.get_email()), + dimensions::position.eq(dimensions::position - 1), + )) + .schema_name(&workspace_context.schema_name) + .execute(conn)?; + } else { + diesel::update(dsl::dimensions) + .filter(dimensions::position.lt(previous_position)) + .filter(dimensions::position.ge(&new_position)) + .set(( + dsl::last_modified_at.eq(Utc::now()), + dsl::last_modified_by.eq(user.get_email()), + dimensions::position.eq(dimensions::position + 1), + )) + .schema_name(&workspace_context.schema_name) + .execute(conn)?; + }; + } + + diesel::update(dimensions) + .filter(dsl::dimension.eq(name)) + .set(( + update_req, + dimensions::last_modified_at.eq(Utc::now()), + dimensions::last_modified_by.eq(user.get_email()), + )) + .returning(Dimension::as_returning()) + .schema_name(&workspace_context.schema_name) + .get_result::(conn) + .map_err(|err| db_error!(err)) +} diff --git a/crates/frontend/src/api.rs b/crates/frontend/src/api.rs index 8257a7a21..92cc84055 100644 --- a/crates/frontend/src/api.rs +++ b/crates/frontend/src/api.rs @@ -446,6 +446,34 @@ pub mod dimensions { } } +pub mod copy_to { + use superposition_types::api::copy_to::{CopyToRequest, CopyToResponse}; + + use super::*; + + pub async fn copy( + payload: CopyToRequest, + workspace: &str, + org_id: &str, + ) -> Result { + let host = use_host_server(); + let url = format!("{host}/copy-to"); + + let response = request( + url, + reqwest::Method::POST, + Some(payload), + construct_request_headers(&[ + ("x-workspace", workspace), + ("x-org-id", org_id), + ])?, + ) + .await?; + + parse_json_response(response).await + } +} + pub async fn delete_context( context_id: String, workspace: &str, diff --git a/crates/frontend/src/pages/dimensions.rs b/crates/frontend/src/pages/dimensions.rs index 5b8032805..7d216c8f1 100644 --- a/crates/frontend/src/pages/dimensions.rs +++ b/crates/frontend/src/pages/dimensions.rs @@ -1,10 +1,17 @@ +use std::collections::HashSet; + use leptos::*; use leptos_router::A; use serde_json::{Map, Value, json}; -use superposition_types::custom_query::{CustomQuery, PaginationParams, Query}; use superposition_types::database::models::cac::DimensionType; +use superposition_types::{ + api::copy_to::{CopyEntityType, CopyToRequest, RowSelectionMode}, + custom_query::{CustomQuery, PaginationParams, Query}, + database::models::ChangeReason, +}; -use crate::api::dimensions; +use crate::api::{copy_to, dimensions, workspaces}; +use crate::components::alert::AlertType; use crate::components::button::ButtonAnchor; use crate::components::datetime::DatetimeStr; use crate::components::skeleton::Skeleton; @@ -18,6 +25,7 @@ use crate::components::{ types::{Column, TablePaginationProps}, }, }; +use crate::providers::alert_provider::enqueue_alert; use crate::query_updater::use_signal_from_query; use crate::types::{DimensionTypeOptions, OrganisationId, Workspace}; @@ -42,53 +50,186 @@ pub fn Dimensions() -> impl IntoView { pagination_params_rws.update(|f| f.page = Some(page)); }); - let table_columns = StoredValue::new(vec![ - Column::new( - "dimension".to_string(), - false, - move |dimension_name: &str, _row: &Map| { - let dimension_name = dimension_name.to_string(); + let workspaces_resource = create_blocking_resource( + move || org.get().0, + |org_id| async move { + workspaces::list(&PaginationParams::all_entries(), &org_id) + .await + .unwrap_or_default() + }, + ); + + let selected_dimensions_rws = create_rw_signal::>(HashSet::new()); + let copy_all_rws = create_rw_signal(true); + let show_copy_panel_rws = create_rw_signal(false); + let target_workspace_rws = create_rw_signal(String::new()); + let skip_existing_rws = create_rw_signal(true); + let change_reason_rws = create_rw_signal(String::new()); + let copy_inprogress_rws = create_rw_signal(false); + let selected_segment_class = "btn join-item min-h-12 flex-1 !border-purple-300 !bg-purple-50 !text-purple-800 hover:!border-purple-400 hover:!bg-purple-100"; + let idle_segment_class = "btn join-item min-h-12 flex-1 !border-slate-200 !bg-white !text-slate-700 hover:!border-slate-300 hover:!bg-slate-50"; + + let table_columns = create_memo(move |_| { + let selected_dimensions_rws = selected_dimensions_rws; + let copy_all_rws = copy_all_rws; + let show_copy_panel_rws = show_copy_panel_rws; + vec![ + Column::new( + "dimension".to_string(), + false, + move |dimension_name: &str, _row: &Map| { + let dimension_name = dimension_name.to_string(); + let checked_dimension_name = dimension_name.clone(); + let changed_dimension_name = dimension_name.clone(); + let link_dimension_name = dimension_name.clone(); + view! { + + } + .into_view() + }, + ColumnSortable::No, + Expandable::Disabled, + |_| { + view! { "Dimension" }.into_view() + }, + ), + Column::default("position".to_string()), + Column::default_with_cell_formatter( + "dimension_type".to_string(), + move |dimension_type: &str, _| { + let dim_type = serde_json::from_str::(dimension_type) + .unwrap_or_default(); + let dimension_type = DimensionTypeOptions::from(&dim_type); + view! { + {dimension_type.to_string()} + } + .into_view() + }, + ), + Column::default_with_cell_formatter("created_at".to_string(), |value, _| { view! { - - {dimension_name} - + } - }, - ColumnSortable::No, - Expandable::Disabled, - default_column_formatter, - ), - Column::default("position".to_string()), - Column::default_with_cell_formatter( - "dimension_type".to_string(), - move |dimension_type: &str, _| { - let dim_type = serde_json::from_str::(dimension_type) - .unwrap_or_default(); - let dimension_type = DimensionTypeOptions::from(&dim_type); - view! { - {dimension_type.to_string()} + }), + Column::new( + "last_modified_at".to_string(), + false, + |value, _| { + view! { + + } + }, + ColumnSortable::No, + Expandable::Enabled(100), + |_| default_column_formatter("Modified At"), + ), + ] + }); + + let on_copy = Callback::new(move |_| { + if copy_inprogress_rws.get_untracked() { + return; + } + + let target_workspace = target_workspace_rws.get_untracked(); + if target_workspace.is_empty() { + enqueue_alert( + String::from("Please select a target workspace."), + AlertType::Error, + 5000, + ); + return; + } + + let change_reason = + match ChangeReason::try_from(change_reason_rws.get_untracked()) { + Ok(change_reason) => change_reason, + Err(err) => { + enqueue_alert(err, AlertType::Error, 5000); + return; } - .into_view() - }, - ), - Column::default_with_cell_formatter("created_at".to_string(), |value, _| { - view! { - - } - }), - Column::new( - "last_modified_at".to_string(), - false, - |value, _| { - view! { - + }; + + let selection_mode = if copy_all_rws.get_untracked() { + RowSelectionMode::All + } else { + RowSelectionMode::Selected + }; + let selected_dimensions = selected_dimensions_rws.get_untracked(); + if matches!(selection_mode, RowSelectionMode::Selected) + && selected_dimensions.is_empty() + { + enqueue_alert( + String::from("Select at least one dimension to copy."), + AlertType::Error, + 5000, + ); + return; + } + let mut selected_rows = selected_dimensions.into_iter().collect::>(); + selected_rows.sort_unstable(); + + copy_inprogress_rws.set(true); + let workspace = workspace.get_untracked().0; + let org_id = org.get_untracked().0; + let skip_existing = skip_existing_rws.get_untracked(); + spawn_local(async move { + let payload = CopyToRequest { + entity_type: CopyEntityType::Dimensions, + target_workspace, + selection_mode, + selected_rows, + skip_existing, + change_reason, + }; + + let result = copy_to::copy(payload, &workspace, &org_id).await; + copy_inprogress_rws.set(false); + match result { + Ok(response) => { + enqueue_alert( + format!( + "Copied {} dimension(s), skipped {}.", + response.copied_count, response.skipped_count + ), + AlertType::Success, + 5000, + ); } - }, - ColumnSortable::No, - Expandable::Enabled(100), - |_| default_column_formatter("Modified At"), - ), - ]); + Err(err) => enqueue_alert(err, AlertType::Error, 5000), + } + }); + }); view! { impl IntoView { .iter() .map(|ele| json!(ele).as_object().unwrap().clone()) .collect::>>(); + let available_workspaces = workspaces_resource + .get() + .unwrap_or_default() + .data + .into_iter() + .filter(|workspace_resp| workspace_resp.workspace_name != workspace.get().0) + .collect::>(); let pagination_params = pagination_params_rws.get(); let pagination_props = TablePaginationProps { enabled: true, @@ -110,27 +258,205 @@ pub fn Dimensions() -> impl IntoView { on_page_change: handle_page_change, }; view! { -
-
- - -
+
+
+ +
+ + +
+
+ +
+
+
+
+

"Copy To Workspace"

+

+ "Copy all dimensions by default, or switch to selected rows." +

+
+
+ {move || format!( + "Selected rows: {}", + selected_dimensions_rws.get().len() + )} +
+
+
+ + +
+ "Selection mode" +
+ + +
+
+
+ "Existing target rows" +
+ + +
+
+
+
+ +
+
+
+
diff --git a/crates/service_utils/src/middlewares/auth_z.rs b/crates/service_utils/src/middlewares/auth_z.rs index fc0d8566b..1f7913749 100644 --- a/crates/service_utils/src/middlewares/auth_z.rs +++ b/crates/service_utils/src/middlewares/auth_z.rs @@ -187,6 +187,19 @@ impl AuthZHandler { }; Self(ap) } + + pub async fn is_allowed( + &self, + domain: &AuthZDomain, + user: &User, + resource: &Resource, + action: &str, + attributes: Option<&[&String]>, + ) -> Result { + self.0 + .is_allowed(domain, user, resource, action, attributes) + .await + } } impl FromRequest for AuthZHandler { diff --git a/crates/superposition/src/main.rs b/crates/superposition/src/main.rs index 44d32f407..1eefcb31a 100644 --- a/crates/superposition/src/main.rs +++ b/crates/superposition/src/main.rs @@ -256,6 +256,11 @@ impl ScopeExt for Scope { .wrap(OrgWorkspaceMiddlewareFactory::new(true, true)) .service(config::endpoints()), ) + .service( + scope("/copy-to") + .wrap(OrgWorkspaceMiddlewareFactory::new(true, true)) + .service(copy_to::endpoints()), + ) .service( scope("/audit") .wrap(OrgWorkspaceMiddlewareFactory::new(true, true)) diff --git a/crates/superposition_types/src/api.rs b/crates/superposition_types/src/api.rs index 73b158a34..4e509ea71 100644 --- a/crates/superposition_types/src/api.rs +++ b/crates/superposition_types/src/api.rs @@ -2,6 +2,7 @@ pub mod audit_log; pub mod authz; pub mod config; pub mod context; +pub mod copy_to; pub mod default_config; pub mod dimension; #[cfg(feature = "experimentation")] diff --git a/crates/superposition_types/src/api/copy_to.rs b/crates/superposition_types/src/api/copy_to.rs new file mode 100644 index 000000000..a719388d7 --- /dev/null +++ b/crates/superposition_types/src/api/copy_to.rs @@ -0,0 +1,54 @@ +use serde::{Deserialize, Serialize}; + +use crate::database::models::ChangeReason; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum CopyEntityType { + Dimensions, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum RowSelectionMode { + All, + Selected, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum CopyResultStatus { + Copied, + Skipped, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CopyToRequest { + pub entity_type: CopyEntityType, + pub target_workspace: String, + pub selection_mode: RowSelectionMode, + #[serde(default)] + pub selected_rows: Vec, + #[serde(default)] + pub skip_existing: bool, + pub change_reason: ChangeReason, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CopyToResult { + pub row_identifier: String, + pub status: CopyResultStatus, + pub message: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CopyToResponse { + pub entity_type: CopyEntityType, + pub source_workspace: String, + pub target_workspace: String, + pub requested_count: usize, + pub copied_count: usize, + pub skipped_count: usize, + pub failed_count: usize, + pub results: Vec, +}