diff --git a/fluid-server/src/archive.rs b/fluid-server/src/archive.rs index 2281a015..39656897 100644 --- a/fluid-server/src/archive.rs +++ b/fluid-server/src/archive.rs @@ -1,99 +1,112 @@ -// fluid-server/src/archive.rs -// Transaction Archival Job (Issue #238) - -use aws_config::BehaviorVersion; -use aws_sdk_s3::Client as S3Client; -use chrono::{DateTime, Duration, Utc}; -use sqlx::{PgPool, query, query_as}; -use tracing::{info, error}; - -const BATCH_SIZE: i64 = 1000; -const OLDER_THAN_DAYS: i64 = 730; // 2 years - -#[derive(sqlx::FromRow, serde::Serialize)] -struct Transaction { - id: String, - tx_hash: Option, - inner_tx_hash: String, - tenant_id: Option, - status: String, - cost_stroops: i64, - category: String, - chain: String, - created_at: DateTime, -} - -pub async fn run_archival_job(pool: &PgPool) -> Result<(), Box> { - info!("Starting transaction archival job"); - - let cutoff_date = Utc::now() - Duration::days(OLDER_THAN_DAYS); - info!("Archiving transactions older than: {}", cutoff_date); - - // Configure AWS S3 - let config = aws_config::load_defaults(BehaviorVersion::latest()).await; - let s3_client = S3Client::new(&config); - let bucket_name = std::env::var("S3_ARCHIVE_BUCKET") - .expect("S3_ARCHIVE_BUCKET environment variable not set"); - let prefix = std::env::var("S3_ARCHIVE_PREFIX").unwrap_or_else(|_| "transactions/".to_string()); - - let mut total_archived = 0; - let mut batch_num = 0; - - loop { - let transactions = query_as::<_, Transaction>( - r#" - SELECT id, "txHash" as tx_hash, "innerTxHash" as inner_tx_hash, - "tenantId" as tenant_id, status, "costStroops" as cost_stroops, - category, chain, "createdAt" as created_at - FROM "Transaction" - WHERE "createdAt" < $1 - ORDER BY "createdAt" ASC - LIMIT $2 - "#, - ) - .bind(cutoff_date) - .bind(BATCH_SIZE) - .fetch_all(pool) - .await?; - - if transactions.is_empty() { - break; - } - - // Convert to JSON Lines format - let json_lines: Vec = transactions - .iter() - .map(|tx| serde_json::to_string(tx).unwrap()) - .collect(); - let body = json_lines.join("\n"); - - // Upload to S3 - let key = format!("{}{}_{}.jsonl", prefix, Utc::now().format("%Y%m%d_%H%M%S"), batch_num); - - s3_client - .put_object() - .bucket(&bucket_name) - .key(&key) - .body(body.into()) - .content_type("application/x-ndjson") - .send() - .await?; - - // Delete archived transactions - let ids: Vec = transactions.iter().map(|tx| tx.id.clone()).collect(); - for id in ids { - query(r#"DELETE FROM "Transaction" WHERE id = $1"#) - .bind(id) - .execute(pool) - .await?; - } - - total_archived += transactions.len(); - batch_num += 1; - info!("Batch {}: Archived {} transactions to s3://{}/{}", - batch_num, transactions.len(), bucket_name, key); - } - - info!("Archival job completed. Total archived: {} transactions", total_archived); - Ok(()) -} \ No newline at end of file +// fluid-server/src/archive.rs +// Transaction Archival Job (Issue #238) + +use aws_config::BehaviorVersion; +use aws_sdk_s3::Client as S3Client; +use chrono::{DateTime, Duration, Utc}; +use sqlx::{query, query_as, PgPool}; +use tracing::{error, info}; + +const BATCH_SIZE: i64 = 1000; +const OLDER_THAN_DAYS: i64 = 730; // 2 years + +#[derive(sqlx::FromRow, serde::Serialize)] +struct Transaction { + id: String, + tx_hash: Option, + inner_tx_hash: String, + tenant_id: Option, + status: String, + cost_stroops: i64, + category: String, + chain: String, + created_at: DateTime, +} + +pub async fn run_archival_job(pool: &PgPool) -> Result<(), Box> { + info!("Starting transaction archival job"); + + let cutoff_date = Utc::now() - Duration::days(OLDER_THAN_DAYS); + info!("Archiving transactions older than: {}", cutoff_date); + + // Configure AWS S3 + let config = aws_config::load_defaults(BehaviorVersion::latest()).await; + let s3_client = S3Client::new(&config); + let bucket_name = + std::env::var("S3_ARCHIVE_BUCKET").expect("S3_ARCHIVE_BUCKET environment variable not set"); + let prefix = std::env::var("S3_ARCHIVE_PREFIX").unwrap_or_else(|_| "transactions/".to_string()); + + let mut total_archived = 0; + let mut batch_num = 0; + + loop { + let transactions = query_as::<_, Transaction>( + r#" + SELECT id, "txHash" as tx_hash, "innerTxHash" as inner_tx_hash, + "tenantId" as tenant_id, status, "costStroops" as cost_stroops, + category, chain, "createdAt" as created_at + FROM "Transaction" + WHERE "createdAt" < $1 + ORDER BY "createdAt" ASC + LIMIT $2 + "#, + ) + .bind(cutoff_date) + .bind(BATCH_SIZE) + .fetch_all(pool) + .await?; + + if transactions.is_empty() { + break; + } + + // Convert to JSON Lines format + let json_lines: Vec = transactions + .iter() + .map(|tx| serde_json::to_string(tx).unwrap()) + .collect(); + let body = json_lines.join("\n"); + + // Upload to S3 + let key = format!( + "{}{}_{}.jsonl", + prefix, + Utc::now().format("%Y%m%d_%H%M%S"), + batch_num + ); + + s3_client + .put_object() + .bucket(&bucket_name) + .key(&key) + .body(aws_sdk_s3::primitives::ByteStream::from(body.into_bytes())) + .content_type("application/x-ndjson") + .send() + .await?; + + // Delete archived transactions + let ids: Vec = transactions.iter().map(|tx| tx.id.clone()).collect(); + for id in ids { + query(r#"DELETE FROM "Transaction" WHERE id = $1"#) + .bind(id) + .execute(pool) + .await?; + } + + total_archived += transactions.len(); + batch_num += 1; + info!( + "Batch {}: Archived {} transactions to s3://{}/{}", + batch_num, + transactions.len(), + bucket_name, + key + ); + } + + info!( + "Archival job completed. Total archived: {} transactions", + total_archived + ); + Ok(()) +} diff --git a/fluid-server/src/fee_calculator.rs b/fluid-server/src/fee_calculator.rs new file mode 100644 index 00000000..63b0d97a --- /dev/null +++ b/fluid-server/src/fee_calculator.rs @@ -0,0 +1,298 @@ +/// Congestion Fee Calculator Module (Issue #712) +/// +/// Decouples the fee calculation engine from the core API server so it can be +/// updated, tested, and reasoned about independently. +/// +/// The module exposes a pure `calculate_fee` function plus a `FeeCalculator` +/// struct that can be configured and reused across the signing pipeline. +use serde::{Deserialize, Serialize}; + +// --------------------------------------------------------------------------- +// Public types +// --------------------------------------------------------------------------- + +/// Input parameters for a fee calculation. +#[derive(Clone, Debug, PartialEq)] +pub struct FeeInput { + /// Base fee per operation in stroops (e.g. 100). + pub base_fee: i64, + /// Multiplier applied to the raw fee (e.g. 2.0 for high congestion). + pub multiplier: f64, + /// Number of operations in the inner transaction. + pub operation_count: usize, +} + +/// Result of a fee calculation. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct FeeResult { + /// Per-operation fee after applying the multiplier (stroops). + pub per_op_fee: i64, + /// Total fee for the fee-bump envelope (stroops). + /// This is `per_op_fee * (operation_count + 1)` — the +1 accounts for the + /// fee-bump operation itself. + pub total_fee: i64, + /// Congestion level inferred from the multiplier. + pub congestion_level: CongestionLevel, +} + +/// Congestion level classification. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum CongestionLevel { + Low, + Medium, + High, +} + +impl CongestionLevel { + /// Classify a multiplier into a congestion level. + /// + /// - `< 1.5` → Low + /// - `1.5..3` → Medium + /// - `>= 3` → High + pub fn from_multiplier(multiplier: f64) -> Self { + if multiplier < 1.5 { + Self::Low + } else if multiplier < 3.0 { + Self::Medium + } else { + Self::High + } + } +} + +// --------------------------------------------------------------------------- +// Pure calculation function +// --------------------------------------------------------------------------- + +/// Calculate the fee-bump fee for a given set of inputs. +/// +/// Returns an error string when inputs are invalid (e.g. non-positive base fee +/// or negative operation count). +pub fn calculate_fee(input: &FeeInput) -> Result { + if input.base_fee <= 0 { + return Err(format!("base_fee must be positive, got {}", input.base_fee)); + } + if !input.multiplier.is_finite() || input.multiplier <= 0.0 { + return Err(format!( + "multiplier must be a positive finite number, got {}", + input.multiplier + )); + } + + let op_count = input.operation_count as i64; + // Per-operation fee: ceil(base_fee * multiplier) + let per_op_fee = (input.base_fee as f64 * input.multiplier).ceil() as i64; + // Total fee covers all inner ops + the fee-bump op itself + let total_fee = per_op_fee * (op_count + 1); + + Ok(FeeResult { + per_op_fee, + total_fee, + congestion_level: CongestionLevel::from_multiplier(input.multiplier), + }) +} + +// --------------------------------------------------------------------------- +// Stateful calculator (wraps config for reuse) +// --------------------------------------------------------------------------- + +/// A reusable fee calculator that holds base configuration. +#[derive(Clone, Debug)] +pub struct FeeCalculator { + base_fee: i64, + multiplier: f64, +} + +impl FeeCalculator { + /// Create a new calculator. + /// + /// # Panics + /// Panics in debug builds if `base_fee <= 0` or `multiplier <= 0`. + pub fn new(base_fee: i64, multiplier: f64) -> Self { + debug_assert!(base_fee > 0, "base_fee must be positive"); + debug_assert!(multiplier > 0.0, "multiplier must be positive"); + Self { + base_fee, + multiplier, + } + } + + /// Update the multiplier (e.g. after a congestion poll). + pub fn set_multiplier(&mut self, multiplier: f64) { + self.multiplier = multiplier; + } + + /// Calculate the fee for a transaction with `operation_count` operations. + pub fn calculate(&self, operation_count: usize) -> Result { + calculate_fee(&FeeInput { + base_fee: self.base_fee, + multiplier: self.multiplier, + operation_count, + }) + } + + pub fn base_fee(&self) -> i64 { + self.base_fee + } + + pub fn multiplier(&self) -> f64 { + self.multiplier + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn basic_fee_calculation() { + let result = calculate_fee(&FeeInput { + base_fee: 100, + multiplier: 2.0, + operation_count: 1, + }) + .unwrap(); + + // per_op_fee = ceil(100 * 2.0) = 200 + // total_fee = 200 * (1 + 1) = 400 + assert_eq!(result.per_op_fee, 200); + assert_eq!(result.total_fee, 400); + assert_eq!(result.congestion_level, CongestionLevel::Medium); + } + + #[test] + fn fee_calculation_with_multiple_operations() { + let result = calculate_fee(&FeeInput { + base_fee: 100, + multiplier: 1.0, + operation_count: 5, + }) + .unwrap(); + + assert_eq!(result.per_op_fee, 100); + assert_eq!(result.total_fee, 600); // 100 * (5 + 1) + assert_eq!(result.congestion_level, CongestionLevel::Low); + } + + #[test] + fn fee_calculation_rounds_up() { + // 100 * 1.5 = 150.0 exactly → no rounding needed + let result = calculate_fee(&FeeInput { + base_fee: 100, + multiplier: 1.5, + operation_count: 1, + }) + .unwrap(); + assert_eq!(result.per_op_fee, 150); + + // 100 * 1.333 = 133.3 → ceil → 134 + let result2 = calculate_fee(&FeeInput { + base_fee: 100, + multiplier: 1.333, + operation_count: 1, + }) + .unwrap(); + assert_eq!(result2.per_op_fee, 134); + } + + #[test] + fn rejects_non_positive_base_fee() { + assert!(calculate_fee(&FeeInput { + base_fee: 0, + multiplier: 1.0, + operation_count: 1, + }) + .is_err()); + + assert!(calculate_fee(&FeeInput { + base_fee: -1, + multiplier: 1.0, + operation_count: 1, + }) + .is_err()); + } + + #[test] + fn rejects_invalid_multiplier() { + assert!(calculate_fee(&FeeInput { + base_fee: 100, + multiplier: 0.0, + operation_count: 1, + }) + .is_err()); + + assert!(calculate_fee(&FeeInput { + base_fee: 100, + multiplier: f64::NAN, + operation_count: 1, + }) + .is_err()); + + assert!(calculate_fee(&FeeInput { + base_fee: 100, + multiplier: f64::INFINITY, + operation_count: 1, + }) + .is_err()); + } + + #[test] + fn zero_operations_still_charges_fee_bump_op() { + let result = calculate_fee(&FeeInput { + base_fee: 100, + multiplier: 2.0, + operation_count: 0, + }) + .unwrap(); + // total = 200 * (0 + 1) = 200 + assert_eq!(result.total_fee, 200); + } + + #[test] + fn congestion_level_classification() { + assert_eq!(CongestionLevel::from_multiplier(1.0), CongestionLevel::Low); + assert_eq!(CongestionLevel::from_multiplier(1.49), CongestionLevel::Low); + assert_eq!( + CongestionLevel::from_multiplier(1.5), + CongestionLevel::Medium + ); + assert_eq!( + CongestionLevel::from_multiplier(2.9), + CongestionLevel::Medium + ); + assert_eq!(CongestionLevel::from_multiplier(3.0), CongestionLevel::High); + assert_eq!( + CongestionLevel::from_multiplier(10.0), + CongestionLevel::High + ); + } + + #[test] + fn fee_calculator_struct() { + let mut calc = FeeCalculator::new(100, 2.0); + let result = calc.calculate(3).unwrap(); + assert_eq!(result.per_op_fee, 200); + assert_eq!(result.total_fee, 800); // 200 * (3 + 1) + + calc.set_multiplier(1.0); + let result2 = calc.calculate(3).unwrap(); + assert_eq!(result2.per_op_fee, 100); + assert_eq!(result2.total_fee, 400); + } + + #[test] + fn fee_result_is_serializable() { + let result = FeeResult { + per_op_fee: 200, + total_fee: 400, + congestion_level: CongestionLevel::Medium, + }; + let json = serde_json::to_string(&result).unwrap(); + assert!(json.contains("\"congestion_level\":\"medium\"")); + } +} diff --git a/fluid-server/src/fee_calculator_mod.rs b/fluid-server/src/fee_calculator_mod.rs new file mode 100644 index 00000000..0f39406d --- /dev/null +++ b/fluid-server/src/fee_calculator_mod.rs @@ -0,0 +1,2 @@ +/// Re-export the fee calculator for use in the binary. +pub use fluid_server::fee_calculator::*; diff --git a/fluid-server/src/lib.rs b/fluid-server/src/lib.rs index 5027323a..2d4d3564 100644 --- a/fluid-server/src/lib.rs +++ b/fluid-server/src/lib.rs @@ -1,10 +1,17 @@ use std::fmt; use std::str::FromStr; use std::sync::{Mutex, OnceLock}; +pub mod archive; mod blocklist; +pub mod fee_calculator; mod heuristics; -pub mod archive; pub mod memory_leak_profiling; +#[cfg(not(target_arch = "wasm32"))] +pub mod mock_horizon; +#[cfg(not(target_arch = "wasm32"))] +pub mod profiling; +#[cfg(not(target_arch = "wasm32"))] +pub mod tracing; use blocklist::Blocklist; use heuristics::RequestTracker; @@ -27,17 +34,16 @@ fn now() -> u64 { } } - use ed25519_dalek::{Signer, SigningKey}; use sha2::{Digest, Sha256}; use stellar_strkey::ed25519::{PrivateKey, PublicKey}; use stellar_strkey::Strkey; -use subtle::ConstantTimeEq; use stellar_xdr::curr::{ DecoratedSignature, Hash, Limits, MuxedAccount, Preconditions, ReadXdr, Signature, SignatureHint, Transaction, TransactionEnvelope, TransactionExt, TransactionSignaturePayload, TransactionSignaturePayloadTaggedTransaction, TransactionV0, Uint256, VecM, WriteXdr, }; +use subtle::ConstantTimeEq; use wasm_bindgen::prelude::*; // These modules are primarily used by the native server binary. @@ -156,7 +162,9 @@ impl fmt::Display for SigningError { "transaction already contains the maximum of 20 signatures" ), Self::AccountBlocked(message) => write!(f, "account is blocked: {message}"), - Self::SuspiciousActivity(message) => write!(f, "suspicious activity detected: {message}"), + Self::SuspiciousActivity(message) => { + write!(f, "suspicious activity detected: {message}") + } } } } @@ -685,7 +693,10 @@ mod tests { let huge = "A".repeat(MAX_XDR_BYTES + 1); let err = check_xdr_size(&huge).unwrap_err(); let msg = err.to_string(); - assert!(msg.contains("exceeds the maximum allowed size"), "msg: {msg}"); + assert!( + msg.contains("exceeds the maximum allowed size"), + "msg: {msg}" + ); } #[test] diff --git a/fluid-server/src/main.rs b/fluid-server/src/main.rs index bfe2a634..0839b5e7 100644 --- a/fluid-server/src/main.rs +++ b/fluid-server/src/main.rs @@ -1,8 +1,9 @@ +mod benchmarks; mod config; mod contract_cache; mod db; -mod benchmarks; mod error; +mod fee_calculator_mod; mod horizon; mod logging; mod metrics; @@ -10,6 +11,7 @@ mod notifications; mod profiling; mod state; mod stellar; +mod tracing_ctx; pub use fluid_server::xdr; mod ai_query; use axum::{ @@ -22,11 +24,11 @@ use axum::{ routing::{get, post}, Json, Router, }; -use std::{net::SocketAddr, sync::Arc, time::Instant}; use serde::{Deserialize, Serialize}; +use std::{net::SocketAddr, sync::Arc, time::Instant}; use tracing::{error, info}; -use ai_query::{handle_ai_query, QueryRequest, QueryFilters}; +use ai_query::{handle_ai_query, QueryFilters, QueryRequest}; use config::load_config; use contract_cache::{ ContractCacheStatsResponse, FootprintSchema, UpsertDefinitionRequest, UpsertFootprintRequest, @@ -36,9 +38,8 @@ use error::AppError; use fluid_server::archive::run_archival_job; use fluid_server::grpc::serve_grpc; use horizon::HorizonNodeStatus; -use notifications::{Backend, NotificationEngine, WebhookBackend}; -use serde::{Deserialize, Serialize}; use logging::init_logging_from_env; +use notifications::{Backend, NotificationEngine, WebhookBackend}; use sqlx::postgres::PgPool; use state::{ iso_now, utc_day_start_ms, ApiKeyConfig, AppState, HealthFeePayer, RateLimitEntry, @@ -250,14 +251,15 @@ async fn run() -> Result<(), AppError> { if let Some(pool) = db_pool.clone() { tokio::spawn(async move { info!("Starting transaction archival job..."); - + // Run once on startup if let Err(e) = run_archival_job(&pool).await { error!("Initial archival job failed: {}", e); } - + // Then every 30 days (30 * 24 * 3600 seconds) - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30 * 24 * 3600)); + let mut interval = + tokio::time::interval(tokio::time::Duration::from_secs(30 * 24 * 3600)); loop { interval.tick().await; info!("Running monthly transaction archival job..."); @@ -293,8 +295,7 @@ async fn run() -> Result<(), AppError> { { let cache = Arc::clone(&state.contract_cache); tokio::spawn(async move { - let mut interval = - tokio::time::interval(std::time::Duration::from_secs(60)); + let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); interval.tick().await; loop { interval.tick().await; @@ -342,25 +343,30 @@ async fn run() -> Result<(), AppError> { info!("Starting Fluid Rust services"); info!("Fluid server (Rust) listening on {http_addr}"); - let listener = tokio::net::TcpListener::bind(http_addr).await.map_err(|error| { - AppError::new( - axum::http::StatusCode::INTERNAL_SERVER_ERROR, - "INTERNAL_ERROR", - format!("Failed to bind TCP listener: {error}"), - ) - })?; + let listener = tokio::net::TcpListener::bind(http_addr) + .await + .map_err(|error| { + AppError::new( + axum::http::StatusCode::INTERNAL_SERVER_ERROR, + "INTERNAL_ERROR", + format!("Failed to bind TCP listener: {error}"), + ) + })?; tokio::try_join!( async { - axum::serve(listener, app.into_make_service_with_connect_info::()) - .await - .map_err(|error| { - AppError::new( - axum::http::StatusCode::INTERNAL_SERVER_ERROR, - "INTERNAL_ERROR", - format!("Rust server exited unexpectedly: {error}"), - ) - }) + axum::serve( + listener, + app.into_make_service_with_connect_info::(), + ) + .await + .map_err(|error| { + AppError::new( + axum::http::StatusCode::INTERNAL_SERVER_ERROR, + "INTERNAL_ERROR", + format!("Rust server exited unexpectedly: {error}"), + ) + }) }, async { serve_grpc(grpc_addr).await.map_err(|error| { @@ -443,8 +449,7 @@ async fn metrics(State(state): State) -> impl IntoResponse { async fn add_transaction( State(state): State, Json(request): Json, -) --> Result, AppError> { +) -> Result, AppError> { let status = request.status.unwrap_or_else(|| "pending".to_string()); let now = iso_now(); @@ -484,6 +489,15 @@ async fn fee_bump( state.metrics.inc_total_transactions(); let started_at = Instant::now(); + // Distributed tracing: extract or create a span context for this request. + let span_ctx = tracing_ctx::extract_span_context(&headers); + tracing_ctx::record_trace_ids(&span_ctx); + info!( + trace_id = %span_ctx.trace_id.to_hex(), + span_id = %span_ctx.span_id.to_hex(), + "fee-bump request received" + ); + let api_key = extract_api_key(&headers)?; let api_key_config = find_api_key(&api_key)?; let (ip_limit, api_limit) = if state.config.disable_rate_limits { @@ -522,7 +536,13 @@ async fn fee_bump( match result { Ok(fee_bump_res) => { - let response = Json(fee_bump_res).into_response(); + let mut response = Json(fee_bump_res).into_response(); + // Propagate trace context to the caller. + if let Ok(val) = axum::http::HeaderValue::from_str(&span_ctx.to_traceparent()) { + response + .headers_mut() + .insert(axum::http::HeaderName::from_static("traceparent"), val); + } Ok(with_limit_headers(response, &ip_limit, &api_limit)) } Err(err) => { @@ -606,8 +626,8 @@ async fn process_fee_bump_request( // Validate operation count before acquiring a signer lease to avoid // holding a resource slot while performing a cheap structural check. { - use stellar_xdr::curr::{Limits, ReadXdr, TransactionEnvelope}; use base64::{engine::general_purpose::STANDARD, Engine as _}; + use stellar_xdr::curr::{Limits, ReadXdr, TransactionEnvelope}; let op_count: Option = STANDARD .decode(xdr.trim()) @@ -672,7 +692,9 @@ async fn process_fee_bump_request( .map(|record| record.fee_stroops) .sum(); - if !state.config.disable_rate_limits && current_spend + result.fee_amount > api_key_config.daily_quota_stroops { + if !state.config.disable_rate_limits + && current_spend + result.fee_amount > api_key_config.daily_quota_stroops + { signer_lease.release().await; return Err(AppError::new( axum::http::StatusCode::FORBIDDEN, @@ -775,7 +797,10 @@ async fn upsert_contract_definition( wasm_bytes: body.wasm_bytes, cached_at_ms: state::now_ms(), }; - state.contract_cache.set_definition(definition.clone()).await; + state + .contract_cache + .set_definition(definition.clone()) + .await; Ok(Json(definition)) } @@ -800,9 +825,7 @@ async fn upsert_contract_footprint( Ok(Json(footprint)) } -async fn contract_cache_stats( - State(state): State, -) -> Json { +async fn contract_cache_stats(State(state): State) -> Json { Json(ContractCacheStatsResponse { definition_count: state.contract_cache.definition_count().await, footprint_count: state.contract_cache.footprint_count().await, @@ -902,8 +925,7 @@ async fn check_api_key_rate_limit( "RATE_LIMITED", format!( "API key rate limit exceeded for {} ({}).", - api_key.name, - api_key.tier + api_key.name, api_key.tier ), )); } @@ -968,4 +990,4 @@ const DASHBOARD_HTML: &str = r#" }); -"#; \ No newline at end of file +"#; diff --git a/fluid-server/src/memory_leak_profiling.rs b/fluid-server/src/memory_leak_profiling.rs index f412de8a..172b4d17 100644 --- a/fluid-server/src/memory_leak_profiling.rs +++ b/fluid-server/src/memory_leak_profiling.rs @@ -58,11 +58,7 @@ mod memory_leak_profiling { /// A warmup phase of `warmup` iterations is executed first so that /// one-time initialisation allocations (lazy statics, thread-locals, etc.) /// are excluded from the measurement window. - fn measure( - warmup: usize, - iterations: usize, - mut op: F, - ) -> (usize, usize) { + fn measure(warmup: usize, iterations: usize, mut op: F) -> (usize, usize) { // Warmup – not measured for _ in 0..warmup { op(); diff --git a/fluid-server/src/mock_horizon.rs b/fluid-server/src/mock_horizon.rs new file mode 100644 index 00000000..6545b356 --- /dev/null +++ b/fluid-server/src/mock_horizon.rs @@ -0,0 +1,635 @@ +/// Local Mock Horizon API Server (Issue #720) +/// +/// An in-process HTTP server that simulates the Stellar Horizon API for +/// integration tests. Supports: +/// +/// - Happy-path responses (submit success, account fetch, fee stats) +/// - Slow responses (configurable latency injection) +/// - Network failures (connection reset, TCP stall) +/// - Bad HTTP statuses (429, 500, 502, 503, 504) +/// - Horizon-specific error extras (tx_bad_seq, tx_insufficient_fee, tx_failed) +/// - Per-path scenario overrides +/// - Request capture for assertion in tests +use std::{ + collections::HashMap, + net::SocketAddr, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, + time::Duration, +}; + +use axum::{ + body::Body, + extract::State, + http::{Request, StatusCode}, + response::{IntoResponse, Response}, + routing::any, + Router, +}; +use serde_json::json; +use tokio::{net::TcpListener, sync::oneshot, time::sleep}; + +// --------------------------------------------------------------------------- +// Public types +// --------------------------------------------------------------------------- + +/// Error scenario to simulate. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum HorizonScenario { + /// Return a successful response. + Success, + /// Return HTTP 429 with a `Retry-After: 1` header. + RateLimit, + /// Return HTTP 500. + InternalError, + /// Return HTTP 502. + BadGateway, + /// Return HTTP 503. + ServiceUnavailable, + /// Delay the response by `ms` milliseconds, then return 504. + GatewayTimeout { delay_ms: u64 }, + /// Close the TCP connection immediately without sending a response. + ConnectionReset, + /// Return HTTP 400 with Horizon `tx_bad_seq` extras. + TxBadSeq, + /// Return HTTP 400 with Horizon `tx_insufficient_fee` extras. + TxInsufficientFee, + /// Return HTTP 400 with Horizon `tx_failed` extras. + TxFailed, +} + +/// Configuration for the mock server. +#[derive(Clone, Default)] +pub struct MockHorizonConfig { + /// Default scenario for all paths not listed in `path_scenarios`. + pub default_scenario: Option, + /// Latency added to every response (ms). + pub latency_ms: u64, + /// Per-path overrides. Key is the request path, e.g. `"/transactions"`. + pub path_scenarios: HashMap, + /// After this many requests the scenario resets to `Success`. + /// `0` means never reset. + pub fail_count: usize, +} + +/// A captured inbound request. +#[derive(Clone, Debug)] +pub struct CapturedRequest { + pub method: String, + pub path: String, + pub body: String, +} + +// --------------------------------------------------------------------------- +// Server state (shared across handlers) +// --------------------------------------------------------------------------- + +#[derive(Clone)] +struct ServerState { + config: Arc>, + requests: Arc>>, + request_count: Arc, +} + +// --------------------------------------------------------------------------- +// MockHorizonServer +// --------------------------------------------------------------------------- + +/// A local mock Horizon server. +pub struct MockHorizonServer { + state: ServerState, + shutdown_tx: Option>, + addr: Option, +} + +impl MockHorizonServer { + /// Create a new server (not yet started). + pub fn new() -> Self { + Self { + state: ServerState { + config: Arc::new(Mutex::new(MockHorizonConfig::default())), + requests: Arc::new(Mutex::new(Vec::new())), + request_count: Arc::new(AtomicUsize::new(0)), + }, + shutdown_tx: None, + addr: None, + } + } + + /// Start the server on a random port. Returns the bound URL. + pub async fn start(&mut self) -> String { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + self.addr = Some(addr); + + let state = self.state.clone(); + let app = Router::new() + .route("/{*path}", any(handle_request)) + .route("/", any(handle_request)) + .with_state(state); + + let (tx, rx) = oneshot::channel::<()>(); + self.shutdown_tx = Some(tx); + + tokio::spawn(async move { + axum::serve(listener, app) + .with_graceful_shutdown(async { + rx.await.ok(); + }) + .await + .ok(); + }); + + format!("http://{addr}") + } + + /// Stop the server. + pub async fn stop(&mut self) { + if let Some(tx) = self.shutdown_tx.take() { + let _ = tx.send(()); + } + } + + /// Reconfigure the server at runtime. + pub fn configure(&self, config: MockHorizonConfig) { + *self.state.config.lock().unwrap() = config; + self.state.request_count.store(0, Ordering::SeqCst); + } + + /// Return all captured requests and clear the buffer. + pub fn drain_requests(&self) -> Vec { + self.state.requests.lock().unwrap().drain(..).collect() + } + + /// Return the number of requests received so far. + pub fn request_count(&self) -> usize { + self.state.request_count.load(Ordering::SeqCst) + } + + /// Convenience: set a single default scenario. + pub fn set_scenario(&self, scenario: HorizonScenario) { + let mut cfg = self.state.config.lock().unwrap(); + cfg.default_scenario = Some(scenario); + cfg.fail_count = 0; + drop(cfg); + self.state.request_count.store(0, Ordering::SeqCst); + } + + /// Convenience: reset to success. + pub fn reset(&self) { + let mut cfg = self.state.config.lock().unwrap(); + *cfg = MockHorizonConfig::default(); + drop(cfg); + self.state.request_count.store(0, Ordering::SeqCst); + self.state.requests.lock().unwrap().clear(); + } +} + +impl Default for MockHorizonServer { + fn default() -> Self { + Self::new() + } +} + +// --------------------------------------------------------------------------- +// Request handler +// --------------------------------------------------------------------------- + +async fn handle_request(State(state): State, req: Request) -> Response { + let path = req.uri().path().to_string(); + let method = req.method().to_string(); + let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) + .await + .unwrap_or_default(); + let body = String::from_utf8_lossy(&body_bytes).to_string(); + + // Capture the request. + state.requests.lock().unwrap().push(CapturedRequest { + method: method.clone(), + path: path.clone(), + body: body.clone(), + }); + + let count = state.request_count.fetch_add(1, Ordering::SeqCst) + 1; + + // Determine the scenario. + let (scenario, latency_ms) = { + let cfg = state.config.lock().unwrap(); + let fail_count = cfg.fail_count; + + // If fail_count is set and we've exceeded it, succeed. + let effective_scenario = if fail_count > 0 && count > fail_count { + HorizonScenario::Success + } else { + cfg.path_scenarios + .get(&path) + .cloned() + .or_else(|| cfg.default_scenario.clone()) + .unwrap_or(HorizonScenario::Success) + }; + + (effective_scenario, cfg.latency_ms) + }; + + // Apply latency. + if latency_ms > 0 { + sleep(Duration::from_millis(latency_ms)).await; + } + + build_response(&path, scenario).await +} + +async fn build_response(path: &str, scenario: HorizonScenario) -> Response { + match scenario { + HorizonScenario::Success => success_response(path), + + HorizonScenario::RateLimit => ( + StatusCode::TOO_MANY_REQUESTS, + [("retry-after", "1"), ("content-type", "application/json")], + json!({"type": "https://stellar.org/horizon-errors/rate_limit_exceeded", "title": "Rate Limit Exceeded", "status": 429}).to_string(), + ) + .into_response(), + + HorizonScenario::InternalError => ( + StatusCode::INTERNAL_SERVER_ERROR, + json!({"type": "https://stellar.org/horizon-errors/server_error", "title": "Internal Server Error", "status": 500}).to_string(), + ) + .into_response(), + + HorizonScenario::BadGateway => ( + StatusCode::BAD_GATEWAY, + "Bad Gateway", + ) + .into_response(), + + HorizonScenario::ServiceUnavailable => ( + StatusCode::SERVICE_UNAVAILABLE, + json!({"type": "https://stellar.org/horizon-errors/service_unavailable", "title": "Service Unavailable", "status": 503}).to_string(), + ) + .into_response(), + + HorizonScenario::GatewayTimeout { delay_ms } => { + sleep(Duration::from_millis(delay_ms)).await; + (StatusCode::GATEWAY_TIMEOUT, "Gateway Timeout").into_response() + } + + HorizonScenario::ConnectionReset => { + // Return an empty body with a connection-close header to simulate + // a reset. True TCP-level resets require raw socket manipulation + // which is not practical in an in-process test server; this is the + // closest approximation available via HTTP. + ( + StatusCode::INTERNAL_SERVER_ERROR, + [("connection", "close")], + "", + ) + .into_response() + } + + HorizonScenario::TxBadSeq => ( + StatusCode::BAD_REQUEST, + json!({ + "type": "https://stellar.org/horizon-errors/transaction_failed", + "title": "Transaction Failed", + "status": 400, + "extras": { + "result_codes": { + "transaction": "tx_bad_seq" + } + } + }) + .to_string(), + ) + .into_response(), + + HorizonScenario::TxInsufficientFee => ( + StatusCode::BAD_REQUEST, + json!({ + "type": "https://stellar.org/horizon-errors/transaction_failed", + "title": "Transaction Failed", + "status": 400, + "extras": { + "result_codes": { + "transaction": "tx_insufficient_fee" + } + } + }) + .to_string(), + ) + .into_response(), + + HorizonScenario::TxFailed => ( + StatusCode::BAD_REQUEST, + json!({ + "type": "https://stellar.org/horizon-errors/transaction_failed", + "title": "Transaction Failed", + "status": 400, + "extras": { + "result_codes": { + "transaction": "tx_failed", + "operations": ["op_bad_auth"] + } + } + }) + .to_string(), + ) + .into_response(), + } +} + +fn success_response(path: &str) -> Response { + let body = match path { + "/fee_stats" => json!({ + "last_ledger": "123456", + "last_ledger_base_fee": "100", + "ledger_capacity_usage": "0.5", + "fee_charged": { + "max": "200", + "min": "100", + "mode": "100", + "p10": "100", + "p20": "100", + "p30": "100", + "p40": "100", + "p50": "100", + "p60": "100", + "p70": "150", + "p80": "200", + "p90": "200", + "p95": "200", + "p99": "200" + }, + "max_fee": { + "max": "10000", + "min": "100", + "mode": "100", + "p10": "100", + "p20": "100", + "p30": "100", + "p40": "100", + "p50": "100", + "p60": "100", + "p70": "150", + "p80": "200", + "p90": "200", + "p95": "200", + "p99": "200" + } + }), + p if p.starts_with("/accounts/") => json!({ + "id": "GABC", + "sequence": "1234567890", + "balances": [{"asset_type": "native", "balance": "100.0000000"}] + }), + "/transactions" => json!({ + "hash": "abc123def456abc123def456abc123def456abc123def456abc123def456abc1", + "successful": true, + "ledger": 123456 + }), + _ => json!({"status": "ok"}), + }; + + ( + StatusCode::OK, + [("content-type", "application/json")], + body.to_string(), + ) + .into_response() +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + async fn get(url: &str) -> reqwest::Response { + reqwest::Client::new().get(url).send().await.unwrap() + } + + async fn post(url: &str, body: &str) -> reqwest::Response { + reqwest::Client::new() + .post(url) + .header("content-type", "application/json") + .body(body.to_string()) + .send() + .await + .unwrap() + } + + #[tokio::test] + async fn happy_path_fee_stats() { + let mut server = MockHorizonServer::new(); + let base = server.start().await; + + let resp = get(&format!("{base}/fee_stats")).await; + assert_eq!(resp.status(), 200); + let json: serde_json::Value = resp.json().await.unwrap(); + assert!(json.get("fee_charged").is_some()); + + server.stop().await; + } + + #[tokio::test] + async fn happy_path_submit_transaction() { + let mut server = MockHorizonServer::new(); + let base = server.start().await; + + let resp = post(&format!("{base}/transactions"), r#"{"tx":"abc"}"#).await; + assert_eq!(resp.status(), 200); + let json: serde_json::Value = resp.json().await.unwrap(); + assert_eq!(json["successful"], true); + + server.stop().await; + } + + #[tokio::test] + async fn rate_limit_scenario() { + let mut server = MockHorizonServer::new(); + let base = server.start().await; + server.set_scenario(HorizonScenario::RateLimit); + + let resp = post(&format!("{base}/transactions"), "{}").await; + assert_eq!(resp.status(), 429); + assert_eq!(resp.headers().get("retry-after").unwrap(), "1"); + + server.stop().await; + } + + #[tokio::test] + async fn internal_error_scenario() { + let mut server = MockHorizonServer::new(); + let base = server.start().await; + server.set_scenario(HorizonScenario::InternalError); + + let resp = get(&format!("{base}/fee_stats")).await; + assert_eq!(resp.status(), 500); + + server.stop().await; + } + + #[tokio::test] + async fn service_unavailable_scenario() { + let mut server = MockHorizonServer::new(); + let base = server.start().await; + server.set_scenario(HorizonScenario::ServiceUnavailable); + + let resp = get(&format!("{base}/fee_stats")).await; + assert_eq!(resp.status(), 503); + + server.stop().await; + } + + #[tokio::test] + async fn tx_bad_seq_scenario() { + let mut server = MockHorizonServer::new(); + let base = server.start().await; + server.set_scenario(HorizonScenario::TxBadSeq); + + let resp = post(&format!("{base}/transactions"), "{}").await; + assert_eq!(resp.status(), 400); + let json: serde_json::Value = resp.json().await.unwrap(); + assert_eq!(json["extras"]["result_codes"]["transaction"], "tx_bad_seq"); + + server.stop().await; + } + + #[tokio::test] + async fn tx_insufficient_fee_scenario() { + let mut server = MockHorizonServer::new(); + let base = server.start().await; + server.set_scenario(HorizonScenario::TxInsufficientFee); + + let resp = post(&format!("{base}/transactions"), "{}").await; + assert_eq!(resp.status(), 400); + let json: serde_json::Value = resp.json().await.unwrap(); + assert_eq!( + json["extras"]["result_codes"]["transaction"], + "tx_insufficient_fee" + ); + + server.stop().await; + } + + #[tokio::test] + async fn tx_failed_scenario() { + let mut server = MockHorizonServer::new(); + let base = server.start().await; + server.set_scenario(HorizonScenario::TxFailed); + + let resp = post(&format!("{base}/transactions"), "{}").await; + assert_eq!(resp.status(), 400); + let json: serde_json::Value = resp.json().await.unwrap(); + assert_eq!(json["extras"]["result_codes"]["transaction"], "tx_failed"); + + server.stop().await; + } + + #[tokio::test] + async fn fail_count_resets_to_success() { + let mut server = MockHorizonServer::new(); + let base = server.start().await; + server.configure(MockHorizonConfig { + default_scenario: Some(HorizonScenario::InternalError), + fail_count: 2, + ..Default::default() + }); + + // First two requests fail. + assert_eq!(get(&format!("{base}/fee_stats")).await.status(), 500); + assert_eq!(get(&format!("{base}/fee_stats")).await.status(), 500); + // Third request succeeds (fail_count exceeded). + assert_eq!(get(&format!("{base}/fee_stats")).await.status(), 200); + + server.stop().await; + } + + #[tokio::test] + async fn per_path_scenario_override() { + let mut server = MockHorizonServer::new(); + let base = server.start().await; + + let mut path_scenarios = HashMap::new(); + path_scenarios.insert("/transactions".to_string(), HorizonScenario::RateLimit); + + server.configure(MockHorizonConfig { + default_scenario: Some(HorizonScenario::Success), + path_scenarios, + ..Default::default() + }); + + // /fee_stats uses default (success) + assert_eq!(get(&format!("{base}/fee_stats")).await.status(), 200); + // /transactions uses per-path override (rate limit) + assert_eq!( + post(&format!("{base}/transactions"), "{}").await.status(), + 429 + ); + + server.stop().await; + } + + #[tokio::test] + async fn request_capture() { + let mut server = MockHorizonServer::new(); + let base = server.start().await; + + post(&format!("{base}/transactions"), r#"{"tx":"hello"}"#).await; + + let captured = server.drain_requests(); + assert_eq!(captured.len(), 1); + assert_eq!(captured[0].path, "/transactions"); + assert_eq!(captured[0].method, "POST"); + assert!(captured[0].body.contains("hello")); + + // drain clears the buffer + assert!(server.drain_requests().is_empty()); + + server.stop().await; + } + + #[tokio::test] + async fn latency_injection() { + let mut server = MockHorizonServer::new(); + let base = server.start().await; + server.configure(MockHorizonConfig { + latency_ms: 50, + ..Default::default() + }); + + let start = std::time::Instant::now(); + get(&format!("{base}/fee_stats")).await; + assert!(start.elapsed().as_millis() >= 50); + + server.stop().await; + } + + #[tokio::test] + async fn bad_gateway_scenario() { + let mut server = MockHorizonServer::new(); + let base = server.start().await; + server.set_scenario(HorizonScenario::BadGateway); + + let resp = get(&format!("{base}/fee_stats")).await; + assert_eq!(resp.status(), 502); + + server.stop().await; + } + + #[tokio::test] + async fn request_count_increments() { + let mut server = MockHorizonServer::new(); + let base = server.start().await; + + assert_eq!(server.request_count(), 0); + get(&format!("{base}/fee_stats")).await; + get(&format!("{base}/fee_stats")).await; + assert_eq!(server.request_count(), 2); + + server.stop().await; + } +} diff --git a/fluid-server/src/tracing.rs b/fluid-server/src/tracing.rs new file mode 100644 index 00000000..dcee636f --- /dev/null +++ b/fluid-server/src/tracing.rs @@ -0,0 +1,308 @@ +/// Distributed tracing integration (Issue #709) +/// +/// Provides OpenTelemetry-compatible span context propagation and structured +/// trace IDs that flow through the fee-bump pipeline: inbound HTTP request → +/// fee calculation → Horizon submission → response. +/// +/// The implementation is intentionally zero-dependency on the full OTEL SDK +/// (which would pull in a large dependency tree incompatible with the WASM +/// target). Instead we: +/// 1. Generate a W3C-compatible `traceparent` header value. +/// 2. Attach trace/span IDs to every `tracing` span so they appear in logs. +/// 3. Expose helpers for extracting/injecting context across HTTP boundaries. +use std::fmt; + +use uuid::Uuid; + +// --------------------------------------------------------------------------- +// Trace / Span ID types +// --------------------------------------------------------------------------- + +/// A 128-bit trace identifier (W3C traceparent format). +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct TraceId(pub [u8; 16]); + +/// A 64-bit span identifier (W3C traceparent format). +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct SpanId(pub [u8; 8]); + +impl TraceId { + /// Generate a new random trace ID. + pub fn new() -> Self { + let id = Uuid::new_v4(); + Self(*id.as_bytes()) + } + + /// Parse from a 32-character lowercase hex string. + pub fn from_hex(s: &str) -> Option { + if s.len() != 32 { + return None; + } + let mut bytes = [0u8; 16]; + for (i, chunk) in s.as_bytes().chunks(2).enumerate() { + let hi = hex_nibble(chunk[0])?; + let lo = hex_nibble(chunk[1])?; + bytes[i] = (hi << 4) | lo; + } + Some(Self(bytes)) + } + + pub fn to_hex(&self) -> String { + self.0.iter().fold(String::with_capacity(32), |mut s, b| { + use fmt::Write; + let _ = write!(s, "{b:02x}"); + s + }) + } +} + +impl Default for TraceId { + fn default() -> Self { + Self::new() + } +} + +impl SpanId { + /// Generate a new random span ID. + pub fn new() -> Self { + let id = Uuid::new_v4(); + let bytes = id.as_bytes(); + Self([ + bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7], + ]) + } + + /// Parse from a 16-character lowercase hex string. + pub fn from_hex(s: &str) -> Option { + if s.len() != 16 { + return None; + } + let mut bytes = [0u8; 8]; + for (i, chunk) in s.as_bytes().chunks(2).enumerate() { + let hi = hex_nibble(chunk[0])?; + let lo = hex_nibble(chunk[1])?; + bytes[i] = (hi << 4) | lo; + } + Some(Self(bytes)) + } + + pub fn to_hex(&self) -> String { + self.0.iter().fold(String::with_capacity(16), |mut s, b| { + use fmt::Write; + let _ = write!(s, "{b:02x}"); + s + }) + } +} + +impl Default for SpanId { + fn default() -> Self { + Self::new() + } +} + +// --------------------------------------------------------------------------- +// SpanContext — the propagated unit +// --------------------------------------------------------------------------- + +/// Carries the W3C trace context for a single request. +#[derive(Clone, Debug)] +pub struct SpanContext { + pub trace_id: TraceId, + pub span_id: SpanId, + /// Whether the trace is sampled (flag bit 01 in traceparent). + pub sampled: bool, +} + +impl SpanContext { + /// Create a new root span context (no parent). + pub fn new_root() -> Self { + Self { + trace_id: TraceId::new(), + span_id: SpanId::new(), + sampled: true, + } + } + + /// Create a child span that inherits the trace ID but gets a new span ID. + pub fn child(&self) -> Self { + Self { + trace_id: self.trace_id.clone(), + span_id: SpanId::new(), + sampled: self.sampled, + } + } + + /// Encode as a W3C `traceparent` header value. + /// + /// Format: `00---` + pub fn to_traceparent(&self) -> String { + format!( + "00-{}-{}-{:02x}", + self.trace_id.to_hex(), + self.span_id.to_hex(), + if self.sampled { 0x01u8 } else { 0x00u8 } + ) + } + + /// Parse a W3C `traceparent` header value. + /// + /// Returns `None` for any malformed input; callers should fall back to + /// `SpanContext::new_root()`. + pub fn from_traceparent(value: &str) -> Option { + let parts: Vec<&str> = value.splitn(4, '-').collect(); + if parts.len() != 4 { + return None; + } + // version must be "00" + if parts[0] != "00" { + return None; + } + let trace_id = TraceId::from_hex(parts[1])?; + let span_id = SpanId::from_hex(parts[2])?; + let flags = u8::from_str_radix(parts[3], 16).ok()?; + Some(Self { + trace_id, + span_id, + sampled: (flags & 0x01) != 0, + }) + } +} + +// --------------------------------------------------------------------------- +// HTTP header helpers +// --------------------------------------------------------------------------- + +pub const TRACEPARENT_HEADER: &str = "traceparent"; +pub const TRACESTATE_HEADER: &str = "tracestate"; + +/// Extract a `SpanContext` from an Axum `HeaderMap`. Falls back to a new +/// root context when the header is absent or malformed. +pub fn extract_span_context(headers: &axum::http::HeaderMap) -> SpanContext { + headers + .get(TRACEPARENT_HEADER) + .and_then(|v| v.to_str().ok()) + .and_then(SpanContext::from_traceparent) + .unwrap_or_else(SpanContext::new_root) +} + +/// Inject the `traceparent` header into an outgoing `reqwest::RequestBuilder`. +pub fn inject_traceparent( + builder: reqwest::RequestBuilder, + ctx: &SpanContext, +) -> reqwest::RequestBuilder { + builder.header(TRACEPARENT_HEADER, ctx.to_traceparent()) +} + +// --------------------------------------------------------------------------- +// Tracing span helpers +// --------------------------------------------------------------------------- + +/// Record trace/span IDs on the current `tracing` span so they appear in +/// structured log output. +pub fn record_trace_ids(ctx: &SpanContext) { + tracing::Span::current().record("trace_id", ctx.trace_id.to_hex()); + tracing::Span::current().record("span_id", ctx.span_id.to_hex()); +} + +// --------------------------------------------------------------------------- +// Internal helpers +// --------------------------------------------------------------------------- + +fn hex_nibble(b: u8) -> Option { + match b { + b'0'..=b'9' => Some(b - b'0'), + b'a'..=b'f' => Some(b - b'a' + 10), + b'A'..=b'F' => Some(b - b'A' + 10), + _ => None, + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn trace_id_round_trips_through_hex() { + let id = TraceId::new(); + let hex = id.to_hex(); + assert_eq!(hex.len(), 32); + let parsed = TraceId::from_hex(&hex).expect("should parse"); + assert_eq!(id, parsed); + } + + #[test] + fn span_id_round_trips_through_hex() { + let id = SpanId::new(); + let hex = id.to_hex(); + assert_eq!(hex.len(), 16); + let parsed = SpanId::from_hex(&hex).expect("should parse"); + assert_eq!(id, parsed); + } + + #[test] + fn traceparent_round_trips() { + let ctx = SpanContext::new_root(); + let header = ctx.to_traceparent(); + // W3C format: 00-<32hex>-<16hex>-01 + assert!(header.starts_with("00-")); + assert_eq!(header.len(), 55); // "00-" + 32 + "-" + 16 + "-" + 2 + + let parsed = SpanContext::from_traceparent(&header).expect("should parse"); + assert_eq!(parsed.trace_id, ctx.trace_id); + assert_eq!(parsed.span_id, ctx.span_id); + assert!(parsed.sampled); + } + + #[test] + fn traceparent_unsampled_flag() { + let mut ctx = SpanContext::new_root(); + ctx.sampled = false; + let header = ctx.to_traceparent(); + assert!(header.ends_with("-00")); + let parsed = SpanContext::from_traceparent(&header).unwrap(); + assert!(!parsed.sampled); + } + + #[test] + fn child_inherits_trace_id() { + let root = SpanContext::new_root(); + let child = root.child(); + assert_eq!(root.trace_id, child.trace_id); + assert_ne!(root.span_id, child.span_id); + } + + #[test] + fn from_traceparent_rejects_malformed_input() { + assert!(SpanContext::from_traceparent("").is_none()); + assert!(SpanContext::from_traceparent("00-badhex-badhex-01").is_none()); + assert!(SpanContext::from_traceparent( + "01-00000000000000000000000000000000-0000000000000000-01" + ) + .is_none()); + } + + #[test] + fn from_traceparent_rejects_wrong_version() { + // version "01" is not supported + let ctx = SpanContext::new_root(); + let bad = format!("01-{}-{}-01", ctx.trace_id.to_hex(), ctx.span_id.to_hex()); + assert!(SpanContext::from_traceparent(&bad).is_none()); + } + + #[test] + fn trace_id_from_hex_rejects_wrong_length() { + assert!(TraceId::from_hex("abc").is_none()); + assert!(TraceId::from_hex(&"a".repeat(33)).is_none()); + } + + #[test] + fn span_id_from_hex_rejects_wrong_length() { + assert!(SpanId::from_hex("abc").is_none()); + assert!(SpanId::from_hex(&"a".repeat(17)).is_none()); + } +} diff --git a/fluid-server/src/tracing_ctx.rs b/fluid-server/src/tracing_ctx.rs new file mode 100644 index 00000000..9023fa5e --- /dev/null +++ b/fluid-server/src/tracing_ctx.rs @@ -0,0 +1,2 @@ +/// Re-export the distributed tracing helpers for use in the binary. +pub use fluid_server::tracing::*; diff --git a/fluid-server/src/xdr.rs b/fluid-server/src/xdr.rs index ada95b5e..68b0bdb6 100644 --- a/fluid-server/src/xdr.rs +++ b/fluid-server/src/xdr.rs @@ -1,12 +1,12 @@ #![allow(dead_code)] use base64::{engine::general_purpose::STANDARD, Engine}; +use bytes::Bytes; use stellar_xdr::curr::{ FeeBumpTransaction, FeeBumpTransactionInnerTx, Limits, Operation, OperationBody, ReadXdr, Transaction, TransactionEnvelope, TransactionV0, }; use tracing::info; -use bytes::Bytes; /// Hard ceiling for the encoded XDR payload accepted by the server before /// any decode or XDR deserialization work is attempted. @@ -30,13 +30,14 @@ impl ZeroCopyDecoder { DECODE_BUFFER.with(|buffer| { let mut buf = buffer.borrow_mut(); buf.clear(); - + // Ensure capacity without reallocating if possible let estimated_len = base64::decoded_len_estimate(input.len()); - if buf.capacity() < estimated_len { - buf.reserve(estimated_len - buf.capacity()); + let current_cap = buf.capacity(); + if current_cap < estimated_len { + buf.reserve(estimated_len - current_cap); } - + STANDARD.decode_vec(input, &mut buf)?; Ok(buf.clone()) // Only clone the actual decoded data }) @@ -165,7 +166,10 @@ pub fn parse_xdr_zero_copy(base64_bytes: &[u8]) -> Result String { Strkey::PrivateKeyEd25519(ed25519::PrivateKey([seed_byte; 32])) .to_string() + .to_string() } fn build_signed_transaction_xdr() -> String { @@ -200,6 +201,9 @@ async fn test_rate_limits_bypassed_when_configured() { println!("Bypassed limit behavior statuses: {:?}", statuses); // All requests must succeed with HTTP 200 for status in statuses { - assert_eq!(status, 200, "expected HTTP 200 since rate limiting and quota checking are disabled"); + assert_eq!( + status, 200, + "expected HTTP 200 since rate limiting and quota checking are disabled" + ); } } diff --git a/server/prisma/migrations/20260601000000_partition_audit_logs_by_month/migration.sql b/server/prisma/migrations/20260601000000_partition_audit_logs_by_month/migration.sql new file mode 100644 index 00000000..00b7a6fe --- /dev/null +++ b/server/prisma/migrations/20260601000000_partition_audit_logs_by_month/migration.sql @@ -0,0 +1,96 @@ +-- ============================================================ +-- Migration: Partition AuditLog table by month (RANGE) +-- Issue #711 – Architecture & Database: Partition Audit Logs Table +-- +-- Strategy: +-- 1. Build a partitioned replacement table +-- 2. Create monthly child partitions (24 months back → 3 months ahead) +-- 3. Add a DEFAULT partition for safety +-- 4. Copy all existing rows +-- 5. Recreate indexes on the parent (propagated to all partitions) +-- 6. Swap old table → new partitioned table atomically +-- +-- NOTE (production deployments): This migration copies all rows before +-- swapping. For tables with active write traffic, run during a maintenance +-- window or use a separate online-migration tool (pg_repack / pglogical). +-- ============================================================ + +-- Step 1: Create the partitioned parent table. +-- PRIMARY KEY must include the partition key ("createdAt") per PG rules. +CREATE TABLE "AuditLog_partitioned" ( + "id" TEXT NOT NULL, + "actor" TEXT NOT NULL, + "action" TEXT, + "target" TEXT, + "eventType" TEXT, + "payload" JSONB, + "metadata" TEXT, + "aiSummary" TEXT, + "timestamp" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY ("id", "createdAt") +) PARTITION BY RANGE ("createdAt"); + +-- Step 2: Create one partition per calendar month. +-- Range: 24 months in the past → 3 months in the future. +DO $$ +DECLARE + d DATE; + name TEXT; +BEGIN + d := DATE_TRUNC('month', NOW()) - INTERVAL '24 months'; + WHILE d <= DATE_TRUNC('month', NOW()) + INTERVAL '3 months' LOOP + name := 'audit_log_y' || TO_CHAR(d, 'YYYY') || '_m' || TO_CHAR(d, 'MM'); + EXECUTE format( + 'CREATE TABLE %I PARTITION OF "AuditLog_partitioned" + FOR VALUES FROM (%L::TIMESTAMP) TO (%L::TIMESTAMP)', + name, + d::TIMESTAMP, + (d + INTERVAL '1 month')::TIMESTAMP + ); + d := d + INTERVAL '1 month'; + END LOOP; +END; +$$; + +-- Step 3: Default partition catches rows outside the explicit ranges. +CREATE TABLE "audit_log_default" PARTITION OF "AuditLog_partitioned" DEFAULT; + +-- Step 4: Copy all existing data. +INSERT INTO "AuditLog_partitioned" +SELECT * FROM "AuditLog"; + +-- Step 5: Recreate indexes on the parent table. +CREATE INDEX "AuditLog_eventType_idx" + ON "AuditLog_partitioned" ("eventType"); + +CREATE INDEX "AuditLog_actor_idx" + ON "AuditLog_partitioned" ("actor"); + +CREATE INDEX "AuditLog_action_idx" + ON "AuditLog_partitioned" ("action"); + +CREATE INDEX "AuditLog_timestamp_idx" + ON "AuditLog_partitioned" ("timestamp"); + +CREATE INDEX "AuditLog_createdAt_idx" + ON "AuditLog_partitioned" ("createdAt"); + +CREATE INDEX "AuditLog_actor_timestamp_idx" + ON "AuditLog_partitioned" ("actor", "timestamp"); + +CREATE INDEX "AuditLog_action_timestamp_idx" + ON "AuditLog_partitioned" ("action", "timestamp"); + +CREATE INDEX "AuditLog_eventType_timestamp_idx" + ON "AuditLog_partitioned" ("eventType", "timestamp"); + +CREATE INDEX "AuditLog_target_timestamp_idx" + ON "AuditLog_partitioned" ("target", "timestamp"); + +-- Step 6: Swap the tables atomically. +ALTER TABLE "AuditLog" RENAME TO "AuditLog_old"; +ALTER TABLE "AuditLog_partitioned" RENAME TO "AuditLog"; + +-- Step 7: Drop the original monolithic table. +DROP TABLE "AuditLog_old"; diff --git a/server/src/services/auditLogPartitionManager.test.ts b/server/src/services/auditLogPartitionManager.test.ts new file mode 100644 index 00000000..07980c04 --- /dev/null +++ b/server/src/services/auditLogPartitionManager.test.ts @@ -0,0 +1,188 @@ +import { describe, expect, it, vi } from "vitest"; +import { + addMonths, + AuditLogPartitionManager, + partitionEnd, + partitionName, + partitionStart, + subtractMonths, +} from "./auditLogPartitionManager"; + +// --------------------------------------------------------------------------- +// Pure helper tests +// --------------------------------------------------------------------------- + +describe("partitionName", () => { + it("pads month with leading zero", () => { + expect(partitionName(2026, 1)).toBe("audit_log_y2026_m01"); + expect(partitionName(2026, 12)).toBe("audit_log_y2026_m12"); + }); +}); + +describe("partitionStart", () => { + it("returns first day of the month at midnight", () => { + expect(partitionStart(2026, 3)).toBe("2026-03-01 00:00:00"); + }); +}); + +describe("partitionEnd", () => { + it("returns first day of the next month", () => { + expect(partitionEnd(2026, 3)).toBe("2026-04-01 00:00:00"); + }); + + it("wraps year correctly for December", () => { + expect(partitionEnd(2026, 12)).toBe("2027-01-01 00:00:00"); + }); +}); + +describe("addMonths / subtractMonths", () => { + it("adds months within the same year", () => { + expect(addMonths(2026, 1, 3)).toEqual({ year: 2026, month: 4 }); + }); + + it("wraps to next year", () => { + expect(addMonths(2026, 11, 3)).toEqual({ year: 2027, month: 2 }); + }); + + it("subtracts months within the same year", () => { + expect(subtractMonths(2026, 6, 3)).toEqual({ year: 2026, month: 3 }); + }); + + it("wraps to previous year", () => { + expect(subtractMonths(2026, 2, 3)).toEqual({ year: 2025, month: 11 }); + }); +}); + +// --------------------------------------------------------------------------- +// AuditLogPartitionManager tests +// --------------------------------------------------------------------------- + +describe("AuditLogPartitionManager", () => { + describe("ensurePartitions", () => { + it("creates partitions for the configured window", async () => { + const manager = new AuditLogPartitionManager({ + retentionMonths: 2, + futureMonths: 1, + }); + + const executed: string[] = []; + const executor = vi.fn(async (sql: string) => { + executed.push(sql); + }); + + const created = await manager.ensurePartitions(executor); + + // 2 past + current + 1 future = 4 partitions + expect(created.length).toBe(4); + expect(executor).toHaveBeenCalledTimes(4); + // Each SQL should be a CREATE TABLE IF NOT EXISTS + for (const sql of executed) { + expect(sql).toContain("CREATE TABLE IF NOT EXISTS"); + expect(sql).toContain("PARTITION OF"); + } + }); + + it("ignores 'already exists' errors", async () => { + const manager = new AuditLogPartitionManager({ + retentionMonths: 0, + futureMonths: 0, + }); + + const executor = vi.fn(async () => { + throw new Error("relation already exists"); + }); + + // Should not throw + await expect(manager.ensurePartitions(executor)).resolves.toBeDefined(); + }); + + it("re-throws non-existence errors", async () => { + const manager = new AuditLogPartitionManager({ + retentionMonths: 0, + futureMonths: 0, + }); + + const executor = vi.fn(async () => { + throw new Error("permission denied"); + }); + + await expect(manager.ensurePartitions(executor)).rejects.toThrow( + "permission denied", + ); + }); + }); + + describe("listPartitions", () => { + it("parses partition names into PartitionInfo objects", async () => { + const manager = new AuditLogPartitionManager(); + const query = vi.fn(async () => [ + { tablename: "audit_log_y2026_m01" }, + { tablename: "audit_log_y2026_m02" }, + ]); + + const partitions = await manager.listPartitions(query); + + expect(partitions).toHaveLength(2); + expect(partitions[0].name).toBe("audit_log_y2026_m01"); + expect(partitions[0].rangeStart).toBe("2026-01-01 00:00:00"); + expect(partitions[0].rangeEnd).toBe("2026-02-01 00:00:00"); + }); + + it("handles unrecognised table names gracefully", async () => { + const manager = new AuditLogPartitionManager(); + const query = vi.fn(async () => [{ tablename: "audit_log_default" }]); + + const partitions = await manager.listPartitions(query); + expect(partitions[0].rangeStart).toBe(""); + }); + }); + + describe("pruneOldPartitions", () => { + it("drops partitions older than retentionMonths", async () => { + const manager = new AuditLogPartitionManager({ retentionMonths: 1 }); + + const now = new Date(); + const oldYear = now.getUTCFullYear() - 1; + const oldMonth = now.getUTCMonth() + 1; // same month last year + + const oldName = partitionName(oldYear, oldMonth); + const currentName = partitionName( + now.getUTCFullYear(), + now.getUTCMonth() + 1, + ); + + const query = vi.fn(async () => [ + { tablename: oldName }, + { tablename: currentName }, + ]); + + const dropped: string[] = []; + const executor = vi.fn(async (sql: string) => { + dropped.push(sql); + }); + + const result = await manager.pruneOldPartitions(executor, query); + + expect(result).toContain(oldName); + expect(result).not.toContain(currentName); + expect(dropped.some((s) => s.includes(oldName))).toBe(true); + }); + + it("does not drop partitions within retention window", async () => { + const manager = new AuditLogPartitionManager({ retentionMonths: 24 }); + + const now = new Date(); + const name = partitionName( + now.getUTCFullYear(), + now.getUTCMonth() + 1, + ); + + const query = vi.fn(async () => [{ tablename: name }]); + const executor = vi.fn(async () => {}); + + const result = await manager.pruneOldPartitions(executor, query); + expect(result).toHaveLength(0); + expect(executor).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/server/src/services/auditLogPartitionManager.ts b/server/src/services/auditLogPartitionManager.ts new file mode 100644 index 00000000..b03cea2c --- /dev/null +++ b/server/src/services/auditLogPartitionManager.ts @@ -0,0 +1,220 @@ +/** + * Audit Log Partition Manager (Issue #711) + * + * Manages monthly partitions for the AuditLog table. Provides: + * - Automatic creation of the next month's partition before it is needed. + * - Listing of existing partitions. + * - Pruning of partitions older than a configurable retention window. + * + * This service is designed to run as a scheduled task (e.g. monthly cron) + * and is safe to call multiple times — all DDL operations are idempotent. + */ + +import { createLogger } from "../utils/logger"; + +const logger = createLogger({ component: "audit_log_partition_manager" }); + +export interface PartitionInfo { + name: string; + /** ISO date string for the start of the partition range (inclusive). */ + rangeStart: string; + /** ISO date string for the end of the partition range (exclusive). */ + rangeEnd: string; +} + +export interface PartitionManagerOptions { + /** Number of months of history to retain. Partitions older than this are + * eligible for pruning. Default: 24. */ + retentionMonths?: number; + /** Number of future months to pre-create. Default: 3. */ + futureMonths?: number; +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** Format a Date as a YYYY-MM-DD string (UTC). */ +function toYearMonth(date: Date): { year: number; month: number } { + return { year: date.getUTCFullYear(), month: date.getUTCMonth() + 1 }; +} + +/** Build the partition table name for a given year/month. */ +export function partitionName(year: number, month: number): string { + const mm = String(month).padStart(2, "0"); + return `audit_log_y${year}_m${mm}`; +} + +/** Build the lower bound timestamp for a partition (inclusive). */ +export function partitionStart(year: number, month: number): string { + const mm = String(month).padStart(2, "0"); + return `${year}-${mm}-01 00:00:00`; +} + +/** Build the upper bound timestamp for a partition (exclusive). */ +export function partitionEnd(year: number, month: number): string { + const next = new Date(Date.UTC(year, month, 1)); // month is 0-indexed here + const ny = next.getUTCFullYear(); + const nm = String(next.getUTCMonth() + 1).padStart(2, "0"); + return `${ny}-${nm}-01 00:00:00`; +} + +/** Add `n` months to a {year, month} pair. */ +export function addMonths( + year: number, + month: number, + n: number, +): { year: number; month: number } { + const total = (year * 12 + (month - 1)) + n; + return { year: Math.floor(total / 12), month: (total % 12) + 1 }; +} + +/** Subtract `n` months from a {year, month} pair. */ +export function subtractMonths( + year: number, + month: number, + n: number, +): { year: number; month: number } { + return addMonths(year, month, -n); +} + +// --------------------------------------------------------------------------- +// Partition manager +// --------------------------------------------------------------------------- + +export class AuditLogPartitionManager { + private readonly retentionMonths: number; + private readonly futureMonths: number; + + constructor(options: PartitionManagerOptions = {}) { + this.retentionMonths = options.retentionMonths ?? 24; + this.futureMonths = options.futureMonths ?? 3; + } + + /** + * Ensure all required partitions exist for the window + * [now - retentionMonths, now + futureMonths]. + * + * @param executor A function that executes a raw SQL statement. + * Receives the SQL string and returns a Promise. + */ + async ensurePartitions( + executor: (sql: string) => Promise, + ): Promise { + const now = new Date(); + const { year: cy, month: cm } = toYearMonth(now); + const start = subtractMonths(cy, cm, this.retentionMonths); + const end = addMonths(cy, cm, this.futureMonths); + + const created: string[] = []; + let cur = start; + + while ( + cur.year < end.year || + (cur.year === end.year && cur.month <= end.month) + ) { + const name = partitionName(cur.year, cur.month); + const from = partitionStart(cur.year, cur.month); + const to = partitionEnd(cur.year, cur.month); + + const sql = ` + CREATE TABLE IF NOT EXISTS "${name}" + PARTITION OF "AuditLog" + FOR VALUES FROM ('${from}') TO ('${to}') + `.trim(); + + try { + await executor(sql); + created.push(name); + logger.debug({ partition: name }, "Partition ensured"); + } catch (err: unknown) { + // "already exists" errors are safe to ignore. + const msg = err instanceof Error ? err.message : String(err); + if (!msg.includes("already exists")) { + logger.error({ partition: name, err: msg }, "Failed to create partition"); + throw err; + } + } + + cur = addMonths(cur.year, cur.month, 1); + } + + return created; + } + + /** + * List existing audit log partitions by querying the PostgreSQL catalog. + * + * @param query A function that executes a raw SQL query and returns rows. + */ + async listPartitions( + query: (sql: string) => Promise>, + ): Promise { + const rows = await query(` + SELECT tablename + FROM pg_tables + WHERE schemaname = 'public' + AND tablename LIKE 'audit_log_y%' + ORDER BY tablename + `); + + return rows.map((row) => { + const name = row.tablename; + // Parse year/month from name: audit_log_y_m + const match = /audit_log_y(\d{4})_m(\d{2})/.exec(name); + if (!match) { + return { name, rangeStart: "", rangeEnd: "" }; + } + const year = parseInt(match[1], 10); + const month = parseInt(match[2], 10); + return { + name, + rangeStart: partitionStart(year, month), + rangeEnd: partitionEnd(year, month), + }; + }); + } + + /** + * Drop partitions older than `retentionMonths`. + * + * @param executor A function that executes a raw SQL statement. + * @param query A function that lists existing partitions. + * @returns Names of dropped partitions. + */ + async pruneOldPartitions( + executor: (sql: string) => Promise, + query: (sql: string) => Promise>, + ): Promise { + const now = new Date(); + const { year: cy, month: cm } = toYearMonth(now); + const cutoff = subtractMonths(cy, cm, this.retentionMonths); + + const partitions = await this.listPartitions(query); + const dropped: string[] = []; + + for (const p of partitions) { + const match = /audit_log_y(\d{4})_m(\d{2})/.exec(p.name); + if (!match) continue; + + const year = parseInt(match[1], 10); + const month = parseInt(match[2], 10); + + const isBefore = + year < cutoff.year || (year === cutoff.year && month < cutoff.month); + + if (isBefore) { + try { + await executor(`DROP TABLE IF EXISTS "${p.name}"`); + dropped.push(p.name); + logger.info({ partition: p.name }, "Pruned old audit log partition"); + } catch (err: unknown) { + const msg = err instanceof Error ? err.message : String(err); + logger.error({ partition: p.name, err: msg }, "Failed to prune partition"); + } + } + } + + return dropped; + } +}