Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Codec trait and array refactor #6

Merged
merged 2 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add store lock tests
- Added `contiguous_elements` method to `ContiguousIndicesIterator` and `ContiguousLinearisedIndicesIterator`
- Added `ChunkShape::num_elements`
- Added `codec::{Encode,Decode,PartialDecode,PartialDecoder}Options`
- Added new `Array::opt` methods which can use new encode/decode options
- **Breaking** Existing `Array` `_opt` use new encode/decode options insted of `parallel: bool`

### Changed
- Dependency bumps
Expand All @@ -32,7 +35,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `array_subset_iterators.rs`
- **Major breaking**: storage transformers must be `Arc` wrapped as `StorageTransformerExtension` trait method now take `self: Arc<Self>`
- Removed lifetimes from `{Async}{Readable,Writable,ReadableWritable,Listable,ReadableListable}Storage`
- **Breaking**: `Group` and `Array` methods generic on storage now require the storage with a `'static` lifetime
- **Breaking**: `Group` and `Array` methods generic on storage now require the storage have a `'static` lifetime
- **Breaking**: remove `Array::{set_}parallel_codecs` and `ArrayBuilder::parallel_codecs`
- **Breaking**: added `recommended_concurrency` to codec trait methods to facilitate improved parallelisation
- **Major breaking**: refactor codec traits:
- **Breaking**: remove `par_` variants,
- **Breaking**: `_opt` variants use new `codec::{Encode,Decode,PartialDecode,PartialDecoder}Options` instead of `parallel: bool`
- variants without prefix/suffix are no longer serial variants but parallel
- TODO: Remove these?

### Removed
- **Breaking**: remove `InvalidArraySubsetError` and `ArrayExtractElementsError`
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ walkdir = "2.3.2"
zfp-sys = {version = "0.1.4", features = ["static"], optional = true }
zip = { version = "0.6", optional = true }
zstd = { version = "0.13", features = ["zstdmt"], optional = true }
rayon_iter_concurrent_limit = "0.1.0-alpha3"

[dev-dependencies]
chrono = "0.4"
Expand Down
10 changes: 3 additions & 7 deletions benches/array_blosc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ fn array_blosc_write_all(c: &mut Criterion) {
.unwrap();
let data = vec![1u8; num_elements.try_into().unwrap()];
let subset = zarrs::array_subset::ArraySubset::new_with_shape(vec![size; 3]);
array
.par_store_array_subset_elements(&subset, data)
.unwrap();
array.store_array_subset_elements(&subset, data).unwrap();
});
});
}
Expand Down Expand Up @@ -69,13 +67,11 @@ fn array_blosc_read_all(c: &mut Criterion) {
.unwrap();
let data = vec![1u8; num_elements.try_into().unwrap()];
let subset = zarrs::array_subset::ArraySubset::new_with_shape(vec![size; 3]);
array
.par_store_array_subset_elements(&subset, data)
.unwrap();
array.store_array_subset_elements(&subset, data).unwrap();

// Benchmark reading the data
b.iter(|| {
let _bytes = array.par_retrieve_array_subset(&subset).unwrap();
let _bytes = array.retrieve_array_subset(&subset).unwrap();
});
});
}
Expand Down
20 changes: 6 additions & 14 deletions benches/array_uncompressed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ fn array_write_all(c: &mut Criterion) {
.unwrap();
let data = vec![1u8; num_elements.try_into().unwrap()];
let subset = zarrs::array_subset::ArraySubset::new_with_shape(vec![size; 3]);
array
.par_store_array_subset_elements(&subset, data)
.unwrap();
array.store_array_subset_elements(&subset, data).unwrap();
});
});
}
Expand Down Expand Up @@ -49,9 +47,7 @@ fn array_write_all_sharded(c: &mut Criterion) {
.unwrap();
let data = vec![1u16; num_elements.try_into().unwrap()];
let subset = zarrs::array_subset::ArraySubset::new_with_shape(vec![size; 3]);
array
.par_store_array_subset_elements(&subset, data)
.unwrap();
array.store_array_subset_elements(&subset, data).unwrap();
});
});
}
Expand All @@ -76,13 +72,11 @@ fn array_read_all(c: &mut Criterion) {
.unwrap();
let data = vec![1u16; num_elements.try_into().unwrap()];
let subset = zarrs::array_subset::ArraySubset::new_with_shape(vec![size; 3]);
array
.par_store_array_subset_elements(&subset, data)
.unwrap();
array.store_array_subset_elements(&subset, data).unwrap();

// Benchmark reading the data
b.iter(|| {
let _bytes = array.par_retrieve_array_subset(&subset).unwrap();
let _bytes = array.retrieve_array_subset(&subset).unwrap();
});
});
}
Expand Down Expand Up @@ -110,13 +104,11 @@ fn array_read_all_sharded(c: &mut Criterion) {
.unwrap();
let data = vec![0u8; num_elements.try_into().unwrap()];
let subset = zarrs::array_subset::ArraySubset::new_with_shape(vec![size; 3]);
array
.par_store_array_subset_elements(&subset, data)
.unwrap();
array.store_array_subset_elements(&subset, data).unwrap();

// Benchmark reading the data
b.iter(|| {
let _bytes = array.par_retrieve_array_subset(&subset).unwrap();
let _bytes = array.retrieve_array_subset(&subset).unwrap();
});
});
}
Expand Down
6 changes: 0 additions & 6 deletions benches/codecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,6 @@ fn codec_blosc(c: &mut Criterion) {
group.bench_function(BenchmarkId::new("decode", size3), |b| {
b.iter(|| codec.decode(data_encoded.clone(), &rep).unwrap());
});
group.bench_function(BenchmarkId::new("par_encode", size3), |b| {
b.iter(|| codec.par_encode(data_decoded.clone()).unwrap());
});
group.bench_function(BenchmarkId::new("par_decode", size3), |b| {
b.iter(|| codec.par_decode(data_encoded.clone(), &rep).unwrap());
});
}
}

Expand Down
3 changes: 1 addition & 2 deletions examples/array_write_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,14 @@ fn array_write_read() -> Result<(), Box<dyn std::error::Error>> {
println!("store_chunk [0, 0] and [0, 1]:\n{data_all:+4.1}\n");

// Store multiple chunks
array.store_chunks_elements_opt::<f32>(
array.store_chunks_elements::<f32>(
&ArraySubset::new_with_ranges(&[1..2, 0..2]),
vec![
//
1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1, 1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1,
//
1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1, 1.0, 1.0, 1.0, 1.0, 1.1, 1.1, 1.1, 1.1,
],
true,
)?;
let data_all = array.retrieve_array_subset_ndarray::<f32>(&subset_all)?;
println!("store_chunks [1..2, 0..2]:\n{data_all:+4.1}\n");
Expand Down
2 changes: 1 addition & 1 deletion examples/sharded_array_write_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ fn sharded_array_write_read() -> Result<(), Box<dyn std::error::Error>> {
ArraySubset::new_with_start_shape(vec![0, 0], inner_chunk_shape.clone())?,
ArraySubset::new_with_start_shape(vec![0, 4], inner_chunk_shape.clone())?,
];
let decoded_inner_chunks = partial_decoder.par_partial_decode(&inner_chunks_to_decode)?;
let decoded_inner_chunks = partial_decoder.partial_decode(&inner_chunks_to_decode)?;
let decoded_inner_chunks = decoded_inner_chunks
.into_iter()
.map(|bytes| {
Expand Down
16 changes: 0 additions & 16 deletions src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,6 @@ pub struct Array<TStorage: ?Sized> {
dimension_names: Option<Vec<DimensionName>>,
/// Additional fields annotated with `"must_understand": false`.
additional_fields: AdditionalFields,
/// If true, codecs run with multithreading (where supported)
parallel_codecs: bool,
/// Zarrs metadata.
include_zarrs_metadata: bool,
}
Expand Down Expand Up @@ -289,7 +287,6 @@ impl<TStorage: ?Sized> Array<TStorage> {
additional_fields: metadata.additional_fields,
storage_transformers,
dimension_names: metadata.dimension_names,
parallel_codecs: true,
include_zarrs_metadata: true,
})
}
Expand Down Expand Up @@ -371,19 +368,6 @@ impl<TStorage: ?Sized> Array<TStorage> {
&self.additional_fields
}

/// Returns true if codecs can use multiple threads for encoding and decoding (where supported).
#[must_use]
pub const fn parallel_codecs(&self) -> bool {
self.parallel_codecs
}

/// Enable or disable multithreaded codec encoding/decoding. Enabled by default.
///
/// It may be advantageous to turn this off if parallelisation is external to avoid thrashing.
pub fn set_parallel_codecs(&mut self, parallel_codecs: bool) {
self.parallel_codecs = parallel_codecs;
}

/// Enable or disable the inclusion of zarrs metadata in the array attributes. Enabled by default.
///
/// Zarrs metadata includes the zarrs version and some parameters.
Expand Down
Loading
Loading