Skip to content

Commit

Permalink
Add a few changes to evaluate trees.
Browse files Browse the repository at this point in the history
  • Loading branch information
SajadKarim committed Jul 24, 2024
1 parent d0a93fe commit 6f7f4cc
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 11 deletions.
2 changes: 1 addition & 1 deletion betree/src/compression/zstd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl CompressionState for ZstdCompression {
impl DecompressionState for ZstdDecompression {
fn decompress(&mut self, data: &[u8]) -> Result<Box<[u8]>> {
self.writer.write_all(data)?;
self.writer.finish()?;
self.writer.finish();

Ok(mem::replace(
self.writer.writer_mut(),
Expand Down
25 changes: 19 additions & 6 deletions betree/src/cow_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,10 @@ impl<S: Serializer + ?Sized + ScratchSpace> Serialize<S> for CowBytes {
serializer: &mut S
) -> Result<Self::Resolver, S::Error> {
//panic!("----------------------");
let compression = CompressionConfiguration::None;
/*let compression = CompressionConfiguration::Zstd(Zstd {
//let compression = CompressionConfiguration::None;
let compression = CompressionConfiguration::Zstd(Zstd {
level: 1,
});*/
});
let default_compression = compression.to_builder();

let compression = &*default_compression.read().unwrap();
Expand All @@ -212,10 +212,23 @@ impl<D: Fallible + ?Sized> Deserialize<CowBytes, D> for ArchivedVec<u8> {
//panic!("----------------------");
let vec: Vec<u8> = self.deserialize(deserializer)?;

let d = DecompressionTag::None;
let mut decompression_state = d.new_decompression();

let data = decompression_state.unwrap().decompress(vec.as_slice()).unwrap();
//let compression = CompressionConfiguration::None;
let compression = CompressionConfiguration::Zstd(Zstd {
level: 1,
});
let default_compression = compression.to_builder();

let compression = &*default_compression.read().unwrap();





//let d = DecompressionTag::None;
let mut decompression_state = compression.decompression_tag().new_decompression().unwrap();//d.new_decompression();

let data = decompression_state.decompress(vec.as_slice()).unwrap();
let arc_vec = Arc::new(data.to_vec());

Ok(CowBytes { inner: arc_vec })
Expand Down
2 changes: 1 addition & 1 deletion betree/src/tree/imp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ where
dml: X,
storage_preference: StoragePreference,
) -> Self {
let root_node = dml.insert(Node::empty_leaf(false), tree_id, PivotKey::Root(tree_id));
let root_node = dml.insert(Node::empty_leaf(true), tree_id, PivotKey::Root(tree_id));
Tree::new(root_node, tree_id, msg_action, dml, storage_preference)
}

Expand Down
52 changes: 49 additions & 3 deletions betree/src/tree/imp/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ pub(super) enum TakeChildBufferWrapper<'a, N: 'a + 'static> {
NVMTakeChildBuffer(Option<NVMTakeChildBuffer<'a, N>>),
}

use crate::buffer::BufWrite;
const DEFAULT_BUFFER_SIZE: crate::vdev::Block<u32> = crate::vdev::Block(1);

impl<'a, N: Size + HasStoragePreference> TakeChildBufferWrapper<'a, N> {
pub fn node_pointer_mut(&mut self) -> &mut TakeChildBufferWrapper<'a, N> where N: ObjectReference{
// TODO: Karim... add comments...
Expand Down Expand Up @@ -231,6 +234,7 @@ impl<R: ObjectReference + HasStoragePreference + StaticSize> Object<R> for Node<
//let compression_nvm = CompressionConfiguration::None;
//let default_compression_nvm = compression_nvm.to_builder();

/*
let compression_nvm = &*compressor.read().unwrap();
//let compressed_data = [0u8, 10];
let compressed_data_nvm = {
Expand Down Expand Up @@ -258,11 +262,33 @@ impl<R: ObjectReference + HasStoragePreference + StaticSize> Object<R> for Node<
}
return Ok(writer.finish());
};
*/

let mut bw = BufWrite::with_capacity(DEFAULT_BUFFER_SIZE);
let mut serializer_meta_data = rkyv::ser::serializers::AllocSerializer::<0>::default();
serializer_meta_data.serialize_value(&leaf.meta_data).unwrap();
let bytes_meta_data = serializer_meta_data.into_serializer().into_inner();

let mut serializer_data = rkyv::ser::serializers::AllocSerializer::<0>::default();
serializer_data.serialize_value(leaf.data.read().as_ref().unwrap().as_ref().unwrap()).unwrap();
let bytes_data = serializer_data.into_serializer().into_inner();
//println!("ln m:{} d:{}", bytes_meta_data.len(), bytes_data.len());
bw.write_all((NodeInnerType::NVMLeaf as u32).to_be_bytes().as_ref())?;
bw.write_all(bytes_meta_data.len().to_be_bytes().as_ref())?;
bw.write_all(bytes_data.len().to_be_bytes().as_ref())?;

bw.write_all(&bytes_meta_data.as_ref())?;
bw.write_all(&bytes_data.as_ref())?;

*metadata_size = 4 + 8 + 8 + bytes_meta_data.len(); //TODO: fix this.. magic nos!
return Ok(bw.into_buf())

},
NVMInternal(ref nvminternal) => {
//let compression_nvm = CompressionConfiguration::None;
//let default_compression_nvm = compression_nvm.to_builder();


/*
let compression_nvm = &*compressor.read().unwrap();
//let compressed_data = [0u8, 10];
let compressed_data_nvm = {
Expand Down Expand Up @@ -291,6 +317,26 @@ impl<R: ObjectReference + HasStoragePreference + StaticSize> Object<R> for Node<
}
return Ok(writer.finish());
};
*/
let mut bw = BufWrite::with_capacity(DEFAULT_BUFFER_SIZE);
let mut serializer_meta_data = rkyv::ser::serializers::AllocSerializer::<0>::default();
serializer_meta_data.serialize_value(&nvminternal.meta_data).unwrap();
let bytes_meta_data = serializer_meta_data.into_serializer().into_inner();

let mut serializer_data = rkyv::ser::serializers::AllocSerializer::<0>::default();
serializer_data.serialize_value(nvminternal.data.read().as_ref().unwrap().as_ref().unwrap()).unwrap();
let bytes_data = serializer_data.into_serializer().into_inner();

bw.write_all((NodeInnerType::NVMInternal as u32).to_be_bytes().as_ref())?;
bw.write_all(bytes_meta_data.len().to_be_bytes().as_ref())?;
bw.write_all(bytes_data.len().to_be_bytes().as_ref())?;

bw.write_all(&bytes_meta_data.as_ref())?;
bw.write_all(&bytes_data.as_ref())?;

*metadata_size = 4 + 8 + 8 + bytes_meta_data.len();//TODO: fix this

return Ok(bw.into_buf());
},
}

Expand Down Expand Up @@ -554,8 +600,8 @@ impl<R: ObjectReference + HasStoragePreference + StaticSize> Object<R> for Node<

fn unpack_and_decompress(size: crate::vdev::Block<u32>, checksum: crate::checksum::XxHash, pool: RootSpu, _offset: DiskOffset, d_id: DatasetId, data: Box<[u8]>, d: DecompressionTag) -> Result<Self, io::Error> {
//println!("unpack_and_decompress...");
let mut decompression_state = d.new_decompression();
let data = decompression_state.unwrap().decompress(&data).unwrap();
//let mut decompression_state = d.new_decompression();
//let data = decompression_state.unwrap().decompress(&data).unwrap();

if data[0..4] == (NodeInnerType::Internal as u32).to_be_bytes() {
match deserialize::<InternalNode<_>>(&data[4..]) {
Expand Down

0 comments on commit 6f7f4cc

Please sign in to comment.