diff --git a/Cargo.lock b/Cargo.lock index 6742468..ae0df7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -879,6 +879,7 @@ dependencies = [ "rmp-serde", "serde_json", "storage", + "tempfile", "thiserror", "tokio", "tokio-util", diff --git a/engine/src/heap_file/mod.rs b/engine/src/heap_file/mod.rs index f0e76b7..48d744e 100644 --- a/engine/src/heap_file/mod.rs +++ b/engine/src/heap_file/mod.rs @@ -18,11 +18,7 @@ use crate::heap_file::{ record::{RecordHandle, RecordPtr, RecordTag}, }; -use std::{ - collections::VecDeque, - mem, - sync::{Arc, atomic::Ordering}, -}; +use std::{collections::VecDeque, mem, sync::Arc}; use metadata::catalog::ColumnMetadata; use parking_lot::Mutex; @@ -40,6 +36,12 @@ use storage::{ paged_file::{PageId, PagedFileError}, }; +/// Helper enum to specify which metadata field to update +enum PageTypeForMetadata { + Record, + Overflow, +} + /// Struct used for describing update of single field. pub struct FieldUpdateDescriptor { column: ColumnMetadata, @@ -152,7 +154,7 @@ impl<'hf, const BUCKETS_COUNT: usize> Iterator for AllRecordsIterator<'hf, BUCKE /// For concurrent access by multiple threads, wrap the instance in `Arc`. pub struct HeapFile { file_key: FileKey, - metadata: Metadata, + metadata: Mutex, record_pages_fsm: FreeSpaceMap, overflow_pages_fsm: FreeSpaceMap, cache: Arc, @@ -180,7 +182,7 @@ impl HeapFile { /// Reads all [`Record`]s stored in heap file. pub fn all_records(&'_ self) -> AllRecordsIterator<'_, BUCKETS_COUNT> { - let first_page_id = *self.metadata.first_record_page.lock(); + let first_page_id = self.metadata.lock().first_record_page; AllRecordsIterator::new(self, first_page_id) } @@ -285,27 +287,14 @@ impl HeapFile { } /// Flushes [`HeapFile::metadata`] content to disk. - /// - /// If metadata is not dirty then this is no-op. - pub fn flush_metadata(&mut self) -> Result<(), HeapFileError> { - match self - .metadata - .dirty - .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire) - { - Ok(_) => { - // Changed dirty from true to false, so metadata must be flushed - let key = self.file_page_ref(Self::METADATA_PAGE_ID); - let mut page = self.cache.pin_write(&key)?; - let repr = MetadataRepr::from(&self.metadata); - repr.write_to_page(&mut page); - Ok(()) - } - Err(_) => { - // Metadata was already clean - Ok(()) - } - } + pub fn flush_metadata(&self) -> Result<(), HeapFileError> { + let metadata = self.metadata.lock(); + let repr = MetadataRepr::from(&*metadata); + + let key = self.file_page_ref(Self::METADATA_PAGE_ID); + let mut page = self.cache.pin_write(&key)?; + repr.write_to_page(&mut page); + Ok(()) } /// Migrates all records to add a new column at the specified position with a default value. @@ -321,7 +310,7 @@ impl HeapFile { let mut chain = NoDroppingPageLockChain::empty(self)?; // Iterate through all record pages - let mut page_id = *self.metadata.first_record_page.lock(); + let mut page_id = self.metadata.lock().first_record_page; while page_id != RecordPageHeader::NO_NEXT_PAGE { chain.start_from_record_page(page_id)?; let record_page = chain.record_page(); @@ -363,7 +352,7 @@ impl HeapFile { let mut chain = NoDroppingPageLockChain::empty(self)?; // Iterate through all record pages - let mut page_id = *self.metadata.first_record_page.lock(); + let mut page_id = self.metadata.lock().first_record_page; while page_id != RecordPageHeader::NO_NEXT_PAGE { chain.start_from_record_page(page_id)?; let record_page = chain.record_page(); @@ -909,7 +898,7 @@ impl HeapFile { /// Generic helper for allocating a page and updating metadata fn allocate_page_with_metadata( &self, - metadata_lock: &Mutex, + page_type: PageTypeForMetadata, header_fn: F, ) -> Result<(PageId, HeapPage), HeapFileError> where @@ -919,24 +908,37 @@ impl HeapFile { let (page, page_id) = self.cache.allocate_page(&self.file_key)?; let old_first_page = { - let mut metadata_page_lock = metadata_lock.lock(); - let old = *metadata_page_lock; - *metadata_page_lock = page_id; - self.metadata.dirty.store(true, Ordering::Release); - old + let metadata = self.metadata.lock(); + match page_type { + PageTypeForMetadata::Record => metadata.first_record_page, + PageTypeForMetadata::Overflow => metadata.first_overflow_page, + } }; let header = header_fn(old_first_page); let slotted_page = SlottedPage::initialize_with_header(page, header)?; let heap_page = HeapPage::new(slotted_page); + let mut metadata = self.metadata.lock(); + match page_type { + PageTypeForMetadata::Record => { + metadata.first_record_page = page_id; + } + PageTypeForMetadata::Overflow => { + metadata.first_overflow_page = page_id; + } + } + drop(metadata); + + self.flush_metadata()?; + Ok((page_id, heap_page)) } fn allocate_record_page( &self, ) -> Result<(PageId, HeapPage), HeapFileError> { - self.allocate_page_with_metadata(&self.metadata.first_record_page, |next_page| { + self.allocate_page_with_metadata(PageTypeForMetadata::Record, |next_page| { RecordPageHeader::new(next_page) }) } @@ -944,7 +946,7 @@ impl HeapFile { fn allocate_overflow_page( &self, ) -> Result<(PageId, HeapPage), HeapFileError> { - self.allocate_page_with_metadata(&self.metadata.first_overflow_page, |next_page| { + self.allocate_page_with_metadata(PageTypeForMetadata::Overflow, |next_page| { OverflowPageHeader::new(next_page) }) } @@ -1153,14 +1155,6 @@ impl HeapFile { } } -impl Drop for HeapFile { - fn drop(&mut self) { - if let Err(e) = self.flush_metadata() { - log::error!("failed to flush metadata while dropping HeapFile: {e}"); - } - } -} - /// Factory responsible for creating and loading existing [`HeapFile`]. pub struct HeapFileFactory { file_key: FileKey, @@ -1201,7 +1195,7 @@ impl HeapFileFactory { cache: self.cache, columns_metadata: self.columns_metadata, file_key: self.file_key, - metadata, + metadata: Mutex::new(metadata), record_pages_fsm, overflow_pages_fsm, }; @@ -1269,7 +1263,7 @@ mod tests { collections::HashSet, fs, ops::Deref, - sync::atomic::AtomicUsize, + sync::atomic::{AtomicUsize, Ordering}, thread, time::{Duration, SystemTime, UNIX_EPOCH}, }; @@ -2083,8 +2077,9 @@ mod tests { let heap_file = factory.create_heap_file().unwrap(); // Verify metadata was loaded - assert_ne!(*heap_file.metadata.first_record_page.lock(), 0); - assert_ne!(*heap_file.metadata.first_overflow_page.lock(), 0); + let metadata = heap_file.metadata.lock(); + assert_ne!(metadata.first_record_page, 0); + assert_ne!(metadata.first_overflow_page, 0); } #[test] @@ -2106,7 +2101,7 @@ mod tests { // The actual error will occur when trying to read from invalid page IDs if let Ok(heap_file) = result { let invalid_ptr = RecordPtr { - page_id: *heap_file.metadata.first_record_page.lock(), + page_id: heap_file.metadata.lock().first_record_page, slot_id: 0, }; let read_result = heap_file.record(&invalid_ptr); @@ -2122,8 +2117,9 @@ mod tests { let heap_file = factory.create_heap_file().unwrap(); // Verify metadata was created with correct initial values - let first_record_page = *heap_file.metadata.first_record_page.lock(); - let first_overflow_page = *heap_file.metadata.first_overflow_page.lock(); + let metadata = heap_file.metadata.lock(); + let first_record_page = metadata.first_record_page; + let first_overflow_page = metadata.first_overflow_page; assert_eq!(first_record_page, 2); assert_eq!(first_overflow_page, 3); @@ -3364,9 +3360,7 @@ mod tests { assert_string(&large_string, &retrieved_record.fields[1]); // Assert metadata change and allocations were successful - assert!(heap_file.metadata.dirty.load(Ordering::Acquire)); - - let new_first_overflow_page = *heap_file.metadata.first_overflow_page.lock(); + let new_first_overflow_page = heap_file.metadata.lock().first_overflow_page; assert_ne!(new_first_overflow_page, first_overflow_page_id); let mut overflow_pages_count = 0; @@ -3395,9 +3389,8 @@ mod tests { let heap_file = factory.create_heap_file().unwrap(); // Store initial metadata state - let initial_first_record_page = *heap_file.metadata.first_record_page.lock(); + let initial_first_record_page = heap_file.metadata.lock().first_record_page; assert_eq!(initial_first_record_page, first_record_page_id); - assert!(!heap_file.metadata.dirty.load(Ordering::Acquire)); // Insert a large record that fills the first page let max_single_page_size = SlottedPage::<(), RecordPageHeader>::MAX_FREE_SPACE as usize @@ -3429,10 +3422,8 @@ mod tests { assert_ne!(second_record_ptr.page_id, first_record_page_id); assert_eq!(second_record_ptr.slot_id, 0); - // Verify metadata was changed and is dirty - assert!(heap_file.metadata.dirty.load(Ordering::Acquire)); - - let new_first_record_page = *heap_file.metadata.first_record_page.lock(); + // Verify metadata was changed + let new_first_record_page = heap_file.metadata.lock().first_record_page; assert_ne!(new_first_record_page, initial_first_record_page); assert_eq!(second_record_ptr.page_id, new_first_record_page); @@ -3512,16 +3503,17 @@ mod tests { // Metadata #[test] - fn heap_file_flush_metadata() { + fn heap_file_metadata_persisted_immediately_after_allocation() { let (cache, _, file_key) = setup_test_cache(); - setup_heap_file_structure(&cache, &file_key); + let (first_record_page_id, _) = setup_heap_file_structure(&cache, &file_key); // Create heap file let factory = create_test_heap_file_factory(cache.clone(), file_key.clone()); - let mut heap_file = factory.create_heap_file().unwrap(); + let heap_file = factory.create_heap_file().unwrap(); - // Initially metadata should not be dirty - assert!(!heap_file.metadata.dirty.load(Ordering::Acquire)); + // Capture initial metadata values + let initial_first_record_page = heap_file.metadata.lock().first_record_page; + assert_eq!(initial_first_record_page, first_record_page_id); // Trigger a metadata change by allocating a new record page let max_single_page_size = SlottedPage::<(), RecordPageHeader>::MAX_FREE_SPACE as usize @@ -3546,31 +3538,22 @@ mod tests { ]); heap_file.insert(small_record).unwrap(); - // Verify metadata is now dirty - assert!(heap_file.metadata.dirty.load(Ordering::Acquire)); - // Capture current metadata values - let first_record_page = *heap_file.metadata.first_record_page.lock(); - let first_overflow_page = *heap_file.metadata.first_overflow_page.lock(); - - // Flush metadata - heap_file.flush_metadata().unwrap(); + let metadata = heap_file.metadata.lock(); + let first_record_page = metadata.first_record_page; + let first_overflow_page = metadata.first_overflow_page; + drop(metadata); - // Verify metadata is no longer dirty - assert!(!heap_file.metadata.dirty.load(Ordering::Acquire)); + // Verify metadata was changed + assert_ne!(first_record_page, initial_first_record_page); - // Read metadata directly from disk to verify it was written + // Verify metadata is already persisted to disk (immediately after the insert) let metadata_page_ref = heap_file.file_page_ref(HeapFile::<4>::METADATA_PAGE_ID); let metadata_page = cache.pin_read(&metadata_page_ref).unwrap(); let disk_metadata = MetadataRepr::load_from_page(&metadata_page).unwrap(); assert_eq!(disk_metadata.first_record_page, first_record_page); assert_eq!(disk_metadata.first_overflow_page, first_overflow_page); - - // Try flushing again (should be no-op since metadata is clean) - heap_file.flush_metadata().unwrap(); - - assert!(!heap_file.metadata.dirty.load(Ordering::Acquire)); } // HeapFile updating record diff --git a/engine/src/heap_file/pages.rs b/engine/src/heap_file/pages.rs index e2774b1..98c82d6 100644 --- a/engine/src/heap_file/pages.rs +++ b/engine/src/heap_file/pages.rs @@ -1,7 +1,4 @@ -use std::sync::atomic::AtomicBool; - use bytemuck::{Pod, Zeroable}; -use parking_lot::Mutex; use storage::{ cache::{PageRead, PageWrite}, paged_file::PageId, @@ -17,17 +14,15 @@ use crate::{ /// Metadata of [`HeapFile`]. It's stored using bare [`PagedFile`]. pub(super) struct Metadata { - pub first_record_page: Mutex, - pub first_overflow_page: Mutex, - pub dirty: AtomicBool, + pub first_record_page: PageId, + pub first_overflow_page: PageId, } impl From<&MetadataRepr> for Metadata { fn from(value: &MetadataRepr) -> Self { Metadata { - first_record_page: Mutex::new(value.first_record_page), - first_overflow_page: Mutex::new(value.first_overflow_page), - dirty: AtomicBool::new(false), + first_record_page: value.first_record_page, + first_overflow_page: value.first_overflow_page, } } } @@ -60,11 +55,9 @@ impl MetadataRepr { impl From<&Metadata> for MetadataRepr { fn from(value: &Metadata) -> Self { - let first_record_page = *value.first_record_page.lock(); - let first_overflow_page = *value.first_overflow_page.lock(); MetadataRepr { - first_record_page, - first_overflow_page, + first_record_page: value.first_record_page, + first_overflow_page: value.first_overflow_page, } } } diff --git a/server/Cargo.toml b/server/Cargo.toml index 98b425c..12412c7 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -24,3 +24,6 @@ rmp-serde.workspace = true crossbeam.workspace = true tokio-util = "0.7" + +[dev-dependencies] +tempfile = "3.20.0" diff --git a/server/src/client_handler.rs b/server/src/client_handler.rs index 2990b70..6f7b184 100644 --- a/server/src/client_handler.rs +++ b/server/src/client_handler.rs @@ -448,16 +448,11 @@ mod client_handler_tests { ) { let executors = Arc::new(DashMap::new()); - let temp_dir = std::env::temp_dir().join(format!( - "codb_test_{}", - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_nanos() - )); + let temp_dir = tempfile::TempDir::new().expect("Failed to create temp dir"); let catalog_manager = Arc::new(RwLock::new( - CatalogManager::with_path(temp_dir).expect("Failed to create catalog manager"), + CatalogManager::with_path(temp_dir.path().to_path_buf()) + .expect("Failed to create catalog manager"), )); (executors, catalog_manager) }