Skip to content
Merged

wal #27

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion storage/src/compaction/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl Storage {

let state_lock = self.state_lock.lock().unwrap();
self.manifest
.add_record(&state_lock, Compaction(compact_sst_id));
.add_record(&state_lock, Compaction(compact_sst_id))?;
Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod lsm_storage;
mod lsm_util;
mod manifest;
pub mod memtable;
mod wal;

mod block;
mod sst;
Expand Down
10 changes: 3 additions & 7 deletions storage/src/lsm_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,13 @@ pub struct Config {
}

pub fn new(config: Config) -> Arc<Storage> {
let block_cache = Arc::new(BlockCache::new(4096));
let db_dir = Path::new(&config.db_dir);
let manifest_file = db_dir.join("manifest");
create_db_dir(db_dir);
let block_cache = Arc::new(BlockCache::new(4096));

let manifest;
let mut manifest_records: Vec<ManifestRecord> = vec![];

let manifest_file = db_dir.join("manifest");
match Manifest::recover(&manifest_file) {
Ok((man, manifest_recs)) => {
manifest = man;
Expand All @@ -70,10 +69,7 @@ pub fn new(config: Config) -> Arc<Storage> {
let (l0_sst_ids, l1_sst_ids, sstables) =
load_sstables(db_dir, block_cache, manifest_records).expect("loaded sstables");

let sst_id = match ([l0_sst_ids.clone(), l1_sst_ids.clone()].concat())
.iter()
.max()
{
let sst_id = match ([&l0_sst_ids[..], &l1_sst_ids[..]].concat()).iter().max() {
Some(id) => id + 1,
None => 0,
};
Expand Down
105 changes: 105 additions & 0 deletions storage/src/wal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use anyhow::{Ok, Result};
use bytes::{Buf, BufMut, Bytes};
use crossbeam_skiplist::SkipMap;
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Read, Write};
use std::path::Path;
use std::sync::{Arc, Mutex};

pub struct Wal {
file: Arc<Mutex<BufWriter<File>>>,
}

impl Wal {
pub fn create(path: impl AsRef<Path>) -> Result<Self> {
let mut open_opts = OpenOptions::new();
open_opts.read(true).write(true).create(true);
let mut file = open_opts.open(path)?;

let buf_writer = BufWriter::new(file);
Ok(Wal {
file: Arc::new(Mutex::new(buf_writer)),
})
}

pub fn recover(path: impl AsRef<Path>, skiplist: &SkipMap<Bytes, Bytes>) -> Result<Self> {
let mut open_opts = OpenOptions::new();
open_opts.read(true).write(true).create(true);
let mut file = open_opts.open(path)?;

let mut buf = vec![];
file.read_to_end(&mut buf)?;
let mut buf_ptr = buf.as_slice();

while buf_ptr.has_remaining() {
let len = buf_ptr.get_u16();
let key = &buf_ptr[..len as usize];
buf_ptr.advance(len as usize);

let len = buf_ptr.get_u16();
let value = &buf_ptr[..len as usize];
buf_ptr.advance(len as usize);

skiplist.insert(Bytes::copy_from_slice(key), Bytes::copy_from_slice(value));
}

let buf_writer = BufWriter::new(file);
Ok(Wal {
file: Arc::new(Mutex::new(buf_writer)),
})
}

pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
let mut file = self.file.lock().unwrap();
let mut buf = vec![];
buf.put_u16(key.len() as u16);
buf.extend(key);
buf.put_u16(value.len() as u16);
buf.extend(value);

file.write_all(&buf)?;
Ok(())
}

pub fn sync(&self) -> Result<()> {
let mut file = self.file.lock().unwrap();
file.flush()?;
file.get_mut().sync_all()?;
Ok(())
}
}

#[cfg(test)]
mod test {
use crossbeam_skiplist::SkipMap;
use tempfile::tempdir;

use crate::{lsm_util::get_entries, wal::Wal};

#[test]
fn test_wal_recovery() {
let temp_dir = tempdir().unwrap();
let wal_path = temp_dir.path().join("wal");
let wal = Wal::create(wal_path.as_path()).unwrap();

let entries = get_entries();
for (key, value) in &entries {
wal.put(key, value).unwrap();
}
wal.sync().unwrap();

let memtable = SkipMap::new();
let _ = Wal::recover(wal_path.as_path(), &memtable).unwrap();
let mut res = vec![];

for entry in memtable.iter() {
res.push((entry.key().to_vec(), entry.value().to_vec()));
}
assert_eq!(memtable.len(), res.len());
let res_slices: Vec<(&[u8], &[u8])> = res
.iter()
.map(|(k, v)| (k.as_slice(), v.as_slice()))
.collect();
assert_eq!(res_slices, entries);
}
}
Loading