diff --git a/server/src/client_handler.rs b/server/src/client_handler.rs index 21fa888..cc7e69d 100644 --- a/server/src/client_handler.rs +++ b/server/src/client_handler.rs @@ -337,9 +337,7 @@ mod client_handler_tests { use crate::client_handler::{ClientError, ClientHandler}; use crate::protocol_handler::{BinaryProtocolHandler, TextProtocolHandler}; use crate::server::ExecutorWithWorkers; - use crate::workers_container::WorkersContainer; use dashmap::DashMap; - use executor::Executor; use metadata::catalog_manager::CatalogManager; use parking_lot::RwLock; use protocol::{Request, Response, StatementType}; diff --git a/storage/src/cache.rs b/storage/src/cache.rs index 7ef37b9..cde546e 100644 --- a/storage/src/cache.rs +++ b/storage/src/cache.rs @@ -480,7 +480,7 @@ impl Cache { // - allocate page // the newly allocated page will discard previous changes applied from wal. let mut pinned_write_page = self.pin_write(&id)?; - pinned_write_page.mark_diff(0, PAGE_SIZE as _); + pinned_write_page.mark_diff(0, USABLE_PAGE_SIZE as _); Ok((pinned_write_page, page_id)) } diff --git a/storage/src/write_ahead_log.rs b/storage/src/write_ahead_log.rs index fc99e4c..7ec9438 100644 --- a/storage/src/write_ahead_log.rs +++ b/storage/src/write_ahead_log.rs @@ -590,10 +590,7 @@ impl WalClient { let (send, recv) = channel::bounded(1); let ops = ops .into_iter() - .map(|(file_page_ref, diff)| SinglePageOperation { - file_page_ref, - diff, - }) + .map(|(file_page_ref, diff)| SinglePageOperation::new(file_page_ref, diff)) .collect(); let record = WalRecord { data: WalRecordData::MultiPageOperation(ops), @@ -930,7 +927,7 @@ mod tests { let dir = tempdir().unwrap(); let log_path = dir.path(); let (handle, mut bg_handle) = - spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + spawn_wal(log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); assert_eq!(handle.wal_client.flushed_lsn(), 0); @@ -954,7 +951,7 @@ mod tests { let dir = tempdir().unwrap(); let log_path = dir.path(); let (handle, mut bg_handle) = - spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + spawn_wal(log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); handle .wal_client @@ -1008,7 +1005,7 @@ mod tests { // First session: write records and checkpoint { let (handle, mut bg_handle) = - spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + spawn_wal(log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); handle .wal_client .write_single(make_test_file_page_ref(1), PageDiff::default()); @@ -1030,7 +1027,7 @@ mod tests { // Second session: recover { let (handle, mut bg_handle) = - spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + spawn_wal(log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); // Should only have records after checkpoint (LSN 4 and 5) assert_eq!(handle.redo_records.len(), 2); @@ -1050,7 +1047,7 @@ mod tests { // First session { let (handle, mut bg_handle) = - spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + spawn_wal(log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); handle .wal_client .write_single(make_test_file_page_ref(1), PageDiff::default()); @@ -1068,7 +1065,7 @@ mod tests { // Second session { let (handle, mut bg_handle) = - spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + spawn_wal(log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); // flushed_lsn should be set to last_lsn from recovery assert_eq!(handle.wal_client.flushed_lsn(), 3); @@ -1098,7 +1095,7 @@ mod tests { let dir = tempdir().unwrap(); let log_path = dir.path(); let (handle, mut bg_handle) = - spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + spawn_wal(log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); // Simulate how Cache would share WalClient via Arc let shared = SharedWalClient::new(handle.wal_client); diff --git a/tester/README.md b/tester/README.md index 80ed74c..1b95cb3 100644 --- a/tester/README.md +++ b/tester/README.md @@ -1,21 +1,49 @@ # Tester -This crate is a small test client used only for running end-to-end and performance tests against a running coDB server. +This crate is a test client used for running end-to-end and performance tests against a coDB server. -Prerequisites -- Ensure the coDB server is already running before using the tester (the tester connects to the server). +## Overview -Quick run -- Build & run the tester (example): +- **E2E Tests**: Automatically manage the database server lifecycle - start the server before tests and stop it after tests complete. +- **Performance Tests**: Require a manually started server for more accurate benchmarking. + +## Prerequisites + +- Build the server binary first: + ```bash + cargo build --release --bin server + ``` + +## Quick Run + +### E2E Tests + +E2E tests require the `--server-path` argument - the server is started/stopped automatically: ```bash +# Run a specific E2E test +cargo run -p tester -- e2e-select --server-path ./target/release/server + +# Run all E2E tests +cargo run -p tester -- e2e-all --server-path ./target/release/server +``` + +### Performance Tests + +Performance tests require a **manually started server**: + +```bash +# Terminal 1: Start the server +./target/release/server + +# Terminal 2: Run performance tests (no --server-path needed) cargo run -p tester -- concurrent-reads-index --runs 1 --threads 8 --records 1000 --bound-size 10 ``` -Performance tests -- Performance tests live in [tester/src/performance](tester/src/performance). +## Test Types + +### Performance Tests +Performance tests live in [tester/src/performance](tester/src/performance) and measure concurrency and throughput. -Adding a new performance test -- Add a new file in [tester/src/performance](tester/src/performance) implementing the `Suite` trait (see existing tests for examples). -- Export the module in [tester/src/performance/mod.rs](tester/src/performance/mod.rs) with `pub mod your_test;`. -- Wire a CLI subcommand and runner in [tester/src/main.rs](tester/src/main.rs) so you can execute the test from the command line. +### E2E Tests +E2E tests live in [tester/src/e2e](tester/src/e2e) and verify end-to-end functionality. diff --git a/tester/src/e2e/mod.rs b/tester/src/e2e/mod.rs index 56c5140..8c53694 100644 --- a/tester/src/e2e/mod.rs +++ b/tester/src/e2e/mod.rs @@ -7,3 +7,4 @@ pub mod response_helpers; pub mod select; pub mod truncate_table; pub mod update; +pub mod wal_recovery; diff --git a/tester/src/e2e/wal_recovery.rs b/tester/src/e2e/wal_recovery.rs new file mode 100644 index 0000000..47522ce --- /dev/null +++ b/tester/src/e2e/wal_recovery.rs @@ -0,0 +1,339 @@ +use std::time::Duration; +use std::{cell::RefCell, thread}; + +use log::{error, info}; +use protocol::{ColumnType, Request, StatementType}; + +use crate::{ + ServerProcess, TesterError, + suite::{E2ETestResult, Suite, default_client}, +}; + +use super::response_helpers::{ + extract_bool, extract_f32, extract_f64, extract_i32, extract_i64, extract_string, + validate_field_count, validate_non_select_statement, validate_select_query, +}; + +/// Test record structure for WAL recovery tests +#[derive(Debug, Clone)] +pub struct WalTestRecord { + pub id: i32, + pub big_id: i64, + pub price: f32, + pub precise_price: f64, + pub active: bool, + pub birth_date: String, + pub last_login: String, + pub name: String, +} + +impl WalTestRecord { + /// Generate test records + pub fn generate(num_records: usize) -> Vec { + (0..num_records) + .map(|i| WalTestRecord { + id: i as i32, + big_id: (i as i64) * 1000, + price: (i as f32) * 1.5, + precise_price: (i as f64) * 2.75, + active: i % 2 == 0, + birth_date: format!("2024-01-{:02}", (i % 28) + 1), + last_login: format!("2024-01-{:02}T12:00:00", (i % 28) + 1), + name: format!("User_{}", i), + }) + .collect() + } + + /// Validate that a protocol Record matches this test record + pub fn validate_record(&self, record: &protocol::Record) -> Result<(), TesterError> { + validate_field_count(record, 8)?; + + let id = extract_i32(record, 0)?; + let big_id = extract_i64(record, 1)?; + let price = extract_f32(record, 2)?; + let precise_price = extract_f64(record, 3)?; + let active = extract_bool(record, 4)?; + let name = extract_string(record, 7)?; + + if id != self.id { + return Err(TesterError::ServerError { + message: format!("ID mismatch: expected {}, got {}", self.id, id), + }); + } + if big_id != self.big_id { + return Err(TesterError::ServerError { + message: format!("big_id mismatch: expected {}, got {}", self.big_id, big_id), + }); + } + if (price - self.price).abs() > 0.01 { + return Err(TesterError::ServerError { + message: format!("price mismatch: expected {}, got {}", self.price, price), + }); + } + if (precise_price - self.precise_price).abs() > 0.001 { + return Err(TesterError::ServerError { + message: format!( + "precise_price mismatch: expected {}, got {}", + self.precise_price, precise_price + ), + }); + } + if active != self.active { + return Err(TesterError::ServerError { + message: format!("active mismatch: expected {}, got {}", self.active, active), + }); + } + if name != self.name { + return Err(TesterError::ServerError { + message: format!("name mismatch: expected '{}', got '{}'", self.name, name), + }); + } + + Ok(()) + } +} + +pub struct WalRecoveryE2ETest; + +pub struct Setup { + pub database_name: String, + pub table_name: String, +} + +pub struct Test { + pub database_name: String, + pub table_name: String, + pub test_data: Vec, + pub server_path: String, + pub server: ServerProcess, +} + +impl Suite for WalRecoveryE2ETest { + type SetupArgs = Setup; + + async fn setup(args: &Self::SetupArgs) -> Result<(), TesterError> { + info!("Creating database '{}'...", args.database_name); + let mut client = default_client().await?; + + client + .execute_and_wait(Request::CreateDatabase { + database_name: args.database_name.clone(), + }) + .await?; + + info!("✓ Database created"); + + // Create table with all data types + let create_table_sql = format!( + "CREATE TABLE {} (\ + id INT32 PRIMARY_KEY, \ + big_id INT64, \ + price FLOAT32, \ + precise_price FLOAT64, \ + active BOOL, \ + birth_date DATE, \ + last_login DATETIME, \ + name STRING\ + );", + args.table_name + ); + + info!("Creating table..."); + client + .send_request(&Request::Query { + database_name: Some(args.database_name.clone()), + sql: create_table_sql, + }) + .await?; + + validate_non_select_statement(&mut client, 0, StatementType::CreateTable).await?; + info!("✓ Table created"); + + Ok(()) + } + + type TestArgs = RefCell; + + async fn run(args: &Self::TestArgs) -> Result { + let mut tests_passed = 0; + + info!("\n=== Test 1: WAL recovery after SIGKILL ==="); + + // Clone values to avoid holding RefCell borrow across await + let (database_name, table_name, test_data) = { + let borrowed = args.borrow(); + ( + borrowed.database_name.clone(), + borrowed.table_name.clone(), + borrowed.test_data.clone(), + ) + }; + + // Insert records + if let Err(e) = insert_records(&database_name, &table_name, &test_data).await { + error!("Insert failed: {:?}", e); + return Err(e); + } + info!("✓ All {} records inserted", args.borrow().test_data.len()); + + // Wait for WAL flush + info!("Waiting 500ms for WAL flush..."); + thread::sleep(Duration::from_millis(500)); + info!("✓ WAL should be flushed"); + + // Kill the current server + args.borrow_mut().server.stop()?; + info!("✓ Server killed"); + + // Restart server + info!("Restarting server for WAL recovery..."); + let mut server = ServerProcess::start(&args.borrow().server_path)?; + + // Wait for WAL recovery to complete + info!("Waiting 3 seconds for WAL recovery..."); + thread::sleep(Duration::from_secs(3)); + info!("✓ WAL recovery period complete"); + + // Verify all records are present after recovery + if let Err(e) = verify_records_after_recovery(&database_name, &table_name, &test_data).await + { + error!("Verification failed: {:?}", e); + return Err(e); + } + info!( + "✓ All {} records verified after recovery", + args.borrow().test_data.len() + ); + tests_passed += 1; + + // We need to do cleanup here because we can't pass it to cleanup function + + info!("Deleting database '{}'...", &database_name); + let mut client = default_client().await?; + + client + .execute_and_wait(Request::DeleteDatabase { + database_name: database_name.clone(), + }) + .await?; + + info!("✓ Database deleted"); + + server.stop()?; + + Ok(E2ETestResult { tests_passed }) + } + + type CleanupArgs = (); + + async fn cleanup(_: &Self::CleanupArgs) -> Result<(), TesterError> { + Ok(()) + } +} + +/// Test WAL recovery: Insert records +async fn insert_records( + db_name: &str, + table_name: &str, + test_data: &[WalTestRecord], +) -> Result<(), TesterError> { + let mut client = default_client().await?; + + info!("Inserting {} records...", test_data.len()); + + // Insert all records + for record in test_data { + let insert_sql = format!( + "INSERT INTO {} (id, big_id, price, precise_price, active, birth_date, last_login, name) \ + VALUES ({}, {}, {:.1}, {:.2}, {}, '{}', '{}', '{}');", + table_name, + record.id, + record.big_id, + record.price, + record.precise_price, + record.active, + record.birth_date, + record.last_login, + record.name + ); + + client + .send_request(&Request::Query { + database_name: Some(db_name.to_string()), + sql: insert_sql, + }) + .await?; + + validate_non_select_statement(&mut client, 1, StatementType::Insert).await?; + } + + info!("✓ All {} records inserted", test_data.len()); + + Ok(()) +} + +/// Verify all records after server restart +pub async fn verify_records_after_recovery( + database_name: &str, + table_name: &str, + expected_records: &[WalTestRecord], +) -> Result<(), TesterError> { + info!( + "Verifying {} records after recovery...", + expected_records.len() + ); + + let mut client = default_client().await?; + + // Select all records ordered by id + let sql = format!("SELECT * FROM {} ORDER BY id;", table_name); + client + .send_request(&Request::Query { + database_name: Some(database_name.to_string()), + sql, + }) + .await?; + + let expected_columns = vec![ + ("id", ColumnType::I32), + ("big_id", ColumnType::I64), + ("price", ColumnType::F32), + ("precise_price", ColumnType::F64), + ("active", ColumnType::Bool), + ("birth_date", ColumnType::Date), + ("last_login", ColumnType::DateTime), + ("name", ColumnType::String), + ]; + + let records = validate_select_query(&mut client, &expected_columns).await?; + + // Verify count + if records.len() != expected_records.len() { + error!( + "Record count mismatch: expected {}, got {}", + expected_records.len(), + records.len() + ); + return Err(TesterError::ServerError { + message: format!( + "Record count mismatch after recovery: expected {}, got {}", + expected_records.len(), + records.len() + ), + }); + } + + info!("✓ Record count matches: {}", records.len()); + + // Verify each record + for (i, (expected, actual)) in expected_records.iter().zip(records.iter()).enumerate() { + if let Err(e) = expected.validate_record(actual) { + error!("Record {} validation failed: {:?}", i, e); + return Err(e); + } + } + + info!("✓ All {} records verified successfully", records.len()); + + Ok(()) +} diff --git a/tester/src/main.rs b/tester/src/main.rs index 73eeab1..8140429 100644 --- a/tester/src/main.rs +++ b/tester/src/main.rs @@ -1,4 +1,7 @@ +use std::cell::RefCell; use std::io; +use std::process::{Child, Command as StdCommand, Stdio}; +use std::thread; use std::time::Duration; use clap::{Parser, Subcommand}; @@ -12,6 +15,7 @@ use crate::e2e::insert::{self, InsertE2ETest}; use crate::e2e::select::{self, SelectE2ETest}; use crate::e2e::truncate_table::{self, TruncateTableE2ETest}; use crate::e2e::update::{self, UpdateE2ETest}; +use crate::e2e::wal_recovery::{self, WalRecoveryE2ETest}; use crate::performance::concurrent_inserts::{self, ConcurrentInserts}; use crate::performance::concurrent_reads::{self, ReadMany}; use crate::performance::concurrent_reads_and_inserts::{self, ConcurrentReadsAndInserts}; @@ -123,31 +127,74 @@ enum Command { }, /// E2E test for SELECT statements with comprehensive validation - E2eSelect, + E2eSelect { + /// Path to the server executable + #[arg(long)] + server_path: String, + }, /// E2E test for INSERT statements with comprehensive validation - E2eInsert, + E2eInsert { + /// Path to the server executable + #[arg(long)] + server_path: String, + }, /// E2E test for UPDATE statements with comprehensive validation - E2eUpdate, + E2eUpdate { + /// Path to the server executable + #[arg(long)] + server_path: String, + }, /// E2E test for DELETE statements with comprehensive validation - E2eDelete, + E2eDelete { + /// Path to the server executable + #[arg(long)] + server_path: String, + }, /// E2E test for CREATE TABLE statements with comprehensive validation - E2eCreateTable, + E2eCreateTable { + /// Path to the server executable + #[arg(long)] + server_path: String, + }, /// E2E test for TRUNCATE TABLE statements with comprehensive validation - E2eTruncateTable, + E2eTruncateTable { + /// Path to the server executable + #[arg(long)] + server_path: String, + }, /// E2E test for DROP TABLE statements with comprehensive validation - E2eDropTable, + E2eDropTable { + /// Path to the server executable + #[arg(long)] + server_path: String, + }, /// E2E test for ALTER TABLE statements with comprehensive validation - E2eAlterTable, + E2eAlterTable { + /// Path to the server executable + #[arg(long)] + server_path: String, + }, + + /// E2E test for WAL recovery after SIGKILL + E2eWalRecovery { + /// Path to the server executable + #[arg(long)] + server_path: String, + }, - /// Run all E2E tests (SELECT, INSERT, UPDATE, DELETE, CREATE TABLE, TRUNCATE TABLE, DROP TABLE, and ALTER TABLE) - E2eAll, + /// Run all E2E tests (SELECT, INSERT, UPDATE, DELETE, CREATE TABLE, TRUNCATE TABLE, DROP TABLE, ALTER TABLE, and WAL RECOVERY) + E2eAll { + /// Path to the server executable + #[arg(long)] + server_path: String, + }, } #[derive(Debug, Error)] @@ -168,6 +215,52 @@ enum TesterError { ServerError { message: String }, } +/// Helper struct to manage the database server process +struct ServerProcess { + child: Child, +} + +impl ServerProcess { + /// Start the database server as a child process + fn start(server_path: &str) -> Result { + println!("Starting database server..."); + + // Start the server process + let child = StdCommand::new(server_path) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn()?; + + // Give the server some time to start up + thread::sleep(Duration::from_secs(1)); + + println!("Database server started (PID: {})", child.id()); + + Ok(ServerProcess { child }) + } + + /// Stop the database server gracefully + fn stop(&mut self) -> Result<(), TesterError> { + println!("Stopping database server (PID: {})...", self.child.id()); + + // Wait for the process to finish + self.child.kill()?; + self.child.wait()?; + + println!("Database server stopped"); + + Ok(()) + } +} + +impl Drop for ServerProcess { + fn drop(&mut self) { + // Ensure the process is killed even if stop() wasn't called + let _ = self.child.kill(); + let _ = self.child.wait(); + } +} + async fn concurrent_inserts( runs: u32, threads: usize, @@ -334,7 +427,9 @@ async fn concurrent_reads_non_index( Ok(test_results) } -async fn e2e_select() -> Result<(), TesterError> { +async fn e2e_select(server_path: &str) -> Result<(), TesterError> { + let mut server = ServerProcess::start(server_path)?; + let db_name = "E2E_SELECT_TEST".to_string(); let table_name = "TEST_TABLE".to_string(); @@ -364,10 +459,14 @@ async fn e2e_select() -> Result<(), TesterError> { println!("E2E SELECT test completed successfully!"); println!("Tests passed: {}", result.tests_passed); + server.stop()?; + Ok(()) } -async fn e2e_insert() -> Result<(), TesterError> { +async fn e2e_insert(server_path: &str) -> Result<(), TesterError> { + let mut server = ServerProcess::start(server_path)?; + let db_name = "E2E_INSERT_TEST".to_string(); let table_name = "INSERT_TEST_TABLE".to_string(); @@ -390,10 +489,14 @@ async fn e2e_insert() -> Result<(), TesterError> { println!("E2E INSERT test completed successfully!"); println!("Tests passed: {}", result.tests_passed); + server.stop()?; + Ok(()) } -async fn e2e_update() -> Result<(), TesterError> { +async fn e2e_update(server_path: &str) -> Result<(), TesterError> { + let mut server = ServerProcess::start(server_path)?; + let db_name = "UPDATE_E2E_DB".to_string(); let table_name = "UPDATE_E2E_TABLE".to_string(); @@ -422,10 +525,14 @@ async fn e2e_update() -> Result<(), TesterError> { println!("E2E UPDATE test completed successfully!"); println!("Tests passed: {}", result.tests_passed); + server.stop()?; + Ok(()) } -async fn e2e_delete() -> Result<(), TesterError> { +async fn e2e_delete(server_path: &str) -> Result<(), TesterError> { + let mut server = ServerProcess::start(server_path)?; + let db_name = "DELETE_E2E_DB".to_string(); let table_name = "DELETE_E2E_TABLE".to_string(); const NUM_RECORDS: usize = 5000; @@ -453,10 +560,14 @@ async fn e2e_delete() -> Result<(), TesterError> { println!("E2E DELETE test completed successfully!"); println!("Tests passed: {}", result.tests_passed); + server.stop()?; + Ok(()) } -async fn e2e_create_table() -> Result<(), TesterError> { +async fn e2e_create_table(server_path: &str) -> Result<(), TesterError> { + let mut server = ServerProcess::start(server_path)?; + let db_name = "CREATE_TABLE_E2E_DB".to_string(); let setup = create_table::Setup { @@ -476,10 +587,14 @@ async fn e2e_create_table() -> Result<(), TesterError> { println!("E2E CREATE TABLE test completed successfully!"); println!("Tests passed: {}", result.tests_passed); + server.stop()?; + Ok(()) } -async fn e2e_truncate_table() -> Result<(), TesterError> { +async fn e2e_truncate_table(server_path: &str) -> Result<(), TesterError> { + let mut server = ServerProcess::start(server_path)?; + let db_name = "TRUNCATE_TABLE_E2E_DB".to_string(); let setup = truncate_table::Setup { @@ -499,10 +614,14 @@ async fn e2e_truncate_table() -> Result<(), TesterError> { println!("E2E TRUNCATE TABLE test completed successfully!"); println!("Tests passed: {}", result.tests_passed); + server.stop()?; + Ok(()) } -async fn e2e_drop_table() -> Result<(), TesterError> { +async fn e2e_drop_table(server_path: &str) -> Result<(), TesterError> { + let mut server = ServerProcess::start(server_path)?; + let db_name = "DROP_TABLE_E2E_DB".to_string(); let setup = drop_table::Setup { @@ -522,10 +641,14 @@ async fn e2e_drop_table() -> Result<(), TesterError> { println!("E2E DROP TABLE test completed successfully!"); println!("Tests passed: {}", result.tests_passed); + server.stop()?; + Ok(()) } -async fn e2e_alter_table() -> Result<(), TesterError> { +async fn e2e_alter_table(server_path: &str) -> Result<(), TesterError> { + let mut server = ServerProcess::start(server_path)?; + let db_name = "ALTER_TABLE_E2E_DB".to_string(); let setup = alter_table::Setup { @@ -545,17 +668,53 @@ async fn e2e_alter_table() -> Result<(), TesterError> { println!("E2E ALTER TABLE test completed successfully!"); println!("Tests passed: {}", result.tests_passed); + server.stop()?; + Ok(()) } -async fn e2e_all() -> Result<(), TesterError> { +async fn e2e_wal_recovery(server_path: &str) -> Result<(), TesterError> { + let server = ServerProcess::start(server_path)?; + + let db_name = "WAL_RECOVERY_E2E_DB".to_string(); + let table_name = "WAL_TEST_TABLE".to_string(); + + const NUM_RECORDS: usize = 10000; + + let setup = wal_recovery::Setup { + database_name: db_name.clone(), + table_name: table_name.clone(), + }; + + // Generate test data + let test_data = wal_recovery::WalTestRecord::generate(NUM_RECORDS); + + let test = wal_recovery::Test { + database_name: db_name.clone(), + table_name: table_name.clone(), + test_data: test_data.clone(), + server_path: server_path.to_string(), + server, + }; + + let result = WalRecoveryE2ETest::run_suite(&setup, &RefCell::new(test), &()).await?; + + println!("E2E WAL RECOVERY test completed successfully!"); + println!("Tests passed: {}", result.tests_passed); + + // Server is stopped inside wal recovery test, no need to stop it here. + + Ok(()) +} + +async fn e2e_all(server_path: &str) -> Result<(), TesterError> { println!("\n========================================"); println!("Running ALL E2E Tests"); println!("========================================\n"); // Run CREATE TABLE tests - println!("[1/8] Running CREATE TABLE E2E tests..."); - match e2e_create_table().await { + println!("[1/9] Running CREATE TABLE E2E tests..."); + match e2e_create_table(server_path).await { Ok(()) => { println!("✓ CREATE TABLE E2E tests passed\n"); } @@ -566,8 +725,8 @@ async fn e2e_all() -> Result<(), TesterError> { } // Run INSERT tests - println!("[2/8] Running INSERT E2E tests..."); - match e2e_insert().await { + println!("[2/9] Running INSERT E2E tests..."); + match e2e_insert(server_path).await { Ok(()) => { println!("✓ INSERT E2E tests passed\n"); } @@ -578,8 +737,8 @@ async fn e2e_all() -> Result<(), TesterError> { } // Run SELECT tests - println!("[3/8] Running SELECT E2E tests..."); - match e2e_select().await { + println!("[3/9] Running SELECT E2E tests..."); + match e2e_select(server_path).await { Ok(()) => { println!("✓ SELECT E2E tests passed\n"); } @@ -590,8 +749,8 @@ async fn e2e_all() -> Result<(), TesterError> { } // Run UPDATE tests - println!("[4/8] Running UPDATE E2E tests..."); - match e2e_update().await { + println!("[4/9] Running UPDATE E2E tests..."); + match e2e_update(server_path).await { Ok(()) => { println!("✓ UPDATE E2E tests passed\n"); } @@ -602,8 +761,8 @@ async fn e2e_all() -> Result<(), TesterError> { } // Run DELETE tests - println!("[5/8] Running DELETE E2E tests..."); - match e2e_delete().await { + println!("[5/9] Running DELETE E2E tests..."); + match e2e_delete(server_path).await { Ok(()) => { println!("✓ DELETE E2E tests passed\n"); } @@ -614,8 +773,8 @@ async fn e2e_all() -> Result<(), TesterError> { } // Run TRUNCATE TABLE tests - println!("[6/8] Running TRUNCATE TABLE E2E tests..."); - match e2e_truncate_table().await { + println!("[6/9] Running TRUNCATE TABLE E2E tests..."); + match e2e_truncate_table(server_path).await { Ok(()) => { println!("✓ TRUNCATE TABLE E2E tests passed\n"); } @@ -626,8 +785,8 @@ async fn e2e_all() -> Result<(), TesterError> { } // Run DROP TABLE tests - println!("[7/8] Running DROP TABLE E2E tests..."); - match e2e_drop_table().await { + println!("[7/9] Running DROP TABLE E2E tests..."); + match e2e_drop_table(server_path).await { Ok(()) => { println!("✓ DROP TABLE E2E tests passed\n"); } @@ -637,8 +796,8 @@ async fn e2e_all() -> Result<(), TesterError> { } } // Run ALTER TABLE tests - println!("[8/8] Running ALTER TABLE E2E tests..."); - match e2e_alter_table().await { + println!("[8/9] Running ALTER TABLE E2E tests..."); + match e2e_alter_table(server_path).await { Ok(()) => { println!("✓ ALTER TABLE E2E tests passed\n"); } @@ -647,6 +806,19 @@ async fn e2e_all() -> Result<(), TesterError> { return Err(e); } } + + // Run WAL RECOVERY tests + println!("[9/9] Running WAL RECOVERY E2E tests..."); + match e2e_wal_recovery(server_path).await { + Ok(()) => { + println!("✓ WAL RECOVERY E2E tests passed\n"); + } + Err(e) => { + println!("✗ WAL RECOVERY E2E tests failed: {:?}\n", e); + return Err(e); + } + } + println!("========================================"); println!("All E2E Tests Completed Successfully!"); println!("========================================"); @@ -711,40 +883,44 @@ async fn main() -> Result<(), TesterError> { report_stats("concurrent-reads-and-inserts", &test_results); Ok(()) } - Command::E2eSelect => { - e2e_select().await?; + Command::E2eSelect { server_path } => { + e2e_select(&server_path).await?; + Ok(()) + } + Command::E2eInsert { server_path } => { + e2e_insert(&server_path).await?; Ok(()) } - Command::E2eInsert => { - e2e_insert().await?; + Command::E2eUpdate { server_path } => { + e2e_update(&server_path).await?; Ok(()) } - Command::E2eUpdate => { - e2e_update().await?; + Command::E2eDelete { server_path } => { + e2e_delete(&server_path).await?; Ok(()) } - Command::E2eDelete => { - e2e_delete().await?; + Command::E2eCreateTable { server_path } => { + e2e_create_table(&server_path).await?; Ok(()) } - Command::E2eCreateTable => { - e2e_create_table().await?; + Command::E2eTruncateTable { server_path } => { + e2e_truncate_table(&server_path).await?; Ok(()) } - Command::E2eTruncateTable => { - e2e_truncate_table().await?; + Command::E2eDropTable { server_path } => { + e2e_drop_table(&server_path).await?; Ok(()) } - Command::E2eDropTable => { - e2e_drop_table().await?; + Command::E2eAlterTable { server_path } => { + e2e_alter_table(&server_path).await?; Ok(()) } - Command::E2eAlterTable => { - e2e_alter_table().await?; + Command::E2eWalRecovery { server_path } => { + e2e_wal_recovery(&server_path).await?; Ok(()) } - Command::E2eAll => { - e2e_all().await?; + Command::E2eAll { server_path } => { + e2e_all(&server_path).await?; Ok(()) } }