From f19381888aee382b58d0157be00fe1e830a6274c Mon Sep 17 00:00:00 2001 From: kTrzcinskii Date: Sat, 17 Jan 2026 12:20:35 +0100 Subject: [PATCH 1/6] start server automatically --- tester/README.md | 40 +++++++---- tester/src/main.rs | 172 ++++++++++++++++++++++++++++++++++++--------- 2 files changed, 168 insertions(+), 44 deletions(-) diff --git a/tester/README.md b/tester/README.md index 80ed74c..850e777 100644 --- a/tester/README.md +++ b/tester/README.md @@ -1,21 +1,37 @@ # Tester -This crate is a small test client used only for running end-to-end and performance tests against a running coDB server. +This crate is a test client used for running end-to-end and performance tests against a coDB server. -Prerequisites -- Ensure the coDB server is already running before using the tester (the tester connects to the server). +## Overview -Quick run -- Build & run the tester (example): +The tester automatically manages the database server lifecycle - it starts the server before tests and stops it after tests complete. + +## Prerequisites + +- Build the server binary first: + ```bash + cargo build --release --bin server + ``` + +## Quick Run + +All commands require the `--server-path` argument pointing to the server executable: ```bash -cargo run -p tester -- concurrent-reads-index --runs 1 --threads 8 --records 1000 --bound-size 10 +# Performance test example +cargo run -p tester -- --server-path ./target/release/server concurrent-reads-index --runs 1 --threads 8 --records 1000 --bound-size 10 + +# E2E test example +cargo run -p tester -- --server-path ./target/release/server e2e-select + +# Run all E2E tests +cargo run -p tester -- --server-path ./target/release/server e2e-all ``` -Performance tests -- Performance tests live in [tester/src/performance](tester/src/performance). +## Test Types + +### Performance Tests +Performance tests live in [tester/src/performance](tester/src/performance) and measure concurrency and throughput. -Adding a new performance test -- Add a new file in [tester/src/performance](tester/src/performance) implementing the `Suite` trait (see existing tests for examples). -- Export the module in [tester/src/performance/mod.rs](tester/src/performance/mod.rs) with `pub mod your_test;`. -- Wire a CLI subcommand and runner in [tester/src/main.rs](tester/src/main.rs) so you can execute the test from the command line. +### E2E Tests +E2E tests live in [tester/src/e2e](tester/src/e2e) and verify end-to-end functionality. diff --git a/tester/src/main.rs b/tester/src/main.rs index 73eeab1..f761904 100644 --- a/tester/src/main.rs +++ b/tester/src/main.rs @@ -1,4 +1,6 @@ use std::io; +use std::process::{Child, Command as StdCommand, Stdio}; +use std::thread; use std::time::Duration; use clap::{Parser, Subcommand}; @@ -28,6 +30,10 @@ mod suite; #[command(name = "tester")] #[command(about = "coDB tester client for e2e & performance tests", long_about = None)] struct Cli { + /// Path to the server executable + #[arg(long)] + server_path: String, + #[command(subcommand)] command: Command, } @@ -168,7 +174,54 @@ enum TesterError { ServerError { message: String }, } +/// Helper struct to manage the database server process +struct ServerProcess { + child: Child, +} + +impl ServerProcess { + /// Start the database server as a child process + fn start(server_path: &str) -> Result { + println!("Starting database server..."); + + // Start the server process + let child = StdCommand::new(server_path) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn()?; + + // Give the server some time to start up + thread::sleep(Duration::from_secs(1)); + + println!("Database server started (PID: {})", child.id()); + + Ok(ServerProcess { child }) + } + + /// Stop the database server gracefully + fn stop(mut self) -> Result<(), TesterError> { + println!("Stopping database server (PID: {})...", self.child.id()); + + // Wait for the process to finish + self.child.kill()?; + self.child.wait()?; + + println!("Database server stopped"); + + Ok(()) + } +} + +impl Drop for ServerProcess { + fn drop(&mut self) { + // Ensure the process is killed even if stop() wasn't called + let _ = self.child.kill(); + let _ = self.child.wait(); + } +} + async fn concurrent_inserts( + server_path: &str, runs: u32, threads: usize, records_per_thread: usize, @@ -194,13 +247,16 @@ async fn concurrent_inserts( }; for _ in 0..runs { + let server = ServerProcess::start(server_path)?; let result = ConcurrentInserts::run_suite(&setup, &test, &cleanup).await?; test_results.push(result); + server.stop()?; } Ok(test_results) } async fn concurrent_reads( + server_path: &str, runs: u32, threads: usize, records_to_insert: usize, @@ -226,13 +282,16 @@ async fn concurrent_reads( }; for _ in 0..runs { + let server = ServerProcess::start(server_path)?; let result = ReadMany::run_suite(&setup, &test, &cleanup).await?; test_results.push(result); + server.stop()?; } Ok(test_results) } async fn concurrent_reads_and_inserts( + server_path: &str, runs: u32, readers: usize, writers: usize, @@ -260,13 +319,16 @@ async fn concurrent_reads_and_inserts( }; for _ in 0..runs { + let server = ServerProcess::start(server_path)?; let result = ConcurrentReadsAndInserts::run_suite(&setup, &test, &cleanup).await?; test_results.push(result); + server.stop()?; } Ok(test_results) } async fn concurrent_reads_index( + server_path: &str, runs: u32, threads: usize, records_to_insert: usize, @@ -294,13 +356,16 @@ async fn concurrent_reads_index( }; for _ in 0..runs { + let server = ServerProcess::start(server_path)?; let result = ReadByIndex::run_suite(&setup, &test, &cleanup).await?; test_results.push(result); + server.stop()?; } Ok(test_results) } async fn concurrent_reads_non_index( + server_path: &str, runs: u32, threads: usize, records_to_insert: usize, @@ -328,13 +393,17 @@ async fn concurrent_reads_non_index( }; for _ in 0..runs { + let server = ServerProcess::start(server_path)?; let result = ReadByNonIndex::run_suite(&setup, &test, &cleanup).await?; test_results.push(result); + server.stop()?; } Ok(test_results) } -async fn e2e_select() -> Result<(), TesterError> { +async fn e2e_select(server_path: &str) -> Result<(), TesterError> { + let server = ServerProcess::start(server_path)?; + let db_name = "E2E_SELECT_TEST".to_string(); let table_name = "TEST_TABLE".to_string(); @@ -364,10 +433,14 @@ async fn e2e_select() -> Result<(), TesterError> { println!("E2E SELECT test completed successfully!"); println!("Tests passed: {}", result.tests_passed); + server.stop()?; + Ok(()) } -async fn e2e_insert() -> Result<(), TesterError> { +async fn e2e_insert(server_path: &str) -> Result<(), TesterError> { + let server = ServerProcess::start(server_path)?; + let db_name = "E2E_INSERT_TEST".to_string(); let table_name = "INSERT_TEST_TABLE".to_string(); @@ -390,10 +463,14 @@ async fn e2e_insert() -> Result<(), TesterError> { println!("E2E INSERT test completed successfully!"); println!("Tests passed: {}", result.tests_passed); + server.stop()?; + Ok(()) } -async fn e2e_update() -> Result<(), TesterError> { +async fn e2e_update(server_path: &str) -> Result<(), TesterError> { + let server = ServerProcess::start(server_path)?; + let db_name = "UPDATE_E2E_DB".to_string(); let table_name = "UPDATE_E2E_TABLE".to_string(); @@ -422,10 +499,14 @@ async fn e2e_update() -> Result<(), TesterError> { println!("E2E UPDATE test completed successfully!"); println!("Tests passed: {}", result.tests_passed); + server.stop()?; + Ok(()) } -async fn e2e_delete() -> Result<(), TesterError> { +async fn e2e_delete(server_path: &str) -> Result<(), TesterError> { + let server = ServerProcess::start(server_path)?; + let db_name = "DELETE_E2E_DB".to_string(); let table_name = "DELETE_E2E_TABLE".to_string(); const NUM_RECORDS: usize = 5000; @@ -453,10 +534,14 @@ async fn e2e_delete() -> Result<(), TesterError> { println!("E2E DELETE test completed successfully!"); println!("Tests passed: {}", result.tests_passed); + server.stop()?; + Ok(()) } -async fn e2e_create_table() -> Result<(), TesterError> { +async fn e2e_create_table(server_path: &str) -> Result<(), TesterError> { + let server = ServerProcess::start(server_path)?; + let db_name = "CREATE_TABLE_E2E_DB".to_string(); let setup = create_table::Setup { @@ -476,10 +561,14 @@ async fn e2e_create_table() -> Result<(), TesterError> { println!("E2E CREATE TABLE test completed successfully!"); println!("Tests passed: {}", result.tests_passed); + server.stop()?; + Ok(()) } -async fn e2e_truncate_table() -> Result<(), TesterError> { +async fn e2e_truncate_table(server_path: &str) -> Result<(), TesterError> { + let server = ServerProcess::start(server_path)?; + let db_name = "TRUNCATE_TABLE_E2E_DB".to_string(); let setup = truncate_table::Setup { @@ -499,10 +588,14 @@ async fn e2e_truncate_table() -> Result<(), TesterError> { println!("E2E TRUNCATE TABLE test completed successfully!"); println!("Tests passed: {}", result.tests_passed); + server.stop()?; + Ok(()) } -async fn e2e_drop_table() -> Result<(), TesterError> { +async fn e2e_drop_table(server_path: &str) -> Result<(), TesterError> { + let server = ServerProcess::start(server_path)?; + let db_name = "DROP_TABLE_E2E_DB".to_string(); let setup = drop_table::Setup { @@ -522,10 +615,14 @@ async fn e2e_drop_table() -> Result<(), TesterError> { println!("E2E DROP TABLE test completed successfully!"); println!("Tests passed: {}", result.tests_passed); + server.stop()?; + Ok(()) } -async fn e2e_alter_table() -> Result<(), TesterError> { +async fn e2e_alter_table(server_path: &str) -> Result<(), TesterError> { + let server = ServerProcess::start(server_path)?; + let db_name = "ALTER_TABLE_E2E_DB".to_string(); let setup = alter_table::Setup { @@ -545,17 +642,19 @@ async fn e2e_alter_table() -> Result<(), TesterError> { println!("E2E ALTER TABLE test completed successfully!"); println!("Tests passed: {}", result.tests_passed); + server.stop()?; + Ok(()) } -async fn e2e_all() -> Result<(), TesterError> { +async fn e2e_all(server_path: &str) -> Result<(), TesterError> { println!("\n========================================"); println!("Running ALL E2E Tests"); println!("========================================\n"); // Run CREATE TABLE tests println!("[1/8] Running CREATE TABLE E2E tests..."); - match e2e_create_table().await { + match e2e_create_table(server_path).await { Ok(()) => { println!("✓ CREATE TABLE E2E tests passed\n"); } @@ -567,7 +666,7 @@ async fn e2e_all() -> Result<(), TesterError> { // Run INSERT tests println!("[2/8] Running INSERT E2E tests..."); - match e2e_insert().await { + match e2e_insert(server_path).await { Ok(()) => { println!("✓ INSERT E2E tests passed\n"); } @@ -579,7 +678,7 @@ async fn e2e_all() -> Result<(), TesterError> { // Run SELECT tests println!("[3/8] Running SELECT E2E tests..."); - match e2e_select().await { + match e2e_select(server_path).await { Ok(()) => { println!("✓ SELECT E2E tests passed\n"); } @@ -591,7 +690,7 @@ async fn e2e_all() -> Result<(), TesterError> { // Run UPDATE tests println!("[4/8] Running UPDATE E2E tests..."); - match e2e_update().await { + match e2e_update(server_path).await { Ok(()) => { println!("✓ UPDATE E2E tests passed\n"); } @@ -603,7 +702,7 @@ async fn e2e_all() -> Result<(), TesterError> { // Run DELETE tests println!("[5/8] Running DELETE E2E tests..."); - match e2e_delete().await { + match e2e_delete(server_path).await { Ok(()) => { println!("✓ DELETE E2E tests passed\n"); } @@ -615,7 +714,7 @@ async fn e2e_all() -> Result<(), TesterError> { // Run TRUNCATE TABLE tests println!("[6/8] Running TRUNCATE TABLE E2E tests..."); - match e2e_truncate_table().await { + match e2e_truncate_table(server_path).await { Ok(()) => { println!("✓ TRUNCATE TABLE E2E tests passed\n"); } @@ -627,7 +726,7 @@ async fn e2e_all() -> Result<(), TesterError> { // Run DROP TABLE tests println!("[7/8] Running DROP TABLE E2E tests..."); - match e2e_drop_table().await { + match e2e_drop_table(server_path).await { Ok(()) => { println!("✓ DROP TABLE E2E tests passed\n"); } @@ -638,7 +737,7 @@ async fn e2e_all() -> Result<(), TesterError> { } // Run ALTER TABLE tests println!("[8/8] Running ALTER TABLE E2E tests..."); - match e2e_alter_table().await { + match e2e_alter_table(server_path).await { Ok(()) => { println!("✓ ALTER TABLE E2E tests passed\n"); } @@ -666,7 +765,7 @@ async fn main() -> Result<(), TesterError> { threads, records, } => { - let test_results = concurrent_inserts(runs, threads, records).await?; + let test_results = concurrent_inserts(&cli.server_path, runs, threads, records).await?; report_stats("concurrent-inserts", &test_results); Ok(()) } @@ -675,7 +774,7 @@ async fn main() -> Result<(), TesterError> { threads, records, } => { - let test_results = concurrent_reads(runs, threads, records).await?; + let test_results = concurrent_reads(&cli.server_path, runs, threads, records).await?; report_stats("concurrent-reads", &test_results); Ok(()) } @@ -685,7 +784,9 @@ async fn main() -> Result<(), TesterError> { records, bound_size, } => { - let test_results = concurrent_reads_index(runs, threads, records, bound_size).await?; + let test_results = + concurrent_reads_index(&cli.server_path, runs, threads, records, bound_size) + .await?; report_stats("concurrent-reads-index", &test_results); Ok(()) } @@ -696,7 +797,8 @@ async fn main() -> Result<(), TesterError> { bound_size, } => { let test_results = - concurrent_reads_non_index(runs, threads, records, bound_size).await?; + concurrent_reads_non_index(&cli.server_path, runs, threads, records, bound_size) + .await?; report_stats("concurrent-reads-non-index", &test_results); Ok(()) } @@ -706,45 +808,51 @@ async fn main() -> Result<(), TesterError> { writers, records_per_writer, } => { - let test_results = - concurrent_reads_and_inserts(runs, readers, writers, records_per_writer).await?; + let test_results = concurrent_reads_and_inserts( + &cli.server_path, + runs, + readers, + writers, + records_per_writer, + ) + .await?; report_stats("concurrent-reads-and-inserts", &test_results); Ok(()) } Command::E2eSelect => { - e2e_select().await?; + e2e_select(&cli.server_path).await?; Ok(()) } Command::E2eInsert => { - e2e_insert().await?; + e2e_insert(&cli.server_path).await?; Ok(()) } Command::E2eUpdate => { - e2e_update().await?; + e2e_update(&cli.server_path).await?; Ok(()) } Command::E2eDelete => { - e2e_delete().await?; + e2e_delete(&cli.server_path).await?; Ok(()) } Command::E2eCreateTable => { - e2e_create_table().await?; + e2e_create_table(&cli.server_path).await?; Ok(()) } Command::E2eTruncateTable => { - e2e_truncate_table().await?; + e2e_truncate_table(&cli.server_path).await?; Ok(()) } Command::E2eDropTable => { - e2e_drop_table().await?; + e2e_drop_table(&cli.server_path).await?; Ok(()) } Command::E2eAlterTable => { - e2e_alter_table().await?; + e2e_alter_table(&cli.server_path).await?; Ok(()) } Command::E2eAll => { - e2e_all().await?; + e2e_all(&cli.server_path).await?; Ok(()) } } From e5c1750d8a1948cb86d97c80095e5cad57e4fe7a Mon Sep 17 00:00:00 2001 From: kTrzcinskii Date: Sat, 17 Jan 2026 13:12:35 +0100 Subject: [PATCH 2/6] wal recovery test --- tester/src/e2e/mod.rs | 1 + tester/src/e2e/wal_recovery.rs | 341 +++++++++++++++++++++++++++++++++ tester/src/main.rs | 106 +++++++--- 3 files changed, 425 insertions(+), 23 deletions(-) create mode 100644 tester/src/e2e/wal_recovery.rs diff --git a/tester/src/e2e/mod.rs b/tester/src/e2e/mod.rs index 56c5140..8c53694 100644 --- a/tester/src/e2e/mod.rs +++ b/tester/src/e2e/mod.rs @@ -7,3 +7,4 @@ pub mod response_helpers; pub mod select; pub mod truncate_table; pub mod update; +pub mod wal_recovery; diff --git a/tester/src/e2e/wal_recovery.rs b/tester/src/e2e/wal_recovery.rs new file mode 100644 index 0000000..58859ca --- /dev/null +++ b/tester/src/e2e/wal_recovery.rs @@ -0,0 +1,341 @@ +use std::time::Duration; +use std::{cell::RefCell, thread}; + +use log::{error, info}; +use protocol::{ColumnType, Request, StatementType}; + +use crate::{ + ServerProcess, TesterError, + suite::{E2ETestResult, Suite, default_client}, +}; + +use super::response_helpers::{ + extract_bool, extract_f32, extract_f64, extract_i32, extract_i64, extract_string, + validate_field_count, validate_non_select_statement, validate_select_query, +}; + +/// Test record structure for WAL recovery tests +#[derive(Debug, Clone)] +pub struct WalTestRecord { + pub id: i32, + pub big_id: i64, + pub price: f32, + pub precise_price: f64, + pub active: bool, + pub birth_date: String, + pub last_login: String, + pub name: String, +} + +impl WalTestRecord { + /// Generate test records + pub fn generate(num_records: usize) -> Vec { + (0..num_records) + .map(|i| WalTestRecord { + id: i as i32, + big_id: (i as i64) * 1000, + price: (i as f32) * 1.5, + precise_price: (i as f64) * 2.75, + active: i % 2 == 0, + birth_date: format!("2024-01-{:02}", (i % 28) + 1), + last_login: format!("2024-01-{:02}T12:00:00", (i % 28) + 1), + name: format!("User_{}", i), + }) + .collect() + } + + /// Validate that a protocol Record matches this test record + pub fn validate_record(&self, record: &protocol::Record) -> Result<(), TesterError> { + validate_field_count(record, 8)?; + + let id = extract_i32(record, 0)?; + let big_id = extract_i64(record, 1)?; + let price = extract_f32(record, 2)?; + let precise_price = extract_f64(record, 3)?; + let active = extract_bool(record, 4)?; + let name = extract_string(record, 7)?; + + if id != self.id { + return Err(TesterError::ServerError { + message: format!("ID mismatch: expected {}, got {}", self.id, id), + }); + } + if big_id != self.big_id { + return Err(TesterError::ServerError { + message: format!("big_id mismatch: expected {}, got {}", self.big_id, big_id), + }); + } + if (price - self.price).abs() > 0.01 { + return Err(TesterError::ServerError { + message: format!("price mismatch: expected {}, got {}", self.price, price), + }); + } + if (precise_price - self.precise_price).abs() > 0.001 { + return Err(TesterError::ServerError { + message: format!( + "precise_price mismatch: expected {}, got {}", + self.precise_price, precise_price + ), + }); + } + if active != self.active { + return Err(TesterError::ServerError { + message: format!("active mismatch: expected {}, got {}", self.active, active), + }); + } + if name != self.name { + return Err(TesterError::ServerError { + message: format!("name mismatch: expected '{}', got '{}'", self.name, name), + }); + } + + Ok(()) + } +} + +pub struct WalRecoveryE2ETest; + +pub struct Setup { + pub database_name: String, + pub table_name: String, +} + +pub struct Test { + pub database_name: String, + pub table_name: String, + pub test_data: Vec, + pub server_path: String, + pub server: ServerProcess, +} + +pub struct Cleanup { + pub database_name: String, +} + +impl Suite for WalRecoveryE2ETest { + type SetupArgs = Setup; + + async fn setup(args: &Self::SetupArgs) -> Result<(), TesterError> { + info!("Creating database '{}'...", args.database_name); + let mut client = default_client().await?; + + client + .execute_and_wait(Request::CreateDatabase { + database_name: args.database_name.clone(), + }) + .await?; + + info!("✓ Database created"); + + // Create table with all data types + let create_table_sql = format!( + "CREATE TABLE {} (\ + id INT32 PRIMARY_KEY, \ + big_id INT64, \ + price FLOAT32, \ + precise_price FLOAT64, \ + active BOOL, \ + birth_date DATE, \ + last_login DATETIME, \ + name STRING\ + );", + args.table_name + ); + + info!("Creating table..."); + client + .send_request(&Request::Query { + database_name: Some(args.database_name.clone()), + sql: create_table_sql, + }) + .await?; + + validate_non_select_statement(&mut client, 0, StatementType::CreateTable).await?; + info!("✓ Table created"); + + Ok(()) + } + + type TestArgs = RefCell; + + async fn run(args: &Self::TestArgs) -> Result { + let mut tests_passed = 0; + + info!("\n=== Test 1: WAL recovery after SIGKILL ==="); + + // Clone values to avoid holding RefCell borrow across await + let (database_name, table_name, test_data) = { + let borrowed = args.borrow(); + ( + borrowed.database_name.clone(), + borrowed.table_name.clone(), + borrowed.test_data.clone(), + ) + }; + + // Insert records + if let Err(e) = insert_records(&database_name, &table_name, &test_data).await { + error!("Insert failed: {:?}", e); + return Err(e); + } + info!("✓ All {} records inserted", args.borrow().test_data.len()); + + // Wait for WAL flush + info!("Waiting 500ms for WAL flush..."); + thread::sleep(Duration::from_millis(500)); + info!("✓ WAL should be flushed"); + + // Kill the current server + args.borrow_mut().server.stop()?; + info!("✓ Server killed"); + + // Restart server + info!("Restarting server for WAL recovery..."); + let mut server = ServerProcess::start(&args.borrow().server_path)?; + + // Wait for WAL recovery to complete + info!("Waiting 3 seconds for WAL recovery..."); + thread::sleep(Duration::from_secs(3)); + info!("✓ WAL recovery period complete"); + + // Verify all records are present after recovery + if let Err(e) = verify_records_after_recovery(&database_name, &table_name, &test_data).await + { + error!("Verification failed: {:?}", e); + return Err(e); + } + info!( + "✓ All {} records verified after recovery", + args.borrow().test_data.len() + ); + tests_passed += 1; + + server.stop()?; + + Ok(E2ETestResult { tests_passed }) + } + + type CleanupArgs = Cleanup; + + async fn cleanup(args: &Self::CleanupArgs) -> Result<(), TesterError> { + info!("Deleting database '{}'...", args.database_name); + let mut client = default_client().await?; + + client + .execute_and_wait(Request::DeleteDatabase { + database_name: args.database_name.clone(), + }) + .await?; + + info!("✓ Database deleted"); + + Ok(()) + } +} + +/// Test WAL recovery: Insert records +async fn insert_records( + db_name: &str, + table_name: &str, + test_data: &[WalTestRecord], +) -> Result<(), TesterError> { + let mut client = default_client().await?; + + info!("Inserting {} records...", test_data.len()); + + // Insert all records + for record in test_data { + let insert_sql = format!( + "INSERT INTO {} (id, big_id, price, precise_price, active, birth_date, last_login, name) \ + VALUES ({}, {}, {:.1}, {:.2}, {}, '{}', '{}', '{}');", + table_name, + record.id, + record.big_id, + record.price, + record.precise_price, + record.active, + record.birth_date, + record.last_login, + record.name + ); + + client + .send_request(&Request::Query { + database_name: Some(db_name.to_string()), + sql: insert_sql, + }) + .await?; + + validate_non_select_statement(&mut client, 1, StatementType::Insert).await?; + } + + info!("✓ All {} records inserted", test_data.len()); + + Ok(()) +} + +/// Verify all records after server restart +pub async fn verify_records_after_recovery( + database_name: &str, + table_name: &str, + expected_records: &[WalTestRecord], +) -> Result<(), TesterError> { + info!( + "Verifying {} records after recovery...", + expected_records.len() + ); + + let mut client = default_client().await?; + + // Select all records ordered by id + let sql = format!("SELECT * FROM {} ORDER BY id;", table_name); + client + .send_request(&Request::Query { + database_name: Some(database_name.to_string()), + sql, + }) + .await?; + + let expected_columns = vec![ + ("id", ColumnType::I32), + ("big_id", ColumnType::I64), + ("price", ColumnType::F32), + ("precise_price", ColumnType::F64), + ("active", ColumnType::Bool), + ("birth_date", ColumnType::Date), + ("last_login", ColumnType::DateTime), + ("name", ColumnType::String), + ]; + + let records = validate_select_query(&mut client, &expected_columns).await?; + + // Verify count + if records.len() != expected_records.len() { + error!( + "Record count mismatch: expected {}, got {}", + expected_records.len(), + records.len() + ); + return Err(TesterError::ServerError { + message: format!( + "Record count mismatch after recovery: expected {}, got {}", + expected_records.len(), + records.len() + ), + }); + } + + info!("✓ Record count matches: {}", records.len()); + + // Verify each record + for (i, (expected, actual)) in expected_records.iter().zip(records.iter()).enumerate() { + if let Err(e) = expected.validate_record(actual) { + error!("Record {} validation failed: {:?}", i, e); + return Err(e); + } + } + + info!("✓ All {} records verified successfully", records.len()); + + Ok(()) +} diff --git a/tester/src/main.rs b/tester/src/main.rs index f761904..97d10ee 100644 --- a/tester/src/main.rs +++ b/tester/src/main.rs @@ -1,3 +1,4 @@ +use std::cell::RefCell; use std::io; use std::process::{Child, Command as StdCommand, Stdio}; use std::thread; @@ -14,6 +15,7 @@ use crate::e2e::insert::{self, InsertE2ETest}; use crate::e2e::select::{self, SelectE2ETest}; use crate::e2e::truncate_table::{self, TruncateTableE2ETest}; use crate::e2e::update::{self, UpdateE2ETest}; +use crate::e2e::wal_recovery::{self, WalRecoveryE2ETest}; use crate::performance::concurrent_inserts::{self, ConcurrentInserts}; use crate::performance::concurrent_reads::{self, ReadMany}; use crate::performance::concurrent_reads_and_inserts::{self, ConcurrentReadsAndInserts}; @@ -152,7 +154,10 @@ enum Command { /// E2E test for ALTER TABLE statements with comprehensive validation E2eAlterTable, - /// Run all E2E tests (SELECT, INSERT, UPDATE, DELETE, CREATE TABLE, TRUNCATE TABLE, DROP TABLE, and ALTER TABLE) + /// E2E test for WAL recovery after SIGKILL + E2eWalRecovery, + + /// Run all E2E tests (SELECT, INSERT, UPDATE, DELETE, CREATE TABLE, TRUNCATE TABLE, DROP TABLE, ALTER TABLE, and WAL RECOVERY) E2eAll, } @@ -199,7 +204,7 @@ impl ServerProcess { } /// Stop the database server gracefully - fn stop(mut self) -> Result<(), TesterError> { + fn stop(&mut self) -> Result<(), TesterError> { println!("Stopping database server (PID: {})...", self.child.id()); // Wait for the process to finish @@ -247,7 +252,7 @@ async fn concurrent_inserts( }; for _ in 0..runs { - let server = ServerProcess::start(server_path)?; + let mut server = ServerProcess::start(server_path)?; let result = ConcurrentInserts::run_suite(&setup, &test, &cleanup).await?; test_results.push(result); server.stop()?; @@ -282,7 +287,7 @@ async fn concurrent_reads( }; for _ in 0..runs { - let server = ServerProcess::start(server_path)?; + let mut server = ServerProcess::start(server_path)?; let result = ReadMany::run_suite(&setup, &test, &cleanup).await?; test_results.push(result); server.stop()?; @@ -319,7 +324,7 @@ async fn concurrent_reads_and_inserts( }; for _ in 0..runs { - let server = ServerProcess::start(server_path)?; + let mut server = ServerProcess::start(server_path)?; let result = ConcurrentReadsAndInserts::run_suite(&setup, &test, &cleanup).await?; test_results.push(result); server.stop()?; @@ -356,7 +361,7 @@ async fn concurrent_reads_index( }; for _ in 0..runs { - let server = ServerProcess::start(server_path)?; + let mut server = ServerProcess::start(server_path)?; let result = ReadByIndex::run_suite(&setup, &test, &cleanup).await?; test_results.push(result); server.stop()?; @@ -393,7 +398,7 @@ async fn concurrent_reads_non_index( }; for _ in 0..runs { - let server = ServerProcess::start(server_path)?; + let mut server = ServerProcess::start(server_path)?; let result = ReadByNonIndex::run_suite(&setup, &test, &cleanup).await?; test_results.push(result); server.stop()?; @@ -402,7 +407,7 @@ async fn concurrent_reads_non_index( } async fn e2e_select(server_path: &str) -> Result<(), TesterError> { - let server = ServerProcess::start(server_path)?; + let mut server = ServerProcess::start(server_path)?; let db_name = "E2E_SELECT_TEST".to_string(); let table_name = "TEST_TABLE".to_string(); @@ -439,7 +444,7 @@ async fn e2e_select(server_path: &str) -> Result<(), TesterError> { } async fn e2e_insert(server_path: &str) -> Result<(), TesterError> { - let server = ServerProcess::start(server_path)?; + let mut server = ServerProcess::start(server_path)?; let db_name = "E2E_INSERT_TEST".to_string(); let table_name = "INSERT_TEST_TABLE".to_string(); @@ -469,7 +474,7 @@ async fn e2e_insert(server_path: &str) -> Result<(), TesterError> { } async fn e2e_update(server_path: &str) -> Result<(), TesterError> { - let server = ServerProcess::start(server_path)?; + let mut server = ServerProcess::start(server_path)?; let db_name = "UPDATE_E2E_DB".to_string(); let table_name = "UPDATE_E2E_TABLE".to_string(); @@ -505,7 +510,7 @@ async fn e2e_update(server_path: &str) -> Result<(), TesterError> { } async fn e2e_delete(server_path: &str) -> Result<(), TesterError> { - let server = ServerProcess::start(server_path)?; + let mut server = ServerProcess::start(server_path)?; let db_name = "DELETE_E2E_DB".to_string(); let table_name = "DELETE_E2E_TABLE".to_string(); @@ -540,7 +545,7 @@ async fn e2e_delete(server_path: &str) -> Result<(), TesterError> { } async fn e2e_create_table(server_path: &str) -> Result<(), TesterError> { - let server = ServerProcess::start(server_path)?; + let mut server = ServerProcess::start(server_path)?; let db_name = "CREATE_TABLE_E2E_DB".to_string(); @@ -567,7 +572,7 @@ async fn e2e_create_table(server_path: &str) -> Result<(), TesterError> { } async fn e2e_truncate_table(server_path: &str) -> Result<(), TesterError> { - let server = ServerProcess::start(server_path)?; + let mut server = ServerProcess::start(server_path)?; let db_name = "TRUNCATE_TABLE_E2E_DB".to_string(); @@ -594,7 +599,7 @@ async fn e2e_truncate_table(server_path: &str) -> Result<(), TesterError> { } async fn e2e_drop_table(server_path: &str) -> Result<(), TesterError> { - let server = ServerProcess::start(server_path)?; + let mut server = ServerProcess::start(server_path)?; let db_name = "DROP_TABLE_E2E_DB".to_string(); @@ -621,7 +626,7 @@ async fn e2e_drop_table(server_path: &str) -> Result<(), TesterError> { } async fn e2e_alter_table(server_path: &str) -> Result<(), TesterError> { - let server = ServerProcess::start(server_path)?; + let mut server = ServerProcess::start(server_path)?; let db_name = "ALTER_TABLE_E2E_DB".to_string(); @@ -647,13 +652,51 @@ async fn e2e_alter_table(server_path: &str) -> Result<(), TesterError> { Ok(()) } +async fn e2e_wal_recovery(server_path: &str) -> Result<(), TesterError> { + let server = ServerProcess::start(server_path)?; + + let db_name = "WAL_RECOVERY_E2E_DB".to_string(); + let table_name = "WAL_TEST_TABLE".to_string(); + + const NUM_RECORDS: usize = 10000; + + let setup = wal_recovery::Setup { + database_name: db_name.clone(), + table_name: table_name.clone(), + }; + + // Generate test data + let test_data = wal_recovery::WalTestRecord::generate(NUM_RECORDS); + + let test = wal_recovery::Test { + database_name: db_name.clone(), + table_name: table_name.clone(), + test_data: test_data.clone(), + server_path: server_path.to_string(), + server, + }; + + let cleanup = wal_recovery::Cleanup { + database_name: db_name.clone(), + }; + + let result = WalRecoveryE2ETest::run_suite(&setup, &RefCell::new(test), &cleanup).await?; + + println!("E2E WAL RECOVERY test completed successfully!"); + println!("Tests passed: {}", result.tests_passed); + + // Server is stopped inside wal recovery test, no need to stop it here. + + Ok(()) +} + async fn e2e_all(server_path: &str) -> Result<(), TesterError> { println!("\n========================================"); println!("Running ALL E2E Tests"); println!("========================================\n"); // Run CREATE TABLE tests - println!("[1/8] Running CREATE TABLE E2E tests..."); + println!("[1/9] Running CREATE TABLE E2E tests..."); match e2e_create_table(server_path).await { Ok(()) => { println!("✓ CREATE TABLE E2E tests passed\n"); @@ -665,7 +708,7 @@ async fn e2e_all(server_path: &str) -> Result<(), TesterError> { } // Run INSERT tests - println!("[2/8] Running INSERT E2E tests..."); + println!("[2/9] Running INSERT E2E tests..."); match e2e_insert(server_path).await { Ok(()) => { println!("✓ INSERT E2E tests passed\n"); @@ -677,7 +720,7 @@ async fn e2e_all(server_path: &str) -> Result<(), TesterError> { } // Run SELECT tests - println!("[3/8] Running SELECT E2E tests..."); + println!("[3/9] Running SELECT E2E tests..."); match e2e_select(server_path).await { Ok(()) => { println!("✓ SELECT E2E tests passed\n"); @@ -689,7 +732,7 @@ async fn e2e_all(server_path: &str) -> Result<(), TesterError> { } // Run UPDATE tests - println!("[4/8] Running UPDATE E2E tests..."); + println!("[4/9] Running UPDATE E2E tests..."); match e2e_update(server_path).await { Ok(()) => { println!("✓ UPDATE E2E tests passed\n"); @@ -701,7 +744,7 @@ async fn e2e_all(server_path: &str) -> Result<(), TesterError> { } // Run DELETE tests - println!("[5/8] Running DELETE E2E tests..."); + println!("[5/9] Running DELETE E2E tests..."); match e2e_delete(server_path).await { Ok(()) => { println!("✓ DELETE E2E tests passed\n"); @@ -713,7 +756,7 @@ async fn e2e_all(server_path: &str) -> Result<(), TesterError> { } // Run TRUNCATE TABLE tests - println!("[6/8] Running TRUNCATE TABLE E2E tests..."); + println!("[6/9] Running TRUNCATE TABLE E2E tests..."); match e2e_truncate_table(server_path).await { Ok(()) => { println!("✓ TRUNCATE TABLE E2E tests passed\n"); @@ -725,7 +768,7 @@ async fn e2e_all(server_path: &str) -> Result<(), TesterError> { } // Run DROP TABLE tests - println!("[7/8] Running DROP TABLE E2E tests..."); + println!("[7/9] Running DROP TABLE E2E tests..."); match e2e_drop_table(server_path).await { Ok(()) => { println!("✓ DROP TABLE E2E tests passed\n"); @@ -736,7 +779,7 @@ async fn e2e_all(server_path: &str) -> Result<(), TesterError> { } } // Run ALTER TABLE tests - println!("[8/8] Running ALTER TABLE E2E tests..."); + println!("[8/9] Running ALTER TABLE E2E tests..."); match e2e_alter_table(server_path).await { Ok(()) => { println!("✓ ALTER TABLE E2E tests passed\n"); @@ -746,6 +789,19 @@ async fn e2e_all(server_path: &str) -> Result<(), TesterError> { return Err(e); } } + + // Run WAL RECOVERY tests + println!("[9/9] Running WAL RECOVERY E2E tests..."); + match e2e_wal_recovery(server_path).await { + Ok(()) => { + println!("✓ WAL RECOVERY E2E tests passed\n"); + } + Err(e) => { + println!("✗ WAL RECOVERY E2E tests failed: {:?}\n", e); + return Err(e); + } + } + println!("========================================"); println!("All E2E Tests Completed Successfully!"); println!("========================================"); @@ -851,6 +907,10 @@ async fn main() -> Result<(), TesterError> { e2e_alter_table(&cli.server_path).await?; Ok(()) } + Command::E2eWalRecovery => { + e2e_wal_recovery(&cli.server_path).await?; + Ok(()) + } Command::E2eAll => { e2e_all(&cli.server_path).await?; Ok(()) From 5f46ef613b4d999c9d2550dadf73a9fda7142f1d Mon Sep 17 00:00:00 2001 From: kTrzcinskii Date: Sat, 17 Jan 2026 13:24:44 +0100 Subject: [PATCH 3/6] fix cleanup in wal test --- tester/src/e2e/wal_recovery.rs | 24 +++++++++++------------- tester/src/main.rs | 6 +----- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/tester/src/e2e/wal_recovery.rs b/tester/src/e2e/wal_recovery.rs index 58859ca..47522ce 100644 --- a/tester/src/e2e/wal_recovery.rs +++ b/tester/src/e2e/wal_recovery.rs @@ -108,10 +108,6 @@ pub struct Test { pub server: ServerProcess, } -pub struct Cleanup { - pub database_name: String, -} - impl Suite for WalRecoveryE2ETest { type SetupArgs = Setup; @@ -210,25 +206,27 @@ impl Suite for WalRecoveryE2ETest { ); tests_passed += 1; - server.stop()?; - - Ok(E2ETestResult { tests_passed }) - } - - type CleanupArgs = Cleanup; + // We need to do cleanup here because we can't pass it to cleanup function - async fn cleanup(args: &Self::CleanupArgs) -> Result<(), TesterError> { - info!("Deleting database '{}'...", args.database_name); + info!("Deleting database '{}'...", &database_name); let mut client = default_client().await?; client .execute_and_wait(Request::DeleteDatabase { - database_name: args.database_name.clone(), + database_name: database_name.clone(), }) .await?; info!("✓ Database deleted"); + server.stop()?; + + Ok(E2ETestResult { tests_passed }) + } + + type CleanupArgs = (); + + async fn cleanup(_: &Self::CleanupArgs) -> Result<(), TesterError> { Ok(()) } } diff --git a/tester/src/main.rs b/tester/src/main.rs index 97d10ee..71b3c3c 100644 --- a/tester/src/main.rs +++ b/tester/src/main.rs @@ -676,11 +676,7 @@ async fn e2e_wal_recovery(server_path: &str) -> Result<(), TesterError> { server, }; - let cleanup = wal_recovery::Cleanup { - database_name: db_name.clone(), - }; - - let result = WalRecoveryE2ETest::run_suite(&setup, &RefCell::new(test), &cleanup).await?; + let result = WalRecoveryE2ETest::run_suite(&setup, &RefCell::new(test), &()).await?; println!("E2E WAL RECOVERY test completed successfully!"); println!("Tests passed: {}", result.tests_passed); From 09b7dfc1ea91459c299c92ee2d48db450b7b0740 Mon Sep 17 00:00:00 2001 From: kTrzcinskii Date: Sat, 17 Jan 2026 13:35:14 +0100 Subject: [PATCH 4/6] fix marking diff in cache --- storage/src/cache.rs | 2 +- storage/src/write_ahead_log.rs | 16 +++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/storage/src/cache.rs b/storage/src/cache.rs index 7ef37b9..cde546e 100644 --- a/storage/src/cache.rs +++ b/storage/src/cache.rs @@ -480,7 +480,7 @@ impl Cache { // - allocate page // the newly allocated page will discard previous changes applied from wal. let mut pinned_write_page = self.pin_write(&id)?; - pinned_write_page.mark_diff(0, PAGE_SIZE as _); + pinned_write_page.mark_diff(0, USABLE_PAGE_SIZE as _); Ok((pinned_write_page, page_id)) } diff --git a/storage/src/write_ahead_log.rs b/storage/src/write_ahead_log.rs index fc99e4c..9b45a5f 100644 --- a/storage/src/write_ahead_log.rs +++ b/storage/src/write_ahead_log.rs @@ -391,6 +391,8 @@ pub(crate) struct SinglePageOperation { } impl SinglePageOperation { + // Used in tests + #[allow(dead_code)] pub(crate) fn new(file_page_ref: FilePageRef, diff: PageDiff) -> Self { Self { file_page_ref, @@ -930,7 +932,7 @@ mod tests { let dir = tempdir().unwrap(); let log_path = dir.path(); let (handle, mut bg_handle) = - spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + spawn_wal(log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); assert_eq!(handle.wal_client.flushed_lsn(), 0); @@ -954,7 +956,7 @@ mod tests { let dir = tempdir().unwrap(); let log_path = dir.path(); let (handle, mut bg_handle) = - spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + spawn_wal(log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); handle .wal_client @@ -1008,7 +1010,7 @@ mod tests { // First session: write records and checkpoint { let (handle, mut bg_handle) = - spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + spawn_wal(log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); handle .wal_client .write_single(make_test_file_page_ref(1), PageDiff::default()); @@ -1030,7 +1032,7 @@ mod tests { // Second session: recover { let (handle, mut bg_handle) = - spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + spawn_wal(log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); // Should only have records after checkpoint (LSN 4 and 5) assert_eq!(handle.redo_records.len(), 2); @@ -1050,7 +1052,7 @@ mod tests { // First session { let (handle, mut bg_handle) = - spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + spawn_wal(log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); handle .wal_client .write_single(make_test_file_page_ref(1), PageDiff::default()); @@ -1068,7 +1070,7 @@ mod tests { // Second session { let (handle, mut bg_handle) = - spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + spawn_wal(log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); // flushed_lsn should be set to last_lsn from recovery assert_eq!(handle.wal_client.flushed_lsn(), 3); @@ -1098,7 +1100,7 @@ mod tests { let dir = tempdir().unwrap(); let log_path = dir.path(); let (handle, mut bg_handle) = - spawn_wal(&log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); + spawn_wal(log_path, FLUSH_INTERVAL_MS, MAX_UNFLUSHED_RECORDS).unwrap(); // Simulate how Cache would share WalClient via Arc let shared = SharedWalClient::new(handle.wal_client); From 4a28040f074a1c5d0ead8c6939a2c07eae4198e4 Mon Sep 17 00:00:00 2001 From: kTrzcinskii Date: Sat, 17 Jan 2026 13:41:33 +0100 Subject: [PATCH 5/6] automatically start server only in e2e --- tester/README.md | 28 ++++++--- tester/src/main.rs | 140 ++++++++++++++++++++++++--------------------- 2 files changed, 96 insertions(+), 72 deletions(-) diff --git a/tester/README.md b/tester/README.md index 850e777..1b95cb3 100644 --- a/tester/README.md +++ b/tester/README.md @@ -4,7 +4,8 @@ This crate is a test client used for running end-to-end and performance tests ag ## Overview -The tester automatically manages the database server lifecycle - it starts the server before tests and stops it after tests complete. +- **E2E Tests**: Automatically manage the database server lifecycle - start the server before tests and stop it after tests complete. +- **Performance Tests**: Require a manually started server for more accurate benchmarking. ## Prerequisites @@ -15,17 +16,28 @@ The tester automatically manages the database server lifecycle - it starts the s ## Quick Run -All commands require the `--server-path` argument pointing to the server executable: +### E2E Tests -```bash -# Performance test example -cargo run -p tester -- --server-path ./target/release/server concurrent-reads-index --runs 1 --threads 8 --records 1000 --bound-size 10 +E2E tests require the `--server-path` argument - the server is started/stopped automatically: -# E2E test example -cargo run -p tester -- --server-path ./target/release/server e2e-select +```bash +# Run a specific E2E test +cargo run -p tester -- e2e-select --server-path ./target/release/server # Run all E2E tests -cargo run -p tester -- --server-path ./target/release/server e2e-all +cargo run -p tester -- e2e-all --server-path ./target/release/server +``` + +### Performance Tests + +Performance tests require a **manually started server**: + +```bash +# Terminal 1: Start the server +./target/release/server + +# Terminal 2: Run performance tests (no --server-path needed) +cargo run -p tester -- concurrent-reads-index --runs 1 --threads 8 --records 1000 --bound-size 10 ``` ## Test Types diff --git a/tester/src/main.rs b/tester/src/main.rs index 71b3c3c..8140429 100644 --- a/tester/src/main.rs +++ b/tester/src/main.rs @@ -32,10 +32,6 @@ mod suite; #[command(name = "tester")] #[command(about = "coDB tester client for e2e & performance tests", long_about = None)] struct Cli { - /// Path to the server executable - #[arg(long)] - server_path: String, - #[command(subcommand)] command: Command, } @@ -131,34 +127,74 @@ enum Command { }, /// E2E test for SELECT statements with comprehensive validation - E2eSelect, + E2eSelect { + /// Path to the server executable + #[arg(long)] + server_path: String, + }, /// E2E test for INSERT statements with comprehensive validation - E2eInsert, + E2eInsert { + /// Path to the server executable + #[arg(long)] + server_path: String, + }, /// E2E test for UPDATE statements with comprehensive validation - E2eUpdate, + E2eUpdate { + /// Path to the server executable + #[arg(long)] + server_path: String, + }, /// E2E test for DELETE statements with comprehensive validation - E2eDelete, + E2eDelete { + /// Path to the server executable + #[arg(long)] + server_path: String, + }, /// E2E test for CREATE TABLE statements with comprehensive validation - E2eCreateTable, + E2eCreateTable { + /// Path to the server executable + #[arg(long)] + server_path: String, + }, /// E2E test for TRUNCATE TABLE statements with comprehensive validation - E2eTruncateTable, + E2eTruncateTable { + /// Path to the server executable + #[arg(long)] + server_path: String, + }, /// E2E test for DROP TABLE statements with comprehensive validation - E2eDropTable, + E2eDropTable { + /// Path to the server executable + #[arg(long)] + server_path: String, + }, /// E2E test for ALTER TABLE statements with comprehensive validation - E2eAlterTable, + E2eAlterTable { + /// Path to the server executable + #[arg(long)] + server_path: String, + }, /// E2E test for WAL recovery after SIGKILL - E2eWalRecovery, + E2eWalRecovery { + /// Path to the server executable + #[arg(long)] + server_path: String, + }, /// Run all E2E tests (SELECT, INSERT, UPDATE, DELETE, CREATE TABLE, TRUNCATE TABLE, DROP TABLE, ALTER TABLE, and WAL RECOVERY) - E2eAll, + E2eAll { + /// Path to the server executable + #[arg(long)] + server_path: String, + }, } #[derive(Debug, Error)] @@ -226,7 +262,6 @@ impl Drop for ServerProcess { } async fn concurrent_inserts( - server_path: &str, runs: u32, threads: usize, records_per_thread: usize, @@ -252,16 +287,13 @@ async fn concurrent_inserts( }; for _ in 0..runs { - let mut server = ServerProcess::start(server_path)?; let result = ConcurrentInserts::run_suite(&setup, &test, &cleanup).await?; test_results.push(result); - server.stop()?; } Ok(test_results) } async fn concurrent_reads( - server_path: &str, runs: u32, threads: usize, records_to_insert: usize, @@ -287,16 +319,13 @@ async fn concurrent_reads( }; for _ in 0..runs { - let mut server = ServerProcess::start(server_path)?; let result = ReadMany::run_suite(&setup, &test, &cleanup).await?; test_results.push(result); - server.stop()?; } Ok(test_results) } async fn concurrent_reads_and_inserts( - server_path: &str, runs: u32, readers: usize, writers: usize, @@ -324,16 +353,13 @@ async fn concurrent_reads_and_inserts( }; for _ in 0..runs { - let mut server = ServerProcess::start(server_path)?; let result = ConcurrentReadsAndInserts::run_suite(&setup, &test, &cleanup).await?; test_results.push(result); - server.stop()?; } Ok(test_results) } async fn concurrent_reads_index( - server_path: &str, runs: u32, threads: usize, records_to_insert: usize, @@ -361,16 +387,13 @@ async fn concurrent_reads_index( }; for _ in 0..runs { - let mut server = ServerProcess::start(server_path)?; let result = ReadByIndex::run_suite(&setup, &test, &cleanup).await?; test_results.push(result); - server.stop()?; } Ok(test_results) } async fn concurrent_reads_non_index( - server_path: &str, runs: u32, threads: usize, records_to_insert: usize, @@ -398,10 +421,8 @@ async fn concurrent_reads_non_index( }; for _ in 0..runs { - let mut server = ServerProcess::start(server_path)?; let result = ReadByNonIndex::run_suite(&setup, &test, &cleanup).await?; test_results.push(result); - server.stop()?; } Ok(test_results) } @@ -817,7 +838,7 @@ async fn main() -> Result<(), TesterError> { threads, records, } => { - let test_results = concurrent_inserts(&cli.server_path, runs, threads, records).await?; + let test_results = concurrent_inserts(runs, threads, records).await?; report_stats("concurrent-inserts", &test_results); Ok(()) } @@ -826,7 +847,7 @@ async fn main() -> Result<(), TesterError> { threads, records, } => { - let test_results = concurrent_reads(&cli.server_path, runs, threads, records).await?; + let test_results = concurrent_reads(runs, threads, records).await?; report_stats("concurrent-reads", &test_results); Ok(()) } @@ -836,9 +857,7 @@ async fn main() -> Result<(), TesterError> { records, bound_size, } => { - let test_results = - concurrent_reads_index(&cli.server_path, runs, threads, records, bound_size) - .await?; + let test_results = concurrent_reads_index(runs, threads, records, bound_size).await?; report_stats("concurrent-reads-index", &test_results); Ok(()) } @@ -849,8 +868,7 @@ async fn main() -> Result<(), TesterError> { bound_size, } => { let test_results = - concurrent_reads_non_index(&cli.server_path, runs, threads, records, bound_size) - .await?; + concurrent_reads_non_index(runs, threads, records, bound_size).await?; report_stats("concurrent-reads-non-index", &test_results); Ok(()) } @@ -860,55 +878,49 @@ async fn main() -> Result<(), TesterError> { writers, records_per_writer, } => { - let test_results = concurrent_reads_and_inserts( - &cli.server_path, - runs, - readers, - writers, - records_per_writer, - ) - .await?; + let test_results = + concurrent_reads_and_inserts(runs, readers, writers, records_per_writer).await?; report_stats("concurrent-reads-and-inserts", &test_results); Ok(()) } - Command::E2eSelect => { - e2e_select(&cli.server_path).await?; + Command::E2eSelect { server_path } => { + e2e_select(&server_path).await?; Ok(()) } - Command::E2eInsert => { - e2e_insert(&cli.server_path).await?; + Command::E2eInsert { server_path } => { + e2e_insert(&server_path).await?; Ok(()) } - Command::E2eUpdate => { - e2e_update(&cli.server_path).await?; + Command::E2eUpdate { server_path } => { + e2e_update(&server_path).await?; Ok(()) } - Command::E2eDelete => { - e2e_delete(&cli.server_path).await?; + Command::E2eDelete { server_path } => { + e2e_delete(&server_path).await?; Ok(()) } - Command::E2eCreateTable => { - e2e_create_table(&cli.server_path).await?; + Command::E2eCreateTable { server_path } => { + e2e_create_table(&server_path).await?; Ok(()) } - Command::E2eTruncateTable => { - e2e_truncate_table(&cli.server_path).await?; + Command::E2eTruncateTable { server_path } => { + e2e_truncate_table(&server_path).await?; Ok(()) } - Command::E2eDropTable => { - e2e_drop_table(&cli.server_path).await?; + Command::E2eDropTable { server_path } => { + e2e_drop_table(&server_path).await?; Ok(()) } - Command::E2eAlterTable => { - e2e_alter_table(&cli.server_path).await?; + Command::E2eAlterTable { server_path } => { + e2e_alter_table(&server_path).await?; Ok(()) } - Command::E2eWalRecovery => { - e2e_wal_recovery(&cli.server_path).await?; + Command::E2eWalRecovery { server_path } => { + e2e_wal_recovery(&server_path).await?; Ok(()) } - Command::E2eAll => { - e2e_all(&cli.server_path).await?; + Command::E2eAll { server_path } => { + e2e_all(&server_path).await?; Ok(()) } } From 26f6ab3480502788bd5af5ff9ebced269255a42d Mon Sep 17 00:00:00 2001 From: kTrzcinskii Date: Sun, 18 Jan 2026 10:30:02 +0100 Subject: [PATCH 6/6] fixen --- server/src/client_handler.rs | 2 -- storage/src/write_ahead_log.rs | 7 +------ 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/server/src/client_handler.rs b/server/src/client_handler.rs index 21fa888..cc7e69d 100644 --- a/server/src/client_handler.rs +++ b/server/src/client_handler.rs @@ -337,9 +337,7 @@ mod client_handler_tests { use crate::client_handler::{ClientError, ClientHandler}; use crate::protocol_handler::{BinaryProtocolHandler, TextProtocolHandler}; use crate::server::ExecutorWithWorkers; - use crate::workers_container::WorkersContainer; use dashmap::DashMap; - use executor::Executor; use metadata::catalog_manager::CatalogManager; use parking_lot::RwLock; use protocol::{Request, Response, StatementType}; diff --git a/storage/src/write_ahead_log.rs b/storage/src/write_ahead_log.rs index 9b45a5f..7ec9438 100644 --- a/storage/src/write_ahead_log.rs +++ b/storage/src/write_ahead_log.rs @@ -391,8 +391,6 @@ pub(crate) struct SinglePageOperation { } impl SinglePageOperation { - // Used in tests - #[allow(dead_code)] pub(crate) fn new(file_page_ref: FilePageRef, diff: PageDiff) -> Self { Self { file_page_ref, @@ -592,10 +590,7 @@ impl WalClient { let (send, recv) = channel::bounded(1); let ops = ops .into_iter() - .map(|(file_page_ref, diff)| SinglePageOperation { - file_page_ref, - diff, - }) + .map(|(file_page_ref, diff)| SinglePageOperation::new(file_page_ref, diff)) .collect(); let record = WalRecord { data: WalRecordData::MultiPageOperation(ops),