Skip to content

Commit

Permalink
storage: clean up Bitcask engine
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Feb 6, 2025
1 parent 000fe3a commit be78237
Showing 1 changed file with 116 additions and 100 deletions.
216 changes: 116 additions & 100 deletions src/storage/bitcask.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,75 @@
use std::collections::btree_map::Range;
use std::collections::BTreeMap;
use std::fs::File;
use std::io::{BufReader, BufWriter, Read as _, Seek as _, SeekFrom, Write as _};
use std::ops::{Bound, RangeBounds};
use std::path::PathBuf;
use std::result::Result as StdResult;

use fs4::fs_std::FileExt;
use log::{error, info};

use super::{Engine, Status};
use crate::error::Result;

/// A very simple variant of BitCask, itself a very simple log-structured
/// key-value engine used e.g. by the Riak database. It is not compatible with
/// BitCask databases generated by other implementations. See:
/// A very simple variant of BitCask, itself a simple log-structured key-value
/// engine used e.g. by the Riak database. It is not compatible with BitCask
/// databases generated by other implementations. See:
/// https://riak.com/assets/bitcask-intro.pdf
///
/// BitCask writes key-value pairs to an append-only log file, and keeps a
/// mapping of keys to file positions in memory. All live keys must fit in
/// memory. Deletes write a tombstone value to the log file. To remove old
/// garbage, logs can be compacted by writing new logs containing only live
/// data, skipping replaced values and tombstones.
/// mapping of keys to file offsets in memory. All live keys must fit in memory.
/// Deletes write a tombstone value to the log file. To remove old garbage
/// (deleted or replaced keys), logs can be compacted by writing new logs
/// containing only live data, dropping replaced values and tombstones.
///
/// This implementation makes several significant simplifications over
/// standard BitCask:
/// This implementation is significantly simpler than standard BitCask:
///
/// - Instead of writing multiple fixed-size log files, it uses a single
/// * Instead of writing multiple fixed-size log files, it uses a single
/// append-only log file of arbitrary size. This increases the compaction
/// volume, since the entire log file must be rewritten on every compaction,
/// and can exceed the filesystem's file size limit, but ToyDB databases are
/// expected to be small.
/// volume, since the entire log file must be rewritten on every compaction.
/// It can also exceed the filesystem's file size limit. However, toyDB
/// databases are expected to be small.
///
/// - Compactions lock the database for reads and writes. This is ok since ToyDB
/// * Compactions lock the database for reads and writes. This is ok since toyDB
/// only compacts during node startup and files are expected to be small.
///
/// - Hint files are not used, the log itself is scanned when opened to
/// build the keydir. Hint files only omit values, and ToyDB values are
/// * Hint files are not used, the log itself is scanned when opened to
/// build the keydir. Hint files only omit values, and toyDB values are
/// expected to be small, so the hint files would be nearly as large as
/// the compacted log files themselves.
///
/// - Log entries don't contain timestamps or checksums.
/// * Log entries don't contain timestamps or checksums.
///
/// The structure of a log entry is:
/// The structure of an encoded log entry is:
///
/// - Key length as big-endian u32.
/// - Value length as big-endian i32, or -1 for tombstones.
/// - Key as raw bytes (max 2 GB).
/// - Value as raw bytes (max 2 GB).
/// 1. Key length as big-endian u32 [4 bytes].
/// 2. Value length as big-endian i32, or -1 for tombstones [4 bytes].
/// 3. Key as raw bytes [<= 2 GB].
/// 4. Value as raw bytes [<= 2 GB].
pub struct BitCask {
/// The active append-only log file.
/// The current append-only log file.
log: Log,
/// Maps keys to a value position and length in the log file.
/// Maps keys to a value's offset and length in `log`.
keydir: KeyDir,
}

/// Maps keys to a value position and length in the log file.
type KeyDir = BTreeMap<Vec<u8>, (u64, u32)>;
/// Maps keys to a value's location in the log file.
type KeyDir = BTreeMap<Vec<u8>, ValueLocation>;

/// The location of a value in the log file.
#[derive(Clone, Copy)]
struct ValueLocation {
offset: u64,
length: usize,
}

impl ValueLocation {
/// Returns the file offset of the value location's end.
fn end(&self) -> u64 {
self.offset + self.length as u64
}
}

impl BitCask {
/// Opens or creates a BitCask database in the given file.
Expand Down Expand Up @@ -131,11 +146,8 @@ impl Engine for BitCask {
}

fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
if let Some((value_pos, value_len)) = self.keydir.get(key) {
Ok(Some(self.log.read_value(*value_pos, *value_len)?))
} else {
Ok(None)
}
let Some(location) = self.keydir.get(key) else { return Ok(None) };
self.log.read_value(*location).map(Some)
}

fn scan(&mut self, range: impl RangeBounds<Vec<u8>>) -> Self::ScanIterator<'_> {
Expand All @@ -150,34 +162,30 @@ impl Engine for BitCask {
}

fn set(&mut self, key: &[u8], value: Vec<u8>) -> Result<()> {
let (pos, len) = self.log.write_entry(key, Some(&*value))?;
let value_len = value.len() as u32;
self.keydir.insert(key.to_vec(), (pos + len as u64 - value_len as u64, value_len));
let value_location = self.log.write_entry(key, Some(&*value))?;
self.keydir.insert(key.to_vec(), value_location);
Ok(())
}

fn status(&mut self) -> Result<Status> {
let keys = self.keydir.len() as u64;
let size = self
.keydir
.iter()
.map(|(key, (_, value_len))| key.len() as u64 + *value_len as u64)
.sum();
let size =
self.keydir.iter().map(|(key, value_loc)| (key.len() + value_loc.length) as u64).sum();
let disk_size = self.log.file.metadata()?.len();
let live_disk_size = size + 8 * keys; // account for length prefixes
Ok(Status { name: "bitcask".to_string(), keys, size, disk_size, live_disk_size })
}
}

pub struct ScanIterator<'a> {
inner: std::collections::btree_map::Range<'a, Vec<u8>, (u64, u32)>,
inner: Range<'a, Vec<u8>, ValueLocation>,
log: &'a mut Log,
}

impl ScanIterator<'_> {
fn map(&mut self, item: (&Vec<u8>, &(u64, u32))) -> <Self as Iterator>::Item {
let (key, (value_pos, value_len)) = item;
Ok((key.clone(), self.log.read_value(*value_pos, *value_len)?))
fn map(&mut self, item: (&Vec<u8>, &ValueLocation)) -> <Self as Iterator>::Item {
let (key, value_loc) = item;
Ok((key.clone(), self.log.read_value(*value_loc)?))
}
}

Expand Down Expand Up @@ -217,10 +225,10 @@ impl BitCask {
let mut new_keydir = KeyDir::new();
let mut new_log = Log::new(path)?;
new_log.file.set_len(0)?; // truncate file if it exists
for (key, (value_pos, value_len)) in self.keydir.iter() {
let value = self.log.read_value(*value_pos, *value_len)?;
let (pos, len) = new_log.write_entry(key, Some(&value))?;
new_keydir.insert(key.clone(), (pos + len as u64 - *value_len as u64, *value_len));
for (key, value_loc) in self.keydir.iter() {
let value = self.log.read_value(*value_loc)?;
let value_loc = new_log.write_entry(key, Some(&value))?;
new_keydir.insert(key.clone(), value_loc);
}
Ok((new_log, new_keydir))
}
Expand Down Expand Up @@ -275,88 +283,97 @@ impl Log {
let mut keydir = KeyDir::new();
let file_len = self.file.metadata()?.len();
let mut r = BufReader::new(&mut self.file);
let mut pos = r.seek(SeekFrom::Start(0))?;
let mut offset = r.seek(SeekFrom::Start(0))?;

while pos < file_len {
// Read the next entry from the file, returning the key, value
// position, and value length or None for tombstones.
let result = || -> std::result::Result<(Vec<u8>, u64, Option<u32>), std::io::Error> {
while offset < file_len {
// Read the next entry from the file, returning the key and value
// location, or None for tombstones.
let result = || -> StdResult<(Vec<u8>, Option<ValueLocation>), std::io::Error> {
// Read key length: 4-byte u32.
r.read_exact(&mut len_buf)?;
let key_len = u32::from_be_bytes(len_buf);

// Read value length: 4-byte i32, -1 for tombstones.
r.read_exact(&mut len_buf)?;
let value_len_or_tombstone = match i32::from_be_bytes(len_buf) {
l if l >= 0 => Some(l as u32),
_ => None, // -1 for tombstones
let value_loc = match i32::from_be_bytes(len_buf) {
..0 => None, // tombstone
len => Some(ValueLocation {
offset: offset + 8 + key_len as u64,
length: len as usize,
}),
};
let value_pos = pos + 4 + 4 + key_len as u64;

// Read the key.
let mut key = vec![0; key_len as usize];
r.read_exact(&mut key)?;

if let Some(value_len) = value_len_or_tombstone {
if value_pos + value_len as u64 > file_len {
// Skip past the value.
if let Some(value_loc) = value_loc {
if value_loc.end() > file_len {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"value extends beyond end of file",
));
}
r.seek_relative(value_len as i64)?; // avoids discarding buffer
r.seek_relative(value_loc.length as i64)?;
}

Ok((key, value_pos, value_len_or_tombstone))
// Update the current file offset.
offset += 8 + key_len as u64 + value_loc.map_or(0, |v| v.length) as u64;

Ok((key, value_loc))
}();

// Update the keydir with the entry.
match result {
// Populate the keydir with the entry, or remove it on tombstones.
Ok((key, value_pos, Some(value_len))) => {
keydir.insert(key, (value_pos, value_len));
pos = value_pos + value_len as u64;
}
Ok((key, value_pos, None)) => {
keydir.remove(&key);
pos = value_pos;
}
Ok((key, Some(value_loc))) => keydir.insert(key, value_loc),
Ok((key, None)) => keydir.remove(&key),
// If an incomplete entry was found at the end of the file, assume an
// incomplete write and truncate the file.
Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => {
error!("Found incomplete entry at offset {}, truncating file", pos);
self.file.set_len(pos)?;
error!("Found incomplete entry at offset {offset}, truncating file");
self.file.set_len(offset)?;
break;
}
Err(err) => return Err(err.into()),
}
};
}

Ok(keydir)
}

/// Reads a value from the log file.
fn read_value(&mut self, value_pos: u64, value_len: u32) -> Result<Vec<u8>> {
let mut value = vec![0; value_len as usize];
self.file.seek(SeekFrom::Start(value_pos))?;
/// Reads a value from the log file at the given location.
fn read_value(&mut self, location: ValueLocation) -> Result<Vec<u8>> {
let mut value = vec![0; location.length];
self.file.seek(SeekFrom::Start(location.offset))?;
self.file.read_exact(&mut value)?;
Ok(value)
}

/// Appends a key/value entry to the log file, using a None value for
/// tombstones. It returns the position and length of the entry.
fn write_entry(&mut self, key: &[u8], value: Option<&[u8]>) -> Result<(u64, u32)> {
let key_len = key.len() as u32;
let value_len = value.map_or(0, |v| v.len() as u32);
let value_len_or_tombstone = value.map_or(-1, |v| v.len() as i32);
let len = 4 + 4 + key_len + value_len;

let pos = self.file.seek(SeekFrom::End(0))?;
let mut w = BufWriter::with_capacity(len as usize, &mut self.file);
w.write_all(&key_len.to_be_bytes())?;
w.write_all(&value_len_or_tombstone.to_be_bytes())?;
/// tombstones. It returns the location in the log of the value inside the
/// entry, for use with the `KeyDir`.
fn write_entry(&mut self, key: &[u8], value: Option<&[u8]>) -> Result<ValueLocation> {
let length = 8 + key.len() + value.map_or(0, |v| v.len());
let offset = self.file.seek(SeekFrom::End(0))?;
let mut w = BufWriter::with_capacity(length, &mut self.file);

// Key length: 4-byte u32.
w.write_all(&(key.len() as u32).to_be_bytes())?;

// Value length: 4-byte i32, -1 for tombstones.
w.write_all(&value.map_or(-1, |v| v.len() as i32).to_be_bytes())?;

// The actual key and value.
w.write_all(key)?;
if let Some(value) = value {
w.write_all(value)?;
}
w.write_all(value.unwrap_or_default())?;
w.flush()?;

Ok((pos, len))
// Translate the entry location into a value location.
Ok(ValueLocation {
offset: offset + 8 + key.len() as u64,
length: value.map_or(0, |v| v.len()),
})
}
}

Expand All @@ -365,7 +382,6 @@ impl Log {
mod tests {
use std::error::Error as StdError;
use std::fmt::Write as _;
use std::result::Result as StdResult;

use tempfile::TempDir;
use test_case::test_case;
Expand Down Expand Up @@ -412,14 +428,14 @@ mod tests {
let mut log = Log::new(path.clone())?;

let mut ends = vec![];
let (pos, len) = log.write_entry("deleted".as_bytes(), Some(&[1, 2, 3]))?;
ends.push(pos + len as u64);
let (pos, len) = log.write_entry("deleted".as_bytes(), None)?;
ends.push(pos + len as u64);
let (pos, len) = log.write_entry(&[], Some(&[]))?;
ends.push(pos + len as u64);
let (pos, len) = log.write_entry("key".as_bytes(), Some(&[1, 2, 3, 4, 5]))?;
ends.push(pos + len as u64);
let value_loc = log.write_entry("deleted".as_bytes(), Some(&[1, 2, 3]))?;
ends.push(value_loc.end());
let value_loc = log.write_entry("deleted".as_bytes(), None)?;
ends.push(value_loc.end());
let value_loc = log.write_entry(&[], Some(&[]))?;
ends.push(value_loc.end());
let value_loc = log.write_entry("key".as_bytes(), Some(&[1, 2, 3, 4, 5]))?;
ends.push(value_loc.end());
drop(log);

// Copy the file, and truncate it at each byte, then try to open it
Expand Down

0 comments on commit be78237

Please sign in to comment.