From 42c6913ac905d4aa2a6678160a65a9299ff19105 Mon Sep 17 00:00:00 2001 From: Sajad Karim Date: Thu, 5 Sep 2024 16:06:02 +0200 Subject: [PATCH] temp checkin --- betree/src/buffer.rs | 17 ++++ betree/src/compression/mod.rs | 14 ++- betree/src/compression/none.rs | 8 +- betree/src/compression/zstd.rs | 145 +++++++++++++++++++-------- betree/src/cow_bytes.rs | 159 +++++++++++++++++++++++------- betree/src/cow_bytesex.rs | 56 +++++------ betree/src/data_management/dmu.rs | 24 ++++- betree/src/tree/imp/node.rs | 73 +++++++++++--- betree/src/tree/imp/nvmleaf.rs | 46 ++++----- 9 files changed, 393 insertions(+), 149 deletions(-) diff --git a/betree/src/buffer.rs b/betree/src/buffer.rs index 5a896593..f928caca 100644 --- a/betree/src/buffer.rs +++ b/betree/src/buffer.rs @@ -290,6 +290,23 @@ impl io::Write for BufWrite { } } + +unsafe impl zstd::stream::raw::WriteBuf for BufWrite { + fn as_slice(&self) -> &[u8] { + self.as_ref() + } + fn capacity(&self) -> usize { + self.buf.capacity.to_bytes() as usize + } + fn as_mut_ptr(&mut self) -> *mut u8 { + unsafe { self.buf.ptr.as_mut().unwrap() } + } + unsafe fn filled_until(&mut self, n: usize) { + self.size = n as u32 + } + } + + impl io::Seek for BufWrite { fn seek(&mut self, seek: io::SeekFrom) -> io::Result { use io::SeekFrom::*; diff --git a/betree/src/compression/mod.rs b/betree/src/compression/mod.rs index 1dfa18cc..2cfbc409 100644 --- a/betree/src/compression/mod.rs +++ b/betree/src/compression/mod.rs @@ -3,7 +3,7 @@ //! `None` and `Lz4` are provided as implementation. use crate::{ - buffer::Buf, + buffer::{Buf, BufWrite}, size::{Size, StaticSize}, vdev::Block, }; @@ -75,11 +75,11 @@ pub trait CompressionBuilder: Debug + Size + Send + Sync + 'static { pub trait CompressionState: Write { /// Finishes the compression stream and returns a buffer that contains the /// compressed data. - fn finish(&mut self) -> Buf; + fn finish(&mut self, data: Buf) -> Result; } pub trait DecompressionState { - fn decompress(&mut self, data: &[u8]) -> Result>; + fn decompress(&mut self, data: Buf) -> Result; } mod none; @@ -90,3 +90,11 @@ pub use self::none::None; mod zstd; pub use self::zstd::Zstd; + + +lazy_static::lazy_static! { + pub static ref COMPRESSION_VAR: Arc> = + CompressionConfiguration::Zstd(Zstd { + level: 1, + }).to_builder(); +} \ No newline at end of file diff --git a/betree/src/compression/none.rs b/betree/src/compression/none.rs index be82adfe..249b273b 100644 --- a/betree/src/compression/none.rs +++ b/betree/src/compression/none.rs @@ -57,14 +57,14 @@ impl io::Write for NoneCompression { } impl CompressionState for NoneCompression { - fn finish(&mut self) -> Buf { - mem::replace(&mut self.buf, BufWrite::with_capacity(DEFAULT_BUFFER_SIZE)).into_buf() + fn finish(&mut self, buf: Buf) -> Result { + Ok(buf) } } impl DecompressionState for NoneDecompression { - fn decompress(&mut self, data: &[u8]) -> Result> { + fn decompress(&mut self, data: Buf) -> Result { // FIXME: pass-through Buf, reusing alloc - Ok(data.to_vec().into_boxed_slice()) + Ok(data) } } diff --git a/betree/src/compression/zstd.rs b/betree/src/compression/zstd.rs index 419d8361..c7ffe7fd 100644 --- a/betree/src/compression/zstd.rs +++ b/betree/src/compression/zstd.rs @@ -4,20 +4,24 @@ use super::{ }; use crate::{ buffer::{Buf, BufWrite}, + database, size::StaticSize, + vdev::Block, }; use serde::{Deserialize, Serialize}; use std::{ - io::{self, Write}, + io::{self, Cursor, Write}, mem, }; -use zstd::stream::{ - raw::{CParameter, DParameter, Decoder, Encoder}, - zio::Writer, +use zstd::{ + block::{Compressor, Decompressor}, + stream::{ + raw::{CParameter, DParameter, Decoder, Encoder}, + zio::{Reader, Writer}, + }, }; -use zstd_safe::FrameFormat; +use zstd_safe::{FrameFormat, InBuffer, OutBuffer, WriteBuf}; use std::sync::{Arc, Mutex}; - // TODO: investigate pre-created dictionary payoff /// Zstd compression. () @@ -29,10 +33,10 @@ pub struct Zstd { } struct ZstdCompression { - writer: Writer>, + writer: Encoder<'static>, } struct ZstdDecompression { - writer: Writer>, + writer: Decoder<'static>, } impl StaticSize for Zstd { @@ -41,6 +45,8 @@ impl StaticSize for Zstd { } } +use zstd::stream::raw::Operation; + impl CompressionBuilder for Zstd { fn new_compression(&self) -> Result>> { // "The library supports regular compression levels from 1 up to ZSTD_maxCLevel(), @@ -49,14 +55,10 @@ impl CompressionBuilder for Zstd { // Compression format is stored externally, don't need to duplicate it encoder.set_parameter(CParameter::Format(FrameFormat::Magicless))?; - // Integrity is handled at a different layer + // // Integrity is handled at a different layer encoder.set_parameter(CParameter::ChecksumFlag(false))?; - let buf = BufWrite::with_capacity(DEFAULT_BUFFER_SIZE); - - Ok(Arc::new(std::sync::RwLock::new(ZstdCompression { - writer: Writer::new(buf, encoder), - }))) + Ok(Arc::new(std::sync::RwLock::new(ZstdCompression { writer: encoder }))) } fn decompression_tag(&self) -> DecompressionTag { @@ -68,50 +70,115 @@ impl Zstd { pub fn new_decompression() -> Result> { let mut decoder = Decoder::new()?; decoder.set_parameter(DParameter::Format(FrameFormat::Magicless))?; + // decoder.set_parameter(DParameter::ForceIgnoreChecksum(true))?; - let buf = BufWrite::with_capacity(DEFAULT_BUFFER_SIZE); - Ok(Box::new(ZstdDecompression { - writer: Writer::new(buf, decoder), - })) + Ok(Box::new(ZstdDecompression { writer: decoder })) } } impl io::Write for ZstdCompression { fn write(&mut self, buf: &[u8]) -> io::Result { - self.writer.write(buf) + unimplemented!() } fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { - self.writer.write_all(buf) + unimplemented!() } fn flush(&mut self) -> io::Result<()> { - self.writer.flush() + unimplemented!() } } +use speedy::{Readable, Writable}; +const DATA_OFF: usize = mem::size_of::(); + impl CompressionState for ZstdCompression { - fn finish(&mut self) -> Buf { - let _ = self.writer.finish(); - - mem::replace( - self.writer.writer_mut(), - BufWrite::with_capacity(DEFAULT_BUFFER_SIZE), - ) - .into_buf() + fn finish(&mut self, data: Buf) -> Result { + let size = zstd_safe::compress_bound(data.as_ref().len()); + let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size as u32)); + buf.write_all(&[0u8; DATA_OFF])?; + + let mut input = zstd::stream::raw::InBuffer::around(&data); + let mut output = zstd::stream::raw::OutBuffer::around_pos(&mut buf, DATA_OFF); + let mut finished_frame; + loop { + let remaining = self.writer.run(&mut input, &mut output)?; + finished_frame = remaining == 0; + if input.pos() > 0 || data.is_empty() { + break; + } + } + + while self.writer.flush(&mut output)? > 0 {} + self.writer.finish(&mut output, finished_frame)?; + + let og_len = data.len() as u32; + og_len + .write_to_buffer(&mut buf.as_mut()[..DATA_OFF]) + .unwrap(); + Ok(buf.into_buf()) } } impl DecompressionState for ZstdDecompression { - fn decompress(&mut self, data: &[u8]) -> Result> { - self.writer.write_all(data)?; - self.writer.finish(); - - Ok(mem::replace( - self.writer.writer_mut(), - BufWrite::with_capacity(DEFAULT_BUFFER_SIZE), - ) - .into_buf() - .into_boxed_slice()) + fn decompress(&mut self, data: Buf) -> Result { + let size = u32::read_from_buffer(data.as_ref()).unwrap(); + let mut buf = BufWrite::with_capacity(Block::round_up_from_bytes(size)); + + let mut input = zstd::stream::raw::InBuffer::around(&data[DATA_OFF..]); + let mut output = zstd::stream::raw::OutBuffer::around(&mut buf); + + let mut finished_frame; + loop { + let remaining = self.writer.run(&mut input, &mut output)?; + finished_frame = remaining == 0; + if remaining > 0 { + if output.dst.capacity() == output.dst.as_ref().len() { + // append faux byte to extend in case that original was + // wrong for some reason (this should not happen but is a + // sanity guard) + output.dst.write(&[0])?; + } + continue; + } + if input.pos() > 0 || data.is_empty() { + break; + } + } + + while self.writer.flush(&mut output)? > 0 {} + self.writer.finish(&mut output, finished_frame)?; + + Ok(buf.into_buf()) } } + +#[cfg(test)] +mod tests { + use rand::RngCore; + + use super::*; + + #[test] + fn encode_then_decode() { + let mut rng = rand::thread_rng(); + let mut buf = vec![42u8; 4 * 1024 * 1024]; + rng.fill_bytes(buf.as_mut()); + let buf = Buf::from_zero_padded(buf); + let zstd = Zstd { level: 1 }; + let mut comp = zstd.new_compression().unwrap(); + let c_buf = comp.finish(buf.clone()).unwrap(); + let mut decomp = zstd.decompression_tag().new_decompression().unwrap(); + let d_buf = decomp.decompress(c_buf).unwrap(); + assert_eq!(buf.as_ref().len(), d_buf.as_ref().len()); + } + + #[test] + fn sanity() { + let buf = [42u8, 42]; + let c_buf = zstd::stream::encode_all(&buf[..], 1).unwrap(); + let d_buf = zstd::stream::decode_all(c_buf.as_slice()).unwrap(); + assert_eq!(&buf, d_buf.as_slice()); + } +} \ No newline at end of file diff --git a/betree/src/cow_bytes.rs b/betree/src/cow_bytes.rs index 4ed8e64b..6d097a6d 100644 --- a/betree/src/cow_bytes.rs +++ b/betree/src/cow_bytes.rs @@ -162,9 +162,14 @@ struct ArchivedCowBytes { inner: ArchivedVec, } +pub struct CowBytesResolver { + len: usize, + inner: VecResolver, +} + impl Archive for CowBytes { type Archived = ArchivedVec; - type Resolver = VecResolver; + type Resolver = CowBytesResolver; unsafe fn resolve( &self, @@ -172,7 +177,9 @@ impl Archive for CowBytes { resolver: Self::Resolver, out: *mut Self::Archived, ) { - ArchivedVec::resolve_from_len(self.inner.len(), pos, resolver, out); + println!("xxxxxxxxxxxxxxxxxxxxxxx {} {}", self.inner.len(), resolver.len); + //ArchivedVec::resolve_from_len(self.inner.len(), pos, resolver, out); + ArchivedVec::resolve_from_len(resolver.len, pos, resolver.inner, out); } } @@ -181,57 +188,137 @@ impl Serialize for CowBytes { &self, serializer: &mut S ) -> Result { - //panic!("----------------------"); + + panic!("----------------------"); //let compression = CompressionConfiguration::None; - let compression = CompressionConfiguration::Zstd(Zstd { + /*let compression = CompressionConfiguration::Zstd(Zstd { level: 1, }); let default_compression = compression.to_builder(); - - let compression = &*default_compression.read().unwrap(); - //let compressed_data = [0u8, 10]; - //panic!("<>"); - let compressed_data = { - // FIXME: cache this - let a = compression.new_compression().unwrap(); - let mut state = a.write().unwrap(); - { - state.write_all(self.inner.as_slice()); - } - state.finish() - }; - - ArchivedVec::serialize_from_slice(compressed_data.as_slice(), serializer) +*/ + + + + // let compression = &*crate::compression::COMPRESSION_VAR.read().unwrap();//default_compression.read().unwrap(); + // //let compressed_data = [0u8, 10]; + // //panic!("<>"); + // let mut compressed_data = { + // let state = compression.new_compression().unwrap(); + // let mut writer = state.write().unwrap(); + // { + // match writer.write_all(self.inner.as_slice()) { + // Ok(obj) => obj, + // Err(e) => panic!("+++++++++++++++++++++++++++++++++++++++++++++++++++++++"), + // } + // } + // writer.finish() + // }; + + // if compressed_data[compressed_data.len()-1] != 0 && compressed_data.len() != 4096{ + // println!("once..."); + // let mut dkd = (*self.inner).clone(); + // dkd.resize(dkd.len() + 4096,0); + + // compressed_data = { + // let state = compression.new_compression().unwrap(); + // let mut writer = state.write().unwrap(); + // { + // match writer.write_all(dkd.as_slice()) { + // Ok(obj) => obj, + // Err(e) => panic!("+++++++++++++++++++++++++++++++++++++++++++++++++++++++"), + // } + // } + // writer.finish() + // }; + + // //println!("11------------------------------------{}, {}", dkd.len(), compressed_data.len()); + // //} + + // //if compressed_data[compressed_data.len()-1] != 0 { + // // panic!("agaiomn!!!! {:?}", dkd.as_slice()); + // //} + // //let mut dkd = (*self.inner).clone(); + // //dkd.resize(dkd.len() + 4096,0); + + + // // if self.inner.len() == 131072 { + // //println!("1------------------------------------{:?}", self.inner.as_slice()); + // //println!("2------------------------------------{:?}", compressed_data.as_slice()); + // //println!("1------------------------------------{}:: {} -- {}",compressed_data[compressed_data.len()-1], compressed_data.as_slice().len(), self.inner.as_slice().len()); + + + // //println!("{} {}", self.inner[self.inner.len()-1], compressed_data[self.inner.len()-1]); + // //println!("{} {}", self.inner[self.inner.len()-2], compressed_data[self.inner.len()-2]); + // //println!("{} {}", self.inner[self.inner.len()-3], compressed_data[self.inner.len()-3]); + + + // //let d = DecompressionTag::None; + // let mut decompression_state = compression.decompression_tag().new_decompression().unwrap();//d.new_decompression(); + + // let data = decompression_state.decompress(&compressed_data).unwrap(); + // let arc_vec = Arc::new(data.to_vec()); + // // panic!(""); + // // } + + // //println!("++++++++++++++++++++{:?}", compressed_data.as_slice()); + // //println!("++++++++++++++++++++"); + + + // //panic!("1------------------------------------{:?}:: {} -- {}", compressed_data.as_slice(), compressed_data.as_slice().len(), self.inner.as_slice().len()); + // } + + // let mut lambda = |data: &dyn AsRef<[u8]>| { + + // Ok(CowBytesResolver { + // len: data.as_ref().len(), + // inner: ArchivedVec::serialize_from_slice(data.as_ref().as_slice(), serializer)?, + // }) + + // //ArchivedVec::serialize_from_slice(data.as_ref().as_slice(), serializer) + // }; + // lambda(&compressed_data) + // //ArchivedVec::serialize_from_slice(compressed_data.as_slice(), serializer) - //ArchivedVec::serialize_from_slice(&self.inner, serializer) + // //ArchivedVec::serialize_from_slice(&self.inner, serializer) } } -impl Deserialize for ArchivedVec { - fn deserialize(&self, deserializer: &mut D) -> Result { -//panic!("----------------------"); - let vec: Vec = self.deserialize(deserializer)?; - - //let compression = CompressionConfiguration::None; - let compression = CompressionConfiguration::Zstd(Zstd { - level: 1, - }); - let default_compression = compression.to_builder(); +use std::io::Write; - let compression = &*default_compression.read().unwrap(); +impl Deserialize for ArchivedVec { + fn deserialize(&self, deserializer: &mut D) -> Result { + panic!(".."); + //println!("pre ====="); + // let vec: Vec = self.deserialize(deserializer)?; + // if vec[vec.len()-1] != 0 { + // panic!("1------------------------------------{:?}", vec.as_slice()); + // } + + // let size = crate::vdev::Block::from_bytes(vec.len() as u32); + // //println!("===== {} {:?}", vec.len(), size); + // let mut buf = crate::buffer::BufWrite::with_capacity(size); + // buf.write_all(vec.as_slice()); + // let dt = buf.into_buf(); + // //println!("===== done {} {:?} {}", vec.len(), size, dt.as_ref().len()); + // //let compression = CompressionConfiguration::None; + // /*let compression = CompressionConfiguration::Zstd(Zstd { + // level: 1, + // }); + // let default_compression = compression.to_builder();*/ + // let compression = &*crate::compression::COMPRESSION_VAR.read().unwrap();//crate::compression::COMPRESSION_VAR.read().unwrap()default_compression.read().unwrap(); - //let d = DecompressionTag::None; - let mut decompression_state = compression.decompression_tag().new_decompression().unwrap();//d.new_decompression(); + // //let d = DecompressionTag::None; + // let mut decompression_state = compression.decompression_tag().new_decompression().unwrap();//d.new_decompression(); - let data = decompression_state.decompress(vec.as_slice()).unwrap(); - let arc_vec = Arc::new(data.to_vec()); + // let data = decompression_state.decompress(&dt/*vec.as_slice()*/).unwrap(); + // let arc_vec = Arc::new(data.to_vec()); - Ok(CowBytes { inner: arc_vec }) + // Ok(CowBytes { inner: arc_vec }) /*let arc_vec = Arc::new(vec); diff --git a/betree/src/cow_bytesex.rs b/betree/src/cow_bytesex.rs index fe6ad10f..4664748e 100644 --- a/betree/src/cow_bytesex.rs +++ b/betree/src/cow_bytesex.rs @@ -181,27 +181,27 @@ impl Serialize for CowBytes2 { &self, serializer: &mut S ) -> Result { - //panic!("----------------------"); - let compression = CompressionConfiguration::None; - /*let compression = CompressionConfiguration::Zstd(Zstd { - level: 1, - });*/ - let default_compression = compression.to_builder(); - - let compression = &*default_compression.read().unwrap(); - //let compressed_data = [0u8, 10]; - //panic!("<>"); - let compressed_data = { - // FIXME: cache this - let a = compression.new_compression().unwrap(); - let mut state = a.write().unwrap(); - { - state.write_all(self.inner.as_slice()); - } - state.finish() - }; - - ArchivedVec::serialize_from_slice(compressed_data.as_slice(), serializer) + panic!("----------------------"); + // let compression = CompressionConfiguration::None; + // /*let compression = CompressionConfiguration::Zstd(Zstd { + // level: 1, + // });*/ + // let default_compression = compression.to_builder(); + + // let compression = &*default_compression.read().unwrap(); + // //let compressed_data = [0u8, 10]; + // //panic!("<>"); + // let compressed_data = { + // // FIXME: cache this + // let a = compression.new_compression().unwrap(); + // let mut state = a.write().unwrap(); + // { + // state.write_all(self.inner.as_slice()); + // } + // state.finish() + // }; + + // ArchivedVec::serialize_from_slice(compressed_data.as_slice(), serializer) //ArchivedVec::serialize_from_slice(&self.inner, serializer) } @@ -209,16 +209,16 @@ impl Serialize for CowBytes2 { impl Deserialize for ArchivedVec { fn deserialize(&self, deserializer: &mut D) -> Result { -//panic!("----------------------"); - let vec: Vec = self.deserialize(deserializer)?; +panic!("----------------------"); + // let vec: Vec = self.deserialize(deserializer)?; - let d = DecompressionTag::None; - let mut decompression_state = d.new_decompression(); + // let d = DecompressionTag::None; + // let mut decompression_state = d.new_decompression(); - let data = decompression_state.unwrap().decompress(vec.as_slice()).unwrap(); - let arc_vec = Arc::new(data.to_vec()); + // let data = decompression_state.unwrap().decompress(vec.as_slice()).unwrap(); + // let arc_vec = Arc::new(data.to_vec()); - Ok(CowBytes2 { inner: arc_vec }) + // Ok(CowBytes2 { inner: arc_vec }) /*let arc_vec = Arc::new(vec); diff --git a/betree/src/data_management/dmu.rs b/betree/src/data_management/dmu.rs index 9c71172e..0537fc7c 100644 --- a/betree/src/data_management/dmu.rs +++ b/betree/src/data_management/dmu.rs @@ -248,8 +248,8 @@ where // }; let object: Node>> = { - let data = decompression_state.decompress(&compressed_data)?; - Object::unpack_and_decompress(op.size(), op.checksum().clone().into(), self.pool.clone().into(), op.offset(), op.info(), data, a)? + let data = decompression_state.decompress(compressed_data)?; + Object::unpack_and_decompress(op.size(), op.checksum().clone().into(), self.pool.clone().into(), op.offset(), op.info(), data.into_boxed_slice(), a)? }; let key = ObjectKey::Unmodified { offset, generation }; @@ -412,7 +412,23 @@ where // state.finish() // }; + // let compressed_data = { + // // FIXME: cache this + // let mut state = compression.new_compression().unwrap(); + // let mut buf = crate::buffer::BufWrite::with_capacity(Block(128)); + // { + // object.pack(&mut buf, &mut metadata_size)?; + // drop(object); + // } + // let mut newstate = state.write().unwrap(); + // { + // newstate.finish(buf.into_buf()) + // } + + // }; + //panic!("2------------------------------------{:?}", compressed_data); let compressed_data : Buf = object.pack_and_compress(&mut metadata_size, self.default_compression.clone()).unwrap(); + //println!("2------------------------------------{:?}", compressed_data); drop(object); assert!(compressed_data.len() <= u32::max_value() as usize); @@ -967,9 +983,9 @@ where let data = ptr .decompression_tag() .new_decompression()? - .decompress(&compressed_data)?; + .decompress(compressed_data)?; //Object::unpack_at(ptr.size(), ptr.checksum().clone().into() , self.pool.clone().into(), ptr.offset(), ptr.info(), data)? - Object::unpack_and_decompress(ptr.size(), ptr.checksum().clone().into() , self.pool.clone().into(), ptr.offset(), ptr.info(), data, ptr.decompression_tag())? + Object::unpack_and_decompress(ptr.size(), ptr.checksum().clone().into() , self.pool.clone().into(), ptr.offset(), ptr.info(), data.into_boxed_slice(), ptr.decompression_tag())? }; let key = ObjectKey::Unmodified { offset: ptr.offset(), diff --git a/betree/src/tree/imp/node.rs b/betree/src/tree/imp/node.rs index db07f32f..6c79637f 100644 --- a/betree/src/tree/imp/node.rs +++ b/betree/src/tree/imp/node.rs @@ -195,40 +195,89 @@ impl HasStoragePreference for Node { } } +use rand::Rng; + impl Object for Node { fn pack_and_compress(&self, metadata_size: &mut usize, compressor: Arc>) -> Result { - //println!("pack_and_compress..."); + /* + let mut v = vec![5; 131072]; + + let mut rng = rand::thread_rng(); // Create a random number generator + + for i in &mut v { + *i = rng.gen_range(1..=255); // Fill the vector with random numbers between 1 and 100 + } + + + let w = CowBytes { inner: Arc::new(v)}; + + //println!("1: {:?}", w); + + let mut ser = rkyv::ser::serializers::AllocSerializer::<10000000>::default(); + ser.serialize_value(&w).unwrap(); + let sercow = ser.into_serializer().into_inner(); + + //println!("2: {:?}, {}", sercow, sercow.len()); + + + let arccow = rkyv::check_archived_root::(&sercow[..]).unwrap(); + //println!("2.1:"); + let cow: CowBytes = arccow.deserialize(&mut rkyv::Infallible).unwrap(); + + panic!(""); + //panic!("3: {:?}", cow); +*/ match self.0 { PackedLeaf(ref map) => { let builder = &*compressor.read().unwrap(); let state = builder.new_compression().unwrap(); - let mut writer = state.write().unwrap(); + //let mut writer = state.write().unwrap(); + let mut buf = crate::buffer::BufWrite::with_capacity(crate::vdev::Block(128)); { - writer.write_all(map.inner())? + buf.write_all(map.inner())? } - return Ok(writer.finish()); + + let mut newstate = state.write().unwrap(); + { + return Ok(newstate.finish(buf.into_buf()).unwrap()); + } + + panic!(""); }, Leaf(ref leaf) => { let builder = &*compressor.read().unwrap(); let state = builder.new_compression().unwrap(); - let mut writer = state.write().unwrap(); + //let mut writer = state.write().unwrap(); + let mut buf = crate::buffer::BufWrite::with_capacity(crate::vdev::Block(128)); { - writer.write_all((NodeInnerType::Leaf as u32).to_be_bytes().as_ref())?; - PackedMap::pack(leaf, &mut *writer)? + buf.write_all((NodeInnerType::Leaf as u32).to_be_bytes().as_ref())?; + PackedMap::pack(leaf, &mut buf)? } - return Ok(writer.finish()); + + let mut newstate = state.write().unwrap(); + { + return Ok(newstate.finish(buf.into_buf()).unwrap()); + } + + panic!(""); }, Internal(ref internal) => { let builder = &*compressor.read().unwrap(); let state = builder.new_compression().unwrap(); - let mut writer = state.write().unwrap(); + //let mut writer = state.write().unwrap(); + let mut buf = crate::buffer::BufWrite::with_capacity(crate::vdev::Block(128)); { - writer.write_all((NodeInnerType::Internal as u32).to_be_bytes().as_ref())?; - serialize_into(&mut *writer, internal) + buf.write_all((NodeInnerType::Internal as u32).to_be_bytes().as_ref())?; + serialize_into(&mut buf, internal) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))? } - return Ok(writer.finish()); + let mut newstate = state.write().unwrap(); + { + return Ok(newstate.finish(buf.into_buf()).unwrap()); + } + + panic!(""); }, NVMLeaf(ref leaf) => { //let compression_nvm = CompressionConfiguration::None; diff --git a/betree/src/tree/imp/nvmleaf.rs b/betree/src/tree/imp/nvmleaf.rs index cbe9b762..f5b7fc87 100644 --- a/betree/src/tree/imp/nvmleaf.rs +++ b/betree/src/tree/imp/nvmleaf.rs @@ -224,29 +224,29 @@ where field: &BTreeMap, serializer: &mut S, ) -> Result { - - let compression = CompressionConfiguration::None; - let default_compression = compression.to_builder(); - - let compression = &*default_compression.read().unwrap(); - //let compressed_data = [0u8, 10]; - let compressed_data = { - // FIXME: cache this - let a = compression.new_compression().unwrap(); - let mut state = a.write().unwrap(); - { - state.write_all(&[0u8, 10]); - } - state.finish() - }; - - ArchivedVec::serialize_from_iter( - field.iter().map(|(key, value)| { - //let (a, b, c, d) = value as (&KeyInfo, &SlicedCowBytes, &u16, &u16); - EntryEx { key: key, value: value } - }), - serializer, - ) +panic!(".."); + // let compression = CompressionConfiguration::None; + // let default_compression = compression.to_builder(); + + // let compression = &*default_compression.read().unwrap(); + // //let compressed_data = [0u8, 10]; + // let compressed_data = { + // // FIXME: cache this + // let a = compression.new_compression().unwrap(); + // let mut state = a.write().unwrap(); + // { + // state.write_all(&[0u8, 10]); + // } + // state.finish() + // }; + + // ArchivedVec::serialize_from_iter( + // field.iter().map(|(key, value)| { + // //let (a, b, c, d) = value as (&KeyInfo, &SlicedCowBytes, &u16, &u16); + // EntryEx { key: key, value: value } + // }), + // serializer, + // ) } }