From ca604af8439c223604ef4577063059234f01173a Mon Sep 17 00:00:00 2001 From: Sajad Karim Date: Tue, 1 Oct 2024 14:06:06 +0200 Subject: [PATCH] Add some changes related to compression. --- betree/Cargo.toml | 1 + betree/src/compression/lz4.rs | 34 ++++++++++++++++-- betree/src/compression/mod.rs | 1 + betree/src/compression/none.rs | 5 +++ betree/src/compression/zstd.rs | 60 ++++++++++++++++++++++++------- betree/src/data_management/dmu.rs | 6 ++-- betree/src/tree/imp/mod.rs | 2 +- 7 files changed, 90 insertions(+), 19 deletions(-) diff --git a/betree/Cargo.toml b/betree/Cargo.toml index 71d5a446..25017e91 100644 --- a/betree/Cargo.toml +++ b/betree/Cargo.toml @@ -46,6 +46,7 @@ env_logger = { version = "0.9", optional = true } core_affinity = "0.5" async-trait = "0.1" +lz4-sys = "1.9" lz4 = "1.23.1" zstd = { version = "0.9", default-features = false } zstd-safe = { version = "4.0", default-features = false, features = ["experimental"] } diff --git a/betree/src/compression/lz4.rs b/betree/src/compression/lz4.rs index e85a6af8..aa991401 100644 --- a/betree/src/compression/lz4.rs +++ b/betree/src/compression/lz4.rs @@ -103,7 +103,16 @@ use std::time::Instant; use speedy::{Readable, Writable}; const DATA_OFF: usize = mem::size_of::(); +use lz4_sys::{LZ4F_compressBound, LZ4FPreferences, LZ4FCompressionContext, LZ4F_createCompressionContext}; +use std::ptr; +use lz4_sys::LZ4FFrameInfo; + impl CompressionState for Lz4Compression { + fn finishext2(&mut self, data: &[u8]) -> Result + { + panic!(".."); + } + fn finishext(&mut self, data: &[u8]) -> Result> { let size = data.len(); @@ -124,7 +133,16 @@ impl CompressionState for Lz4Compression { fn finish(&mut self, data: Buf) -> Result { let size = data.as_ref().len(); - let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size as u32)); + // let prefs = LZ4FPreferences { + // frame_info: Default::default(), + // compression_level: self.config.level as u32, + // auto_flush: 1, + // favor_dec_speed: 0, + // reserved: [0; 3], + // }; + + + let mut buf: BufWrite = BufWrite::with_capacity(Block::round_up_from_bytes( (size as u32))); let mut encoder = EncoderBuilder::new() .level(u32::from(self.config.level)) @@ -133,10 +151,18 @@ impl CompressionState for Lz4Compression { .block_mode(BlockMode::Linked) .build(buf)?; + //io::copy(&mut data.as_ref(), &mut encoder)?; encoder.write_all(data.as_ref())?; let (compressed_data, result) = encoder.finish(); - - Ok(compressed_data.into_buf()) + + if let Err(e) = result { + panic!("Compression failed: {:?}", e); + } + + let mut buf2 = BufWrite::with_capacity(Block::round_up_from_bytes(compressed_data.as_slice().len() as u32)); + buf2.write_all(compressed_data.as_slice()); + + Ok(buf2.into_buf()) } // fn finish(&mut self) -> Buf { // let (v, result) = self.encoder.finish(); @@ -146,6 +172,8 @@ impl CompressionState for Lz4Compression { } + + impl DecompressionState for Lz4Decompression { fn decompressext(&mut self, data: &[u8]) -> Result> { diff --git a/betree/src/compression/mod.rs b/betree/src/compression/mod.rs index 95ebc091..e4d2a833 100644 --- a/betree/src/compression/mod.rs +++ b/betree/src/compression/mod.rs @@ -78,6 +78,7 @@ pub trait CompressionState: Write { /// compressed data. fn finish(&mut self, data: Buf) -> Result; fn finishext(&mut self, data: &[u8]) -> Result>; + fn finishext2(&mut self, data: &[u8]) -> Result; } pub trait DecompressionState { diff --git a/betree/src/compression/none.rs b/betree/src/compression/none.rs index 0a0cb1d5..47aa8c22 100644 --- a/betree/src/compression/none.rs +++ b/betree/src/compression/none.rs @@ -57,6 +57,11 @@ impl io::Write for NoneCompression { } impl CompressionState for NoneCompression { + fn finishext2(&mut self, data: &[u8]) -> Result + { + panic!(".."); + } + fn finishext(&mut self, data: &[u8]) -> Result> { Ok(data.clone().to_vec()) diff --git a/betree/src/compression/zstd.rs b/betree/src/compression/zstd.rs index 2349112c..fc2d9f02 100644 --- a/betree/src/compression/zstd.rs +++ b/betree/src/compression/zstd.rs @@ -94,7 +94,7 @@ use speedy::{Readable, Writable}; const DATA_OFF: usize = mem::size_of::(); impl CompressionState for ZstdCompression { - fn finishext(&mut self, data: &[u8]) -> Result> + fn finishext2(&mut self, data: &[u8]) -> Result { let size = zstd_safe::compress_bound(data.len()); let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size as u32)); @@ -119,12 +119,12 @@ impl CompressionState for ZstdCompression { .write_to_buffer(&mut buf.as_mut()[..DATA_OFF]) .unwrap(); - Ok(buf.as_slice().to_vec()) + Ok(buf.into_buf()) } - fn finish(&mut self, data: Buf) -> Result { - let start = Instant::now(); - let size = zstd_safe::compress_bound(data.as_ref().len()); + fn finishext(&mut self, data: &[u8]) -> Result> + { + let size = zstd_safe::compress_bound(data.len()); let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size as u32)); buf.write_all(&[0u8; DATA_OFF])?; @@ -146,10 +146,46 @@ impl CompressionState for ZstdCompression { og_len .write_to_buffer(&mut buf.as_mut()[..DATA_OFF]) .unwrap(); - let duration = start.elapsed(); - //println!("Total time elapsed: {:?}", duration); - //println!("Total time elapsed: {} {}", size, buf.get_len()); - Ok(buf.into_buf()) + + Ok(buf.as_slice().to_vec()) + } + + fn finish(&mut self, data: Buf) -> Result { + //panic!(".."); + let start = Instant::now(); + let size = zstd_safe::compress_bound(data.as_ref().len()); + let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size as u32)); + //buf.write_all(&[0u8; DATA_OFF])?; + + let mut input = zstd::stream::raw::InBuffer::around(&data); + let mut output = zstd::stream::raw::OutBuffer::around_pos(&mut buf, 0); + let mut finished_frame; + loop { + let remaining = self.writer.run(&mut input, &mut output)?; + finished_frame = remaining == 0; + if input.pos() > 0 || data.is_empty() { + break; + } + } + while self.writer.flush(&mut output)? > 0 {} + self.writer.finish(&mut output, finished_frame)?; + + // let og_len = data.len() as u32; + // og_len + // .write_to_buffer(&mut buf.as_mut()[..DATA_OFF]) + // .unwrap(); + // let duration = start.elapsed(); + // let b = buf.get_len(); + let mut buf2 = BufWrite::with_capacity(Block::round_up_from_bytes(output.as_slice().len() as u32)); + buf2.write_all(output.as_slice()); + + let a = output.as_slice().len(); + let b = buf2.into_buf(); + let c = buf.into_buf(); + //println!("== {:?}", data.as_ref()); + //println!("== {:?}", b.as_ref()); + //println!("compressed....: {} {} {} {}", size, data.as_ref().len(), b.as_ref().len(), c.as_ref().len()); + Ok(b) } } @@ -158,7 +194,7 @@ impl DecompressionState for ZstdDecompression { fn decompressext(&mut self, data: &[u8]) -> Result> { //panic!("shukro maula"); - let size = u32::read_from_buffer(data).unwrap(); + let size = data.len() as u32; let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size)); let mut input = zstd::stream::raw::InBuffer::around(&data[DATA_OFF..]); @@ -192,10 +228,10 @@ impl DecompressionState for ZstdDecompression { //let start = Instant::now(); //panic!("..why"); - let size = u32::read_from_buffer(data.as_ref()).unwrap(); + let size = data.as_ref().len() as u32; let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size)); - let mut input = zstd::stream::raw::InBuffer::around(&data[DATA_OFF..]); + let mut input = zstd::stream::raw::InBuffer::around(&data[..]); let mut output = zstd::stream::raw::OutBuffer::around(&mut buf); let mut finished_frame; diff --git a/betree/src/data_management/dmu.rs b/betree/src/data_management/dmu.rs index 0ef5c84e..d248518f 100644 --- a/betree/src/data_management/dmu.rs +++ b/betree/src/data_management/dmu.rs @@ -255,9 +255,9 @@ where //println!("..zz {:?} {}", bytes_to_read, compressed_data.as_ref().len()); let object: Node>> = { /// TODOooooooooooooooooooooooooooooooooooooooooooooooo fix this!!!!!!! layeeeee - //let data = decompression_state.decompress(compressed_data)?; - //Object::unpack_and_decompress(op.size(), op.checksum().clone().into(), self.pool.clone().into(), op.offset(), op.info(), data.into_boxed_slice(), a)? - Object::unpack_and_decompress(op.size(), op.checksum().clone().into(), self.pool.clone().into(), op.offset(), op.info(), compressed_data.into_boxed_slice(), a)? + let data = decompression_state.decompress(compressed_data)?; + Object::unpack_and_decompress(op.size(), op.checksum().clone().into(), self.pool.clone().into(), op.offset(), op.info(), data.into_boxed_slice(), a)? + //Object::unpack_and_decompress(op.size(), op.checksum().clone().into(), self.pool.clone().into(), op.offset(), op.info(), compressed_data.into_boxed_slice(), a)? }; let key = ObjectKey::Unmodified { offset, generation }; diff --git a/betree/src/tree/imp/mod.rs b/betree/src/tree/imp/mod.rs index 7c517a55..64673eac 100644 --- a/betree/src/tree/imp/mod.rs +++ b/betree/src/tree/imp/mod.rs @@ -128,7 +128,7 @@ where dml: X, storage_preference: StoragePreference, ) -> Self { - let root_node = dml.insert(Node::empty_leaf(true), tree_id, PivotKey::Root(tree_id)); + let root_node = dml.insert(Node::empty_leaf(false), tree_id, PivotKey::Root(tree_id)); Tree::new(root_node, tree_id, msg_action, dml, storage_preference) }