diff --git a/src/storage/bitcask.rs b/src/storage/bitcask.rs index 3f44c3581..6be6ea203 100644 --- a/src/storage/bitcask.rs +++ b/src/storage/bitcask.rs @@ -1,8 +1,10 @@ +use std::collections::btree_map::Range; use std::collections::BTreeMap; use std::fs::File; use std::io::{BufReader, BufWriter, Read as _, Seek as _, SeekFrom, Write as _}; use std::ops::{Bound, RangeBounds}; use std::path::PathBuf; +use std::result::Result as StdResult; use fs4::fs_std::FileExt; use log::{error, info}; @@ -10,51 +12,64 @@ use log::{error, info}; use super::{Engine, Status}; use crate::error::Result; -/// A very simple variant of BitCask, itself a very simple log-structured -/// key-value engine used e.g. by the Riak database. It is not compatible with -/// BitCask databases generated by other implementations. See: +/// A very simple variant of BitCask, itself a simple log-structured key-value +/// engine used e.g. by the Riak database. It is not compatible with BitCask +/// databases generated by other implementations. See: /// https://riak.com/assets/bitcask-intro.pdf /// /// BitCask writes key-value pairs to an append-only log file, and keeps a -/// mapping of keys to file positions in memory. All live keys must fit in -/// memory. Deletes write a tombstone value to the log file. To remove old -/// garbage, logs can be compacted by writing new logs containing only live -/// data, skipping replaced values and tombstones. +/// mapping of keys to file offsets in memory. All live keys must fit in memory. +/// Deletes write a tombstone value to the log file. To remove old garbage +/// (deleted or replaced keys), logs can be compacted by writing new logs +/// containing only live data, dropping replaced values and tombstones. /// -/// This implementation makes several significant simplifications over -/// standard BitCask: +/// This implementation is significantly simpler than standard BitCask: /// -/// - Instead of writing multiple fixed-size log files, it uses a single +/// * Instead of writing multiple fixed-size log files, it uses a single /// append-only log file of arbitrary size. This increases the compaction -/// volume, since the entire log file must be rewritten on every compaction, -/// and can exceed the filesystem's file size limit, but ToyDB databases are -/// expected to be small. +/// volume, since the entire log file must be rewritten on every compaction. +/// It can also exceed the filesystem's file size limit. However, toyDB +/// databases are expected to be small. /// -/// - Compactions lock the database for reads and writes. This is ok since ToyDB +/// * Compactions lock the database for reads and writes. This is ok since toyDB /// only compacts during node startup and files are expected to be small. /// -/// - Hint files are not used, the log itself is scanned when opened to -/// build the keydir. Hint files only omit values, and ToyDB values are +/// * Hint files are not used, the log itself is scanned when opened to +/// build the keydir. Hint files only omit values, and toyDB values are /// expected to be small, so the hint files would be nearly as large as /// the compacted log files themselves. /// -/// - Log entries don't contain timestamps or checksums. +/// * Log entries don't contain timestamps or checksums. /// -/// The structure of a log entry is: +/// The structure of an encoded log entry is: /// -/// - Key length as big-endian u32. -/// - Value length as big-endian i32, or -1 for tombstones. -/// - Key as raw bytes (max 2 GB). -/// - Value as raw bytes (max 2 GB). +/// 1. Key length as big-endian u32 [4 bytes]. +/// 2. Value length as big-endian i32, or -1 for tombstones [4 bytes]. +/// 3. Key as raw bytes [<= 2 GB]. +/// 4. Value as raw bytes [<= 2 GB]. pub struct BitCask { - /// The active append-only log file. + /// The current append-only log file. log: Log, - /// Maps keys to a value position and length in the log file. + /// Maps keys to a value's offset and length in `log`. keydir: KeyDir, } -/// Maps keys to a value position and length in the log file. -type KeyDir = BTreeMap, (u64, u32)>; +/// Maps keys to a value's location in the log file. +type KeyDir = BTreeMap, ValueLocation>; + +/// The location of a value in the log file. +#[derive(Clone, Copy)] +struct ValueLocation { + offset: u64, + length: usize, +} + +impl ValueLocation { + /// Returns the file offset of the value location's end. + fn end(&self) -> u64 { + self.offset + self.length as u64 + } +} impl BitCask { /// Opens or creates a BitCask database in the given file. @@ -131,11 +146,8 @@ impl Engine for BitCask { } fn get(&mut self, key: &[u8]) -> Result>> { - if let Some((value_pos, value_len)) = self.keydir.get(key) { - Ok(Some(self.log.read_value(*value_pos, *value_len)?)) - } else { - Ok(None) - } + let Some(location) = self.keydir.get(key) else { return Ok(None) }; + self.log.read_value(*location).map(Some) } fn scan(&mut self, range: impl RangeBounds>) -> Self::ScanIterator<'_> { @@ -150,19 +162,15 @@ impl Engine for BitCask { } fn set(&mut self, key: &[u8], value: Vec) -> Result<()> { - let (pos, len) = self.log.write_entry(key, Some(&*value))?; - let value_len = value.len() as u32; - self.keydir.insert(key.to_vec(), (pos + len as u64 - value_len as u64, value_len)); + let value_location = self.log.write_entry(key, Some(&*value))?; + self.keydir.insert(key.to_vec(), value_location); Ok(()) } fn status(&mut self) -> Result { let keys = self.keydir.len() as u64; - let size = self - .keydir - .iter() - .map(|(key, (_, value_len))| key.len() as u64 + *value_len as u64) - .sum(); + let size = + self.keydir.iter().map(|(key, value_loc)| (key.len() + value_loc.length) as u64).sum(); let disk_size = self.log.file.metadata()?.len(); let live_disk_size = size + 8 * keys; // account for length prefixes Ok(Status { name: "bitcask".to_string(), keys, size, disk_size, live_disk_size }) @@ -170,14 +178,14 @@ impl Engine for BitCask { } pub struct ScanIterator<'a> { - inner: std::collections::btree_map::Range<'a, Vec, (u64, u32)>, + inner: Range<'a, Vec, ValueLocation>, log: &'a mut Log, } impl ScanIterator<'_> { - fn map(&mut self, item: (&Vec, &(u64, u32))) -> ::Item { - let (key, (value_pos, value_len)) = item; - Ok((key.clone(), self.log.read_value(*value_pos, *value_len)?)) + fn map(&mut self, item: (&Vec, &ValueLocation)) -> ::Item { + let (key, value_loc) = item; + Ok((key.clone(), self.log.read_value(*value_loc)?)) } } @@ -217,10 +225,10 @@ impl BitCask { let mut new_keydir = KeyDir::new(); let mut new_log = Log::new(path)?; new_log.file.set_len(0)?; // truncate file if it exists - for (key, (value_pos, value_len)) in self.keydir.iter() { - let value = self.log.read_value(*value_pos, *value_len)?; - let (pos, len) = new_log.write_entry(key, Some(&value))?; - new_keydir.insert(key.clone(), (pos + len as u64 - *value_len as u64, *value_len)); + for (key, value_loc) in self.keydir.iter() { + let value = self.log.read_value(*value_loc)?; + let value_loc = new_log.write_entry(key, Some(&value))?; + new_keydir.insert(key.clone(), value_loc); } Ok((new_log, new_keydir)) } @@ -275,88 +283,97 @@ impl Log { let mut keydir = KeyDir::new(); let file_len = self.file.metadata()?.len(); let mut r = BufReader::new(&mut self.file); - let mut pos = r.seek(SeekFrom::Start(0))?; + let mut offset = r.seek(SeekFrom::Start(0))?; - while pos < file_len { - // Read the next entry from the file, returning the key, value - // position, and value length or None for tombstones. - let result = || -> std::result::Result<(Vec, u64, Option), std::io::Error> { + while offset < file_len { + // Read the next entry from the file, returning the key and value + // location, or None for tombstones. + let result = || -> StdResult<(Vec, Option), std::io::Error> { + // Read key length: 4-byte u32. r.read_exact(&mut len_buf)?; let key_len = u32::from_be_bytes(len_buf); + + // Read value length: 4-byte i32, -1 for tombstones. r.read_exact(&mut len_buf)?; - let value_len_or_tombstone = match i32::from_be_bytes(len_buf) { - l if l >= 0 => Some(l as u32), - _ => None, // -1 for tombstones + let value_loc = match i32::from_be_bytes(len_buf) { + ..0 => None, // tombstone + len => Some(ValueLocation { + offset: offset + 8 + key_len as u64, + length: len as usize, + }), }; - let value_pos = pos + 4 + 4 + key_len as u64; + // Read the key. let mut key = vec![0; key_len as usize]; r.read_exact(&mut key)?; - if let Some(value_len) = value_len_or_tombstone { - if value_pos + value_len as u64 > file_len { + // Skip past the value. + if let Some(value_loc) = value_loc { + if value_loc.end() > file_len { return Err(std::io::Error::new( std::io::ErrorKind::UnexpectedEof, "value extends beyond end of file", )); } - r.seek_relative(value_len as i64)?; // avoids discarding buffer + r.seek_relative(value_loc.length as i64)?; } - Ok((key, value_pos, value_len_or_tombstone)) + // Update the current file offset. + offset += 8 + key_len as u64 + value_loc.map_or(0, |v| v.length) as u64; + + Ok((key, value_loc)) }(); + // Update the keydir with the entry. match result { - // Populate the keydir with the entry, or remove it on tombstones. - Ok((key, value_pos, Some(value_len))) => { - keydir.insert(key, (value_pos, value_len)); - pos = value_pos + value_len as u64; - } - Ok((key, value_pos, None)) => { - keydir.remove(&key); - pos = value_pos; - } + Ok((key, Some(value_loc))) => keydir.insert(key, value_loc), + Ok((key, None)) => keydir.remove(&key), // If an incomplete entry was found at the end of the file, assume an // incomplete write and truncate the file. Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => { - error!("Found incomplete entry at offset {}, truncating file", pos); - self.file.set_len(pos)?; + error!("Found incomplete entry at offset {offset}, truncating file"); + self.file.set_len(offset)?; break; } Err(err) => return Err(err.into()), - } + }; } Ok(keydir) } - /// Reads a value from the log file. - fn read_value(&mut self, value_pos: u64, value_len: u32) -> Result> { - let mut value = vec![0; value_len as usize]; - self.file.seek(SeekFrom::Start(value_pos))?; + /// Reads a value from the log file at the given location. + fn read_value(&mut self, location: ValueLocation) -> Result> { + let mut value = vec![0; location.length]; + self.file.seek(SeekFrom::Start(location.offset))?; self.file.read_exact(&mut value)?; Ok(value) } /// Appends a key/value entry to the log file, using a None value for - /// tombstones. It returns the position and length of the entry. - fn write_entry(&mut self, key: &[u8], value: Option<&[u8]>) -> Result<(u64, u32)> { - let key_len = key.len() as u32; - let value_len = value.map_or(0, |v| v.len() as u32); - let value_len_or_tombstone = value.map_or(-1, |v| v.len() as i32); - let len = 4 + 4 + key_len + value_len; - - let pos = self.file.seek(SeekFrom::End(0))?; - let mut w = BufWriter::with_capacity(len as usize, &mut self.file); - w.write_all(&key_len.to_be_bytes())?; - w.write_all(&value_len_or_tombstone.to_be_bytes())?; + /// tombstones. It returns the location in the log of the value inside the + /// entry, for use with the `KeyDir`. + fn write_entry(&mut self, key: &[u8], value: Option<&[u8]>) -> Result { + let length = 8 + key.len() + value.map_or(0, |v| v.len()); + let offset = self.file.seek(SeekFrom::End(0))?; + let mut w = BufWriter::with_capacity(length, &mut self.file); + + // Key length: 4-byte u32. + w.write_all(&(key.len() as u32).to_be_bytes())?; + + // Value length: 4-byte i32, -1 for tombstones. + w.write_all(&value.map_or(-1, |v| v.len() as i32).to_be_bytes())?; + + // The actual key and value. w.write_all(key)?; - if let Some(value) = value { - w.write_all(value)?; - } + w.write_all(value.unwrap_or_default())?; w.flush()?; - Ok((pos, len)) + // Translate the entry location into a value location. + Ok(ValueLocation { + offset: offset + 8 + key.len() as u64, + length: value.map_or(0, |v| v.len()), + }) } } @@ -365,7 +382,6 @@ impl Log { mod tests { use std::error::Error as StdError; use std::fmt::Write as _; - use std::result::Result as StdResult; use tempfile::TempDir; use test_case::test_case; @@ -412,14 +428,14 @@ mod tests { let mut log = Log::new(path.clone())?; let mut ends = vec![]; - let (pos, len) = log.write_entry("deleted".as_bytes(), Some(&[1, 2, 3]))?; - ends.push(pos + len as u64); - let (pos, len) = log.write_entry("deleted".as_bytes(), None)?; - ends.push(pos + len as u64); - let (pos, len) = log.write_entry(&[], Some(&[]))?; - ends.push(pos + len as u64); - let (pos, len) = log.write_entry("key".as_bytes(), Some(&[1, 2, 3, 4, 5]))?; - ends.push(pos + len as u64); + let value_loc = log.write_entry("deleted".as_bytes(), Some(&[1, 2, 3]))?; + ends.push(value_loc.end()); + let value_loc = log.write_entry("deleted".as_bytes(), None)?; + ends.push(value_loc.end()); + let value_loc = log.write_entry(&[], Some(&[]))?; + ends.push(value_loc.end()); + let value_loc = log.write_entry("key".as_bytes(), Some(&[1, 2, 3, 4, 5]))?; + ends.push(value_loc.end()); drop(log); // Copy the file, and truncate it at each byte, then try to open it