Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions storage/src/compaction/compact.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use std::{
fs::remove_file,
sync::Arc,
thread::{self, JoinHandle},
time::Duration,
};

use anyhow::{Ok, Result};

use crate::{
SSTableBuilder, SSTableIterator, Storage, common::iterator::StorageIterator,
iterators::merged_iterator::MergedIterator, lsm_storage::StorageState,
};
const COMPACT_INTERVAL: Duration = Duration::from_secs(60);

impl Storage {
pub fn trigger_compaction(&self) -> Result<()> {
let state = {
let guard = self.state.read().unwrap();
guard.clone()
};
let ssts_to_compact = state.l0_sstables.clone();
if ssts_to_compact.is_empty() {
return Ok(());
}

let mut iters = vec![];
for sst_id in ssts_to_compact.iter() {
let sstable = state.sstables[sst_id].clone();
let iter = SSTableIterator::create_and_seek_to_first(sstable).unwrap();
iters.push(iter);
}

let mut merged_iter = MergedIterator::new(iters);
let mut builder = SSTableBuilder::new(self.config.block_size);
while merged_iter.is_valid() {
builder.add(merged_iter.key(), merged_iter.value());
merged_iter.next()?;
}

let id = self.get_sst_id();
let table = builder.build(id, self.block_cache.clone(), self.sst_path(id))?;

let mut write_guard = self.state.write().unwrap();
let mut l0_sstables = write_guard.l0_sstables.clone();
let mut sstables = write_guard.sstables.clone();
let mut levels = write_guard.levels.clone();

for sst_id in ssts_to_compact.iter() {
sstables.remove(sst_id);
l0_sstables.retain(|&x| x != *sst_id);

remove_file(self.sst_path(*sst_id))?;
}

sstables.insert(id, Arc::new(table));
levels[0].1.insert(0, id);

*write_guard = Arc::new(StorageState {
memtable: write_guard.memtable.clone(),
frozen_memtables: write_guard.frozen_memtables.clone(),
l0_sstables,
sstables,
levels,
});

Ok(())
}

pub fn spawn_compacter(self: &Arc<Self>) -> JoinHandle<()> {
let this = self.clone();

thread::spawn(move || {
loop {
this.trigger_compaction().expect("sst compaction failed");
thread::sleep(COMPACT_INTERVAL);
}
})
}
}

#[cfg(test)]
mod tests {
use tempfile::tempdir;

use crate::{Config, lsm_util::get_entries, new};

#[test]
fn test_compaction() {
let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap());
let config = Config {
sst_size: 4,
block_size: 32,
db_dir: db_dir.clone(),
num_memtable_limit: 5,
};
let storage = new(config);

for (key, value) in get_entries() {
storage.put(key, value).unwrap();
}

// create 2 sstables
storage
.flush_frozen_memtable()
.expect("memtable to have been frozen");
storage
.flush_frozen_memtable()
.expect("memtable to have been frozen");

let initial_sst_count = storage.state.read().unwrap().l0_sstables.len();
storage.trigger_compaction().expect("ssts were compacted");

let curr_sst_count = storage.state.read().unwrap().l0_sstables.len();
let l1_entries = storage.state.read().unwrap().levels[0].1.clone();

assert_eq!(curr_sst_count, initial_sst_count - 2);
assert_eq!(l1_entries.len(), 1);
}
}
26 changes: 14 additions & 12 deletions storage/src/compaction/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use std::{

use crate::{SSTableBuilder, Storage, lsm_storage::StorageState};

const FLUSH_INTERVAL: Duration = Duration::from_millis(50);

impl Storage {
pub(crate) fn flush_frozen_memtable(&self) -> Result<()> {
let mut sst_builder = SSTableBuilder::new(self.config.block_size);
Expand All @@ -26,12 +28,13 @@ impl Storage {
self.block_cache.clone(),
self.sst_path(memtable.id),
)?;
l0_sstables.push(memtable.id);
l0_sstables.insert(0, memtable.id);
sstables.insert(memtable.id, Arc::new(sst));

*guard = Arc::new(StorageState {
memtable: guard.memtable.clone(),
frozen_memtables: memtables,
levels: guard.levels.clone(),
l0_sstables,
sstables,
});
Expand All @@ -52,19 +55,18 @@ impl Storage {
Ok(())
}

fn sst_path(&self, id: usize) -> String {
pub fn sst_path(&self, id: usize) -> String {
format!("{}/sst/{}.sst", self.config.db_dir, id)
}
}

// TODO: suppot msg passing
pub fn spawn_flusher(storage: Arc<Storage>) -> JoinHandle<()> {
let this = storage.clone();
pub fn spawn_flusher(self: &Arc<Self>) -> JoinHandle<()> {
let this = self.clone();

thread::spawn(move || {
loop {
this.trigger_flush().expect("memtable to have been flushed");
thread::sleep(Duration::from_millis(50));
}
})
thread::spawn(move || {
loop {
this.trigger_flush().expect("memtable to have been flushed");
thread::sleep(FLUSH_INTERVAL);
}
})
}
}
1 change: 1 addition & 0 deletions storage/src/compaction/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod compact;
pub mod flush;
Empty file.
1 change: 1 addition & 0 deletions storage/src/iterators/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod concat_iterator;
pub mod lsm_iterator;
pub mod merged_iterator;
pub mod two_merge_iterator;
78 changes: 72 additions & 6 deletions storage/src/lsm_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub(crate) struct StorageState {
pub(crate) frozen_memtables: Vec<Arc<Memtable>>,
pub(crate) l0_sstables: Vec<usize>,
pub(crate) sstables: HashMap<usize, Arc<SSTable>>,
pub(crate) levels: Vec<(usize, Vec<usize>)>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -64,10 +65,11 @@ pub fn new(config: Config) -> Arc<Storage> {
state_lock: Mutex::new(()),
block_cache: Arc::new(BlockCache::new(1 << 20)), // 4gb cache
state: RwLock::new(Arc::new(StorageState {
memtable: Arc::new(Memtable::new(sst_id)),
frozen_memtables: Vec::new(),
l0_sstables,
sstables,
levels: vec![(0, vec![])],
frozen_memtables: Vec::new(),
memtable: Arc::new(Memtable::new(sst_id)),
})),
})
}
Expand Down Expand Up @@ -189,22 +191,23 @@ impl Storage {
let mut frozen_memtables = guard.frozen_memtables.clone();
frozen_memtables.insert(0, memtable);

self.inc_sst_id();
let id = self.sst_id.load(SeqCst);
let id = self.get_sst_id();

*guard = Arc::new(StorageState {
memtable: Arc::new(Memtable::new(id)),
frozen_memtables,
l0_sstables: guard.l0_sstables.clone(),
sstables: guard.sstables.clone(),
levels: guard.levels.clone(),
});

drop(guard);
}
}

fn inc_sst_id(&self) -> usize {
self.sst_id.fetch_add(1, SeqCst)
pub(crate) fn get_sst_id(&self) -> usize {
self.sst_id.fetch_add(1, SeqCst);
self.sst_id.load(SeqCst)
}
}

Expand Down Expand Up @@ -404,4 +407,67 @@ mod tests {
assert_eq!(keys, vec!["a", "b", "c", "d", "e", "f"]);
assert_eq!(values, vec!["20", "23", "3", "22", "21", "6"]);
}

#[test]
fn reads_the_latest_version_of_a_key() {
let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap());
let config = Config {
sst_size: 4,
block_size: 32,
db_dir: db_dir.clone(),
num_memtable_limit: 5,
};
let storage = new(config);

for (key, value) in get_entries() {
storage.put(key, value).unwrap();
}

// will create sstables with a, b, c, d, e & f
storage
.flush_frozen_memtable()
.expect("memtable to have been frozen");
storage
.flush_frozen_memtable()
.expect("memtable to have been frozen");
storage
.flush_frozen_memtable()
.expect("memtable to have been frozen");

// new storage instance
let config = Config {
sst_size: 4,
block_size: 32,
db_dir: db_dir.clone(),
num_memtable_limit: 5,
};

let new_entries = vec![(b"a", b"20"), (b"e", b"21"), (b"d", b"22"), (b"b", b"23")];
let storage = new(config);
for (key, value) in new_entries {
let _ = storage.put(key, value);
}

// this will create an sst with a & e
storage
.flush_frozen_memtable()
.expect("memtable to have been frozen");

let mut iter = storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap();
let mut keys = vec![];
let mut values = vec![];

while iter.is_valid() {
let k = from_utf8(iter.key()).unwrap();
let v = from_utf8(iter.value()).unwrap();

keys.push(String::from(k));
values.push(String::from(v));

let _ = iter.next();
}

assert_eq!(keys, vec!["a", "b", "c", "d", "e", "f"]);
assert_eq!(values, vec!["20", "23", "3", "22", "21", "6"]);
}
}
58 changes: 28 additions & 30 deletions storage/src/lsm_util.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,37 @@
use std::{collections::HashMap, fs, path::Path, sync::Arc};

use anyhow::{Result, anyhow};
use anyhow::Result;
use std::{cmp::Reverse, collections::HashMap, fs, io, path::Path, sync::Arc, time::SystemTime};

use crate::{FileObject, SSTable, sst::BlockCache};

pub(crate) fn load_sstables(
path: &Path,
block_cache: Arc<BlockCache>,
) -> Result<(Vec<usize>, HashMap<usize, Arc<SSTable>>)> {
type LoadedSstables = (Vec<usize>, HashMap<usize, Arc<SSTable>>);

fn read_dir_sorted<P: AsRef<Path>>(path: P) -> io::Result<Vec<fs::DirEntry>> {
let mut entries: Vec<fs::DirEntry> = fs::read_dir(path)?.filter_map(Result::ok).collect();
entries.sort_by_key(|entry| {
Reverse(
entry
.metadata()
.and_then(|m| m.modified())
.unwrap_or(SystemTime::UNIX_EPOCH),
)
});
Ok(entries)
}

pub(crate) fn load_sstables(path: &Path, block_cache: Arc<BlockCache>) -> Result<LoadedSstables> {
let mut l0_sstables = vec![];
let mut sstables = HashMap::new();

for entry in fs::read_dir(path.join("sst")).unwrap() {
match entry {
Ok(dir_entry) => {
let sst_path = dir_entry.path();

let split_path = sst_path
.file_name()
.unwrap()
.to_str()
.unwrap()
.split(".")
.collect::<Vec<&str>>();

let sst_id = split_path.first().unwrap().parse().unwrap();
let file = FileObject::open(sst_path.as_path()).expect("failed to open file");
let sst = SSTable::open(sst_id, block_cache.clone(), file)
.expect("failed to open sstable");

l0_sstables.push(sst.id);
sstables.insert(sst.id, Arc::new(sst));
}
Err(err) => return Err(anyhow!("{:?}", err)),
}
for entry in read_dir_sorted(path.join("sst")).unwrap() {
let sst_path = entry.path();
let filename = sst_path.file_name().unwrap().to_str().unwrap();
let sst_id = filename.rsplit_once(".").unwrap().0.parse().unwrap();

let file = FileObject::open(sst_path.as_path()).expect("failed to open file");
let sst = SSTable::open(sst_id, block_cache.clone(), file).expect("failed to open sstable");

l0_sstables.push(sst.id);
sstables.insert(sst.id, Arc::new(sst));
}

anyhow::Ok((l0_sstables, sstables))
Expand Down
Loading