Skip to content

Commit

Permalink
Add changes related to compression.
Browse files Browse the repository at this point in the history
  • Loading branch information
SajadKarim committed Oct 1, 2024
2 parents 0cd9d90 + ca604af commit 9dd0d0f
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 12 deletions.
1 change: 1 addition & 0 deletions betree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ env_logger = { version = "0.9", optional = true }
core_affinity = "0.5"
async-trait = "0.1"

lz4-sys = "1.9"
lz4 = "1.23.1"
zstd = { version = "0.9", default-features = false }
zstd-safe = { version = "4.0", default-features = false, features = ["experimental"] }
Expand Down
36 changes: 32 additions & 4 deletions betree/src/compression/lz4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,16 @@ use std::time::Instant;
use speedy::{Readable, Writable};
const DATA_OFF: usize = mem::size_of::<u32>();

use lz4_sys::{LZ4F_compressBound, LZ4FPreferences, LZ4FCompressionContext, LZ4F_createCompressionContext};
use std::ptr;
use lz4_sys::LZ4FFrameInfo;

impl CompressionState for Lz4Compression {
fn finishext2(&mut self, data: &[u8]) -> Result<Buf>
{
panic!("..");
}

fn finishext(&mut self, data: &[u8]) -> Result<Vec<u8>>
{
let size = data.len();
Expand Down Expand Up @@ -132,7 +141,16 @@ impl CompressionState for Lz4Compression {

fn finish(&mut self, data: Buf) -> Result<Buf> {
let size = data.as_ref().len();
let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size as u32));
// let prefs = LZ4FPreferences {
// frame_info: Default::default(),
// compression_level: self.config.level as u32,
// auto_flush: 1,
// favor_dec_speed: 0,
// reserved: [0; 3],
// };


let mut buf: BufWrite = BufWrite::with_capacity(Block::round_up_from_bytes( (size as u32)));

let mut encoder = EncoderBuilder::new()
.level(u32::from(self.config.level))
Expand All @@ -141,9 +159,14 @@ impl CompressionState for Lz4Compression {
.block_mode(BlockMode::Linked)
.build(buf)?;

//io::copy(&mut data.as_ref(), &mut encoder)?;
encoder.write_all(data.as_ref())?;
let (compressed_data, result) = encoder.finish();

//<<<<<<< HEAD
//
//=======
//
//>>>>>>> ca604af8439c223604ef4577063059234f01173a
if let Err(e) = result {
panic!("Compression failed: {:?}", e);
}
Expand All @@ -152,8 +175,11 @@ impl CompressionState for Lz4Compression {
buf2.write_all(compressed_data.as_slice());

Ok(buf2.into_buf())

//Ok(compressed_data.into_buf())
//<<<<<<< HEAD
//
// //Ok(compressed_data.into_buf())
//=======
//>>>>>>> ca604af8439c223604ef4577063059234f01173a
}
// fn finish(&mut self) -> Buf {
// let (v, result) = self.encoder.finish();
Expand All @@ -163,6 +189,8 @@ impl CompressionState for Lz4Compression {
}




impl DecompressionState for Lz4Decompression {
fn decompressext(&mut self, data: &[u8]) -> Result<Vec<u8>>
{
Expand Down
1 change: 1 addition & 0 deletions betree/src/compression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pub trait CompressionState: Write {
/// compressed data.
fn finish(&mut self, data: Buf) -> Result<Buf>;
fn finishext(&mut self, data: &[u8]) -> Result<Vec<u8>>;
fn finishext2(&mut self, data: &[u8]) -> Result<Buf>;
}

pub trait DecompressionState {
Expand Down
5 changes: 5 additions & 0 deletions betree/src/compression/none.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ impl io::Write for NoneCompression {
}

impl CompressionState for NoneCompression {
fn finishext2(&mut self, data: &[u8]) -> Result<Buf>
{
panic!("..");
}

fn finishext(&mut self, data: &[u8]) -> Result<Vec<u8>>
{
Ok(data.clone().to_vec())
Expand Down
67 changes: 59 additions & 8 deletions betree/src/compression/zstd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ use speedy::{Readable, Writable};
const DATA_OFF: usize = mem::size_of::<u32>();

impl CompressionState for ZstdCompression {
fn finishext(&mut self, data: &[u8]) -> Result<Vec<u8>>
fn finishext2(&mut self, data: &[u8]) -> Result<Buf>
{
let size = zstd_safe::compress_bound(data.len());
let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size as u32));
Expand Down Expand Up @@ -122,12 +122,16 @@ impl CompressionState for ZstdCompression {
let mut buf2 = BufWrite::with_capacity(Block::round_up_from_bytes(output.as_slice().len() as u32));
buf2.write_all(output.as_slice());

Ok(buf2.as_slice().to_vec())
//<<<<<<< HEAD
// Ok(buf2.as_slice().to_vec())
//=======
Ok(buf.into_buf())
//>>>>>>> ca604af8439c223604ef4577063059234f01173a
}

fn finish(&mut self, data: Buf) -> Result<Buf> {
let start = Instant::now();
let size = zstd_safe::compress_bound(data.as_ref().len());
fn finishext(&mut self, data: &[u8]) -> Result<Vec<u8>>
{
let size = zstd_safe::compress_bound(data.len());
let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size as u32));
//buf.write_all(&[0u8; DATA_OFF])?;

Expand All @@ -150,15 +154,58 @@ impl CompressionState for ZstdCompression {
og_len
.write_to_buffer(&mut buf.as_mut()[..DATA_OFF])
.unwrap();
//<<<<<<< HEAD
let duration = start.elapsed();
//println!("Total time elapsed: {:?}", duration);
//println!("Total time elapsed: {} {}", size, buf.get_len());
Ok(buf.into_buf())
*/

// let mut buf2 = BufWrite::with_capacity(Block::round_up_from_bytes(output.as_slice().len() as u32));
// buf2.write_all(output.as_slice());
// Ok(buf2.into_buf())
//=======

Ok(buf.as_slice().to_vec())
}

fn finish(&mut self, data: Buf) -> Result<Buf> {
//panic!("..");
let start = Instant::now();
let size = zstd_safe::compress_bound(data.as_ref().len());
let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size as u32));
//buf.write_all(&[0u8; DATA_OFF])?;

let mut input = zstd::stream::raw::InBuffer::around(&data);
let mut output = zstd::stream::raw::OutBuffer::around_pos(&mut buf, 0);
let mut finished_frame;
loop {
let remaining = self.writer.run(&mut input, &mut output)?;
finished_frame = remaining == 0;
if input.pos() > 0 || data.is_empty() {
break;
}
}
while self.writer.flush(&mut output)? > 0 {}
self.writer.finish(&mut output, finished_frame)?;

// let og_len = data.len() as u32;
// og_len
// .write_to_buffer(&mut buf.as_mut()[..DATA_OFF])
// .unwrap();
// let duration = start.elapsed();
// let b = buf.get_len();
let mut buf2 = BufWrite::with_capacity(Block::round_up_from_bytes(output.as_slice().len() as u32));
buf2.write_all(output.as_slice());
Ok(buf2.into_buf())

let a = output.as_slice().len();
let b = buf2.into_buf();
let c = buf.into_buf();
//println!("== {:?}", data.as_ref());
//println!("== {:?}", b.as_ref());
//println!("compressed....: {} {} {} {}", size, data.as_ref().len(), b.as_ref().len(), c.as_ref().len());
Ok(b)
//>>>>>>> ca604af8439c223604ef4577063059234f01173a
}
}

Expand All @@ -167,7 +214,7 @@ impl DecompressionState for ZstdDecompression {
fn decompressext(&mut self, data: &[u8]) -> Result<Vec<u8>>
{
//panic!("shukro maula");
let size = u32::read_from_buffer(data).unwrap();
let size = data.len() as u32;
let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size));

let mut input = zstd::stream::raw::InBuffer::around(&data[DATA_OFF..]);
Expand Down Expand Up @@ -201,10 +248,14 @@ impl DecompressionState for ZstdDecompression {
//let start = Instant::now();
//panic!("..why");

let size = u32::read_from_buffer(data.as_ref()).unwrap();
let size = data.as_ref().len() as u32;
let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size));

//<<<<<<< HEAD
let mut input = zstd::stream::raw::InBuffer::around(&data[/*DATA_OFF*/..]);
//=======
// let mut input = zstd::stream::raw::InBuffer::around(&data[..]);
//>>>>>>> ca604af8439c223604ef4577063059234f01173a
let mut output = zstd::stream::raw::OutBuffer::around(&mut buf);

let mut finished_frame;
Expand Down

0 comments on commit 9dd0d0f

Please sign in to comment.