Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

149 changes: 66 additions & 83 deletions engine/src/heap_file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@ use crate::heap_file::{
record::{RecordHandle, RecordPtr, RecordTag},
};

use std::{
collections::VecDeque,
mem,
sync::{Arc, atomic::Ordering},
};
use std::{collections::VecDeque, mem, sync::Arc};

use metadata::catalog::ColumnMetadata;
use parking_lot::Mutex;
Expand All @@ -40,6 +36,12 @@ use storage::{
paged_file::{PageId, PagedFileError},
};

/// Helper enum to specify which metadata field to update
enum PageTypeForMetadata {
Record,
Overflow,
}

/// Struct used for describing update of single field.
pub struct FieldUpdateDescriptor {
column: ColumnMetadata,
Expand Down Expand Up @@ -152,7 +154,7 @@ impl<'hf, const BUCKETS_COUNT: usize> Iterator for AllRecordsIterator<'hf, BUCKE
/// For concurrent access by multiple threads, wrap the instance in `Arc<HeapFile>`.
pub struct HeapFile<const BUCKETS_COUNT: usize> {
file_key: FileKey,
metadata: Metadata,
metadata: Mutex<Metadata>,
record_pages_fsm: FreeSpaceMap<BUCKETS_COUNT, RecordPageHeader>,
overflow_pages_fsm: FreeSpaceMap<BUCKETS_COUNT, OverflowPageHeader>,
cache: Arc<Cache>,
Expand Down Expand Up @@ -180,7 +182,7 @@ impl<const BUCKETS_COUNT: usize> HeapFile<BUCKETS_COUNT> {

/// Reads all [`Record`]s stored in heap file.
pub fn all_records(&'_ self) -> AllRecordsIterator<'_, BUCKETS_COUNT> {
let first_page_id = *self.metadata.first_record_page.lock();
let first_page_id = self.metadata.lock().first_record_page;
AllRecordsIterator::new(self, first_page_id)
}

Expand Down Expand Up @@ -285,27 +287,14 @@ impl<const BUCKETS_COUNT: usize> HeapFile<BUCKETS_COUNT> {
}

/// Flushes [`HeapFile::metadata`] content to disk.
///
/// If metadata is not dirty then this is no-op.
pub fn flush_metadata(&mut self) -> Result<(), HeapFileError> {
match self
.metadata
.dirty
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
{
Ok(_) => {
// Changed dirty from true to false, so metadata must be flushed
let key = self.file_page_ref(Self::METADATA_PAGE_ID);
let mut page = self.cache.pin_write(&key)?;
let repr = MetadataRepr::from(&self.metadata);
repr.write_to_page(&mut page);
Ok(())
}
Err(_) => {
// Metadata was already clean
Ok(())
}
}
pub fn flush_metadata(&self) -> Result<(), HeapFileError> {
let metadata = self.metadata.lock();
let repr = MetadataRepr::from(&*metadata);

let key = self.file_page_ref(Self::METADATA_PAGE_ID);
let mut page = self.cache.pin_write(&key)?;
repr.write_to_page(&mut page);
Ok(())
}

/// Migrates all records to add a new column at the specified position with a default value.
Expand All @@ -321,7 +310,7 @@ impl<const BUCKETS_COUNT: usize> HeapFile<BUCKETS_COUNT> {
let mut chain = NoDroppingPageLockChain::empty(self)?;

// Iterate through all record pages
let mut page_id = *self.metadata.first_record_page.lock();
let mut page_id = self.metadata.lock().first_record_page;
while page_id != RecordPageHeader::NO_NEXT_PAGE {
chain.start_from_record_page(page_id)?;
let record_page = chain.record_page();
Expand Down Expand Up @@ -363,7 +352,7 @@ impl<const BUCKETS_COUNT: usize> HeapFile<BUCKETS_COUNT> {
let mut chain = NoDroppingPageLockChain::empty(self)?;

// Iterate through all record pages
let mut page_id = *self.metadata.first_record_page.lock();
let mut page_id = self.metadata.lock().first_record_page;
while page_id != RecordPageHeader::NO_NEXT_PAGE {
chain.start_from_record_page(page_id)?;
let record_page = chain.record_page();
Expand Down Expand Up @@ -909,7 +898,7 @@ impl<const BUCKETS_COUNT: usize> HeapFile<BUCKETS_COUNT> {
/// Generic helper for allocating a page and updating metadata
fn allocate_page_with_metadata<H, F>(
&self,
metadata_lock: &Mutex<PageId>,
page_type: PageTypeForMetadata,
header_fn: F,
) -> Result<(PageId, HeapPage<PinnedWritePage, H>), HeapFileError>
where
Expand All @@ -919,32 +908,45 @@ impl<const BUCKETS_COUNT: usize> HeapFile<BUCKETS_COUNT> {
let (page, page_id) = self.cache.allocate_page(&self.file_key)?;

let old_first_page = {
let mut metadata_page_lock = metadata_lock.lock();
let old = *metadata_page_lock;
*metadata_page_lock = page_id;
self.metadata.dirty.store(true, Ordering::Release);
old
let metadata = self.metadata.lock();
match page_type {
PageTypeForMetadata::Record => metadata.first_record_page,
PageTypeForMetadata::Overflow => metadata.first_overflow_page,
}
};

let header = header_fn(old_first_page);
let slotted_page = SlottedPage::initialize_with_header(page, header)?;
let heap_page = HeapPage::new(slotted_page);

let mut metadata = self.metadata.lock();
match page_type {
PageTypeForMetadata::Record => {
metadata.first_record_page = page_id;
}
PageTypeForMetadata::Overflow => {
metadata.first_overflow_page = page_id;
}
}
drop(metadata);

self.flush_metadata()?;

Ok((page_id, heap_page))
}

fn allocate_record_page(
&self,
) -> Result<(PageId, HeapPage<PinnedWritePage, RecordPageHeader>), HeapFileError> {
self.allocate_page_with_metadata(&self.metadata.first_record_page, |next_page| {
self.allocate_page_with_metadata(PageTypeForMetadata::Record, |next_page| {
RecordPageHeader::new(next_page)
})
}

fn allocate_overflow_page(
&self,
) -> Result<(PageId, HeapPage<PinnedWritePage, OverflowPageHeader>), HeapFileError> {
self.allocate_page_with_metadata(&self.metadata.first_overflow_page, |next_page| {
self.allocate_page_with_metadata(PageTypeForMetadata::Overflow, |next_page| {
OverflowPageHeader::new(next_page)
})
}
Expand Down Expand Up @@ -1153,14 +1155,6 @@ impl<const BUCKETS_COUNT: usize> HeapFile<BUCKETS_COUNT> {
}
}

impl<const BUCKETS_COUNT: usize> Drop for HeapFile<BUCKETS_COUNT> {
fn drop(&mut self) {
if let Err(e) = self.flush_metadata() {
log::error!("failed to flush metadata while dropping HeapFile: {e}");
}
}
}

/// Factory responsible for creating and loading existing [`HeapFile`].
pub struct HeapFileFactory<const BUCKETS_COUNT: usize> {
file_key: FileKey,
Expand Down Expand Up @@ -1201,7 +1195,7 @@ impl<const BUCKETS_COUNT: usize> HeapFileFactory<BUCKETS_COUNT> {
cache: self.cache,
columns_metadata: self.columns_metadata,
file_key: self.file_key,
metadata,
metadata: Mutex::new(metadata),
record_pages_fsm,
overflow_pages_fsm,
};
Expand Down Expand Up @@ -1269,7 +1263,7 @@ mod tests {
collections::HashSet,
fs,
ops::Deref,
sync::atomic::AtomicUsize,
sync::atomic::{AtomicUsize, Ordering},
thread,
time::{Duration, SystemTime, UNIX_EPOCH},
};
Expand Down Expand Up @@ -2083,8 +2077,9 @@ mod tests {
let heap_file = factory.create_heap_file().unwrap();

// Verify metadata was loaded
assert_ne!(*heap_file.metadata.first_record_page.lock(), 0);
assert_ne!(*heap_file.metadata.first_overflow_page.lock(), 0);
let metadata = heap_file.metadata.lock();
assert_ne!(metadata.first_record_page, 0);
assert_ne!(metadata.first_overflow_page, 0);
}

#[test]
Expand All @@ -2106,7 +2101,7 @@ mod tests {
// The actual error will occur when trying to read from invalid page IDs
if let Ok(heap_file) = result {
let invalid_ptr = RecordPtr {
page_id: *heap_file.metadata.first_record_page.lock(),
page_id: heap_file.metadata.lock().first_record_page,
slot_id: 0,
};
let read_result = heap_file.record(&invalid_ptr);
Expand All @@ -2122,8 +2117,9 @@ mod tests {
let heap_file = factory.create_heap_file().unwrap();

// Verify metadata was created with correct initial values
let first_record_page = *heap_file.metadata.first_record_page.lock();
let first_overflow_page = *heap_file.metadata.first_overflow_page.lock();
let metadata = heap_file.metadata.lock();
let first_record_page = metadata.first_record_page;
let first_overflow_page = metadata.first_overflow_page;
assert_eq!(first_record_page, 2);
assert_eq!(first_overflow_page, 3);

Expand Down Expand Up @@ -3364,9 +3360,7 @@ mod tests {
assert_string(&large_string, &retrieved_record.fields[1]);

// Assert metadata change and allocations were successful
assert!(heap_file.metadata.dirty.load(Ordering::Acquire));

let new_first_overflow_page = *heap_file.metadata.first_overflow_page.lock();
let new_first_overflow_page = heap_file.metadata.lock().first_overflow_page;
assert_ne!(new_first_overflow_page, first_overflow_page_id);

let mut overflow_pages_count = 0;
Expand Down Expand Up @@ -3395,9 +3389,8 @@ mod tests {
let heap_file = factory.create_heap_file().unwrap();

// Store initial metadata state
let initial_first_record_page = *heap_file.metadata.first_record_page.lock();
let initial_first_record_page = heap_file.metadata.lock().first_record_page;
assert_eq!(initial_first_record_page, first_record_page_id);
assert!(!heap_file.metadata.dirty.load(Ordering::Acquire));

// Insert a large record that fills the first page
let max_single_page_size = SlottedPage::<(), RecordPageHeader>::MAX_FREE_SPACE as usize
Expand Down Expand Up @@ -3429,10 +3422,8 @@ mod tests {
assert_ne!(second_record_ptr.page_id, first_record_page_id);
assert_eq!(second_record_ptr.slot_id, 0);

// Verify metadata was changed and is dirty
assert!(heap_file.metadata.dirty.load(Ordering::Acquire));

let new_first_record_page = *heap_file.metadata.first_record_page.lock();
// Verify metadata was changed
let new_first_record_page = heap_file.metadata.lock().first_record_page;
assert_ne!(new_first_record_page, initial_first_record_page);
assert_eq!(second_record_ptr.page_id, new_first_record_page);

Expand Down Expand Up @@ -3512,16 +3503,17 @@ mod tests {
// Metadata

#[test]
fn heap_file_flush_metadata() {
fn heap_file_metadata_persisted_immediately_after_allocation() {
let (cache, _, file_key) = setup_test_cache();
setup_heap_file_structure(&cache, &file_key);
let (first_record_page_id, _) = setup_heap_file_structure(&cache, &file_key);

// Create heap file
let factory = create_test_heap_file_factory(cache.clone(), file_key.clone());
let mut heap_file = factory.create_heap_file().unwrap();
let heap_file = factory.create_heap_file().unwrap();

// Initially metadata should not be dirty
assert!(!heap_file.metadata.dirty.load(Ordering::Acquire));
// Capture initial metadata values
let initial_first_record_page = heap_file.metadata.lock().first_record_page;
assert_eq!(initial_first_record_page, first_record_page_id);

// Trigger a metadata change by allocating a new record page
let max_single_page_size = SlottedPage::<(), RecordPageHeader>::MAX_FREE_SPACE as usize
Expand All @@ -3546,31 +3538,22 @@ mod tests {
]);
heap_file.insert(small_record).unwrap();

// Verify metadata is now dirty
assert!(heap_file.metadata.dirty.load(Ordering::Acquire));

// Capture current metadata values
let first_record_page = *heap_file.metadata.first_record_page.lock();
let first_overflow_page = *heap_file.metadata.first_overflow_page.lock();

// Flush metadata
heap_file.flush_metadata().unwrap();
let metadata = heap_file.metadata.lock();
let first_record_page = metadata.first_record_page;
let first_overflow_page = metadata.first_overflow_page;
drop(metadata);

// Verify metadata is no longer dirty
assert!(!heap_file.metadata.dirty.load(Ordering::Acquire));
// Verify metadata was changed
assert_ne!(first_record_page, initial_first_record_page);

// Read metadata directly from disk to verify it was written
// Verify metadata is already persisted to disk (immediately after the insert)
let metadata_page_ref = heap_file.file_page_ref(HeapFile::<4>::METADATA_PAGE_ID);
let metadata_page = cache.pin_read(&metadata_page_ref).unwrap();
let disk_metadata = MetadataRepr::load_from_page(&metadata_page).unwrap();

assert_eq!(disk_metadata.first_record_page, first_record_page);
assert_eq!(disk_metadata.first_overflow_page, first_overflow_page);

// Try flushing again (should be no-op since metadata is clean)
heap_file.flush_metadata().unwrap();

assert!(!heap_file.metadata.dirty.load(Ordering::Acquire));
}

// HeapFile updating record
Expand Down
19 changes: 6 additions & 13 deletions engine/src/heap_file/pages.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::sync::atomic::AtomicBool;

use bytemuck::{Pod, Zeroable};
use parking_lot::Mutex;
use storage::{
cache::{PageRead, PageWrite},
paged_file::PageId,
Expand All @@ -17,17 +14,15 @@ use crate::{

/// Metadata of [`HeapFile`]. It's stored using bare [`PagedFile`].
pub(super) struct Metadata {
pub first_record_page: Mutex<PageId>,
pub first_overflow_page: Mutex<PageId>,
pub dirty: AtomicBool,
pub first_record_page: PageId,
pub first_overflow_page: PageId,
}

impl From<&MetadataRepr> for Metadata {
fn from(value: &MetadataRepr) -> Self {
Metadata {
first_record_page: Mutex::new(value.first_record_page),
first_overflow_page: Mutex::new(value.first_overflow_page),
dirty: AtomicBool::new(false),
first_record_page: value.first_record_page,
first_overflow_page: value.first_overflow_page,
}
}
}
Expand Down Expand Up @@ -60,11 +55,9 @@ impl MetadataRepr {

impl From<&Metadata> for MetadataRepr {
fn from(value: &Metadata) -> Self {
let first_record_page = *value.first_record_page.lock();
let first_overflow_page = *value.first_overflow_page.lock();
MetadataRepr {
first_record_page,
first_overflow_page,
first_record_page: value.first_record_page,
first_overflow_page: value.first_overflow_page,
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@ rmp-serde.workspace = true
crossbeam.workspace = true

tokio-util = "0.7"

[dev-dependencies]
tempfile = "3.20.0"
Loading
Loading