Skip to content

Commit

Permalink
temp checkin
Browse files Browse the repository at this point in the history
  • Loading branch information
SajadKarim committed Sep 5, 2024
1 parent 6f7f4cc commit 42c6913
Show file tree
Hide file tree
Showing 9 changed files with 393 additions and 149 deletions.
17 changes: 17 additions & 0 deletions betree/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,23 @@ impl io::Write for BufWrite {
}
}


unsafe impl zstd::stream::raw::WriteBuf for BufWrite {
fn as_slice(&self) -> &[u8] {
self.as_ref()
}
fn capacity(&self) -> usize {
self.buf.capacity.to_bytes() as usize
}
fn as_mut_ptr(&mut self) -> *mut u8 {
unsafe { self.buf.ptr.as_mut().unwrap() }
}
unsafe fn filled_until(&mut self, n: usize) {
self.size = n as u32
}
}


impl io::Seek for BufWrite {
fn seek(&mut self, seek: io::SeekFrom) -> io::Result<u64> {
use io::SeekFrom::*;
Expand Down
14 changes: 11 additions & 3 deletions betree/src/compression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! `None` and `Lz4` are provided as implementation.
use crate::{
buffer::Buf,
buffer::{Buf, BufWrite},
size::{Size, StaticSize},
vdev::Block,
};
Expand Down Expand Up @@ -75,11 +75,11 @@ pub trait CompressionBuilder: Debug + Size + Send + Sync + 'static {
pub trait CompressionState: Write {
/// Finishes the compression stream and returns a buffer that contains the
/// compressed data.
fn finish(&mut self) -> Buf;
fn finish(&mut self, data: Buf) -> Result<Buf>;
}

pub trait DecompressionState {
fn decompress(&mut self, data: &[u8]) -> Result<Box<[u8]>>;
fn decompress(&mut self, data: Buf) -> Result<Buf>;
}

mod none;
Expand All @@ -90,3 +90,11 @@ pub use self::none::None;

mod zstd;
pub use self::zstd::Zstd;


lazy_static::lazy_static! {
pub static ref COMPRESSION_VAR: Arc<std::sync::RwLock<dyn CompressionBuilder>> =
CompressionConfiguration::Zstd(Zstd {
level: 1,
}).to_builder();
}
8 changes: 4 additions & 4 deletions betree/src/compression/none.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ impl io::Write for NoneCompression {
}

impl CompressionState for NoneCompression {
fn finish(&mut self) -> Buf {
mem::replace(&mut self.buf, BufWrite::with_capacity(DEFAULT_BUFFER_SIZE)).into_buf()
fn finish(&mut self, buf: Buf) -> Result<Buf> {
Ok(buf)
}
}

impl DecompressionState for NoneDecompression {
fn decompress(&mut self, data: &[u8]) -> Result<Box<[u8]>> {
fn decompress(&mut self, data: Buf) -> Result<Buf> {
// FIXME: pass-through Buf, reusing alloc
Ok(data.to_vec().into_boxed_slice())
Ok(data)
}
}
145 changes: 106 additions & 39 deletions betree/src/compression/zstd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,24 @@ use super::{
};
use crate::{
buffer::{Buf, BufWrite},
database,
size::StaticSize,
vdev::Block,
};
use serde::{Deserialize, Serialize};
use std::{
io::{self, Write},
io::{self, Cursor, Write},
mem,
};
use zstd::stream::{
raw::{CParameter, DParameter, Decoder, Encoder},
zio::Writer,
use zstd::{
block::{Compressor, Decompressor},
stream::{
raw::{CParameter, DParameter, Decoder, Encoder},
zio::{Reader, Writer},
},
};
use zstd_safe::FrameFormat;
use zstd_safe::{FrameFormat, InBuffer, OutBuffer, WriteBuf};
use std::sync::{Arc, Mutex};

// TODO: investigate pre-created dictionary payoff

/// Zstd compression. (<https://github.com/facebook/zstd>)
Expand All @@ -29,10 +33,10 @@ pub struct Zstd {
}

struct ZstdCompression {
writer: Writer<BufWrite, Encoder<'static>>,
writer: Encoder<'static>,
}
struct ZstdDecompression {
writer: Writer<BufWrite, Decoder<'static>>,
writer: Decoder<'static>,
}

impl StaticSize for Zstd {
Expand All @@ -41,6 +45,8 @@ impl StaticSize for Zstd {
}
}

use zstd::stream::raw::Operation;

impl CompressionBuilder for Zstd {
fn new_compression(&self) -> Result<Arc<std::sync::RwLock<dyn CompressionState>>> {
// "The library supports regular compression levels from 1 up to ZSTD_maxCLevel(),
Expand All @@ -49,14 +55,10 @@ impl CompressionBuilder for Zstd {

// Compression format is stored externally, don't need to duplicate it
encoder.set_parameter(CParameter::Format(FrameFormat::Magicless))?;
// Integrity is handled at a different layer
// // Integrity is handled at a different layer
encoder.set_parameter(CParameter::ChecksumFlag(false))?;

let buf = BufWrite::with_capacity(DEFAULT_BUFFER_SIZE);

Ok(Arc::new(std::sync::RwLock::new(ZstdCompression {
writer: Writer::new(buf, encoder),
})))
Ok(Arc::new(std::sync::RwLock::new(ZstdCompression { writer: encoder })))
}

fn decompression_tag(&self) -> DecompressionTag {
Expand All @@ -68,50 +70,115 @@ impl Zstd {
pub fn new_decompression() -> Result<Box<dyn DecompressionState>> {
let mut decoder = Decoder::new()?;
decoder.set_parameter(DParameter::Format(FrameFormat::Magicless))?;
// decoder.set_parameter(DParameter::ForceIgnoreChecksum(true))?;

let buf = BufWrite::with_capacity(DEFAULT_BUFFER_SIZE);
Ok(Box::new(ZstdDecompression {
writer: Writer::new(buf, decoder),
}))
Ok(Box::new(ZstdDecompression { writer: decoder }))
}
}

impl io::Write for ZstdCompression {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.writer.write(buf)
unimplemented!()
}

fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
self.writer.write_all(buf)
unimplemented!()
}

fn flush(&mut self) -> io::Result<()> {
self.writer.flush()
unimplemented!()
}
}

use speedy::{Readable, Writable};
const DATA_OFF: usize = mem::size_of::<u32>();

impl CompressionState for ZstdCompression {
fn finish(&mut self) -> Buf {
let _ = self.writer.finish();

mem::replace(
self.writer.writer_mut(),
BufWrite::with_capacity(DEFAULT_BUFFER_SIZE),
)
.into_buf()
fn finish(&mut self, data: Buf) -> Result<Buf> {
let size = zstd_safe::compress_bound(data.as_ref().len());
let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size as u32));
buf.write_all(&[0u8; DATA_OFF])?;

let mut input = zstd::stream::raw::InBuffer::around(&data);
let mut output = zstd::stream::raw::OutBuffer::around_pos(&mut buf, DATA_OFF);
let mut finished_frame;
loop {
let remaining = self.writer.run(&mut input, &mut output)?;
finished_frame = remaining == 0;
if input.pos() > 0 || data.is_empty() {
break;
}
}

while self.writer.flush(&mut output)? > 0 {}
self.writer.finish(&mut output, finished_frame)?;

let og_len = data.len() as u32;
og_len
.write_to_buffer(&mut buf.as_mut()[..DATA_OFF])
.unwrap();
Ok(buf.into_buf())
}
}

impl DecompressionState for ZstdDecompression {
fn decompress(&mut self, data: &[u8]) -> Result<Box<[u8]>> {
self.writer.write_all(data)?;
self.writer.finish();

Ok(mem::replace(
self.writer.writer_mut(),
BufWrite::with_capacity(DEFAULT_BUFFER_SIZE),
)
.into_buf()
.into_boxed_slice())
fn decompress(&mut self, data: Buf) -> Result<Buf> {
let size = u32::read_from_buffer(data.as_ref()).unwrap();
let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size));

let mut input = zstd::stream::raw::InBuffer::around(&data[DATA_OFF..]);
let mut output = zstd::stream::raw::OutBuffer::around(&mut buf);

let mut finished_frame;
loop {
let remaining = self.writer.run(&mut input, &mut output)?;
finished_frame = remaining == 0;
if remaining > 0 {
if output.dst.capacity() == output.dst.as_ref().len() {
// append faux byte to extend in case that original was
// wrong for some reason (this should not happen but is a
// sanity guard)
output.dst.write(&[0])?;
}
continue;
}
if input.pos() > 0 || data.is_empty() {
break;
}
}

while self.writer.flush(&mut output)? > 0 {}
self.writer.finish(&mut output, finished_frame)?;

Ok(buf.into_buf())
}
}

#[cfg(test)]
mod tests {
use rand::RngCore;

use super::*;

#[test]
fn encode_then_decode() {
let mut rng = rand::thread_rng();
let mut buf = vec![42u8; 4 * 1024 * 1024];
rng.fill_bytes(buf.as_mut());
let buf = Buf::from_zero_padded(buf);
let zstd = Zstd { level: 1 };
let mut comp = zstd.new_compression().unwrap();
let c_buf = comp.finish(buf.clone()).unwrap();
let mut decomp = zstd.decompression_tag().new_decompression().unwrap();
let d_buf = decomp.decompress(c_buf).unwrap();
assert_eq!(buf.as_ref().len(), d_buf.as_ref().len());
}

#[test]
fn sanity() {
let buf = [42u8, 42];
let c_buf = zstd::stream::encode_all(&buf[..], 1).unwrap();
let d_buf = zstd::stream::decode_all(c_buf.as_slice()).unwrap();
assert_eq!(&buf, d_buf.as_slice());
}
}
Loading

0 comments on commit 42c6913

Please sign in to comment.