Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions src/alerts/alert_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub struct BasicAlertFields {

#[derive(Debug)]
pub struct Alerts {
pub alerts: RwLock<HashMap<Ulid, Box<dyn AlertTrait>>>,
pub alerts: RwLock<HashMap<String, HashMap<Ulid, Box<dyn AlertTrait>>>>,
pub sender: mpsc::Sender<AlertTask>,
}

Expand Down Expand Up @@ -288,7 +288,7 @@ pub struct AlertRequest {
}

impl AlertRequest {
pub async fn into(self) -> Result<AlertConfig, AlertError> {
pub async fn into(self, tenant_id: Option<String>) -> Result<AlertConfig, AlertError> {
// Validate that other_fields doesn't contain reserved field names
let other_fields = if let Some(mut other_fields) = self.other_fields {
// Limit other_fields to maximum 10 fields
Expand Down Expand Up @@ -316,7 +316,7 @@ impl AlertRequest {

// Validate that all target IDs exist
for id in &self.targets {
TARGETS.get_target_by_id(id).await?;
TARGETS.get_target_by_id(id, &tenant_id).await?;
}
let datasets = resolve_stream_names(&self.query)?;

Expand Down Expand Up @@ -369,6 +369,7 @@ impl AlertRequest {
tags: self.tags,
last_triggered_at: None,
other_fields,
tenant_id,
};

Ok(config)
Expand Down Expand Up @@ -399,6 +400,7 @@ pub struct AlertConfig {
pub last_triggered_at: Option<DateTime<Utc>>,
#[serde(flatten)]
pub other_fields: Option<serde_json::Map<String, Value>>,
pub tenant_id: Option<String>,
}

#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
Expand Down Expand Up @@ -711,6 +713,7 @@ pub struct DailyMTTRStats {
pub struct MTTRHistory {
/// Array of daily MTTR statistics
pub daily_stats: Vec<DailyMTTRStats>,
pub tenant_id: Option<String>
}

/// Query parameters for MTTR API endpoint
Expand Down Expand Up @@ -883,7 +886,7 @@ impl MetastoreObject for AlertConfig {
}

fn get_object_path(&self) -> String {
alert_json_path(self.id).to_string()
alert_json_path(self.id, &self.tenant_id).to_string()
}
}

Expand All @@ -893,6 +896,6 @@ impl MetastoreObject for MTTRHistory {
}

fn get_object_path(&self) -> String {
mttr_json_path().to_string()
mttr_json_path(&self.tenant_id).to_string()
}
}
13 changes: 8 additions & 5 deletions src/alerts/alert_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub trait AlertTrait: Debug + Send + Sync + MetastoreObject {
fn get_datasets(&self) -> &[String];
fn to_alert_config(&self) -> AlertConfig;
fn clone_box(&self) -> Box<dyn AlertTrait>;
fn get_tenant_id(&self) -> &Option<String>;
}

#[async_trait]
Expand All @@ -86,25 +87,27 @@ pub trait AlertManagerTrait: Send + Sync {
session: SessionKey,
tags: Vec<String>,
) -> Result<Vec<AlertConfig>, AlertError>;
async fn get_alert_by_id(&self, id: Ulid) -> Result<Box<dyn AlertTrait>, AlertError>;
async fn get_alert_by_id(&self, id: Ulid, tenant_id: &Option<String>) -> Result<Box<dyn AlertTrait>, AlertError>;
async fn update(&self, alert: &dyn AlertTrait);
async fn update_state(
&self,
alert_id: Ulid,
new_state: AlertState,
trigger_notif: Option<String>,
tenant_id: &Option<String>
) -> Result<(), AlertError>;
async fn update_notification_state(
&self,
alert_id: Ulid,
new_notification_state: NotificationState,
tenant_id: &Option<String>
) -> Result<(), AlertError>;
async fn delete(&self, alert_id: Ulid) -> Result<(), AlertError>;
async fn get_state(&self, alert_id: Ulid) -> Result<AlertState, AlertError>;
async fn delete(&self, alert_id: Ulid, tenant_id: &Option<String>) -> Result<(), AlertError>;
async fn get_state(&self, alert_id: Ulid, tenant_id: &Option<String>) -> Result<AlertState, AlertError>;
async fn start_task(&self, alert: Box<dyn AlertTrait>) -> Result<(), AlertError>;
async fn delete_task(&self, alert_id: Ulid) -> Result<(), AlertError>;
async fn list_tags(&self) -> Vec<String>;
async fn get_all_alerts(&self) -> HashMap<Ulid, Box<dyn AlertTrait>>;
async fn list_tags(&self, tenant_id: &Option<String>) -> Vec<String>;
async fn get_all_alerts(&self, tenant_id: &Option<String>) -> HashMap<Ulid, Box<dyn AlertTrait>>;
}

#[async_trait]
Expand Down
60 changes: 51 additions & 9 deletions src/alerts/alert_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ use crate::{
metastore::metastore_traits::MetastoreObject,
parseable::PARSEABLE,
query::resolve_stream_names,
rbac::map::SessionKey,
rbac::{
map::{SessionKey, roles, users},
role::model::DefaultPrivilege,
},
storage::object_storage::alert_json_path,
utils::user_auth_for_query,
};
Expand Down Expand Up @@ -68,11 +71,12 @@ pub struct ThresholdAlert {
pub last_triggered_at: Option<DateTime<Utc>>,
#[serde(flatten)]
pub other_fields: Option<serde_json::Map<String, Value>>,
pub tenant_id: Option<String>,
}

impl MetastoreObject for ThresholdAlert {
fn get_object_path(&self) -> String {
alert_json_path(self.id).to_string()
alert_json_path(self.id, &self.tenant_id).to_string()
}

fn get_object_id(&self) -> String {
Expand All @@ -84,7 +88,39 @@ impl MetastoreObject for ThresholdAlert {
impl AlertTrait for ThresholdAlert {
async fn eval_alert(&self) -> Result<Option<String>, AlertError> {
let time_range = extract_time_range(&self.eval_config)?;
let query_result = execute_alert_query(self.get_query(), &time_range).await?;
let auth = if let Some(tenant) = &self.tenant_id
&& let Some(tenant_users) = users().get(tenant)
&& let Some(tenant_roles) = roles().get(tenant)
&& let Some(user) = tenant_users.iter().find_map(|(_, user)| {
let mut res = None;
for role in &user.roles {
if let Some(role) = tenant_roles.get(role)
&& role.contains(&DefaultPrivilege::Admin)
{
res = Some(user.clone());
break;
}
}
res
}) {
// fetch admin credentials for tenant
match user.ty {
crate::rbac::user::UserType::Native(basic) => {
// Create a protected user whose details can't be edited
// save that user's basic auth
// use that to send request
None
},
crate::rbac::user::UserType::OAuth(_) => {
tracing::warn!("admin user is oauth");
None
},
}
} else {
None
};
let query_result =
execute_alert_query(auth, self.get_query(), &time_range, &self.tenant_id).await?;
Comment on lines +91 to +123
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Auth credential extraction is incomplete - always returns None.

The complex logic to find an admin user for the tenant (lines 91-105) correctly identifies a user with admin privileges, but the credential extraction (lines 107-118) always returns None:

  • Native user: returns None with TODO comments about protected user
  • OAuth user: logs warning and returns None

This means execute_alert_query will always receive None for auth_token, which may cause remote alert queries in Prism mode to fail authentication.

This appears to be work-in-progress. Would you like help implementing the credential extraction for Native users, or should this be tracked as a follow-up issue?


if query_result.is_simple_query {
// Handle simple queries
Expand Down Expand Up @@ -164,7 +200,7 @@ impl AlertTrait for ThresholdAlert {
"No tables found in query".into(),
));
}
create_streams_for_distributed(tables)
create_streams_for_distributed(tables, &self.tenant_id)
.await
.map_err(|_| AlertError::InvalidAlertQuery("Invalid tables".into()))?;

Expand All @@ -191,7 +227,7 @@ impl AlertTrait for ThresholdAlert {
// update on disk
PARSEABLE
.metastore
.put_alert(&self.to_alert_config())
.put_alert(&self.to_alert_config(), &self.tenant_id)
.await?;
Ok(())
}
Expand All @@ -217,12 +253,12 @@ impl AlertTrait for ThresholdAlert {
// update on disk
PARSEABLE
.metastore
.put_alert(&self.to_alert_config())
.put_alert(&self.to_alert_config(), &self.tenant_id)
.await?;
let state_entry = AlertStateEntry::new(self.id, self.state);
PARSEABLE
.metastore
.put_alert_state(&state_entry as &dyn MetastoreObject)
.put_alert_state(&state_entry as &dyn MetastoreObject, &self.tenant_id)
.await?;
return Ok(());
}
Expand Down Expand Up @@ -257,13 +293,13 @@ impl AlertTrait for ThresholdAlert {
// update on disk
PARSEABLE
.metastore
.put_alert(&self.to_alert_config())
.put_alert(&self.to_alert_config(), &self.tenant_id)
.await?;
let state_entry = AlertStateEntry::new(self.id, self.state);

PARSEABLE
.metastore
.put_alert_state(&state_entry as &dyn MetastoreObject)
.put_alert_state(&state_entry as &dyn MetastoreObject, &self.tenant_id)
.await?;

if let Some(trigger_notif) = trigger_notif
Expand Down Expand Up @@ -337,6 +373,10 @@ impl AlertTrait for ThresholdAlert {
&self.datasets
}

fn get_tenant_id(&self) -> &Option<String> {
&self.tenant_id
}

fn to_alert_config(&self) -> AlertConfig {
let clone = self.clone();
clone.into()
Expand Down Expand Up @@ -414,6 +454,7 @@ impl From<AlertConfig> for ThresholdAlert {
datasets: value.datasets,
last_triggered_at: value.last_triggered_at,
other_fields: value.other_fields,
tenant_id: value.tenant_id,
}
}
}
Expand All @@ -438,6 +479,7 @@ impl From<ThresholdAlert> for AlertConfig {
datasets: val.datasets,
last_triggered_at: val.last_triggered_at,
other_fields: val.other_fields,
tenant_id: val.tenant_id,
}
}
}
Expand Down
41 changes: 30 additions & 11 deletions src/alerts/alerts_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,14 @@ pub fn extract_time_range(eval_config: &super::EvalConfig) -> Result<TimeRange,

/// Execute the alert query based on the current mode and return structured group results
pub async fn execute_alert_query(
auth_token: Option<String>,
query: &str,
time_range: &TimeRange,
tenant_id: &Option<String>,
) -> Result<AlertQueryResult, AlertError> {
match PARSEABLE.options.mode {
Mode::All | Mode::Query => execute_local_query(query, time_range).await,
Mode::Prism => execute_remote_query(query, time_range).await,
Mode::All | Mode::Query => execute_local_query(query, time_range, tenant_id).await,
Mode::Prism => execute_remote_query(auth_token, query, time_range).await,
_ => Err(AlertError::CustomError(format!(
"Unsupported mode '{:?}' for alert evaluation",
PARSEABLE.options.mode
Expand All @@ -92,11 +94,12 @@ pub async fn execute_alert_query(
async fn execute_local_query(
query: &str,
time_range: &TimeRange,
tenant_id: &Option<String>,
) -> Result<AlertQueryResult, AlertError> {
let session_state = QUERY_SESSION.state();
let session_state = QUERY_SESSION.get_ctx().state();

let tables = resolve_stream_names(query)?;
create_streams_for_distributed(tables.clone())
create_streams_for_distributed(tables.clone(), tenant_id)
.await
.map_err(|err| AlertError::CustomError(format!("Failed to create streams: {err}")))?;

Expand All @@ -107,7 +110,7 @@ async fn execute_local_query(
filter_tag: None,
};

let (records, _) = execute(query, false)
let (records, _) = execute(query, false, tenant_id)
.await
.map_err(|err| AlertError::CustomError(format!("Failed to execute query: {err}")))?;

Expand All @@ -125,10 +128,11 @@ async fn execute_local_query(

/// Execute alert query remotely (Prism mode)
async fn execute_remote_query(
auth_token: Option<String>,
query: &str,
time_range: &TimeRange,
) -> Result<AlertQueryResult, AlertError> {
let session_state = QUERY_SESSION.state();
let session_state = QUERY_SESSION.get_ctx().state();
let raw_logical_plan = session_state.create_logical_plan(query).await?;

let query_request = Query {
Expand All @@ -141,7 +145,7 @@ async fn execute_remote_query(
filter_tags: None,
};

let (result_value, _) = send_query_request(&query_request)
let (result_value, _) = send_query_request(None,&query_request)
.await
.map_err(|err| AlertError::CustomError(format!("Failed to send query request: {err}")))?;

Expand Down Expand Up @@ -280,19 +284,34 @@ async fn update_alert_state(
// Now perform the state update
if let Some(msg) = message {
alerts
.update_state(*alert.get_id(), AlertState::Triggered, Some(msg))
.update_state(
*alert.get_id(),
AlertState::Triggered,
Some(msg),
alert.get_tenant_id(),
)
.await
} else if alerts
.get_state(*alert.get_id())
.get_state(*alert.get_id(), alert.get_tenant_id())
.await?
.eq(&AlertState::Triggered)
{
alerts
.update_state(*alert.get_id(), AlertState::NotTriggered, Some("".into()))
.update_state(
*alert.get_id(),
AlertState::NotTriggered,
Some("".into()),
alert.get_tenant_id(),
)
.await
} else {
alerts
.update_state(*alert.get_id(), AlertState::NotTriggered, None)
.update_state(
*alert.get_id(),
AlertState::NotTriggered,
None,
alert.get_tenant_id(),
)
.await
}
}
Expand Down
Loading
Loading