From 9affc18a5d7ada667f0045fa2b6c354c36395949 Mon Sep 17 00:00:00 2001 From: jobala Date: Wed, 5 Nov 2025 06:52:26 +0300 Subject: [PATCH 1/2] add wal implementation --- storage/src/compaction/compact.rs | 2 +- storage/src/lib.rs | 1 + storage/src/lsm_storage.rs | 5 +-- storage/src/wal.rs | 68 +++++++++++++++++++++++++++++++ 4 files changed, 71 insertions(+), 5 deletions(-) create mode 100644 storage/src/wal.rs diff --git a/storage/src/compaction/compact.rs b/storage/src/compaction/compact.rs index e886648..da4cd5b 100644 --- a/storage/src/compaction/compact.rs +++ b/storage/src/compaction/compact.rs @@ -78,7 +78,7 @@ impl Storage { let state_lock = self.state_lock.lock().unwrap(); self.manifest - .add_record(&state_lock, Compaction(compact_sst_id)); + .add_record(&state_lock, Compaction(compact_sst_id))?; Ok(()) } diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 3eb61ce..d0d9574 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -5,6 +5,7 @@ pub mod lsm_storage; mod lsm_util; mod manifest; pub mod memtable; +mod wal; mod block; mod sst; diff --git a/storage/src/lsm_storage.rs b/storage/src/lsm_storage.rs index d48963b..3718f14 100644 --- a/storage/src/lsm_storage.rs +++ b/storage/src/lsm_storage.rs @@ -70,10 +70,7 @@ pub fn new(config: Config) -> Arc { let (l0_sst_ids, l1_sst_ids, sstables) = load_sstables(db_dir, block_cache, manifest_records).expect("loaded sstables"); - let sst_id = match ([l0_sst_ids.clone(), l1_sst_ids.clone()].concat()) - .iter() - .max() - { + let sst_id = match ([&l0_sst_ids[..], &l1_sst_ids[..]].concat()).iter().max() { Some(id) => id + 1, None => 0, }; diff --git a/storage/src/wal.rs b/storage/src/wal.rs new file mode 100644 index 0000000..3ce9e63 --- /dev/null +++ b/storage/src/wal.rs @@ -0,0 +1,68 @@ +use anyhow::{Ok, Result}; +use bytes::{Buf, BufMut, Bytes}; +use crossbeam_skiplist::SkipMap; +use std::fs::{File, OpenOptions}; +use std::io::{BufWriter, Read, Write}; +use std::path::Path; +use std::sync::{Arc, Mutex}; + +pub struct Wal { + file: Arc>>, +} + +impl Wal { + pub fn create(path: impl AsRef) -> Result { + let mut open_opts = OpenOptions::new(); + open_opts.read(true).write(true).create(true); + let mut file = open_opts.open(path)?; + + let buf_writer = BufWriter::new(file); + Ok(Wal { + file: Arc::new(Mutex::new(buf_writer)), + }) + } + + pub fn recover(path: impl AsRef, skiplist: &SkipMap) -> Result { + let mut open_opts = OpenOptions::new(); + open_opts.read(true).write(true).create(true); + let mut file = open_opts.open(path)?; + + let mut buf = vec![]; + file.read_to_end(&mut buf)?; + let mut buf_ptr = buf.as_slice(); + + while buf_ptr.has_remaining() { + let len = buf_ptr.get_u16(); + let key = &buf_ptr[..len as usize]; + buf_ptr.advance(len as usize); + + let len = buf_ptr.get_u16(); + let value = &buf_ptr[..len as usize]; + buf_ptr.advance(len as usize); + + skiplist.insert(Bytes::copy_from_slice(key), Bytes::copy_from_slice(value)); + } + + let buf_writer = BufWriter::new(file); + Ok(Wal { + file: Arc::new(Mutex::new(buf_writer)), + }) + } + + pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { + let mut file = self.file.lock().unwrap(); + let mut buf = vec![]; + buf.put_u16(key.len() as u16); + buf.extend(key); + buf.put_u16(value.len() as u16); + buf.extend(value); + + file.write_all(&buf)?; + Ok(()) + } + + pub fn sync(&self) -> Result<()> { + let mut guard = self.file.lock().unwrap(); + Ok(guard.flush()?) + } +} From b3595dcd81a57ca82f507f6cf4b8f33f334dfded Mon Sep 17 00:00:00 2001 From: jobala Date: Thu, 6 Nov 2025 05:55:08 +0300 Subject: [PATCH 2/2] tests wal recovery --- storage/src/lsm_storage.rs | 5 ++--- storage/src/wal.rs | 41 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/storage/src/lsm_storage.rs b/storage/src/lsm_storage.rs index 3718f14..473edb5 100644 --- a/storage/src/lsm_storage.rs +++ b/storage/src/lsm_storage.rs @@ -51,14 +51,13 @@ pub struct Config { } pub fn new(config: Config) -> Arc { + let block_cache = Arc::new(BlockCache::new(4096)); let db_dir = Path::new(&config.db_dir); - let manifest_file = db_dir.join("manifest"); create_db_dir(db_dir); - let block_cache = Arc::new(BlockCache::new(4096)); let manifest; let mut manifest_records: Vec = vec![]; - + let manifest_file = db_dir.join("manifest"); match Manifest::recover(&manifest_file) { Ok((man, manifest_recs)) => { manifest = man; diff --git a/storage/src/wal.rs b/storage/src/wal.rs index 3ce9e63..ebf8a19 100644 --- a/storage/src/wal.rs +++ b/storage/src/wal.rs @@ -62,7 +62,44 @@ impl Wal { } pub fn sync(&self) -> Result<()> { - let mut guard = self.file.lock().unwrap(); - Ok(guard.flush()?) + let mut file = self.file.lock().unwrap(); + file.flush()?; + file.get_mut().sync_all()?; + Ok(()) + } +} + +#[cfg(test)] +mod test { + use crossbeam_skiplist::SkipMap; + use tempfile::tempdir; + + use crate::{lsm_util::get_entries, wal::Wal}; + + #[test] + fn test_wal_recovery() { + let temp_dir = tempdir().unwrap(); + let wal_path = temp_dir.path().join("wal"); + let wal = Wal::create(wal_path.as_path()).unwrap(); + + let entries = get_entries(); + for (key, value) in &entries { + wal.put(key, value).unwrap(); + } + wal.sync().unwrap(); + + let memtable = SkipMap::new(); + let _ = Wal::recover(wal_path.as_path(), &memtable).unwrap(); + let mut res = vec![]; + + for entry in memtable.iter() { + res.push((entry.key().to_vec(), entry.value().to_vec())); + } + assert_eq!(memtable.len(), res.len()); + let res_slices: Vec<(&[u8], &[u8])> = res + .iter() + .map(|(k, v)| (k.as_slice(), v.as_slice())) + .collect(); + assert_eq!(res_slices, entries); } }