diff --git a/storage/src/compaction/compact.rs b/storage/src/compaction/compact.rs new file mode 100644 index 0000000..a0e2d1b --- /dev/null +++ b/storage/src/compaction/compact.rs @@ -0,0 +1,120 @@ +use std::{ + fs::remove_file, + sync::Arc, + thread::{self, JoinHandle}, + time::Duration, +}; + +use anyhow::{Ok, Result}; + +use crate::{ + SSTableBuilder, SSTableIterator, Storage, common::iterator::StorageIterator, + iterators::merged_iterator::MergedIterator, lsm_storage::StorageState, +}; +const COMPACT_INTERVAL: Duration = Duration::from_secs(60); + +impl Storage { + pub fn trigger_compaction(&self) -> Result<()> { + let state = { + let guard = self.state.read().unwrap(); + guard.clone() + }; + let ssts_to_compact = state.l0_sstables.clone(); + if ssts_to_compact.is_empty() { + return Ok(()); + } + + let mut iters = vec![]; + for sst_id in ssts_to_compact.iter() { + let sstable = state.sstables[sst_id].clone(); + let iter = SSTableIterator::create_and_seek_to_first(sstable).unwrap(); + iters.push(iter); + } + + let mut merged_iter = MergedIterator::new(iters); + let mut builder = SSTableBuilder::new(self.config.block_size); + while merged_iter.is_valid() { + builder.add(merged_iter.key(), merged_iter.value()); + merged_iter.next()?; + } + + let id = self.get_sst_id(); + let table = builder.build(id, self.block_cache.clone(), self.sst_path(id))?; + + let mut write_guard = self.state.write().unwrap(); + let mut l0_sstables = write_guard.l0_sstables.clone(); + let mut sstables = write_guard.sstables.clone(); + let mut levels = write_guard.levels.clone(); + + for sst_id in ssts_to_compact.iter() { + sstables.remove(sst_id); + l0_sstables.retain(|&x| x != *sst_id); + + remove_file(self.sst_path(*sst_id))?; + } + + sstables.insert(id, Arc::new(table)); + levels[0].1.insert(0, id); + + *write_guard = Arc::new(StorageState { + memtable: write_guard.memtable.clone(), + frozen_memtables: write_guard.frozen_memtables.clone(), + l0_sstables, + sstables, + levels, + }); + + Ok(()) + } + + pub fn spawn_compacter(self: &Arc) -> JoinHandle<()> { + let this = self.clone(); + + thread::spawn(move || { + loop { + this.trigger_compaction().expect("sst compaction failed"); + thread::sleep(COMPACT_INTERVAL); + } + }) + } +} + +#[cfg(test)] +mod tests { + use tempfile::tempdir; + + use crate::{Config, lsm_util::get_entries, new}; + + #[test] + fn test_compaction() { + let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap()); + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: db_dir.clone(), + num_memtable_limit: 5, + }; + let storage = new(config); + + for (key, value) in get_entries() { + storage.put(key, value).unwrap(); + } + + // create 2 sstables + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + + let initial_sst_count = storage.state.read().unwrap().l0_sstables.len(); + storage.trigger_compaction().expect("ssts were compacted"); + + let curr_sst_count = storage.state.read().unwrap().l0_sstables.len(); + let l1_entries = storage.state.read().unwrap().levels[0].1.clone(); + + assert_eq!(curr_sst_count, initial_sst_count - 2); + assert_eq!(l1_entries.len(), 1); + } +} diff --git a/storage/src/compaction/flush.rs b/storage/src/compaction/flush.rs index 79ed1ad..acd6a7b 100644 --- a/storage/src/compaction/flush.rs +++ b/storage/src/compaction/flush.rs @@ -7,6 +7,8 @@ use std::{ use crate::{SSTableBuilder, Storage, lsm_storage::StorageState}; +const FLUSH_INTERVAL: Duration = Duration::from_millis(50); + impl Storage { pub(crate) fn flush_frozen_memtable(&self) -> Result<()> { let mut sst_builder = SSTableBuilder::new(self.config.block_size); @@ -26,12 +28,13 @@ impl Storage { self.block_cache.clone(), self.sst_path(memtable.id), )?; - l0_sstables.push(memtable.id); + l0_sstables.insert(0, memtable.id); sstables.insert(memtable.id, Arc::new(sst)); *guard = Arc::new(StorageState { memtable: guard.memtable.clone(), frozen_memtables: memtables, + levels: guard.levels.clone(), l0_sstables, sstables, }); @@ -52,19 +55,18 @@ impl Storage { Ok(()) } - fn sst_path(&self, id: usize) -> String { + pub fn sst_path(&self, id: usize) -> String { format!("{}/sst/{}.sst", self.config.db_dir, id) } -} -// TODO: suppot msg passing -pub fn spawn_flusher(storage: Arc) -> JoinHandle<()> { - let this = storage.clone(); + pub fn spawn_flusher(self: &Arc) -> JoinHandle<()> { + let this = self.clone(); - thread::spawn(move || { - loop { - this.trigger_flush().expect("memtable to have been flushed"); - thread::sleep(Duration::from_millis(50)); - } - }) + thread::spawn(move || { + loop { + this.trigger_flush().expect("memtable to have been flushed"); + thread::sleep(FLUSH_INTERVAL); + } + }) + } } diff --git a/storage/src/compaction/mod.rs b/storage/src/compaction/mod.rs index 73fe11f..8e59afc 100644 --- a/storage/src/compaction/mod.rs +++ b/storage/src/compaction/mod.rs @@ -1 +1,2 @@ +pub mod compact; pub mod flush; diff --git a/storage/src/iterators/concat_iterator.rs b/storage/src/iterators/concat_iterator.rs new file mode 100644 index 0000000..e69de29 diff --git a/storage/src/iterators/mod.rs b/storage/src/iterators/mod.rs index 8ec2758..85d0fa8 100644 --- a/storage/src/iterators/mod.rs +++ b/storage/src/iterators/mod.rs @@ -1,3 +1,4 @@ +pub mod concat_iterator; pub mod lsm_iterator; pub mod merged_iterator; pub mod two_merge_iterator; diff --git a/storage/src/lsm_storage.rs b/storage/src/lsm_storage.rs index 9bfb275..fe177cd 100644 --- a/storage/src/lsm_storage.rs +++ b/storage/src/lsm_storage.rs @@ -37,6 +37,7 @@ pub(crate) struct StorageState { pub(crate) frozen_memtables: Vec>, pub(crate) l0_sstables: Vec, pub(crate) sstables: HashMap>, + pub(crate) levels: Vec<(usize, Vec)>, } #[derive(Debug)] @@ -64,10 +65,11 @@ pub fn new(config: Config) -> Arc { state_lock: Mutex::new(()), block_cache: Arc::new(BlockCache::new(1 << 20)), // 4gb cache state: RwLock::new(Arc::new(StorageState { - memtable: Arc::new(Memtable::new(sst_id)), - frozen_memtables: Vec::new(), l0_sstables, sstables, + levels: vec![(0, vec![])], + frozen_memtables: Vec::new(), + memtable: Arc::new(Memtable::new(sst_id)), })), }) } @@ -189,22 +191,23 @@ impl Storage { let mut frozen_memtables = guard.frozen_memtables.clone(); frozen_memtables.insert(0, memtable); - self.inc_sst_id(); - let id = self.sst_id.load(SeqCst); + let id = self.get_sst_id(); *guard = Arc::new(StorageState { memtable: Arc::new(Memtable::new(id)), frozen_memtables, l0_sstables: guard.l0_sstables.clone(), sstables: guard.sstables.clone(), + levels: guard.levels.clone(), }); drop(guard); } } - fn inc_sst_id(&self) -> usize { - self.sst_id.fetch_add(1, SeqCst) + pub(crate) fn get_sst_id(&self) -> usize { + self.sst_id.fetch_add(1, SeqCst); + self.sst_id.load(SeqCst) } } @@ -404,4 +407,67 @@ mod tests { assert_eq!(keys, vec!["a", "b", "c", "d", "e", "f"]); assert_eq!(values, vec!["20", "23", "3", "22", "21", "6"]); } + + #[test] + fn reads_the_latest_version_of_a_key() { + let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap()); + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: db_dir.clone(), + num_memtable_limit: 5, + }; + let storage = new(config); + + for (key, value) in get_entries() { + storage.put(key, value).unwrap(); + } + + // will create sstables with a, b, c, d, e & f + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + + // new storage instance + let config = Config { + sst_size: 4, + block_size: 32, + db_dir: db_dir.clone(), + num_memtable_limit: 5, + }; + + let new_entries = vec![(b"a", b"20"), (b"e", b"21"), (b"d", b"22"), (b"b", b"23")]; + let storage = new(config); + for (key, value) in new_entries { + let _ = storage.put(key, value); + } + + // this will create an sst with a & e + storage + .flush_frozen_memtable() + .expect("memtable to have been frozen"); + + let mut iter = storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(); + let mut keys = vec![]; + let mut values = vec![]; + + while iter.is_valid() { + let k = from_utf8(iter.key()).unwrap(); + let v = from_utf8(iter.value()).unwrap(); + + keys.push(String::from(k)); + values.push(String::from(v)); + + let _ = iter.next(); + } + + assert_eq!(keys, vec!["a", "b", "c", "d", "e", "f"]); + assert_eq!(values, vec!["20", "23", "3", "22", "21", "6"]); + } } diff --git a/storage/src/lsm_util.rs b/storage/src/lsm_util.rs index d6507e9..bf70e64 100644 --- a/storage/src/lsm_util.rs +++ b/storage/src/lsm_util.rs @@ -1,39 +1,37 @@ -use std::{collections::HashMap, fs, path::Path, sync::Arc}; - -use anyhow::{Result, anyhow}; +use anyhow::Result; +use std::{cmp::Reverse, collections::HashMap, fs, io, path::Path, sync::Arc, time::SystemTime}; use crate::{FileObject, SSTable, sst::BlockCache}; -pub(crate) fn load_sstables( - path: &Path, - block_cache: Arc, -) -> Result<(Vec, HashMap>)> { +type LoadedSstables = (Vec, HashMap>); + +fn read_dir_sorted>(path: P) -> io::Result> { + let mut entries: Vec = fs::read_dir(path)?.filter_map(Result::ok).collect(); + entries.sort_by_key(|entry| { + Reverse( + entry + .metadata() + .and_then(|m| m.modified()) + .unwrap_or(SystemTime::UNIX_EPOCH), + ) + }); + Ok(entries) +} + +pub(crate) fn load_sstables(path: &Path, block_cache: Arc) -> Result { let mut l0_sstables = vec![]; let mut sstables = HashMap::new(); - for entry in fs::read_dir(path.join("sst")).unwrap() { - match entry { - Ok(dir_entry) => { - let sst_path = dir_entry.path(); - - let split_path = sst_path - .file_name() - .unwrap() - .to_str() - .unwrap() - .split(".") - .collect::>(); - - let sst_id = split_path.first().unwrap().parse().unwrap(); - let file = FileObject::open(sst_path.as_path()).expect("failed to open file"); - let sst = SSTable::open(sst_id, block_cache.clone(), file) - .expect("failed to open sstable"); - - l0_sstables.push(sst.id); - sstables.insert(sst.id, Arc::new(sst)); - } - Err(err) => return Err(anyhow!("{:?}", err)), - } + for entry in read_dir_sorted(path.join("sst")).unwrap() { + let sst_path = entry.path(); + let filename = sst_path.file_name().unwrap().to_str().unwrap(); + let sst_id = filename.rsplit_once(".").unwrap().0.parse().unwrap(); + + let file = FileObject::open(sst_path.as_path()).expect("failed to open file"); + let sst = SSTable::open(sst_id, block_cache.clone(), file).expect("failed to open sstable"); + + l0_sstables.push(sst.id); + sstables.insert(sst.id, Arc::new(sst)); } anyhow::Ok((l0_sstables, sstables))