diff --git a/executor/src/lib.rs b/executor/src/lib.rs index abc11aa..dfea8b5 100644 --- a/executor/src/lib.rs +++ b/executor/src/lib.rs @@ -62,11 +62,12 @@ impl Executor { } /// Creates new [`Executor`] for database at `database_path` and initializes background threads used by its components. - /// Returns executor and handles to the background threads. + /// Returns executor and handles to the background threads (with the distinction between WAL and other + /// background workers needing to be made). pub fn with_background_workers( database_path: impl AsRef, catalog: Catalog, - ) -> Result<(Self, Vec), ExecutorError> { + ) -> Result<(Self, Vec, BackgroundWorkerHandle), ExecutorError> { let (files, files_background_worker) = FilesManager::with_background_cleaner( database_path.as_ref(), consts::FILES_MANAGER_CLEANUP_INTERVAL, @@ -92,12 +93,8 @@ impl Executor { cache, catalog, }; - let workers = vec![ - cache_background_worker, - files_background_worker, - wal_background_worker, - ]; - Ok((executor, workers)) + let workers = vec![cache_background_worker, files_background_worker]; + Ok((executor, workers, wal_background_worker)) } /// Parses `query` and returns iterator over results for each statement in the `query`. @@ -1383,11 +1380,12 @@ mod tests { let (catalog, temp_dir) = create_catalog(); let db_path = temp_dir.path().join("test_db"); - let (executor, mut workers) = Executor::with_background_workers(db_path, catalog) - .expect("with_background_workers should succeed"); + let (executor, mut workers, mut wal_worker) = + Executor::with_background_workers(db_path, catalog) + .expect("with_background_workers should succeed"); // We expect three background workers (cache and files manager and wal) - assert_eq!(workers.len(), 3); + assert_eq!(workers.len(), 2); // Shutdown and join all workers to ensure threads are started and can be stopped. while let Some(mut handle) = workers.pop() { @@ -1395,6 +1393,9 @@ mod tests { handle.join().expect("join should succeed"); } + wal_worker.shutdown().expect("shutdown should succeed"); + wal_worker.join().expect("join should succeed"); + // Basic sanity check that executor was created and holds structures let _c = executor.catalog.read(); } @@ -3801,8 +3802,9 @@ mod tests { fn test_select_index_scan_1000_records_with_wal() { let (catalog, temp_dir) = create_catalog(); let db_path = temp_dir.path().join("test_db"); - let (executor, mut workers) = Executor::with_background_workers(&db_path, catalog) - .expect("with_background_workers should succeed"); + let (executor, mut workers, mut wal_worker) = + Executor::with_background_workers(&db_path, catalog) + .expect("with_background_workers should succeed"); // Create table execute_single( @@ -3894,6 +3896,8 @@ mod tests { handle.shutdown().expect("shutdown should succeed"); handle.join().expect("join should succeed"); } + wal_worker.shutdown().expect("shutdown should succeed"); + wal_worker.join().expect("join should succeed"); } #[test] @@ -4802,8 +4806,9 @@ mod tests { { // Create table, insert many records, then delete all of them - let (executor, mut workers) = Executor::with_background_workers(&db_path, catalog) - .expect("with_background_workers should succeed"); + let (executor, mut workers, mut wal_worker) = + Executor::with_background_workers(&db_path, catalog) + .expect("with_background_workers should succeed"); // Create table with data execute_single( @@ -4836,14 +4841,18 @@ mod tests { handle.shutdown().expect("shutdown should succeed"); handle.join().expect("join should succeed"); } + + wal_worker.shutdown().expect("shutdown should succeed"); + wal_worker.join().expect("join should succeed"); } { // Restart and attempt redo // This should NOT panic - it should gracefully handle missing pages let catalog = Catalog::new(temp_dir.path(), "test_db").unwrap(); - let (executor, mut workers) = Executor::with_background_workers(&db_path, catalog) - .expect("Recovery with redo should handle deleted pages gracefully"); + let (executor, mut workers, mut wal_worker) = + Executor::with_background_workers(&db_path, catalog) + .expect("Recovery with redo should handle deleted pages gracefully"); let result = execute_single(&executor, "SELECT * FROM users;"); // We call it just to trigger wal redo @@ -4854,6 +4863,9 @@ mod tests { handle.shutdown().expect("shutdown should succeed"); handle.join().expect("join should succeed"); } + + wal_worker.shutdown().expect("shutdown should succeed"); + wal_worker.join().expect("join should succeed"); } } @@ -4868,8 +4880,9 @@ mod tests { { // Create table, insert data, then drop it - let (executor, mut workers) = Executor::with_background_workers(&db_path, catalog) - .expect("with_background_workers should succeed"); + let (executor, mut workers, mut wal_worker) = + Executor::with_background_workers(&db_path, catalog) + .expect("with_background_workers should succeed"); // Create and populate table execute_single( @@ -4903,14 +4916,18 @@ mod tests { handle.shutdown().expect("shutdown should succeed"); handle.join().expect("join should succeed"); } + + wal_worker.shutdown().expect("shutdown should succeed"); + wal_worker.join().expect("join should succeed"); } { // Restart, attempt redo, then create new table with same name // This should NOT panic when trying to redo records for deleted pages let catalog = Catalog::new(temp_dir.path(), "test_db").unwrap(); - let (executor, mut workers) = Executor::with_background_workers(&db_path, catalog) - .expect("Recovery should handle dropped tables gracefully"); + let (executor, mut workers, mut wal_worker) = + Executor::with_background_workers(&db_path, catalog) + .expect("Recovery should handle dropped tables gracefully"); // Create new table with same name but different schema execute_single( @@ -4934,6 +4951,8 @@ mod tests { handle.shutdown().expect("shutdown should succeed"); handle.join().expect("join should succeed"); } + wal_worker.shutdown().expect("shutdown should succeed"); + wal_worker.join().expect("join should succeed"); } } @@ -4948,8 +4967,9 @@ mod tests { { // Create table, insert data, then truncate - let (executor, mut workers) = Executor::with_background_workers(&db_path, catalog) - .expect("with_background_workers should succeed"); + let (executor, mut workers, mut wal_worker) = + Executor::with_background_workers(&db_path, catalog) + .expect("with_background_workers should succeed"); // Create and populate table execute_single( @@ -4982,14 +5002,18 @@ mod tests { handle.shutdown().expect("shutdown should succeed"); handle.join().expect("join should succeed"); } + + wal_worker.shutdown().expect("shutdown should succeed"); + wal_worker.join().expect("join should succeed"); } { // Restart, attempt redo, then insert new data // This should NOT panic when trying to redo records for deallocated pages let catalog = Catalog::new(temp_dir.path(), "test_db").unwrap(); - let (executor, mut workers) = Executor::with_background_workers(&db_path, catalog) - .expect("Recovery should handle truncated tables gracefully"); + let (executor, mut workers, mut wal_worker) = + Executor::with_background_workers(&db_path, catalog) + .expect("Recovery should handle truncated tables gracefully"); // Insert new data into the truncated table execute_single( @@ -5011,6 +5035,9 @@ mod tests { handle.shutdown().expect("shutdown should succeed"); handle.join().expect("join should succeed"); } + + wal_worker.shutdown().expect("shutdown should succeed"); + wal_worker.join().expect("join should succeed"); } } @@ -5024,8 +5051,9 @@ mod tests { { // Complex sequence of operations - let (executor, mut workers) = Executor::with_background_workers(&db_path, catalog) - .expect("with_background_workers should succeed"); + let (executor, mut workers, mut wal_worker) = + Executor::with_background_workers(&db_path, catalog) + .expect("with_background_workers should succeed"); // Create first table execute_single( @@ -5069,14 +5097,18 @@ mod tests { handle.shutdown().expect("shutdown should succeed"); handle.join().expect("join should succeed"); } + + wal_worker.shutdown().expect("shutdown should succeed"); + wal_worker.join().expect("join should succeed"); } { // Restart and attempt redo // This should NOT panic despite complex history of deletions let catalog = Catalog::new(temp_dir.path(), "test_db").unwrap(); - let (executor, mut workers) = Executor::with_background_workers(&db_path, catalog) - .expect("Recovery should handle complex table operations gracefully"); + let (executor, mut workers, mut wal_worker) = + Executor::with_background_workers(&db_path, catalog) + .expect("Recovery should handle complex table operations gracefully"); // Verify final state let result = execute_single(&executor, "SELECT * FROM temp1;"); @@ -5092,6 +5124,9 @@ mod tests { handle.shutdown().expect("shutdown should succeed"); handle.join().expect("join should succeed"); } + + wal_worker.shutdown().expect("shutdown should succeed"); + wal_worker.join().expect("join should succeed"); } } @@ -5106,8 +5141,9 @@ mod tests { { // Create table, insert records, truncate, then insert one new record - let (executor, mut workers) = Executor::with_background_workers(&db_path, catalog) - .expect("with_background_workers should succeed"); + let (executor, mut workers, mut wal_worker) = + Executor::with_background_workers(&db_path, catalog) + .expect("with_background_workers should succeed"); // Create table execute_single( @@ -5156,14 +5192,18 @@ mod tests { handle.shutdown().expect("shutdown should succeed"); handle.join().expect("join should succeed"); } + + wal_worker.shutdown().expect("shutdown should succeed"); + wal_worker.join().expect("join should succeed"); } { // Restart and verify WAL redo only applies the new record // NOT the old 4 records that existed before truncate let catalog = Catalog::new(temp_dir.path(), "test_db").unwrap(); - let (executor, mut workers) = Executor::with_background_workers(&db_path, catalog) - .expect("Recovery should handle page reallocation correctly"); + let (executor, mut workers, mut wal_worker) = + Executor::with_background_workers(&db_path, catalog) + .expect("Recovery should handle page reallocation correctly"); // Verify table has exactly 1 record (the one inserted after truncate) let result = execute_single(&executor, "SELECT * FROM items;"); @@ -5187,6 +5227,9 @@ mod tests { handle.shutdown().expect("shutdown should succeed"); handle.join().expect("join should succeed"); } + + wal_worker.shutdown().expect("shutdown should succeed"); + wal_worker.join().expect("join should succeed"); } } } diff --git a/server/src/client_handler.rs b/server/src/client_handler.rs index 2990b70..1579944 100644 --- a/server/src/client_handler.rs +++ b/server/src/client_handler.rs @@ -1,5 +1,6 @@ use crate::protocol_handler::{ProtocolHandler, ReadResult}; use crate::protocol_mappings::IntoProtocol; +use crate::server::ExecutorWithWorkers; use crate::workers_container::WorkersContainer; use dashmap::{DashMap, Entry}; use engine::record::Record as EngineRecord; @@ -20,10 +21,9 @@ where P: ProtocolHandler, { current_database: Option, - executors: Arc>>, + executors: Arc>>, catalog_manager: Arc>, protocol_handler: P, - workers: Arc, shutdown: CancellationToken, } @@ -78,10 +78,9 @@ where const ROWS_CHUNK_SIZE: usize = 10000; pub(crate) fn new( - executors: Arc>>, + executors: Arc>>, catalog_manager: Arc>, protocol_handler: P, - workers: Arc, shutdown: CancellationToken, ) -> Self { Self { @@ -89,7 +88,6 @@ where current_database: None, executors, catalog_manager, - workers, shutdown, } } @@ -146,7 +144,9 @@ where self.catalog_manager .write() .delete_database(&database_name)?; - self.executors.remove(&database_name); + if let Some((_, executor_with_workers)) = self.executors.remove(&database_name) { + drop(executor_with_workers); + } if self.current_database.as_deref() == Some(&database_name) { self.current_database = None; } @@ -201,7 +201,7 @@ where mpsc::channel::(Self::CHANNEL_BUFFER_CAPACITY); let handle = tokio::task::spawn_blocking(move || { - for result in executor.execute(&sql) { + for result in executor.executor().execute(&sql) { if sender.blocking_send(result).is_err() { break; } @@ -222,7 +222,7 @@ where fn get_or_create_executor( &self, database: impl AsRef + Into + Clone, - ) -> Result, ClientError> { + ) -> Result, ClientError> { let database_key = database.clone().into(); match self.executors.entry(database_key) { @@ -236,15 +236,16 @@ where }; let db_directory_path = main_path.join(database.as_ref()); - let (executor, background_workers) = + let (executor, background_workers, wal_worker) = Executor::with_background_workers(db_directory_path, catalog) .map_err(ClientError::ExecutorError)?; + let worker_container = WorkersContainer::new(background_workers.into_iter()); + let executor = + ExecutorWithWorkers::new(executor, worker_container, Some(wal_worker)); let executor = Arc::new(executor); vacant_entry.insert(executor.clone()); - self.workers.add_many(background_workers.into_iter()); - Ok(executor) } } @@ -335,6 +336,7 @@ where mod client_handler_tests { use crate::client_handler::{ClientError, ClientHandler}; use crate::protocol_handler::{BinaryProtocolHandler, TextProtocolHandler}; + use crate::server::ExecutorWithWorkers; use crate::workers_container::WorkersContainer; use dashmap::DashMap; use executor::Executor; @@ -443,7 +445,7 @@ mod client_handler_tests { /// Helper function to create test server components async fn setup_test_server() -> ( - Arc>>, + Arc>>, Arc>, ) { let executors = Arc::new(DashMap::new()); @@ -479,7 +481,6 @@ mod client_handler_tests { executors.clone(), catalog_manager.clone(), text_handler, - Arc::new(WorkersContainer::new()), CancellationToken::new(), ); tokio::spawn(async move { @@ -509,7 +510,6 @@ mod client_handler_tests { executors.clone(), catalog_manager.clone(), binary_handler, - Arc::new(WorkersContainer::new()), CancellationToken::new(), ); tokio::spawn(async move { diff --git a/server/src/server.rs b/server/src/server.rs index 40ad9ec..ea037d0 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -9,8 +9,10 @@ use executor::Executor; use log::{error, info}; use metadata::catalog_manager::{CatalogManager, CatalogManagerError}; use parking_lot::RwLock; +use std::mem::ManuallyDrop; use std::net::SocketAddr; use std::sync::Arc; +use storage::background_worker::BackgroundWorkerHandle; use thiserror::Error; use tokio::net::{TcpListener, TcpStream}; use tokio_util::sync::CancellationToken; @@ -27,18 +29,76 @@ pub(crate) struct Server { binary_addr: SocketAddr, text_addr: SocketAddr, catalog_manager: Arc>, - executors: Arc>>, - background_workers: Arc, + executors: Arc>>, tasks: Arc, shutdown: CancellationToken, } +pub(crate) struct ExecutorWithWorkers { + executor: ManuallyDrop, + workers: WorkersContainer, + wal_worker: Option, +} + +impl ExecutorWithWorkers { + pub(crate) fn new( + executor: Executor, + workers: WorkersContainer, + wal_worker: Option, + ) -> Self { + Self { + executor: ManuallyDrop::new(executor), + workers, + wal_worker, + } + } + + pub(crate) fn shutdown_workers(&self) { + self.workers.shutdown() + } + + pub(crate) fn shutdown_wal_worker(&mut self) { + let mut wal_worker = match self.wal_worker.take() { + Some(worker) => worker, + None => return, + }; + + if let Err(e) = wal_worker.shutdown() { + error!("Failed to stop WAL background worker: {e}"); + } + + if let Err(e) = wal_worker.join() { + error!("Failed to join background worker: {e}"); + } + } + + pub(crate) fn executor(&self) -> &Executor { + &self.executor + } +} + +impl Drop for ExecutorWithWorkers { + fn drop(&mut self) { + // 1. Shutdown workers + self.shutdown_workers(); + + // 2. Explicitly drop executor BEFORE shutting down WAL worker + // SAFETY: This is the Drop implementation, so this is the last time + // we access executor. The ManuallyDrop prevents automatic drop. + unsafe { + ManuallyDrop::drop(&mut self.executor); + } + + // 3. Shutdown WAL worker + self.shutdown_wal_worker(); + } +} + struct ListenerContext { addr: SocketAddr, - executors: Arc>>, + executors: Arc>>, catalog_manager: Arc>, tasks: Arc, - workers: Arc, shutdown: CancellationToken, } @@ -47,9 +107,8 @@ impl Server { Ok(Self { binary_addr, text_addr, - catalog_manager: Arc::new(RwLock::new(CatalogManager::new()?)), executors: Arc::new(DashMap::new()), - background_workers: Arc::new(WorkersContainer::new()), + catalog_manager: Arc::new(RwLock::new(CatalogManager::new()?)), tasks: Arc::new(TasksContainer::new()), shutdown: CancellationToken::new(), }) @@ -58,11 +117,10 @@ impl Server { pub(crate) async fn run_loop(&self) -> Result<(), ServerError> { self.start_listener( self.text_listener_context(), - |socket, executors, manager, tasks, workers, shutdown| { + |socket, executors, manager, tasks, shutdown| { let handle = tokio::spawn(async move { let text_handler = TextProtocolHandler::from(socket); - let handler = - ClientHandler::new(executors, manager, text_handler, workers, shutdown); + let handler = ClientHandler::new(executors, manager, text_handler, shutdown); handler.run().await; }); tasks.add(handle); @@ -72,11 +130,10 @@ impl Server { self.start_listener( self.binary_listener_context(), - |socket, executors, manager, tasks, workers, shutdown| { + |socket, executors, manager, tasks, shutdown| { let handle = tokio::spawn(async move { let binary_handler = BinaryProtocolHandler::from(socket); - let handler = - ClientHandler::new(executors, manager, binary_handler, workers, shutdown); + let handler = ClientHandler::new(executors, manager, binary_handler, shutdown); handler.run().await; }); tasks.add(handle); @@ -93,9 +150,6 @@ impl Server { self.shutdown.cancel(); self.tasks.join_all().await; - // Stop background workers - self.background_workers.shutdown(); - Ok(()) } @@ -107,10 +161,9 @@ impl Server { where F: Fn( TcpStream, - Arc>>, + Arc>>, Arc>, Arc, - Arc, CancellationToken, ) + Send + Sync @@ -143,7 +196,7 @@ impl Server { } let child_shutdown = context.shutdown.child_token(); - handler(socket, context.executors.clone(), context.catalog_manager.clone(), context.tasks.clone(), context.workers.clone(), child_shutdown); + handler(socket, context.executors.clone(), context.catalog_manager.clone(), context.tasks.clone(), child_shutdown); } } } @@ -160,7 +213,6 @@ impl Server { executors: self.executors.clone(), catalog_manager: self.catalog_manager.clone(), tasks: self.tasks.clone(), - workers: self.background_workers.clone(), shutdown: self.shutdown.child_token(), } } diff --git a/server/src/workers_container.rs b/server/src/workers_container.rs index 6cdd229..777914b 100644 --- a/server/src/workers_container.rs +++ b/server/src/workers_container.rs @@ -7,20 +7,12 @@ pub(crate) struct WorkersContainer { } impl WorkersContainer { - pub fn new() -> Self { - WorkersContainer { - workers: SegQueue::new(), - } - } - - pub fn add(&self, handle: BackgroundWorkerHandle) { - self.workers.push(handle); - } - - pub fn add_many(&self, handles: impl Iterator) { + pub fn new(handles: impl Iterator) -> Self { + let workers = SegQueue::new(); for handle in handles { - self.add(handle); + workers.push(handle); } + WorkersContainer { workers } } pub fn shutdown(&self) { diff --git a/storage/src/cache.rs b/storage/src/cache.rs index 55e7c72..7ef37b9 100644 --- a/storage/src/cache.rs +++ b/storage/src/cache.rs @@ -734,6 +734,26 @@ impl Cache { } let page = frame.read(); + + if let Some(wal_client) = &self.wal_client { + let flushed_lsn = wal_client.flushed_lsn(); + + let page_lsn = get_page_lsn(&page); + + // WAL record of this page wasn't yet flushed to disk - we need to do it before flushing page. + if page_lsn > flushed_lsn && !wal_client.flush() { + warn!( + "Cache: failed to flush WAL before flushing page '{:?}' to disk.", + frame.file_page_ref + ); + + // We need to re-insert the frame back to cache as we failed to flush it. + drop(page); + self.frames.insert(frame.file_page_ref.clone(), frame); + return Ok(()); + } + } + file_lock.write_page(frame.file_page_ref.page_id, *page)?; frame.dirty.store(false, Ordering::Release); diff --git a/storage/src/write_ahead_log.rs b/storage/src/write_ahead_log.rs index 5e84c97..fc99e4c 100644 --- a/storage/src/write_ahead_log.rs +++ b/storage/src/write_ahead_log.rs @@ -4,7 +4,7 @@ use crate::page_diff::PageDiff; use crate::paged_file::{AtomicLsn, Lsn}; use crossbeam::channel; use crossbeam::select; -use log::error; +use log::{error, info}; use std::fs::File; use std::io::{BufReader, BufWriter, Read, Write}; use std::path::{Path, PathBuf}; @@ -148,8 +148,7 @@ impl WalManager { } } Err(_) => { - // Channel closed, shutdown gracefully - self.shutdown_gracefully(); + info!("Requests channel closed. Waiting for shutdown signal..."); break; } } @@ -166,6 +165,9 @@ impl WalManager { } } } + + let _ = self.shutdown.recv(); + self.shutdown_gracefully(); } fn shutdown_gracefully(&mut self) {