From a59d0bfa5e6b3328d5b9efa997ab26d57bede603 Mon Sep 17 00:00:00 2001 From: Greg Shear Date: Wed, 29 Apr 2026 16:01:08 -0400 Subject: [PATCH] agent: extract email infrastructure into agent::email module Move EmailSender trait, Sender enum, and ResendSender out of agent::alerts::notifier into a standalone agent::email module so non-alert emails (e.g. invite emails) can reuse the same sending infrastructure. - New agent::email module with EmailSender trait, Sender, ResendSender, and Arc impl - Extract EMAIL_WRAPPER_TEMPLATE constant and register_email_wrapper() from notifications crate so non-alert emails can share Estuary branding - Construct Sender at top level in main.rs wrapped in Arc, ready to be shared across alert notifications and future invite email code - alerts::notifier re-exports EmailSender and Sender to preserve the existing public API --- crates/agent/src/alerts/notifier.rs | 110 +--------------------------- crates/agent/src/email.rs | 110 ++++++++++++++++++++++++++++ crates/agent/src/lib.rs | 1 + crates/agent/src/main.rs | 50 +++++++------ crates/notifications/src/lib.rs | 26 ++++--- 5 files changed, 154 insertions(+), 143 deletions(-) create mode 100644 crates/agent/src/email.rs diff --git a/crates/agent/src/alerts/notifier.rs b/crates/agent/src/alerts/notifier.rs index 69840d57b1e..ae2f0da96ae 100644 --- a/crates/agent/src/alerts/notifier.rs +++ b/crates/agent/src/alerts/notifier.rs @@ -4,6 +4,8 @@ use chrono::{DateTime, Utc}; use control_plane_api::alerts::{Alert, fetch_alert_by_id}; use notifications::Renderer; +pub use crate::email::{EmailSender, Sender}; + #[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct NotifierState { pub(crate) fired_completed: Option>, @@ -87,114 +89,6 @@ impl AlertNotifications { } } -pub trait EmailSender: std::fmt::Debug + Send + Sync + 'static { - fn send<'s>( - &'s self, - email: notifications::NotificationEmail, - ) -> impl std::future::Future> + Send + 's; -} - -/// Sends emails using the resend -#[derive(Debug)] -pub struct ResendSender { - from_address: String, - reply_to_address: String, - resend_client: resend_rs::Resend, - retry_options: resend_rs::rate_limit::RetryOptions, -} - -impl ResendSender { - async fn send(&self, notification: notifications::NotificationEmail) -> anyhow::Result<()> { - let notifications::NotificationEmail { - idempotency_key, - recipient: notifications::Recipient { email, .. }, - subject, - body, - } = notification; - - let Self { - from_address, - reply_to_address, - resend_client, - retry_options, - } = self; - - let resend_req = - resend_rs::types::CreateEmailBaseOptions::new(from_address, [email.as_str()], subject) - .with_reply(reply_to_address.as_str()) - .with_html(body.as_str()) - .with_idempotency_key(idempotency_key.as_str()); - - // Note on retries: We don't technically need to handle retries here, as - // we could instead return and just schedule ourselves to run again. - // It's common for many alerts to fire more or less simultaneously, and - // the resend rate limit is only 9 req/s. So my current thinking is that - // it's better to handle retries here, so that we don't end up having a - // bunch of other notifier tasks run and then have this one hit another - // rate limit error when we retry it. Better to retry each notification - // until it succeeds. If we exhaust the number of retries, we'll return - // an error and back off somewhat longer. - let response = resend_rs::rate_limit::send_with_retry_opts( - || async { resend_client.emails.send(resend_req.clone()).await }, - retry_options, - ) - .await - .context("calling resend API")?; - - tracing::debug!(%idempotency_key, to = %email, email_id = %response.id, "successfully sent alert email"); - - Ok(()) - } -} - -#[derive(Debug)] -pub enum Sender { - Disabled, - Resend(ResendSender), -} - -impl Sender { - pub fn resend( - api_key: &str, - from_address: String, - reply_to_address: String, - http_client: reqwest::Client, - ) -> Sender { - let resend_client = resend_rs::Resend::with_client(api_key, http_client); - let inner = ResendSender { - from_address, - reply_to_address, - resend_client, - retry_options: resend_rs::rate_limit::RetryOptions { - duration_ms: 150, - jitter_range_ms: 0..1000, - max_retries: 5, - }, - }; - Sender::Resend(inner) - } -} - -impl EmailSender for Sender { - async fn send<'s>( - &'s self, - notification: notifications::NotificationEmail, - ) -> anyhow::Result<()> { - match self { - Sender::Disabled => { - tracing::warn!( - to = %notification.recipient.email, - subject = %notification.subject, - idempotency_key = %notification.idempotency_key, - "skipping sending alert email (disabled)" - ); - return Ok(()); - } - Sender::Resend(resend) => resend.send(notification).await, - } - } -} - pub enum AlertOutcome { AwaitResolution, ResolvedSent, diff --git a/crates/agent/src/email.rs b/crates/agent/src/email.rs new file mode 100644 index 00000000000..f3e18f0b1b3 --- /dev/null +++ b/crates/agent/src/email.rs @@ -0,0 +1,110 @@ +/// Sends emails using the Resend API with retry logic for rate limiting. +#[derive(Debug)] +pub struct ResendSender { + from_address: String, + reply_to_address: String, + resend_client: resend_rs::Resend, + retry_options: resend_rs::rate_limit::RetryOptions, +} + +impl ResendSender { + pub async fn send(&self, notification: notifications::NotificationEmail) -> anyhow::Result<()> { + let notifications::NotificationEmail { + idempotency_key, + recipient: notifications::Recipient { email, .. }, + subject, + body, + } = notification; + + let Self { + from_address, + reply_to_address, + resend_client, + retry_options, + } = self; + + let resend_req = + resend_rs::types::CreateEmailBaseOptions::new(from_address, [email.as_str()], subject) + .with_reply(reply_to_address.as_str()) + .with_html(body.as_str()) + .with_idempotency_key(idempotency_key.as_str()); + + let response = resend_rs::rate_limit::send_with_retry_opts( + || async { resend_client.emails.send(resend_req.clone()).await }, + retry_options, + ) + .await + .context("calling resend API")?; + + tracing::debug!(%idempotency_key, to = %email, email_id = %response.id, "successfully sent email"); + + Ok(()) + } +} + +#[derive(Debug)] +pub enum Sender { + Disabled, + Resend(ResendSender), +} + +impl Sender { + pub fn resend( + api_key: &str, + from_address: String, + reply_to_address: String, + http_client: reqwest::Client, + ) -> Sender { + let resend_client = resend_rs::Resend::with_client(api_key, http_client); + let inner = ResendSender { + from_address, + reply_to_address, + resend_client, + retry_options: resend_rs::rate_limit::RetryOptions { + duration_ms: 150, + jitter_range_ms: 0..1000, + max_retries: 5, + }, + }; + Sender::Resend(inner) + } +} + +pub trait EmailSender: std::fmt::Debug + Send + Sync + 'static { + fn send<'s>( + &'s self, + email: notifications::NotificationEmail, + ) -> impl std::future::Future> + Send + 's; +} + +impl EmailSender for Sender { + async fn send<'s>( + &'s self, + notification: notifications::NotificationEmail, + ) -> anyhow::Result<()> { + match self { + Sender::Disabled => { + tracing::warn!( + to = %notification.recipient.email, + subject = %notification.subject, + idempotency_key = %notification.idempotency_key, + "skipping sending email (disabled)" + ); + return Ok(()); + } + Sender::Resend(resend) => resend.send(notification).await, + } + } +} + +use anyhow::Context; +use std::sync::Arc; + +impl EmailSender for Arc { + async fn send<'s>( + &'s self, + email: notifications::NotificationEmail, + ) -> anyhow::Result<()> { + (**self).send(email).await + } +} diff --git a/crates/agent/src/lib.rs b/crates/agent/src/lib.rs index 0352fba3cf2..6d88477fbd9 100644 --- a/crates/agent/src/lib.rs +++ b/crates/agent/src/lib.rs @@ -1,6 +1,7 @@ pub mod alerts; pub(crate) mod connector_tags; pub mod controllers; +pub mod email; pub(crate) mod controlplane; mod directives; mod discovers; diff --git a/crates/agent/src/main.rs b/crates/agent/src/main.rs index 13ae34ee4bf..403c64e7558 100644 --- a/crates/agent/src/main.rs +++ b/crates/agent/src/main.rs @@ -352,31 +352,33 @@ async fn async_main(args: Args) -> Result<(), anyhow::Error> { args.data_movement_alert_interval, )); + let sender = if let Some(api_key) = &args.resend_api_key { + let from_email = args + .email_from_address + .clone() + .expect("missing email-from-address"); + let reply_to_email = args + .email_reply_to_address + .clone() + .expect("missing email-reply-to-address"); + tracing::info!(%from_email, %reply_to_email, "Email sending is enabled"); + agent::email::Sender::resend( + api_key, + from_email, + reply_to_email, + new_http_client()?, + ) + } else { + tracing::warn!("Email sending is disabled"); + agent::email::Sender::Disabled + }; + let sender = std::sync::Arc::new(sender); + if args.serve_alert_notifications { - let sender = if let Some(api_key) = &args.resend_api_key { - // These two are required if api-key is provided, so clap should have ensured they are present - let from_email = args - .email_from_address - .clone() - .expect("missing email-from-address"); - let reply_to_email = args - .email_reply_to_address - .clone() - .expect("missing email-reply-to-address"); - tracing::info!(%from_email, %reply_to_email, "Sending of alert emails is enabled"); - agent::alerts::Sender::resend( - api_key, - from_email, - reply_to_email, - new_http_client()?, - ) - } else { - // Hopefully this is a local env - tracing::warn!("Sending of alert emails is disabled"); - agent::alerts::Sender::Disabled - }; - let alert_notifications = - agent::alerts::AlertNotifications::new(&args.dashboard_base_url, sender)?; + let alert_notifications = agent::alerts::AlertNotifications::new( + &args.dashboard_base_url, + sender.clone(), + )?; automations_server = automations_server.register(alert_notifications); } diff --git a/crates/notifications/src/lib.rs b/crates/notifications/src/lib.rs index 6d4d4f5e9cf..f92dbca56ec 100644 --- a/crates/notifications/src/lib.rs +++ b/crates/notifications/src/lib.rs @@ -212,15 +212,9 @@ impl Renderer { } } -fn register_common_templates<'a>(registry: &mut handlebars::Handlebars<'a>) -> anyhow::Result<()> { - // Common email wrapper template that's used by all alert types, so we have consistent styling. - // Note that this html was generated by an LLM, from the original `mjml` template in the legacy - // alerts edge function. That legacy template used the `mjml-browser` library to render html - // with styling that matches our UI. This LLM-translation doesn't match that _perfectly_, - // but was considered good enough to allow getting rid of `mjml`. - registry.register_template_string( - "email_wrapper", - r#" +/// Branded HTML wrapper template shared by all Estuary emails (alerts, invites, etc.). +/// Expects `full_name` (optional) and a `body_template_name` partial to inject content. +pub const EMAIL_WRAPPER_TEMPLATE: &str = r#" @@ -269,8 +263,18 @@ fn register_common_templates<'a>(registry: &mut handlebars::Handlebars<'a>) -> a -"#, - ).context("registering email_wrapper template")?; +"#; + +/// Registers the shared email wrapper template. Call this before registering +/// any body templates that use `{{> (lookup this "body_template_name")}}`. +pub fn register_email_wrapper(registry: &mut handlebars::Handlebars) -> anyhow::Result<()> { + registry + .register_template_string("email_wrapper", EMAIL_WRAPPER_TEMPLATE) + .context("registering email_wrapper template") +} + +fn register_common_templates<'a>(registry: &mut handlebars::Handlebars<'a>) -> anyhow::Result<()> { + register_email_wrapper(registry)?; // Helper partial for rendering a catalog name identifier registry