diff --git a/Cargo.toml b/Cargo.toml index 15a4c14..a40ca96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,10 @@ path = "src/bin/bichonctl.rs" name = "bichon-admin" path = "src/bin/bichon_admin.rs" +[[bin]] +name = "bichonsync" +path = "src/bin/bichonsync.rs" + [features] default = [] diff --git a/src/bin/bichonsync.rs b/src/bin/bichonsync.rs new file mode 100644 index 0000000..a359c40 --- /dev/null +++ b/src/bin/bichonsync.rs @@ -0,0 +1,539 @@ +use bichon::modules::account::payload::MinimalAccount; +use bichon::modules::cli::BichonCtlConfig; +use bichon::modules::sync::{ + FetchEmlRequest, MailboxStatusEntry, RawEmailExport, SyncFolderResult, SyncVerifyResult, +}; +use clap::{Parser, Subcommand}; +use console::style; +use dialoguer::{theme::ColorfulTheme, Confirm, Input, Select}; +use reqwest::Client; +use std::fs; + +#[derive(Parser, Debug)] +#[command( + name = "bichonsync", + author = "rustmailer", + version = bichon::bichon_version!(), + about = "CLI tool for manual sync controls against the Bichon API" +)] +struct Cli { + /// Path to the configuration file + #[arg( + short, + long, + default_value = "config.toml", + value_name = "FILE", + help = "Sets a custom config file" + )] + config: std::path::PathBuf, + + #[command(subcommand)] + command: Commands, +} + +#[derive(Subcommand, Debug)] +enum Commands { + /// List all accounts + ListAccounts, + /// List mailboxes for an account + ListMailboxes { + /// Account ID + #[arg(short, long)] + account_id: u64, + }, + /// Trigger a full sync for an account + Sync { + /// Account ID to sync + #[arg(short, long)] + account_id: u64, + }, + /// Sync a single folder/mailbox + SyncFolder { + /// Account ID + #[arg(short, long)] + account_id: u64, + /// Mailbox ID to sync + #[arg(short, long)] + mailbox_id: u64, + /// Export duplicate/missing messages as .eml files to a tmp directory + #[arg(long)] + export_dupes: bool, + }, + /// Verify sync completeness against the mail server + Verify { + /// Account ID to verify + #[arg(short, long)] + account_id: u64, + }, +} + +#[tokio::main] +async fn main() { + let cli = Cli::parse(); + let config = load_or_prompt_config(&cli.config); + let client = Client::new(); + + match cli.command { + Commands::ListAccounts => { + let url = format!("{}/api/v1/minimal-account-list", config.base_url); + match get_request::>(&client, &config, &url).await { + Ok(accounts) => { + if accounts.is_empty() { + println!("No accounts found."); + } else { + println!("\n {:<20} {}", "Account ID", "Email"); + println!(" {}", "-".repeat(50)); + for acc in &accounts { + println!(" {:<20} {}", acc.id, acc.email); + } + println!(); + } + } + Err(e) => eprintln!("{} {}", style("✘").red(), e), + } + } + Commands::ListMailboxes { account_id } => { + let url = format!( + "{}/api/v1/sync/mailbox-status/{}", + config.base_url, account_id + ); + match get_request::>(&client, &config, &url).await { + Ok(mailboxes) => { + if mailboxes.is_empty() { + println!("No mailboxes found for account {}.", account_id); + } else { + println!( + "\n {:<20} {:<30} {:>10} {:>10} {}", + "Mailbox ID", "Name", "Server", "Local", "Sync" + ); + println!(" {}", "-".repeat(80)); + for mb in &mailboxes { + let sync_icon = if mb.syncing { + style("ON").green() + } else { + style("--").dim() + }; + println!( + " {:<20} {:<30} {:>10} {:>10} {}", + mb.mailbox_id, mb.mailbox_name, mb.server_count, mb.local_count, sync_icon + ); + } + println!(); + } + } + Err(e) => eprintln!("{} {}", style("✘").red(), e), + } + } + Commands::Sync { account_id } => { + println!( + "{} Triggering sync for account {}...", + style("⟳").cyan(), + account_id + ); + let url = format!("{}/api/v1/sync/{}", config.base_url, account_id); + match post_request(&client, &config, &url).await { + Ok(_) => { + println!( + "{} Sync completed for account {}.", + style("✔").green(), + account_id + ); + print_mailbox_status(&client, &config, account_id).await; + } + Err(e) => eprintln!("{} {}", style("✘").red(), e), + } + } + Commands::SyncFolder { + account_id, + mailbox_id, + export_dupes, + } => { + println!( + "{} Syncing folder {} for account {}...", + style("⟳").cyan(), + mailbox_id, + account_id + ); + let url = format!( + "{}/api/v1/sync/{}/{}", + config.base_url, account_id, mailbox_id + ); + match post_request_json::(&client, &config, &url).await { + Ok(result) => { + print_sync_folder_result(&result); + if export_dupes && !result.missing_messages.is_empty() { + export_missing_emails( + &client, &config, account_id, mailbox_id, &result, + ) + .await; + } + print_mailbox_status(&client, &config, account_id).await; + } + Err(e) => eprintln!("{} {}", style("✘").red(), e), + } + } + Commands::Verify { account_id } => { + println!( + "{} Verifying sync completeness for account {}...", + style("⟳").cyan(), + account_id + ); + let url = format!("{}/api/v1/sync/verify/{}", config.base_url, account_id); + match get_request::(&client, &config, &url).await { + Ok(result) => print_verify_result(&result), + Err(e) => eprintln!("{} {}", style("✘").red(), e), + } + } + } +} + +fn load_or_prompt_config(config_path: &std::path::Path) -> BichonCtlConfig { + let theme = ColorfulTheme::default(); + + if config_path.exists() { + if let Ok(content) = fs::read_to_string(config_path) { + if let Ok(config) = toml::from_str::(&content) { + println!("{}", style("✔ Using existing configuration:").green()); + println!(" Base URL: {}", style(&config.base_url).yellow()); + return config; + } + } + } + + println!( + "\n{}", + style("Please enter Bichon service details:").bold() + ); + + let url: String = Input::with_theme(&theme) + .with_prompt("Bichon Base URL") + .default("http://localhost:15630".into()) + .interact_text() + .unwrap(); + + let token: String = Input::with_theme(&theme) + .with_prompt("API Token") + .interact_text() + .unwrap(); + + let conf = BichonCtlConfig { + base_url: url, + api_token: token, + }; + + if Confirm::with_theme(&theme) + .with_prompt("Save this configuration for future use?") + .default(true) + .interact() + .unwrap() + { + let toml_str = toml::to_string(&conf).unwrap(); + fs::write(config_path, toml_str).expect("Failed to save config file"); + println!("{}", style("Configuration saved.").green()); + } + conf +} + +async fn post_request( + client: &Client, + config: &BichonCtlConfig, + url: &str, +) -> Result<(), String> { + let response = client + .post(url) + .header("Authorization", format!("Bearer {}", config.api_token)) + .send() + .await + .map_err(|e| format!("Network error: {}", e))?; + + if response.status().is_success() { + Ok(()) + } else { + let status = response.status(); + let body = response + .text() + .await + .unwrap_or_else(|_| "No details".into()); + Err(format!("Server error ({}): {}", status, body)) + } +} + +async fn post_request_json( + client: &Client, + config: &BichonCtlConfig, + url: &str, +) -> Result { + let response = client + .post(url) + .header("Authorization", format!("Bearer {}", config.api_token)) + .send() + .await + .map_err(|e| format!("Network error: {}", e))?; + + if !response.status().is_success() { + let status = response.status(); + let body = response + .text() + .await + .unwrap_or_else(|_| "No details".into()); + return Err(format!("Server error ({}): {}", status, body)); + } + + response + .json::() + .await + .map_err(|e| format!("Failed to parse response: {}", e)) +} + +async fn get_request( + client: &Client, + config: &BichonCtlConfig, + url: &str, +) -> Result { + let response = client + .get(url) + .header("Authorization", format!("Bearer {}", config.api_token)) + .send() + .await + .map_err(|e| format!("Network error: {}", e))?; + + if !response.status().is_success() { + let status = response.status(); + let body = response + .text() + .await + .unwrap_or_else(|_| "No details".into()); + return Err(format!("Server error ({}): {}", status, body)); + } + + response + .json::() + .await + .map_err(|e| format!("Failed to parse response: {}", e)) +} + +async fn print_mailbox_status(client: &Client, config: &BichonCtlConfig, account_id: u64) { + let url = format!( + "{}/api/v1/sync/mailbox-status/{}", + config.base_url, account_id + ); + match get_request::>(client, config, &url).await { + Ok(mailboxes) => { + if mailboxes.is_empty() { + println!("No mailboxes found for account {}.", account_id); + } else { + println!( + "\n {:<20} {:<30} {:>10} {:>10} {}", + "Mailbox ID", "Name", "Server", "Local", "Sync" + ); + println!(" {}", "-".repeat(80)); + for mb in &mailboxes { + let sync_icon = if mb.syncing { + style("ON").green() + } else { + style("--").dim() + }; + println!( + " {:<20} {:<30} {:>10} {:>10} {}", + mb.mailbox_id, mb.mailbox_name, mb.server_count, mb.local_count, sync_icon + ); + } + println!(); + } + } + Err(e) => eprintln!("{} Failed to fetch mailbox status: {}", style("✘").red(), e), + } +} + +fn crc32(data: &[u8]) -> u32 { + let mut hash: u32 = 0xFFFFFFFF; + for &byte in data { + hash ^= byte as u32; + for _ in 0..8 { + hash = if hash & 1 != 0 { (hash >> 1) ^ 0xEDB88320 } else { hash >> 1 }; + } + } + !hash +} + +fn sanitize_filename(s: &str) -> String { + s.chars() + .map(|c| if c.is_alphanumeric() || c == '-' || c == '_' { c } else { '_' }) + .take(40) + .collect() +} + +async fn export_missing_emails( + client: &Client, + config: &BichonCtlConfig, + account_id: u64, + mailbox_id: u64, + result: &SyncFolderResult, +) { + let uids: Vec = result.missing_messages.iter().map(|m| m.uid).collect(); + println!( + "\n{} Fetching {} EML files from IMAP...", + style("⟳").cyan(), + uids.len() + ); + + let url = format!( + "{}/api/v1/sync/{}/{}/fetch-eml", + config.base_url, account_id, mailbox_id + ); + let req_body = FetchEmlRequest { uids }; + let response = client + .post(&url) + .header("Authorization", format!("Bearer {}", config.api_token)) + .json(&req_body) + .send() + .await; + + let exports: Vec = match response { + Ok(resp) if resp.status().is_success() => match resp.json().await { + Ok(data) => data, + Err(e) => { + eprintln!("{} Failed to parse EML response: {}", style("✘").red(), e); + return; + } + }, + Ok(resp) => { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + eprintln!("{} Server error ({}): {}", style("✘").red(), status, body); + return; + } + Err(e) => { + eprintln!("{} Network error: {}", style("✘").red(), e); + return; + } + }; + + let tmp_dir = std::env::temp_dir().join(format!("bichon_export_{}_{}", account_id, mailbox_id)); + if let Err(e) = fs::create_dir_all(&tmp_dir) { + eprintln!("{} Failed to create tmp dir: {}", style("✘").red(), e); + return; + } + + let subject_map: std::collections::HashMap = result + .missing_messages + .iter() + .map(|m| (m.uid, m.subject.as_str())) + .collect(); + + let mut saved = 0; + for export in &exports { + let subject = subject_map.get(&export.uid).copied().unwrap_or("unknown"); + let title_part = sanitize_filename(subject); + let checksum = crc32(subject.as_bytes()); + let filename = format!("{}_{:08x}_{}.eml", export.uid, checksum, title_part); + let path = tmp_dir.join(&filename); + + use base64::Engine; + match base64::engine::general_purpose::STANDARD.decode(&export.eml_base64) { + Ok(eml_bytes) => { + if let Err(e) = fs::write(&path, &eml_bytes) { + eprintln!(" {} Failed to write {}: {}", style("✘").red(), filename, e); + } else { + println!(" {} {}", style("✔").green(), filename); + saved += 1; + } + } + Err(e) => { + eprintln!(" {} Failed to decode UID {}: {}", style("✘").red(), export.uid, e); + } + } + } + + println!( + "\n{} Exported {} EML files to {}", + style("✔").green(), + saved, + tmp_dir.display() + ); +} + +fn print_sync_folder_result(result: &SyncFolderResult) { + println!( + "\n{} Folder sync completed: server={}, local_before={}, local_after={}, missing={}, fetched={}, new={}, dedup={}", + style("✔").green(), + result.server_count, + result.local_count_before, + result.local_count_after, + result.missing_count, + result.fetched, + result.new_messages, + result.dedup_count + ); + + if !result.missing_messages.is_empty() { + println!( + "\n {:<8} {:<28} {:<40} {}", + "UID", "Date", "Message-ID", "Subject" + ); + println!(" {}", "-".repeat(100)); + for msg in &result.missing_messages { + let subject_display = if msg.subject.chars().count() > 50 { + format!("{}...", msg.subject.chars().take(50).collect::()) + } else { + msg.subject.clone() + }; + println!( + " {:<8} {:<28} {:<40} {}", + msg.uid, msg.date, format!("<{}>", msg.message_id), subject_display + ); + } + } + + if result.dedup_count > 0 { + println!( + "\n {} {} messages share duplicate Message-IDs (dedup expected)", + style("!").yellow(), + result.dedup_count + ); + } +} + +fn print_verify_result(result: &SyncVerifyResult) { + let status = if result.is_complete { + style("COMPLETE").green().bold() + } else { + style("INCOMPLETE").red().bold() + }; + println!("\nAccount {}: {}", result.account_id, status); + + if !result.missing_folders.is_empty() { + println!( + "\n {} Missing folders (on server but not local):", + style("!").yellow() + ); + for folder in &result.missing_folders { + println!(" - {}", style(folder).yellow()); + } + } + + if !result.mailboxes.is_empty() { + println!("\n Mailbox details:"); + println!( + " {:<30} {:>8} {:>8} {:>8} {}", + "Name", "Remote", "Local", "Missing", "Status" + ); + println!(" {}", "-".repeat(78)); + for mb in &result.mailboxes { + let status_icon = if mb.is_complete { + style("✔").green() + } else { + style("✘").red() + }; + println!( + " {:<30} {:>8} {:>8} {:>8} {}", + mb.mailbox_name, + mb.remote_count, + mb.local_count, + mb.missing_count, + status_icon + ); + } + } + println!(); +} diff --git a/src/modules/cache/imap/sync/flow.rs b/src/modules/cache/imap/sync/flow.rs index 92ca8d3..d80b039 100644 --- a/src/modules/cache/imap/sync/flow.rs +++ b/src/modules/cache/imap/sync/flow.rs @@ -420,7 +420,7 @@ pub async fn reconcile_mailboxes( } //only check new emails and sync -async fn perform_incremental_sync( +pub async fn perform_incremental_sync( account: &AccountModel, local_mailbox: &MailBox, remote_mailbox: &MailBox, diff --git a/src/modules/duckdb/init.rs b/src/modules/duckdb/init.rs index 42cadd9..c971335 100644 --- a/src/modules/duckdb/init.rs +++ b/src/modules/duckdb/init.rs @@ -296,6 +296,22 @@ impl DuckDBManager { Ok(count) } + pub fn get_all_uids(&self, account_id: u64, mailbox_id: u64) -> BichonResult> { + let conn = self.conn()?; + let mut stmt = conn + .prepare("SELECT uid FROM envelopes WHERE account_id = ? AND mailbox_id = ?;") + .map_err(|e| raise_error!(format!("{:#?}", e), ErrorCode::InternalError))?; + let uids: HashSet = stmt + .query_map(params![account_id, mailbox_id], |row| { + let uid: u64 = row.get(0)?; + Ok(uid as u32) + }) + .map_err(|e| raise_error!(format!("{:#?}", e), ErrorCode::InternalError))? + .filter_map(|r| r.ok()) + .collect(); + Ok(uids) + } + pub fn get_max_uid(&self, account_id: u64, mailbox_id: u64) -> BichonResult> { let conn = self.conn()?; let max_uid: Option = conn diff --git a/src/modules/indexer/manager.rs b/src/modules/indexer/manager.rs index 2c044cb..d2f0da3 100644 --- a/src/modules/indexer/manager.rs +++ b/src/modules/indexer/manager.rs @@ -76,6 +76,7 @@ pub enum MetadataOp { pub enum DocumentOp { Document((u64, TantivyDocument)), + Flush(tokio::sync::oneshot::Sender<()>), Shutdown, } @@ -126,6 +127,9 @@ impl EnvelopeIndexManager { let _ = self.sender.send(MetadataOp::Record((eid, doc))).await; } + /// No-op flush — DuckDB commits are synchronous in drain_and_commit. + pub async fn flush(&self) {} + async fn drain_and_commit(&self, buffer: &mut HashMap)>) { if buffer.is_empty() { return; @@ -320,6 +324,23 @@ impl EnvelopeIndexManager { .map_err(|e| raise_error!(format!("{:?}", e), ErrorCode::InternalError))? } + pub async fn count_messages_in_mailbox( + &self, + account_id: u64, + mailbox_id: u64, + ) -> BichonResult { + self.num_messages_in_mailbox(account_id, mailbox_id).await + } + + /// Return all UIDs stored locally for a given mailbox. + pub fn get_all_uids( + &self, + account_id: u64, + mailbox_id: u64, + ) -> BichonResult> { + duckdb()?.get_all_uids(account_id, mailbox_id) + } + pub async fn get_dashboard_stats( &self, accounts: Option>, @@ -388,6 +409,10 @@ impl EmlIndexManager { EML_INDEX_MANAGER.drain_and_commit(&mut buffer).await; break; } + Some(DocumentOp::Flush(tx)) => { + EML_INDEX_MANAGER.drain_and_commit(&mut buffer).await; + let _ = tx.send(()); + } None => break, } } @@ -425,6 +450,13 @@ impl EmlIndexManager { let _ = self.sender.send(DocumentOp::Document((eid, doc))).await; } + /// Flush the write buffer and commit all pending documents to the index. + pub async fn flush(&self) { + let (tx, rx) = tokio::sync::oneshot::channel(); + let _ = self.sender.send(DocumentOp::Flush(tx)).await; + let _ = rx.await; + } + fn open_or_create_index(index_dir: &PathBuf) -> Index { let need_create = !index_dir.exists() || index_dir diff --git a/src/modules/mod.rs b/src/modules/mod.rs index ef52c75..b44bd27 100644 --- a/src/modules/mod.rs +++ b/src/modules/mod.rs @@ -36,6 +36,7 @@ pub mod message; pub mod oauth2; pub mod rest; pub mod settings; +pub mod sync; pub mod tasks; pub mod token; pub mod users; diff --git a/src/modules/rest/api/mod.rs b/src/modules/rest/api/mod.rs index 087fffa..93fcfa3 100644 --- a/src/modules/rest/api/mod.rs +++ b/src/modules/rest/api/mod.rs @@ -23,6 +23,7 @@ use mailbox::MailBoxApi; use message::MessageApi; use oauth2::OAuth2Api; use poem_openapi::{OpenApiService, Tags}; +use sync::SyncApi; use system::SystemApi; use crate::{ @@ -37,6 +38,7 @@ pub mod import; pub mod mailbox; pub mod message; pub mod oauth2; +pub mod sync; pub mod system; pub mod users; @@ -48,6 +50,7 @@ pub enum ApiTags { Mailbox, OAuth2, Message, + Sync, System, Import, Users, @@ -63,6 +66,7 @@ type RustMailOpenApi = ( MessageApi, ImportApi, UsersApi, + SyncApi, ); pub fn create_openapi_service() -> OpenApiService { @@ -77,6 +81,7 @@ pub fn create_openapi_service() -> OpenApiService { MessageApi, ImportApi, UsersApi, + SyncApi, ), "BichonApi", bichon_version!(), diff --git a/src/modules/rest/api/sync.rs b/src/modules/rest/api/sync.rs new file mode 100644 index 0000000..18d2050 --- /dev/null +++ b/src/modules/rest/api/sync.rs @@ -0,0 +1,139 @@ +// +// Copyright (c) 2025 rustmailer.com (https://rustmailer.com) +// +// This file is part of the Bichon Email Archiving Project +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use crate::modules::common::auth::ClientContext; +use crate::modules::rest::api::ApiTags; +use crate::modules::rest::ApiResult; +use crate::modules::sync::{ + fetch_raw_emails, get_mailbox_status, sync_account_on_demand, sync_single_folder, + verify_sync_completeness, FetchEmlRequest, MailboxStatusEntry, RawEmailExport, + SyncFolderResult, SyncVerifyResult, +}; +use crate::modules::users::permissions::Permission; +use poem_openapi::param::Path; +use poem_openapi::payload::Json; +use poem_openapi::OpenApi; + +pub struct SyncApi; + +#[OpenApi(prefix_path = "/api/v1", tag = "ApiTags::Sync")] +impl SyncApi { + /// Trigger an on-demand sync for the given IMAP account. + /// + /// This runs the same sync logic as the background task but + /// is triggered manually via the API. + #[oai( + path = "/sync/:account_id", + method = "post", + operation_id = "sync_account" + )] + async fn sync_account( + &self, + account_id: Path, + context: ClientContext, + ) -> ApiResult<()> { + let account_id = account_id.0; + context + .require_permission(Some(account_id), Permission::ACCOUNT_MANAGE) + .await?; + Ok(sync_account_on_demand(account_id).await?) + } + + /// Sync a single mailbox/folder for the given IMAP account. + #[oai( + path = "/sync/:account_id/:mailbox_id", + method = "post", + operation_id = "sync_folder" + )] + async fn sync_folder( + &self, + account_id: Path, + mailbox_id: Path, + context: ClientContext, + ) -> ApiResult> { + let account_id = account_id.0; + let mailbox_id = mailbox_id.0; + context + .require_permission(Some(account_id), Permission::ACCOUNT_MANAGE) + .await?; + Ok(Json(sync_single_folder(account_id, mailbox_id).await?)) + } + + /// Get mailbox status for an account (offline, no IMAP connection). + /// + /// Returns each mailbox with its server count (from last sync) + /// and the actual local indexed message count. + #[oai( + path = "/sync/mailbox-status/:account_id", + method = "get", + operation_id = "mailbox_status" + )] + async fn mailbox_status( + &self, + account_id: Path, + context: ClientContext, + ) -> ApiResult>> { + let account_id = account_id.0; + context + .require_permission(Some(account_id), Permission::ACCOUNT_READ_DETAILS) + .await?; + Ok(Json(get_mailbox_status(account_id).await?)) + } + + /// Verify sync completeness by comparing local data with the IMAP server. + /// + /// Returns per-mailbox counts (local vs remote) and lists any + /// folders present on the server but missing locally. + #[oai( + path = "/sync/verify/:account_id", + method = "get", + operation_id = "verify_sync" + )] + async fn verify_sync( + &self, + account_id: Path, + context: ClientContext, + ) -> ApiResult> { + let account_id = account_id.0; + context + .require_permission(Some(account_id), Permission::ACCOUNT_READ_DETAILS) + .await?; + Ok(Json(verify_sync_completeness(account_id).await?)) + } + + /// Fetch raw EML content for specific UIDs from IMAP (without storing). + #[oai( + path = "/sync/:account_id/:mailbox_id/fetch-eml", + method = "post", + operation_id = "fetch_eml" + )] + async fn fetch_eml( + &self, + account_id: Path, + mailbox_id: Path, + body: Json, + context: ClientContext, + ) -> ApiResult>> { + let account_id = account_id.0; + let mailbox_id = mailbox_id.0; + context + .require_permission(Some(account_id), Permission::ACCOUNT_MANAGE) + .await?; + Ok(Json(fetch_raw_emails(account_id, mailbox_id, body.0.uids).await?)) + } +} diff --git a/src/modules/sync/mod.rs b/src/modules/sync/mod.rs new file mode 100644 index 0000000..7e3324e --- /dev/null +++ b/src/modules/sync/mod.rs @@ -0,0 +1,486 @@ +// +// Copyright (c) 2025 rustmailer.com (https://rustmailer.com) +// +// This file is part of the Bichon Email Archiving Project +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use crate::modules::{ + account::migration::{AccountModel, AccountType}, + cache::imap::{ + find_intersecting_mailboxes, find_missing_mailboxes, + mailbox::MailBox, + sync::{execute_imap_sync, flow::{generate_uid_sequence_hashset, DEFAULT_BATCH_SIZE}}, + }, + error::{code::ErrorCode, BichonResult}, + imap::executor::ImapExecutor, + indexer::manager::{ENVELOPE_INDEX_MANAGER, EML_INDEX_MANAGER}, + mailbox::list::request_imap_all_mailbox_list, +}; +use crate::raise_error; +use base64::Engine; +use futures::TryStreamExt; +use poem_openapi::Object; +use serde::{Deserialize, Serialize}; +use tracing::info; + +/// Decode RFC 2047 encoded-word subjects (e.g. `=?utf-8?B?...?=`) to readable text. +fn decode_rfc2047(raw: &[u8]) -> String { + let s = String::from_utf8_lossy(raw); + // Try to decode =?charset?B?base64?= or =?charset?Q?quoted?= patterns + let mut result = String::new(); + let mut remaining = s.as_ref(); + while let Some(start) = remaining.find("=?") { + result.push_str(&remaining[..start]); + let after_start = &remaining[start + 2..]; + // Find charset?encoding?data?= + let parts: Vec<&str> = after_start.splitn(4, '?').collect(); + if parts.len() >= 3 && parts[2].ends_with("?=") || (parts.len() == 4 && parts[3].starts_with('=')) { + let encoding = parts[1]; + let data = if parts.len() == 4 { + // parts[2] is the data, parts[3] starts with '=' + parts[2] + } else { + parts[2].trim_end_matches("?=") + }; + let decoded = match encoding.to_uppercase().as_str() { + "B" => base64::engine::general_purpose::STANDARD + .decode(data) + .ok() + .and_then(|bytes| String::from_utf8(bytes).ok()), + "Q" => { + // Quoted-printable: _ = space, =XX = hex byte + let qp: Vec = data.as_bytes().iter().copied().fold( + (Vec::new(), false, 0u8), + |(mut acc, in_hex, hex_byte), b| { + if in_hex { + if hex_byte == 0 { + return (acc, true, b); + } + let hex_str = [hex_byte, b]; + if let Ok(val) = u8::from_str_radix( + &String::from_utf8_lossy(&hex_str), 16, + ) { + acc.push(val); + } + (acc, false, 0) + } else if b == b'=' { + (acc, true, 0) + } else if b == b'_' { + acc.push(b' '); + (acc, false, 0) + } else { + acc.push(b); + (acc, false, 0) + } + }, + ).0; + String::from_utf8(qp).ok() + } + _ => None, + }; + if let Some(text) = decoded { + result.push_str(&text); + } else { + // Fallback: keep original + result.push_str(&remaining[start..start + 2]); + remaining = after_start; + continue; + } + // Skip past the closing ?= + let end_marker = if parts.len() == 4 { + // start + 2 + charset? + encoding? + data?= + start + 2 + parts[0].len() + 1 + parts[1].len() + 1 + parts[2].len() + 2 + } else { + start + 2 + parts[0].len() + 1 + parts[1].len() + 1 + parts[2].len() + }; + remaining = if end_marker <= remaining.len() { + &remaining[end_marker..] + } else { + "" + }; + } else { + result.push_str("=?"); + remaining = after_start; + } + } + result.push_str(remaining); + if result.is_empty() { s.to_string() } else { result } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Object)] +pub struct MissingMessageInfo { + pub uid: u32, + pub date: String, + pub message_id: String, + pub subject: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Object)] +pub struct SyncFolderResult { + pub server_count: usize, + pub local_count_before: usize, + pub local_count_after: u64, + pub missing_count: usize, + pub fetched: usize, + pub new_messages: i64, + pub dedup_count: i64, + pub missing_messages: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Object)] +pub struct FetchEmlRequest { + pub uids: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Object)] +pub struct RawEmailExport { + pub uid: u32, + pub eml_base64: String, +} + +/// Fetch raw EML content from IMAP for the given UIDs (without storing). +pub async fn fetch_raw_emails( + account_id: u64, + mailbox_id: u64, + uids: Vec, +) -> BichonResult> { + let account = AccountModel::check_account_exists(account_id).await?; + if !matches!(account.account_type, AccountType::IMAP) { + return Err(raise_error!( + "Only IMAP accounts supported.".into(), + ErrorCode::InvalidParameter + )); + } + let local_mailbox = MailBox::get(mailbox_id).await?; + if local_mailbox.account_id != account_id { + return Err(raise_error!( + "Mailbox does not belong to this account.".into(), + ErrorCode::InvalidParameter + )); + } + + let encoded_name = local_mailbox.encoded_name(); + let mut session = ImapExecutor::create_connection(account_id).await?; + session + .examine(&encoded_name) + .await + .map_err(|e| raise_error!(format!("{:#?}", e), ErrorCode::ImapCommandFailed))?; + + let uid_set: String = uids.iter().map(|u| u.to_string()).collect::>().join(","); + let mut results = Vec::new(); + { + let mut stream = session + .uid_fetch(&uid_set, "BODY.PEEK[]") + .await + .map_err(|e| raise_error!(format!("{:#?}", e), ErrorCode::ImapCommandFailed))?; + + while let Some(fetch) = stream + .try_next() + .await + .map_err(|e| raise_error!(format!("{:#?}", e), ErrorCode::ImapCommandFailed))? + { + let uid = fetch.uid.unwrap_or(0); + if let Some(body) = fetch.body() { + results.push(RawEmailExport { + uid, + eml_base64: base64::engine::general_purpose::STANDARD.encode(body), + }); + } + } + } + + session.logout().await.ok(); + Ok(results) +} + +/// Trigger a full sync for an IMAP account (on demand). +pub async fn sync_account_on_demand(account_id: u64) -> BichonResult<()> { + let account = AccountModel::check_account_exists(account_id).await?; + if !matches!(account.account_type, AccountType::IMAP) { + return Err(raise_error!( + "Sync is only supported for IMAP accounts.".into(), + ErrorCode::InvalidParameter + )); + } + if !account.enabled { + return Err(raise_error!( + "Account is disabled. Enable it before syncing.".into(), + ErrorCode::InvalidParameter + )); + } + execute_imap_sync(&account).await +} + +/// Sync a single folder/mailbox for an IMAP account. +pub async fn sync_single_folder(account_id: u64, mailbox_id: u64) -> BichonResult { + let account = AccountModel::check_account_exists(account_id).await?; + if !matches!(account.account_type, AccountType::IMAP) { + return Err(raise_error!( + "Sync is only supported for IMAP accounts.".into(), + ErrorCode::InvalidParameter + )); + } + let local_mailbox = MailBox::get(mailbox_id).await?; + if local_mailbox.account_id != account_id { + return Err(raise_error!( + "Mailbox does not belong to this account.".into(), + ErrorCode::InvalidParameter + )); + } + perform_single_folder_sync(&account, &local_mailbox).await +} + +/// Connect to IMAP, compare server UIDs with local UIDs, fetch missing ones, flush indexes. +async fn perform_single_folder_sync( + account: &AccountModel, + local_mailbox: &MailBox, +) -> BichonResult { + let encoded_name = local_mailbox.encoded_name(); + + // Get all UIDs on the server + let mut session = ImapExecutor::create_connection(account.id).await?; + let server_uids = ImapExecutor::uid_search(&mut session, &encoded_name, "UID 1:*").await?; + + // Get all UIDs in the local index + let local_uids = ENVELOPE_INDEX_MANAGER.get_all_uids(account.id, local_mailbox.id)?; + + // Compute missing UIDs (on server but not in local) + let mut missing: Vec = server_uids.difference(&local_uids).copied().collect(); + missing.sort(); + + let before_count = local_uids.len(); + info!( + "[account {}][mailbox {}] server={} UIDs, local={} UIDs, missing={}", + account.id, local_mailbox.name, server_uids.len(), before_count, missing.len() + ); + + let mut missing_messages = Vec::new(); + let mut after_count = before_count as u64; + let mut new_messages: i64 = 0; + let mut dedup_count: i64 = 0; + + if !missing.is_empty() { + // Fetch ENVELOPE info for missing UIDs + let uid_set: String = missing.iter().map(|u| u.to_string()).collect::>().join(","); + + { + let mut envelope_stream = session + .uid_fetch(&uid_set, "(UID ENVELOPE INTERNALDATE)") + .await + .map_err(|e| raise_error!(format!("{:#?}", e), ErrorCode::ImapCommandFailed))?; + while let Some(fetch) = envelope_stream + .try_next() + .await + .map_err(|e| raise_error!(format!("{:#?}", e), ErrorCode::ImapCommandFailed))? + { + let uid = fetch.uid.unwrap_or(0); + if let Some(env) = fetch.envelope() { + let subject = env.subject.as_ref() + .map(|s| decode_rfc2047(s)) + .unwrap_or_default(); + let message_id = env.message_id.as_ref() + .map(|s| { + let raw = String::from_utf8_lossy(s).to_string(); + raw.trim_matches(|c| c == '<' || c == '>').to_string() + }) + .unwrap_or_default(); + let date = env.date.as_ref() + .map(|s| String::from_utf8_lossy(s).to_string()) + .unwrap_or_default(); + // Fall back to INTERNALDATE when envelope Date is missing + let date = if date.is_empty() { + fetch.internal_date() + .map(|dt| dt.to_rfc2822()) + .unwrap_or_default() + } else { + date + }; + missing_messages.push(MissingMessageInfo { + uid, + date, + message_id, + subject, + }); + } + } + } + + let batch_size = account.sync_batch_size.unwrap_or(DEFAULT_BATCH_SIZE) as usize; + let uid_batches = generate_uid_sequence_hashset(missing.clone(), batch_size, false); + for batch in &uid_batches { + ImapExecutor::uid_batch_retrieve_emails( + &mut session, + account.id, + local_mailbox.id, + batch, + &encoded_name, + ) + .await?; + } + + // Flush both indexes so counts are accurate immediately + ENVELOPE_INDEX_MANAGER.flush().await; + EML_INDEX_MANAGER.flush().await; + + after_count = ENVELOPE_INDEX_MANAGER + .count_messages_in_mailbox(account.id, local_mailbox.id) + .await?; + new_messages = after_count as i64 - before_count as i64; + dedup_count = missing.len() as i64 - new_messages; + } + + session.logout().await.ok(); + + // Update mailbox metadata with latest server state + let mut session2 = ImapExecutor::create_connection(account.id).await?; + let mx = session2 + .examine(&encoded_name) + .await + .map_err(|e| raise_error!(format!("{:#?}", e), ErrorCode::ImapCommandFailed))?; + session2.logout().await.ok(); + + let mut updated_mailbox = local_mailbox.clone(); + updated_mailbox.exists = mx.exists; + updated_mailbox.unseen = mx.unseen; + updated_mailbox.uid_next = mx.uid_next; + updated_mailbox.uid_validity = mx.uid_validity; + MailBox::batch_upsert(&[updated_mailbox]).await?; + + Ok(SyncFolderResult { + server_count: server_uids.len(), + local_count_before: before_count, + local_count_after: after_count, + missing_count: missing.len(), + fetched: missing.len(), + new_messages, + dedup_count, + missing_messages, + }) +} + +#[derive(Debug, Clone, Serialize, Deserialize, Object)] +pub struct MailboxStatusEntry { + pub mailbox_id: u64, + pub mailbox_name: String, + /// Message count reported by the IMAP server (from last sync) + pub server_count: u32, + /// Actual number of messages stored in the local index + pub local_count: u64, + /// Whether this mailbox is configured for syncing + pub syncing: bool, +} + +/// Return local mailboxes with their indexed message counts. +/// This is a fast, offline-only operation (no IMAP connection). +pub async fn get_mailbox_status(account_id: u64) -> BichonResult> { + let account = AccountModel::check_account_exists(account_id).await?; + let sync_folders = account.sync_folders.unwrap_or_default(); + let mailboxes = MailBox::list_all(account_id).await?; + let mut result = Vec::with_capacity(mailboxes.len()); + for mb in &mailboxes { + let local_count = ENVELOPE_INDEX_MANAGER + .count_messages_in_mailbox(account_id, mb.id) + .await?; + result.push(MailboxStatusEntry { + mailbox_id: mb.id, + mailbox_name: mb.name.clone(), + server_count: mb.exists, + local_count, + syncing: sync_folders.contains(&mb.name), + }); + } + Ok(result) +} + +#[derive(Debug, Clone, Serialize, Deserialize, Object)] +pub struct MailboxVerifyResult { + pub mailbox_id: u64, + pub mailbox_name: String, + /// Number of messages on the remote IMAP server + pub remote_count: u32, + /// Number of messages in the local index + pub local_count: u64, + /// Number of missing messages (remote - local), 0 if local >= remote + pub missing_count: u64, + /// Whether the mailbox is fully synced + pub is_complete: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Object)] +pub struct SyncVerifyResult { + pub account_id: u64, + /// Folders present on the server but not locally + pub missing_folders: Vec, + /// Per-mailbox verification results + pub mailboxes: Vec, + /// Whether all mailboxes are fully synced and no folders are missing + pub is_complete: bool, +} + +/// Verify sync completeness by comparing local state with the IMAP server. +pub async fn verify_sync_completeness(account_id: u64) -> BichonResult { + let account = AccountModel::check_account_exists(account_id).await?; + if !matches!(account.account_type, AccountType::IMAP) { + return Err(raise_error!( + "Verify is only supported for IMAP accounts.".into(), + ErrorCode::InvalidParameter + )); + } + + let remote_mailboxes = request_imap_all_mailbox_list(account_id).await?; + let local_mailboxes = MailBox::list_all(account_id).await?; + + let missing_folders: Vec = find_missing_mailboxes(&local_mailboxes, &remote_mailboxes) + .into_iter() + .map(|m| m.name) + .collect(); + + let intersecting = find_intersecting_mailboxes(&local_mailboxes, &remote_mailboxes); + + let mut mailbox_results = Vec::with_capacity(intersecting.len()); + + for (local_mb, remote_mb) in &intersecting { + let local_count = ENVELOPE_INDEX_MANAGER + .count_messages_in_mailbox(account_id, local_mb.id) + .await?; + let remote_count = remote_mb.exists; + let missing = (remote_count as u64).saturating_sub(local_count); + mailbox_results.push(MailboxVerifyResult { + mailbox_id: local_mb.id, + mailbox_name: local_mb.name.clone(), + remote_count, + local_count, + missing_count: missing, + is_complete: missing == 0, + }); + } + + let is_complete = missing_folders.is_empty() && mailbox_results.iter().all(|r| r.is_complete); + + info!( + "Sync verification for account {}: complete={}, missing_folders={}, mailboxes_checked={}", + account_id, + is_complete, + missing_folders.len(), + mailbox_results.len() + ); + + Ok(SyncVerifyResult { + account_id, + missing_folders, + mailboxes: mailbox_results, + is_complete, + }) +}