Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ reqwest = { version = "0.12", default-features = false, features = ["blocking",
quick-xml = "0.39"
flate2 = "1"
zip = { version = "8", default-features = false, features = ["deflate"] }
tokio-stream = { version = "0.1", features = ["sync"] }
3 changes: 3 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ fn main() {
hostname,
admin_port: port,
mcp_guard: std::sync::Arc::new(std::sync::Mutex::new(web::McpGuard::new())),
idle_registry: std::sync::Arc::new(std::sync::Mutex::new(
std::collections::HashMap::new(),
)),
};

// Start fail2ban log watcher in a background thread
Expand Down
22 changes: 21 additions & 1 deletion src/web/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,31 @@ use axum::response::Response;
use axum::routing::get_service;
use axum::Router;
use log::{debug, info, warn};
use std::collections::VecDeque;
use serde::Serialize;
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tower_http::services::ServeDir;

use crate::web::errors::status_response;

// ── IMAP IDLE session tracking ────────────────────────────────────────────────

/// Represents one active IMAP-IDLE (SSE) connection from the webmail client.
#[derive(Clone, Serialize)]
pub struct ImapIdleSession {
pub id: String,
pub account_id: i64,
pub username: String,
pub domain: String,
pub folder: String,
pub connected_at: String,
pub last_ping_at: String,
}

/// Shared in-memory registry of active IMAP IDLE sessions.
pub type ImapIdleRegistry = Arc<Mutex<HashMap<String, ImapIdleSession>>>;

// ── MCP rate-limit and anomaly-detection constants ────────────────────────────

/// Maximum number of MCP calls allowed per 60-second sliding window.
Expand Down Expand Up @@ -116,6 +134,8 @@ pub struct AppState {
pub admin_port: u16,
/// Shared rate-limiter and anomaly detector for the MCP endpoint.
pub mcp_guard: Arc<Mutex<McpGuard>>,
/// Registry of active webmail IMAP-IDLE (SSE) sessions.
pub idle_registry: ImapIdleRegistry,
}

impl AppState {
Expand Down
10 changes: 9 additions & 1 deletion src/web/routes/dashboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,20 @@ struct DashboardTemplate<'a> {
flash: Option<&'a str>,
hostname: &'a str,
stats: crate::db::Stats,
idle_session_count: usize,
}

pub async fn page(_auth: AuthAdmin, State(state): State<AppState>) -> Html<String> {
info!("[web] GET / — dashboard requested");
let stats = state.blocking_db(|db| db.get_stats()).await;

let idle_session_count = {
let reg = state.idle_registry.lock().unwrap();
reg.len()
};

debug!(
"[web] dashboard stats: domains={}, accounts={}, aliases={}, forwarding={}, tracked={}, opens={}, banned={}, webhooks={}, unsubs={}, dkim_ready={}",
"[web] dashboard stats: domains={}, accounts={}, aliases={}, forwarding={}, tracked={}, opens={}, banned={}, webhooks={}, unsubs={}, dkim_ready={}, idle_sessions={}",
stats.domain_count,
stats.account_count,
stats.alias_count,
Expand All @@ -32,13 +38,15 @@ pub async fn page(_auth: AuthAdmin, State(state): State<AppState>) -> Html<Strin
stats.webhook_count,
stats.unsubscribe_count,
stats.dkim_ready_count,
idle_session_count,
);

let tmpl = DashboardTemplate {
nav_active: "Dashboard",
flash: None,
hostname: &state.hostname,
stats,
idle_session_count,
};
Html(tmpl.render().unwrap())
}
37 changes: 37 additions & 0 deletions src/web/routes/imap_idle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use askama::Template;
use axum::{extract::State, response::Html};
use log::info;

use crate::web::auth::AuthAdmin;
use crate::web::AppState;
use crate::web::ImapIdleSession;

// ── Templates ──

#[derive(Template)]
#[template(path = "imap_idle/list.html")]
struct ImapIdleTemplate<'a> {
nav_active: &'a str,
flash: Option<&'a str>,
sessions: Vec<ImapIdleSession>,
}

// ── Handlers ──

pub async fn list(_auth: AuthAdmin, State(state): State<AppState>) -> Html<String> {
info!("[web] GET /imap-idle — listing active IDLE sessions");

let sessions: Vec<ImapIdleSession> = {
let reg = state.idle_registry.lock().unwrap();
let mut list: Vec<ImapIdleSession> = reg.values().cloned().collect();
list.sort_by(|a, b| a.connected_at.cmp(&b.connected_at));
list
};

let tmpl = ImapIdleTemplate {
nav_active: "IMAP IDLE",
flash: None,
sessions,
};
Html(tmpl.render().unwrap())
}
3 changes: 3 additions & 0 deletions src/web/routes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod domains;
pub mod fail2ban;
pub mod footer;
pub mod forwarding;
pub mod imap_idle;
pub mod mcp;
pub mod pixel;
pub mod queue;
Expand Down Expand Up @@ -94,6 +95,8 @@ pub fn auth_routes() -> Router<AppState> {
.route("/webmail/delete/:filename", post(webmail::delete_email))
.route("/webmail/compose", get(webmail::compose))
.route("/webmail/send", post(webmail::send_email))
.route("/webmail/idle", get(webmail::idle_stream))
.route("/imap-idle", get(imap_idle::list))
.route("/settings", get(settings::page))
.route("/settings/password", post(settings::change_password))
.route("/settings/2fa", get(settings::setup_2fa))
Expand Down
155 changes: 154 additions & 1 deletion src/web/routes/webmail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@ use askama::Template;
use axum::{
extract::{Path, Query, State},
http::header,
response::{Html, IntoResponse, Redirect, Response},
response::{
sse::{Event, KeepAlive, Sse},
Html, IntoResponse, Redirect, Response,
},
Form,
};
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine};
use log::{debug, error, info, warn};
use serde::Deserialize;
use std::convert::Infallible;
use std::time::Duration;
use tokio_stream::wrappers::ReceiverStream;

use crate::db::Account;
use crate::web::auth::AuthAdmin;
Expand Down Expand Up @@ -1496,6 +1502,153 @@ pub async fn send_email(
}
}

// ── IMAP IDLE (SSE) ──────────────────────────────────────────────────────────

#[derive(Deserialize)]
pub struct ImapIdleQuery {
pub account_id: i64,
#[serde(default)]
pub folder: String,
}

/// Count the number of messages in the `new/` subdirectory of a Maildir folder.
fn count_new_messages(maildir_base: &str, folder: &str) -> usize {
let root = folder_root(maildir_base, folder);
let new_dir = format!("{}/new", root);
std::fs::read_dir(&new_dir)
.map(|entries| entries.flatten().filter(|e| e.path().is_file()).count())
.unwrap_or(0)
}

/// Server-Sent Events endpoint that emits `mailbox` events whenever the number
/// of new messages in a Maildir folder changes. This is the webmail equivalent
/// of the IMAP IDLE command (RFC 2177).
///
/// Query parameters:
/// - `account_id` – ID of the account to watch.
/// - `folder` – Maildir subfolder name (empty = INBOX).
///
/// The connection is registered in `AppState::idle_registry` for admin visibility
/// and is automatically removed when the client disconnects.
pub async fn idle_stream(
_auth: AuthAdmin,
State(state): State<AppState>,
Query(query): Query<ImapIdleQuery>,
) -> impl IntoResponse {
let account_id = query.account_id;
let folder = if is_safe_folder(&query.folder) {
query.folder.clone()
} else {
String::new()
};

// Resolve account details
let acct = state
.blocking_db(move |db| db.get_account_with_domain(account_id))
.await;

let (username, domain) = match acct {
Some(ref a) => (
a.username.clone(),
a.domain_name.clone().unwrap_or_default(),
),
None => {
warn!("[idle] account id={} not found", account_id);
let (_, rx) = tokio::sync::mpsc::channel::<Result<Event, Infallible>>(1);
let stream = ReceiverStream::new(rx);
return Sse::new(stream).keep_alive(KeepAlive::default());
}
};

if !is_safe_path_component(&domain) || !is_safe_path_component(&username) {
warn!(
"[idle] unsafe path component: domain={}, username={}",
domain, username
);
let (_, rx) = tokio::sync::mpsc::channel::<Result<Event, Infallible>>(1);
let stream = ReceiverStream::new(rx);
return Sse::new(stream).keep_alive(KeepAlive::default());
}

let maildir_base = maildir_path(&domain, &username);
let session_id = uuid::Uuid::new_v4().to_string();
let now_ts = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC").to_string();

// Register the session
{
let mut reg = state.idle_registry.lock().unwrap();
reg.insert(
session_id.clone(),
crate::web::ImapIdleSession {
id: session_id.clone(),
account_id,
username: username.clone(),
domain: domain.clone(),
folder: if folder.is_empty() {
"INBOX".to_string()
} else {
folder.trim_start_matches('.').to_string()
},
connected_at: now_ts.clone(),
last_ping_at: now_ts,
},
);
}

info!(
"[idle] session {} opened for {}@{} folder={}",
session_id, username, domain, folder
);

let (tx, rx) = tokio::sync::mpsc::channel::<Result<Event, Infallible>>(16);
let registry = state.idle_registry.clone();
let sid = session_id.clone();

tokio::spawn(async move {
let mut last_count: Option<usize> = None;
let mut interval = tokio::time::interval(Duration::from_secs(5));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);

loop {
interval.tick().await;

let count = count_new_messages(&maildir_base, &folder);

// Format timestamp before acquiring lock to minimise contention
let ping_ts = chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC").to_string();

// Update last_ping_at
{
let mut reg = registry.lock().unwrap();
if let Some(session) = reg.get_mut(&sid) {
session.last_ping_at = ping_ts;
}
}

let changed = last_count.map(|c| c != count).unwrap_or(true);
if changed {
last_count = Some(count);
let data = serde_json::json!({ "new_count": count }).to_string();
let event = Event::default().event("mailbox").data(data);
if tx.send(Ok(event)).await.is_err() {
// Receiver dropped – client disconnected
break;
}
}
}

// Deregister session
{
let mut reg = registry.lock().unwrap();
reg.remove(&sid);
}
info!("[idle] session {} closed", sid);
});

let stream = ReceiverStream::new(rx);
Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(30)))
}

#[cfg(test)]
mod tests {
use super::{
Expand Down
4 changes: 4 additions & 0 deletions templates/config/dovecot.conf.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ service imap-login {
}
}

# IMAP IDLE (RFC 2177) — notify clients at most every 2 minutes that new mail
# has arrived so they can update without polling.
imap_idle_notify_interval = 2 mins

service pop3-login {
inet_listener pop3 {
address = 0.0.0.0
Expand Down
1 change: 1 addition & 0 deletions templates/dashboard.html
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ <h2>Live counters</h2>
<article><data value="{{ stats.webhook_count }}">{{ stats.webhook_count }}</data><strong>Webhooks</strong><small>Events dispatched</small></article>
<article><data value="{{ stats.unsubscribe_count }}">{{ stats.unsubscribe_count }}</data><strong>Unsubscribes</strong><small>Opt-out records</small></article>
<article><data value="{{ stats.banned_count }}">{{ stats.banned_count }}</data><strong>Banned IPs</strong><small>Active fail2ban bans</small></article>
<article><data value="{{ idle_session_count }}">{{ idle_session_count }}</data><strong>IMAP IDLE</strong><small>Live watchers</small></article>
</div>
</section>
{% endblock %}
Loading
Loading