From 86d613331451b249b7709b4ca86e0837e875f80e Mon Sep 17 00:00:00 2001 From: kTrzcinskii Date: Mon, 12 Jan 2026 16:28:38 +0100 Subject: [PATCH 1/4] FLush heap file metadata page on each allocation --- engine/src/heap_file/mod.rs | 146 +++++++++++++++------------------- engine/src/heap_file/pages.rs | 19 ++--- 2 files changed, 70 insertions(+), 95 deletions(-) diff --git a/engine/src/heap_file/mod.rs b/engine/src/heap_file/mod.rs index f0e76b7..89841ad 100644 --- a/engine/src/heap_file/mod.rs +++ b/engine/src/heap_file/mod.rs @@ -18,11 +18,7 @@ use crate::heap_file::{ record::{RecordHandle, RecordPtr, RecordTag}, }; -use std::{ - collections::VecDeque, - mem, - sync::{Arc, atomic::Ordering}, -}; +use std::{collections::VecDeque, mem, sync::Arc}; use metadata::catalog::ColumnMetadata; use parking_lot::Mutex; @@ -40,6 +36,12 @@ use storage::{ paged_file::{PageId, PagedFileError}, }; +/// Helper enum to specify which metadata field to update +enum PageTypeForMetadata { + Record, + Overflow, +} + /// Struct used for describing update of single field. pub struct FieldUpdateDescriptor { column: ColumnMetadata, @@ -152,7 +154,7 @@ impl<'hf, const BUCKETS_COUNT: usize> Iterator for AllRecordsIterator<'hf, BUCKE /// For concurrent access by multiple threads, wrap the instance in `Arc`. pub struct HeapFile { file_key: FileKey, - metadata: Metadata, + metadata: Mutex, record_pages_fsm: FreeSpaceMap, overflow_pages_fsm: FreeSpaceMap, cache: Arc, @@ -180,7 +182,7 @@ impl HeapFile { /// Reads all [`Record`]s stored in heap file. pub fn all_records(&'_ self) -> AllRecordsIterator<'_, BUCKETS_COUNT> { - let first_page_id = *self.metadata.first_record_page.lock(); + let first_page_id = self.metadata.lock().first_record_page; AllRecordsIterator::new(self, first_page_id) } @@ -285,27 +287,14 @@ impl HeapFile { } /// Flushes [`HeapFile::metadata`] content to disk. - /// - /// If metadata is not dirty then this is no-op. - pub fn flush_metadata(&mut self) -> Result<(), HeapFileError> { - match self - .metadata - .dirty - .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire) - { - Ok(_) => { - // Changed dirty from true to false, so metadata must be flushed - let key = self.file_page_ref(Self::METADATA_PAGE_ID); - let mut page = self.cache.pin_write(&key)?; - let repr = MetadataRepr::from(&self.metadata); - repr.write_to_page(&mut page); - Ok(()) - } - Err(_) => { - // Metadata was already clean - Ok(()) - } - } + pub fn flush_metadata(&self) -> Result<(), HeapFileError> { + let metadata = self.metadata.lock(); + let repr = MetadataRepr::from(&*metadata); + + let key = self.file_page_ref(Self::METADATA_PAGE_ID); + let mut page = self.cache.pin_write(&key)?; + repr.write_to_page(&mut page); + Ok(()) } /// Migrates all records to add a new column at the specified position with a default value. @@ -321,7 +310,7 @@ impl HeapFile { let mut chain = NoDroppingPageLockChain::empty(self)?; // Iterate through all record pages - let mut page_id = *self.metadata.first_record_page.lock(); + let mut page_id = self.metadata.lock().first_record_page; while page_id != RecordPageHeader::NO_NEXT_PAGE { chain.start_from_record_page(page_id)?; let record_page = chain.record_page(); @@ -363,7 +352,7 @@ impl HeapFile { let mut chain = NoDroppingPageLockChain::empty(self)?; // Iterate through all record pages - let mut page_id = *self.metadata.first_record_page.lock(); + let mut page_id = self.metadata.lock().first_record_page; while page_id != RecordPageHeader::NO_NEXT_PAGE { chain.start_from_record_page(page_id)?; let record_page = chain.record_page(); @@ -909,7 +898,7 @@ impl HeapFile { /// Generic helper for allocating a page and updating metadata fn allocate_page_with_metadata( &self, - metadata_lock: &Mutex, + page_type: PageTypeForMetadata, header_fn: F, ) -> Result<(PageId, HeapPage), HeapFileError> where @@ -919,10 +908,22 @@ impl HeapFile { let (page, page_id) = self.cache.allocate_page(&self.file_key)?; let old_first_page = { - let mut metadata_page_lock = metadata_lock.lock(); - let old = *metadata_page_lock; - *metadata_page_lock = page_id; - self.metadata.dirty.store(true, Ordering::Release); + let mut metadata = self.metadata.lock(); + let old = match page_type { + PageTypeForMetadata::Record => { + let old = metadata.first_record_page; + metadata.first_record_page = page_id; + old + } + PageTypeForMetadata::Overflow => { + let old = metadata.first_overflow_page; + metadata.first_overflow_page = page_id; + old + } + }; + drop(metadata); + // Flush metadata immediately + self.flush_metadata()?; old }; @@ -936,7 +937,7 @@ impl HeapFile { fn allocate_record_page( &self, ) -> Result<(PageId, HeapPage), HeapFileError> { - self.allocate_page_with_metadata(&self.metadata.first_record_page, |next_page| { + self.allocate_page_with_metadata(PageTypeForMetadata::Record, |next_page| { RecordPageHeader::new(next_page) }) } @@ -944,7 +945,7 @@ impl HeapFile { fn allocate_overflow_page( &self, ) -> Result<(PageId, HeapPage), HeapFileError> { - self.allocate_page_with_metadata(&self.metadata.first_overflow_page, |next_page| { + self.allocate_page_with_metadata(PageTypeForMetadata::Overflow, |next_page| { OverflowPageHeader::new(next_page) }) } @@ -1153,14 +1154,6 @@ impl HeapFile { } } -impl Drop for HeapFile { - fn drop(&mut self) { - if let Err(e) = self.flush_metadata() { - log::error!("failed to flush metadata while dropping HeapFile: {e}"); - } - } -} - /// Factory responsible for creating and loading existing [`HeapFile`]. pub struct HeapFileFactory { file_key: FileKey, @@ -1201,7 +1194,7 @@ impl HeapFileFactory { cache: self.cache, columns_metadata: self.columns_metadata, file_key: self.file_key, - metadata, + metadata: Mutex::new(metadata), record_pages_fsm, overflow_pages_fsm, }; @@ -1269,7 +1262,7 @@ mod tests { collections::HashSet, fs, ops::Deref, - sync::atomic::AtomicUsize, + sync::atomic::{AtomicUsize, Ordering}, thread, time::{Duration, SystemTime, UNIX_EPOCH}, }; @@ -2083,8 +2076,9 @@ mod tests { let heap_file = factory.create_heap_file().unwrap(); // Verify metadata was loaded - assert_ne!(*heap_file.metadata.first_record_page.lock(), 0); - assert_ne!(*heap_file.metadata.first_overflow_page.lock(), 0); + let metadata = heap_file.metadata.lock(); + assert_ne!(metadata.first_record_page, 0); + assert_ne!(metadata.first_overflow_page, 0); } #[test] @@ -2106,7 +2100,7 @@ mod tests { // The actual error will occur when trying to read from invalid page IDs if let Ok(heap_file) = result { let invalid_ptr = RecordPtr { - page_id: *heap_file.metadata.first_record_page.lock(), + page_id: heap_file.metadata.lock().first_record_page, slot_id: 0, }; let read_result = heap_file.record(&invalid_ptr); @@ -2122,8 +2116,9 @@ mod tests { let heap_file = factory.create_heap_file().unwrap(); // Verify metadata was created with correct initial values - let first_record_page = *heap_file.metadata.first_record_page.lock(); - let first_overflow_page = *heap_file.metadata.first_overflow_page.lock(); + let metadata = heap_file.metadata.lock(); + let first_record_page = metadata.first_record_page; + let first_overflow_page = metadata.first_overflow_page; assert_eq!(first_record_page, 2); assert_eq!(first_overflow_page, 3); @@ -3364,9 +3359,7 @@ mod tests { assert_string(&large_string, &retrieved_record.fields[1]); // Assert metadata change and allocations were successful - assert!(heap_file.metadata.dirty.load(Ordering::Acquire)); - - let new_first_overflow_page = *heap_file.metadata.first_overflow_page.lock(); + let new_first_overflow_page = heap_file.metadata.lock().first_overflow_page; assert_ne!(new_first_overflow_page, first_overflow_page_id); let mut overflow_pages_count = 0; @@ -3395,9 +3388,8 @@ mod tests { let heap_file = factory.create_heap_file().unwrap(); // Store initial metadata state - let initial_first_record_page = *heap_file.metadata.first_record_page.lock(); + let initial_first_record_page = heap_file.metadata.lock().first_record_page; assert_eq!(initial_first_record_page, first_record_page_id); - assert!(!heap_file.metadata.dirty.load(Ordering::Acquire)); // Insert a large record that fills the first page let max_single_page_size = SlottedPage::<(), RecordPageHeader>::MAX_FREE_SPACE as usize @@ -3429,10 +3421,8 @@ mod tests { assert_ne!(second_record_ptr.page_id, first_record_page_id); assert_eq!(second_record_ptr.slot_id, 0); - // Verify metadata was changed and is dirty - assert!(heap_file.metadata.dirty.load(Ordering::Acquire)); - - let new_first_record_page = *heap_file.metadata.first_record_page.lock(); + // Verify metadata was changed + let new_first_record_page = heap_file.metadata.lock().first_record_page; assert_ne!(new_first_record_page, initial_first_record_page); assert_eq!(second_record_ptr.page_id, new_first_record_page); @@ -3512,16 +3502,17 @@ mod tests { // Metadata #[test] - fn heap_file_flush_metadata() { + fn heap_file_metadata_persisted_immediately_after_allocation() { let (cache, _, file_key) = setup_test_cache(); - setup_heap_file_structure(&cache, &file_key); + let (first_record_page_id, _) = setup_heap_file_structure(&cache, &file_key); // Create heap file let factory = create_test_heap_file_factory(cache.clone(), file_key.clone()); - let mut heap_file = factory.create_heap_file().unwrap(); + let heap_file = factory.create_heap_file().unwrap(); - // Initially metadata should not be dirty - assert!(!heap_file.metadata.dirty.load(Ordering::Acquire)); + // Capture initial metadata values + let initial_first_record_page = heap_file.metadata.lock().first_record_page; + assert_eq!(initial_first_record_page, first_record_page_id); // Trigger a metadata change by allocating a new record page let max_single_page_size = SlottedPage::<(), RecordPageHeader>::MAX_FREE_SPACE as usize @@ -3546,31 +3537,22 @@ mod tests { ]); heap_file.insert(small_record).unwrap(); - // Verify metadata is now dirty - assert!(heap_file.metadata.dirty.load(Ordering::Acquire)); - // Capture current metadata values - let first_record_page = *heap_file.metadata.first_record_page.lock(); - let first_overflow_page = *heap_file.metadata.first_overflow_page.lock(); + let metadata = heap_file.metadata.lock(); + let first_record_page = metadata.first_record_page; + let first_overflow_page = metadata.first_overflow_page; + drop(metadata); - // Flush metadata - heap_file.flush_metadata().unwrap(); + // Verify metadata was changed + assert_ne!(first_record_page, initial_first_record_page); - // Verify metadata is no longer dirty - assert!(!heap_file.metadata.dirty.load(Ordering::Acquire)); - - // Read metadata directly from disk to verify it was written + // Verify metadata is already persisted to disk (immediately after the insert) let metadata_page_ref = heap_file.file_page_ref(HeapFile::<4>::METADATA_PAGE_ID); let metadata_page = cache.pin_read(&metadata_page_ref).unwrap(); let disk_metadata = MetadataRepr::load_from_page(&metadata_page).unwrap(); assert_eq!(disk_metadata.first_record_page, first_record_page); assert_eq!(disk_metadata.first_overflow_page, first_overflow_page); - - // Try flushing again (should be no-op since metadata is clean) - heap_file.flush_metadata().unwrap(); - - assert!(!heap_file.metadata.dirty.load(Ordering::Acquire)); } // HeapFile updating record diff --git a/engine/src/heap_file/pages.rs b/engine/src/heap_file/pages.rs index e2774b1..98c82d6 100644 --- a/engine/src/heap_file/pages.rs +++ b/engine/src/heap_file/pages.rs @@ -1,7 +1,4 @@ -use std::sync::atomic::AtomicBool; - use bytemuck::{Pod, Zeroable}; -use parking_lot::Mutex; use storage::{ cache::{PageRead, PageWrite}, paged_file::PageId, @@ -17,17 +14,15 @@ use crate::{ /// Metadata of [`HeapFile`]. It's stored using bare [`PagedFile`]. pub(super) struct Metadata { - pub first_record_page: Mutex, - pub first_overflow_page: Mutex, - pub dirty: AtomicBool, + pub first_record_page: PageId, + pub first_overflow_page: PageId, } impl From<&MetadataRepr> for Metadata { fn from(value: &MetadataRepr) -> Self { Metadata { - first_record_page: Mutex::new(value.first_record_page), - first_overflow_page: Mutex::new(value.first_overflow_page), - dirty: AtomicBool::new(false), + first_record_page: value.first_record_page, + first_overflow_page: value.first_overflow_page, } } } @@ -60,11 +55,9 @@ impl MetadataRepr { impl From<&Metadata> for MetadataRepr { fn from(value: &Metadata) -> Self { - let first_record_page = *value.first_record_page.lock(); - let first_overflow_page = *value.first_overflow_page.lock(); MetadataRepr { - first_record_page, - first_overflow_page, + first_record_page: value.first_record_page, + first_overflow_page: value.first_overflow_page, } } } From 924bffed57cc33c2cbe3e6ef3cc19c9a13acf1ac Mon Sep 17 00:00:00 2001 From: kTrzcinskii Date: Mon, 12 Jan 2026 17:04:44 +0100 Subject: [PATCH 2/4] fix tempfile in server --- Cargo.lock | 1 + server/Cargo.toml | 3 +++ server/src/client_handler.rs | 10 +++------- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6742468..ae0df7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -879,6 +879,7 @@ dependencies = [ "rmp-serde", "serde_json", "storage", + "tempfile", "thiserror", "tokio", "tokio-util", diff --git a/server/Cargo.toml b/server/Cargo.toml index 98b425c..12412c7 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -24,3 +24,6 @@ rmp-serde.workspace = true crossbeam.workspace = true tokio-util = "0.7" + +[dev-dependencies] +tempfile = "3.20.0" diff --git a/server/src/client_handler.rs b/server/src/client_handler.rs index 2990b70..d7e3396 100644 --- a/server/src/client_handler.rs +++ b/server/src/client_handler.rs @@ -448,13 +448,9 @@ mod client_handler_tests { ) { let executors = Arc::new(DashMap::new()); - let temp_dir = std::env::temp_dir().join(format!( - "codb_test_{}", - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_nanos() - )); + let temp_dir = tempfile::TempDir::new() + .expect("Failed to create temp dir") + .keep(); let catalog_manager = Arc::new(RwLock::new( CatalogManager::with_path(temp_dir).expect("Failed to create catalog manager"), From 6c0a9e70fca680e0cc0c46eecfb0a4b0119152fc Mon Sep 17 00:00:00 2001 From: kTrzcinskii Date: Mon, 12 Jan 2026 17:22:06 +0100 Subject: [PATCH 3/4] only flush after header was successfully changed --- engine/src/heap_file/mod.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/engine/src/heap_file/mod.rs b/engine/src/heap_file/mod.rs index 89841ad..1f8bb56 100644 --- a/engine/src/heap_file/mod.rs +++ b/engine/src/heap_file/mod.rs @@ -909,7 +909,7 @@ impl HeapFile { let old_first_page = { let mut metadata = self.metadata.lock(); - let old = match page_type { + match page_type { PageTypeForMetadata::Record => { let old = metadata.first_record_page; metadata.first_record_page = page_id; @@ -920,17 +920,15 @@ impl HeapFile { metadata.first_overflow_page = page_id; old } - }; - drop(metadata); - // Flush metadata immediately - self.flush_metadata()?; - old + } }; let header = header_fn(old_first_page); let slotted_page = SlottedPage::initialize_with_header(page, header)?; let heap_page = HeapPage::new(slotted_page); + self.flush_metadata()?; + Ok((page_id, heap_page)) } From 75e275a2b8d6a3b1325e9b740e319ff3fbc53afc Mon Sep 17 00:00:00 2001 From: kTrzcinskii Date: Mon, 12 Jan 2026 17:51:30 +0100 Subject: [PATCH 4/4] fixen --- engine/src/heap_file/mod.rs | 25 ++++++++++++++----------- server/src/client_handler.rs | 7 +++---- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/engine/src/heap_file/mod.rs b/engine/src/heap_file/mod.rs index 1f8bb56..48d744e 100644 --- a/engine/src/heap_file/mod.rs +++ b/engine/src/heap_file/mod.rs @@ -908,18 +908,10 @@ impl HeapFile { let (page, page_id) = self.cache.allocate_page(&self.file_key)?; let old_first_page = { - let mut metadata = self.metadata.lock(); + let metadata = self.metadata.lock(); match page_type { - PageTypeForMetadata::Record => { - let old = metadata.first_record_page; - metadata.first_record_page = page_id; - old - } - PageTypeForMetadata::Overflow => { - let old = metadata.first_overflow_page; - metadata.first_overflow_page = page_id; - old - } + PageTypeForMetadata::Record => metadata.first_record_page, + PageTypeForMetadata::Overflow => metadata.first_overflow_page, } }; @@ -927,6 +919,17 @@ impl HeapFile { let slotted_page = SlottedPage::initialize_with_header(page, header)?; let heap_page = HeapPage::new(slotted_page); + let mut metadata = self.metadata.lock(); + match page_type { + PageTypeForMetadata::Record => { + metadata.first_record_page = page_id; + } + PageTypeForMetadata::Overflow => { + metadata.first_overflow_page = page_id; + } + } + drop(metadata); + self.flush_metadata()?; Ok((page_id, heap_page)) diff --git a/server/src/client_handler.rs b/server/src/client_handler.rs index d7e3396..6f7b184 100644 --- a/server/src/client_handler.rs +++ b/server/src/client_handler.rs @@ -448,12 +448,11 @@ mod client_handler_tests { ) { let executors = Arc::new(DashMap::new()); - let temp_dir = tempfile::TempDir::new() - .expect("Failed to create temp dir") - .keep(); + let temp_dir = tempfile::TempDir::new().expect("Failed to create temp dir"); let catalog_manager = Arc::new(RwLock::new( - CatalogManager::with_path(temp_dir).expect("Failed to create catalog manager"), + CatalogManager::with_path(temp_dir.path().to_path_buf()) + .expect("Failed to create catalog manager"), )); (executors, catalog_manager) }