diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a509737..cc3a3062 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,7 +27,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Implement `From` for `DimensionName` - Add `{Async}ReadableWritableListableStorageTraits` and `{Async}ReadableWritableListableStorage` - Add `ArrayCodecTraits::decode_into_array_view_opt` with default implementation - - TODO: Use in more methods - TODO: Same for async - Add `ArrayPartialDecoderTraits::partial_decode_into_array_view_opt` with default implementation - TODO: Same for async diff --git a/src/array.rs b/src/array.rs index a22e25b5..9dcd58b8 100644 --- a/src/array.rs +++ b/src/array.rs @@ -726,8 +726,6 @@ mod tests { }, }; - use self::codec::{array_to_bytes::sharding::ShardingCodecBuilder, GzipCodec}; - use super::*; #[test] @@ -889,211 +887,4 @@ mod tests { // false, // ); // } - - #[cfg(feature = "ndarray")] - #[rustfmt::skip] - fn array_api_sync_read(array: Array) -> Result<(), Box> { - assert_eq!(array.data_type(), &DataType::UInt8); - assert_eq!(array.fill_value().as_ne_bytes(), &[0u8]); - assert_eq!(array.shape(), &[4, 4]); - assert_eq!(array.chunk_shape(&[0, 0]).unwrap(), [2, 2].try_into().unwrap()); - assert_eq!(array.chunk_grid_shape().unwrap(), &[2, 2]); - - // 1 2 | 3 4 - // 5 6 | 7 8 - // -----|----- - // 9 10 | 0 0 - // 0 0 | 0 0 - array.store_chunk(&[0, 0], vec![1, 2, 0, 0])?; - array.store_chunk(&[0, 1], vec![3, 4, 7, 8])?; - array.store_array_subset(&ArraySubset::new_with_ranges(&[1..3, 0..2]), vec![5, 6, 9, 10])?; - - assert!(array.retrieve_chunk(&[0, 0, 0]).is_err()); - assert_eq!(array.retrieve_chunk(&[0, 0])?, [1, 2, 5, 6]); - assert_eq!(array.retrieve_chunk(&[0, 1])?, [3, 4, 7, 8]); - assert_eq!(array.retrieve_chunk(&[1, 0])?, [9, 10, 0, 0]); - assert_eq!(array.retrieve_chunk(&[1, 1])?, [0, 0, 0, 0]); - - assert!(array.retrieve_chunk_if_exists(&[0, 0, 0]).is_err()); - assert_eq!(array.retrieve_chunk_if_exists(&[0, 0])?, Some(vec![1, 2, 5, 6])); - assert_eq!(array.retrieve_chunk_if_exists(&[0, 1])?, Some(vec![3, 4, 7, 8])); - assert_eq!(array.retrieve_chunk_if_exists(&[1, 0])?, Some(vec![9, 10, 0, 0])); - assert_eq!(array.retrieve_chunk_if_exists(&[1, 1])?, None); - - assert!(array.retrieve_chunk_ndarray::(&[0, 0]).is_err()); - assert_eq!(array.retrieve_chunk_ndarray::(&[0, 0])?, ndarray::array![[1, 2], [5, 6]].into_dyn()); - assert_eq!(array.retrieve_chunk_ndarray::(&[0, 1])?, ndarray::array![[3, 4], [7, 8]].into_dyn()); - assert_eq!(array.retrieve_chunk_ndarray::(&[1, 0])?, ndarray::array![[9, 10], [0, 0]].into_dyn()); - assert_eq!(array.retrieve_chunk_ndarray::(&[1, 1])?, ndarray::array![[0, 0], [0, 0]].into_dyn()); - - assert_eq!(array.retrieve_chunk_ndarray_if_exists::(&[0, 0])?, Some(ndarray::array![[1, 2], [5, 6]].into_dyn())); - assert_eq!(array.retrieve_chunk_ndarray_if_exists::(&[0, 1])?, Some(ndarray::array![[3, 4], [7, 8]].into_dyn())); - assert_eq!(array.retrieve_chunk_ndarray_if_exists::(&[1, 0])?, Some(ndarray::array![[9, 10], [0, 0]].into_dyn())); - assert_eq!(array.retrieve_chunk_ndarray_if_exists::(&[1, 1])?, None); - - assert!(array.retrieve_chunk_subset(&[0, 0], &ArraySubset::new_with_ranges(&[0..2])).is_err()); - assert!(array.retrieve_chunk_subset(&[0, 0], &ArraySubset::new_with_ranges(&[0..3, 0..3])).is_err()); - assert_eq!(array.retrieve_chunk_subset(&[0, 0], &ArraySubset::new_with_ranges(&[0..2, 0..2]))?, [1, 2, 5, 6]); - assert_eq!(array.retrieve_chunk_subset(&[0, 0], &ArraySubset::new_with_ranges(&[0..1, 0..2]))?, [1, 2]); - assert_eq!(array.retrieve_chunk_subset(&[0, 0], &ArraySubset::new_with_ranges(&[0..2, 1..2]))?, [2, 6]); - - assert!(array.retrieve_chunk_subset_ndarray::(&[0, 0], &ArraySubset::new_with_ranges(&[0..3, 0..3])).is_err()); - assert!(array.retrieve_chunk_subset_ndarray::(&[0, 0], &ArraySubset::new_with_ranges(&[0..2, 0..2])).is_err()); - assert_eq!(array.retrieve_chunk_subset_ndarray::(&[0, 0], &ArraySubset::new_with_ranges(&[0..2, 0..2]))?, ndarray::array![[1, 2], [5, 6]].into_dyn()); - assert_eq!(array.retrieve_chunk_subset_ndarray::(&[0, 0], &ArraySubset::new_with_ranges(&[0..1, 0..2]))?, ndarray::array![[1, 2]].into_dyn()); - assert_eq!(array.retrieve_chunk_subset_ndarray::(&[0, 0], &ArraySubset::new_with_ranges(&[0..2, 1..2]))?, ndarray::array![[2], [6]].into_dyn()); - - assert!(array.retrieve_chunks(&ArraySubset::new_with_ranges(&[0..2])).is_err()); - assert_eq!(array.retrieve_chunks(&ArraySubset::new_with_ranges(&[0..0, 0..0]))?, Vec::::new()); - assert_eq!(array.retrieve_chunks(&ArraySubset::new_with_ranges(&[0..1, 0..1]))?, [1, 2, 5, 6]); - assert_eq!(array.retrieve_chunks(&ArraySubset::new_with_ranges(&[0..2, 0..2]))?, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0, 0, 0, 0, 0, 0]); - assert_eq!(array.retrieve_chunks(&ArraySubset::new_with_ranges(&[0..2, 1..2]))?, [3, 4, 7, 8, 0, 0, 0, 0]); - assert_eq!(array.retrieve_chunks(&ArraySubset::new_with_ranges(&[0..1, 1..3]))?, [3, 4, 0, 0, 7, 8, 0, 0]); - - assert!(array.retrieve_chunks_ndarray::(&ArraySubset::new_with_ranges(&[0..2])).is_err()); - assert!(array.retrieve_chunks_ndarray::(&ArraySubset::new_with_ranges(&[0..2, 0..2])).is_err()); - assert_eq!(array.retrieve_chunks_ndarray::(&ArraySubset::new_with_ranges(&[0..2, 0..2]))?, ndarray::array![[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 0, 0], [0, 0, 0, 0]].into_dyn()); - assert_eq!(array.retrieve_chunks_ndarray::(&ArraySubset::new_with_ranges(&[0..2, 1..2]))?, ndarray::array![[3, 4], [7, 8], [0, 0], [0, 0]].into_dyn()); - assert_eq!(array.retrieve_chunks_ndarray::(&ArraySubset::new_with_ranges(&[0..1, 1..3]))?, ndarray::array![[3, 4, 0, 0], [7, 8, 0, 0]].into_dyn()); - - assert!(array.retrieve_array_subset(&ArraySubset::new_with_ranges(&[0..4])).is_err()); - assert_eq!(array.retrieve_array_subset(&ArraySubset::new_with_ranges(&[0..0, 0..0]))?, Vec::::new()); - assert_eq!(array.retrieve_array_subset(&ArraySubset::new_with_ranges(&[0..0, 0..0]))?, [] as [u8; 0]); - assert_eq!(array.retrieve_array_subset(&ArraySubset::new_with_ranges(&[0..2, 0..2]))?, [1, 2, 5, 6]); - assert_eq!(array.retrieve_array_subset(&ArraySubset::new_with_ranges(&[0..4, 0..4]))?, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0, 0, 0, 0, 0, 0]); - assert_eq!(array.retrieve_array_subset(&ArraySubset::new_with_ranges(&[1..3, 1..3]))?, [6, 7, 10 ,0]); - assert_eq!(array.retrieve_array_subset(&ArraySubset::new_with_ranges(&[5..7, 5..6]))?, [0, 0]); // OOB -> fill value - assert_eq!(array.retrieve_array_subset(&ArraySubset::new_with_ranges(&[0..5, 0..5]))?, [1, 2, 3, 4, 0, 5, 6, 7, 8, 0, 9, 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]); // OOB -> fill value - - assert!(array.retrieve_array_subset_ndarray::(&ArraySubset::new_with_ranges(&[0..4])).is_err()); - assert!(array.retrieve_array_subset_ndarray::(&ArraySubset::new_with_ranges(&[0..4, 0..4])).is_err()); - assert_eq!(array.retrieve_array_subset_ndarray::(&ArraySubset::new_with_ranges(&[0..0, 0..0]))?, ndarray::Array2::::zeros((0, 0)).into_dyn()); - assert_eq!(array.retrieve_array_subset_ndarray::(&ArraySubset::new_with_ranges(&[0..4, 0..4]))?, ndarray::array![[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 0, 0], [0, 0, 0, 0]].into_dyn()); - assert_eq!(array.retrieve_array_subset_ndarray::(&ArraySubset::new_with_ranges(&[1..3, 1..3]))?, ndarray::array![[6, 7], [10 ,0]].into_dyn()); - assert_eq!(array.retrieve_array_subset_ndarray::(&ArraySubset::new_with_ranges(&[5..7, 5..6]))?, ndarray::array![[0], [0]].into_dyn()); // OOB -> fill value - assert_eq!(array.retrieve_array_subset_ndarray::(&ArraySubset::new_with_ranges(&[0..5, 0..5]))?, ndarray::array![[1, 2, 3, 4, 0], [5, 6, 7, 8, 0], [9, 10, 0, 0, 0], [0, 0, 0, 0, 0], [0, 0, 0, 0, 0]].into_dyn()); // OOB -> fill value - - { - // Invalid array view dimensionality - let mut data = vec![0, 0, 0, 0, 0, 0]; - let shape = &[3]; - let array_view_subset = ArraySubset::new_with_ranges(&[1..2]); - let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; - assert!(array.retrieve_chunk_subset_into_array_view(&[0, 0], &ArraySubset::new_with_ranges(&[0..1, 0..2]), &array_view).is_err()); - } - { - let mut data = vec![0, 0, 0, 0, 0, 0]; - let shape = &[3, 2]; - let array_view_subset = ArraySubset::new_with_ranges(&[1..2, 0..2]); - let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; - array.retrieve_chunk_subset_into_array_view(&[0, 0], &ArraySubset::new_with_ranges(&[0..1, 0..2]), &array_view)?; - assert_eq!(data, [0, 0, 1, 2, 0, 0]); - } - { - let mut data = vec![0, 0, 0, 0, 0, 0]; - let shape = &[3, 2]; - let array_view_subset = ArraySubset::new_with_ranges(&[0..2, 0..2]); - let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; - array.retrieve_chunk_into_array_view(&[0, 0], &array_view)?; - assert_eq!(data, [1, 2, 5, 6, 0, 0]); - } - { - let mut data = vec![0, 0, 0, 0, 0, 0]; - let shape = &[3, 2]; - let array_view_subset = ArraySubset::new_with_ranges(&[1..2, 0..2]); - let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; - array.retrieve_array_subset_into_array_view(&ArraySubset::new_with_ranges(&[0..1,0..2]), &array_view)?; - assert_eq!(data, [0, 0, 1, 2, 0, 0]); - } - { - let mut data = vec![0, 0, 0, 0, 0, 0]; - let shape = &[3, 2]; - let array_view_subset = ArraySubset::new_with_ranges(&[1..3, 0..2]); - let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; - array.retrieve_array_subset_into_array_view(&ArraySubset::new_with_ranges(&[0..2,0..2]), &array_view)?; - assert_eq!(data, [0, 0, 1, 2, 5, 6]); - } - { - let mut data = vec![0, 0, 0, 0, 0, 0]; - let shape = &[3, 2]; - let array_view_subset = ArraySubset::new_with_ranges(&[1..3, 0..2]); - let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; - array.retrieve_array_subset_into_array_view(&ArraySubset::new_with_ranges(&[1..3, 1..3]), &array_view)?; - assert_eq!(data, [0, 0, 6, 7, 10, 0]); - } - { - let mut data = vec![0, 0, 0, 0, 0, 0, 0, 0]; - let shape = &[4, 2]; - let array_view_subset = ArraySubset::new_with_ranges(&[1..4, 0..2]); - let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; - array.retrieve_array_subset_into_array_view(&ArraySubset::new_with_ranges(&[1..4, 0..2]), &array_view)?; - assert_eq!(data, [0, 0, 5, 6, 9, 10, 0, 0]); - } - - assert!(array.partial_decoder(&[0]).is_err()); - assert!(array.partial_decoder(&[0, 0])?.partial_decode(&[ArraySubset::new_with_ranges(&[0..1])]).is_err()); - assert_eq!(array.partial_decoder(&[0, 0])?.partial_decode(&[])?, Vec::>::new()); - assert_eq!(array.partial_decoder(&[5, 0])?.partial_decode(&[ArraySubset::new_with_ranges(&[0..1, 0..2])])?, [vec![0, 0]]); // OOB -> fill value - assert_eq!(array.partial_decoder(&[0, 0])?.partial_decode(&[ArraySubset::new_with_ranges(&[0..1, 0..2]), ArraySubset::new_with_ranges(&[0..2, 1..2])])?, [vec![1, 2], vec![2, 6]]); - { - let mut data = vec![0, 0, 0, 0, 0, 0]; - let shape = &[3, 2]; - let array_view_subset = ArraySubset::new_with_ranges(&[1..2, 0..2]); - let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; - assert!(array.partial_decoder(&[0, 0])?.partial_decode_into_array_view(&ArraySubset::new_with_ranges(&[0..1, 0..2]), &array_view).is_ok()); - assert_eq!(data, [0, 0, 1, 2, 0, 0]); - } - { - let mut data = vec![0, 0, 0, 0, 0, 0]; - let shape = &[3, 2]; - let array_view_subset = ArraySubset::new_with_ranges(&[1..2, 0..2]); - let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; - assert!(array.partial_decoder(&[0, 0])?.partial_decode_into_array_view(&ArraySubset::new_with_ranges(&[0..2, 0..2]), &array_view).is_err()); - } - - Ok(()) - } - - #[cfg(feature = "ndarray")] - #[test] - fn array_api_sync_read_uncompressed() -> Result<(), Box> { - let store = Arc::new(MemoryStore::default()); - let array_path = "/array"; - let array = ArrayBuilder::new( - vec![4, 4], // array shape - DataType::UInt8, - vec![2, 2].try_into().unwrap(), // regular chunk shape - FillValue::from(0u8), - ) - .bytes_to_bytes_codecs(vec![]) - // .storage_transformers(vec![].into()) - .build(store, array_path) - .unwrap(); - array_api_sync_read(array) - } - - #[cfg(feature = "ndarray")] - #[test] - fn array_api_sync_read_shard_compress() -> Result<(), Box> { - let store = Arc::new(MemoryStore::default()); - let array_path = "/array"; - let array = ArrayBuilder::new( - vec![4, 4], // array shape - DataType::UInt8, - vec![2, 2].try_into().unwrap(), // regular chunk shape - FillValue::from(0u8), - ) - .array_to_bytes_codec(Box::new( - ShardingCodecBuilder::new(vec![1, 1].try_into().unwrap()) - .bytes_to_bytes_codecs(vec![ - #[cfg(feature = "gzip")] - Box::new(GzipCodec::new(5)?), - ]) - .build(), - )) - // .storage_transformers(vec![].into()) - .build(store, array_path) - .unwrap(); - array_api_sync_read(array) - } } diff --git a/src/array/array_async_readable.rs b/src/array/array_async_readable.rs index 3ca72f9b..760adca9 100644 --- a/src/array/array_async_readable.rs +++ b/src/array/array_async_readable.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use futures::{stream::FuturesUnordered, StreamExt}; +use futures::StreamExt; use crate::{ array_subset::ArraySubset, @@ -13,6 +13,7 @@ use super::{ options::CodecOptions, ArrayCodecTraits, ArrayToBytesCodecTraits, AsyncArrayPartialDecoderTraits, AsyncStoragePartialDecoder, }, + concurrency::concurrency_chunks_and_codec, transmute_from_bytes_vec, unsafe_cell_slice::UnsafeCellSlice, validate_element_size, Array, ArrayCreateError, ArrayError, ArrayMetadata, @@ -54,6 +55,11 @@ impl Array { chunk_indices: &[u64], options: &CodecOptions, ) -> Result>, ArrayError> { + if chunk_indices.len() != self.dimensionality() { + return Err(ArrayError::InvalidChunkGridIndicesError( + chunk_indices.to_vec(), + )); + } let storage_handle = Arc::new(StorageHandle::new(self.storage.clone())); let storage_transformer = self .storage_transformers() @@ -303,7 +309,7 @@ impl Array { let array_subset = Arc::new(self.chunks_subset(chunks)?); // Retrieve chunk bytes - let num_chunks = chunks.num_elements(); + let num_chunks = chunks.num_elements_usize(); match num_chunks { 0 => Ok(vec![]), 1 => { @@ -315,28 +321,40 @@ impl Array { let size_output = usize::try_from(array_subset.num_elements() * self.data_type().size() as u64) .unwrap(); + + // Calculate chunk/codec concurrency + let chunk_representation = + self.chunk_array_representation(&vec![0; self.dimensionality()])?; + let codec_concurrency = + self.recommended_codec_concurrency(&chunk_representation)?; + let (chunk_concurrent_limit, options) = concurrency_chunks_and_codec( + options.concurrent_target(), + num_chunks, + &codec_concurrency, + ); + let mut output = Vec::with_capacity(size_output); { let output_slice = UnsafeCellSlice::new(unsafe { crate::vec_spare_capacity_to_mut_slice(&mut output) }); - let mut futures = chunks - .indices() - .into_iter() - .map(|chunk_indices| { - let array_subset = array_subset.clone(); - async move { - self._async_decode_chunk_into_array_subset( - &chunk_indices, - &array_subset, - unsafe { output_slice.get() }, - options, - ) - .await - } - }) - .collect::>(); - while let Some(item) = futures.next().await { + let indices = chunks.indices(); + let futures = indices.into_iter().map(|chunk_indices| { + let options = options.clone(); + let array_subset = array_subset.clone(); + async move { + self._async_decode_chunk_into_array_subset( + &chunk_indices, + &array_subset, + unsafe { output_slice.get() }, + &options, + ) + .await + } + }); + let mut stream = + futures::stream::iter(futures).buffer_unordered(chunk_concurrent_limit); + while let Some(item) = stream.next().await { item?; } } @@ -493,7 +511,7 @@ impl Array { }; // Retrieve chunk bytes - let num_chunks = chunks.num_elements(); + let num_chunks = chunks.num_elements_usize(); match num_chunks { 0 => Ok(vec![]), 1 => { @@ -527,6 +545,17 @@ impl Array { usize::try_from(array_subset.num_elements() * self.data_type().size() as u64) .unwrap(); + // Calculate chunk/codec concurrency + let chunk_representation = + self.chunk_array_representation(&vec![0; self.dimensionality()])?; + let codec_concurrency = + self.recommended_codec_concurrency(&chunk_representation)?; + let (chunk_concurrent_limit, options) = concurrency_chunks_and_codec( + options.concurrent_target(), + num_chunks, + &codec_concurrency, + ); + // let mut output = vec![0; size_output]; // let output_slice = output.as_mut_slice(); let mut output = Vec::with_capacity(size_output); @@ -534,104 +563,92 @@ impl Array { let output_slice = UnsafeCellSlice::new(unsafe { crate::vec_spare_capacity_to_mut_slice(&mut output) }); - let mut futures = chunks - .indices() - .into_iter() - .map(|chunk_indices| { - async move { - // Get the subset of the array corresponding to the chunk - let chunk_subset_in_array = unsafe { - self.chunk_grid() - .subset_unchecked(&chunk_indices, self.shape()) - }; - let Some(chunk_subset_in_array) = chunk_subset_in_array else { - return Err(ArrayError::InvalidArraySubset( - array_subset.clone(), - self.shape().to_vec(), - )); - }; - - // Decode the subset of the chunk which intersects array_subset - let overlap = unsafe { - array_subset.overlap_unchecked(&chunk_subset_in_array) - }; - let array_subset_in_chunk_subset = unsafe { - overlap.relative_to_unchecked(chunk_subset_in_array.start()) - }; - - let storage_handle = - Arc::new(StorageHandle::new(self.storage.clone())); - let storage_transformer = self - .storage_transformers() - .create_async_readable_transformer(storage_handle); - let input_handle = Box::new(AsyncStoragePartialDecoder::new( - storage_transformer, - data_key( - self.path(), - &chunk_indices, - self.chunk_key_encoding(), - ), + let indices = chunks.indices(); + let futures = indices.into_iter().map(|chunk_indices| { + let options = options.clone(); + async move { + // Get the subset of the array corresponding to the chunk + let chunk_subset_in_array = unsafe { + self.chunk_grid() + .subset_unchecked(&chunk_indices, self.shape()) + }; + let Some(chunk_subset_in_array) = chunk_subset_in_array else { + return Err(ArrayError::InvalidArraySubset( + array_subset.clone(), + self.shape().to_vec(), )); - - let decoded_bytes = { - let chunk_representation = - self.chunk_array_representation(&chunk_indices)?; - let partial_decoder = self - .codecs() - .async_partial_decoder_opt( - input_handle, - &chunk_representation, - options, // FIXME: Adjust internal decode options - ) - .await?; - - partial_decoder - .partial_decode_opt( - &[array_subset_in_chunk_subset], - options, - ) // FIXME: Adjust internal decode options - .await? - .remove(0) - }; - - // Copy decoded bytes to the output - let element_size = self.data_type().size() as u64; - let chunk_subset_in_array_subset = - unsafe { overlap.relative_to_unchecked(array_subset.start()) }; - let mut decoded_offset = 0; - let contiguous_indices = unsafe { - chunk_subset_in_array_subset - .contiguous_linearised_indices_unchecked( - array_subset.shape(), - ) - }; - let length = usize::try_from( - contiguous_indices.contiguous_elements() * element_size, - ) - .unwrap(); - for (array_subset_element_index, _num_elements) in - &contiguous_indices - { - let output_offset = - usize::try_from(array_subset_element_index * element_size) - .unwrap(); - debug_assert!((output_offset + length) <= size_output); - debug_assert!((decoded_offset + length) <= decoded_bytes.len()); - unsafe { - let output_slice = output_slice.get(); - output_slice[output_offset..output_offset + length] - .copy_from_slice( - &decoded_bytes - [decoded_offset..decoded_offset + length], - ); - } - decoded_offset += length; + }; + + // Decode the subset of the chunk which intersects array_subset + let overlap = + unsafe { array_subset.overlap_unchecked(&chunk_subset_in_array) }; + let array_subset_in_chunk_subset = unsafe { + overlap.relative_to_unchecked(chunk_subset_in_array.start()) + }; + + let storage_handle = Arc::new(StorageHandle::new(self.storage.clone())); + let storage_transformer = self + .storage_transformers() + .create_async_readable_transformer(storage_handle); + let input_handle = Box::new(AsyncStoragePartialDecoder::new( + storage_transformer, + data_key(self.path(), &chunk_indices, self.chunk_key_encoding()), + )); + + // TODO: Fast path if its the entire chunk + + let decoded_bytes = { + let chunk_representation = + self.chunk_array_representation(&chunk_indices)?; + let partial_decoder = self + .codecs() + .async_partial_decoder_opt( + input_handle, + &chunk_representation, + &options, + ) + .await?; + + partial_decoder + .partial_decode_opt(&[array_subset_in_chunk_subset], &options) + .await? + .remove(0) + }; + + // Copy decoded bytes to the output + let element_size = self.data_type().size() as u64; + let chunk_subset_in_array_subset = + unsafe { overlap.relative_to_unchecked(array_subset.start()) }; + let mut decoded_offset = 0; + let contiguous_indices = unsafe { + chunk_subset_in_array_subset + .contiguous_linearised_indices_unchecked(array_subset.shape()) + }; + let length = usize::try_from( + contiguous_indices.contiguous_elements() * element_size, + ) + .unwrap(); + for (array_subset_element_index, _num_elements) in &contiguous_indices { + let output_offset = + usize::try_from(array_subset_element_index * element_size) + .unwrap(); + debug_assert!((output_offset + length) <= size_output); + debug_assert!((decoded_offset + length) <= decoded_bytes.len()); + unsafe { + let output_slice = output_slice.get(); + output_slice[output_offset..output_offset + length] + .copy_from_slice( + &decoded_bytes[decoded_offset..decoded_offset + length], + ); } - Ok(()) + decoded_offset += length; } - }) - .collect::>(); - while let Some(item) = futures.next().await { + Ok(()) + } + }); + let mut stream = + futures::stream::iter(futures).buffer_unordered(chunk_concurrent_limit); + while let Some(item) = stream.next().await { item?; } } diff --git a/src/array/array_async_readable_writable.rs b/src/array/array_async_readable_writable.rs index 172d2096..0798507f 100644 --- a/src/array/array_async_readable_writable.rs +++ b/src/array/array_async_readable_writable.rs @@ -1,11 +1,13 @@ -use futures::{stream::FuturesUnordered, StreamExt}; +use futures::StreamExt; use crate::{ array_subset::ArraySubset, storage::{data_key, AsyncReadableWritableStorageTraits}, }; -use super::{codec::options::CodecOptions, Array, ArrayError}; +use super::{ + codec::options::CodecOptions, concurrency::concurrency_chunks_and_codec, Array, ArrayError, +}; impl Array { /// Encode `subset_bytes` and store in `array_subset`. @@ -87,52 +89,58 @@ impl Array>(); - let mut futures = chunks_to_update - .iter() - .map( - |( - chunk_indices, - chunk_subset_in_array_subset, - array_subset_in_chunk_subset, - )| { - let chunk_subset_bytes = unsafe { - chunk_subset_in_array_subset.extract_bytes_unchecked( - &subset_bytes, - array_subset.shape(), - element_size, - ) - }; + let options = options.clone(); + async move { self.async_store_chunk_subset_opt( - chunk_indices, - array_subset_in_chunk_subset, + &chunk_indices, + &array_subset_in_chunk_subset, chunk_subset_bytes, - options, + &options, ) - }, - ) - .collect::>(); - while let Some(item) = futures.next().await { + .await + } + }, + ); + let mut stream = + futures::stream::iter(futures).buffer_unordered(chunk_concurrent_limit); + while let Some(item) = stream.next().await { item?; } } diff --git a/src/array/array_async_writable.rs b/src/array/array_async_writable.rs index 1a96384b..a8aae64e 100644 --- a/src/array/array_async_writable.rs +++ b/src/array/array_async_writable.rs @@ -9,6 +9,7 @@ use crate::{ use super::{ codec::{options::CodecOptions, ArrayCodecTraits}, + concurrency::concurrency_chunks_and_codec, Array, ArrayError, }; @@ -194,41 +195,51 @@ impl Array { )); } + // Calculate chunk/codec concurrency + let chunk_representation = + self.chunk_array_representation(&vec![0; self.dimensionality()])?; + let codec_concurrency = self.recommended_codec_concurrency(&chunk_representation)?; + let (chunk_concurrent_limit, options) = concurrency_chunks_and_codec( + options.concurrent_target(), + num_chunks, + &codec_concurrency, + ); + let element_size = self.data_type().size(); - let mut futures = chunks - .indices() - .into_iter() - .map(|chunk_indices| { - let chunk_subset_in_array = unsafe { - self.chunk_grid() - .subset_unchecked(&chunk_indices, self.shape()) - .unwrap() - }; - let overlap = unsafe { array_subset.overlap_unchecked(&chunk_subset_in_array) }; - let chunk_subset_in_array_subset = - unsafe { overlap.relative_to_unchecked(array_subset.start()) }; - #[allow(clippy::similar_names)] - let chunk_bytes = unsafe { - chunk_subset_in_array_subset.extract_bytes_unchecked( - &chunks_bytes, - array_subset.shape(), - element_size, - ) - }; + let indices = chunks.indices(); + let futures = indices.into_iter().map(|chunk_indices| { + let chunk_subset_in_array = unsafe { + self.chunk_grid() + .subset_unchecked(&chunk_indices, self.shape()) + .unwrap() + }; + let overlap = unsafe { array_subset.overlap_unchecked(&chunk_subset_in_array) }; + let chunk_subset_in_array_subset = + unsafe { overlap.relative_to_unchecked(array_subset.start()) }; + #[allow(clippy::similar_names)] + let chunk_bytes = unsafe { + chunk_subset_in_array_subset.extract_bytes_unchecked( + &chunks_bytes, + array_subset.shape(), + element_size, + ) + }; - debug_assert_eq!( - chunk_subset_in_array.num_elements(), - chunk_subset_in_array_subset.num_elements() - ); + debug_assert_eq!( + chunk_subset_in_array.num_elements(), + chunk_subset_in_array_subset.num_elements() + ); - async move { - self.async_store_chunk_opt(&chunk_indices, chunk_bytes, options) - .await - } - }) - .collect::>(); - while let Some(item) = futures.next().await { + let options = options.clone(); + async move { + self.async_store_chunk_opt(&chunk_indices, chunk_bytes, &options) + .await + } + }); + let mut stream = + futures::stream::iter(futures).buffer_unordered(chunk_concurrent_limit); + while let Some(item) = stream.next().await { item?; } } diff --git a/src/array/codec/array_to_array/transpose/transpose_partial_decoder.rs b/src/array/codec/array_to_array/transpose/transpose_partial_decoder.rs index 908ef10c..ad493967 100644 --- a/src/array/codec/array_to_array/transpose/transpose_partial_decoder.rs +++ b/src/array/codec/array_to_array/transpose/transpose_partial_decoder.rs @@ -114,6 +114,15 @@ impl AsyncArrayPartialDecoderTraits for AsyncTransposePartialDecoder<'_> { decoded_regions: &[ArraySubset], options: &CodecOptions, ) -> Result>, CodecError> { + for array_subset in decoded_regions { + if array_subset.dimensionality() != self.decoded_representation.dimensionality() { + return Err(CodecError::InvalidArraySubsetDimensionalityError( + array_subset.clone(), + self.decoded_representation.dimensionality(), + )); + } + } + // Get transposed array subsets let mut decoded_regions_transposed = Vec::with_capacity(decoded_regions.len()); for decoded_region in decoded_regions { diff --git a/src/array/codec/array_to_bytes/bytes/bytes_partial_decoder.rs b/src/array/codec/array_to_bytes/bytes/bytes_partial_decoder.rs index e54627ce..56a78bf2 100644 --- a/src/array/codec/array_to_bytes/bytes/bytes_partial_decoder.rs +++ b/src/array/codec/array_to_bytes/bytes/bytes_partial_decoder.rs @@ -124,6 +124,15 @@ impl AsyncArrayPartialDecoderTraits for AsyncBytesPartialDecoder<'_> { decoded_regions: &[ArraySubset], options: &CodecOptions, ) -> Result>, CodecError> { + for array_subset in decoded_regions { + if array_subset.dimensionality() != self.decoded_representation.dimensionality() { + return Err(CodecError::InvalidArraySubsetDimensionalityError( + array_subset.clone(), + self.decoded_representation.dimensionality(), + )); + } + } + let mut bytes = Vec::with_capacity(decoded_regions.len()); let chunk_shape = self.decoded_representation.shape_u64(); for array_subset in decoded_regions { diff --git a/src/array/codec/array_to_bytes/sharding/sharding_partial_decoder.rs b/src/array/codec/array_to_bytes/sharding/sharding_partial_decoder.rs index b868482f..8f794dce 100644 --- a/src/array/codec/array_to_bytes/sharding/sharding_partial_decoder.rs +++ b/src/array/codec/array_to_bytes/sharding/sharding_partial_decoder.rs @@ -469,6 +469,15 @@ impl AsyncArrayPartialDecoderTraits for AsyncShardingPartialDecoder<'_> { array_subsets: &[ArraySubset], options: &CodecOptions, ) -> Result>, CodecError> { + for array_subset in array_subsets { + if array_subset.dimensionality() != self.decoded_representation.dimensionality() { + return Err(CodecError::InvalidArraySubsetDimensionalityError( + array_subset.clone(), + self.decoded_representation.dimensionality(), + )); + } + } + let Some(shard_index) = &self.shard_index else { return Ok(array_subsets .iter() diff --git a/src/array/codec/array_to_bytes/zfp/zfp_partial_decoder.rs b/src/array/codec/array_to_bytes/zfp/zfp_partial_decoder.rs index ca776ef1..8ba212a9 100644 --- a/src/array/codec/array_to_bytes/zfp/zfp_partial_decoder.rs +++ b/src/array/codec/array_to_bytes/zfp/zfp_partial_decoder.rs @@ -147,6 +147,15 @@ impl AsyncArrayPartialDecoderTraits for AsyncZfpPartialDecoder<'_> { decoded_regions: &[ArraySubset], options: &CodecOptions, ) -> Result>, CodecError> { + for array_subset in decoded_regions { + if array_subset.dimensionality() != self.decoded_representation.dimensionality() { + return Err(CodecError::InvalidArraySubsetDimensionalityError( + array_subset.clone(), + self.decoded_representation.dimensionality(), + )); + } + } + let encoded_value = self.input_handle.decode_opt(options).await?; let chunk_shape = self.decoded_representation.shape_u64(); let mut out = Vec::with_capacity(decoded_regions.len()); diff --git a/tests/array_async.rs b/tests/array_async.rs new file mode 100644 index 00000000..6740067c --- /dev/null +++ b/tests/array_async.rs @@ -0,0 +1,219 @@ +use std::sync::Arc; + +use zarrs::array::codec::{array_to_bytes::sharding::ShardingCodecBuilder, GzipCodec}; +use zarrs::array::{Array, ArrayBuilder, DataType, FillValue}; +use zarrs::array_subset::ArraySubset; + +#[cfg(feature = "object_store")] +use zarrs::object_store::memory::InMemory; + +#[cfg(all(feature = "async", feature = "object_store"))] +use zarrs::storage::store::AsyncObjectStore; + +#[cfg(all(feature = "ndarray", feature = "async", feature = "object_store"))] +#[rustfmt::skip] +async fn array_async_read(array: Array>) -> Result<(), Box> { + + assert_eq!(array.data_type(), &DataType::UInt8); + assert_eq!(array.fill_value().as_ne_bytes(), &[0u8]); + assert_eq!(array.shape(), &[4, 4]); + assert_eq!(array.chunk_shape(&[0, 0]).unwrap(), [2, 2].try_into().unwrap()); + assert_eq!(array.chunk_grid_shape().unwrap(), &[2, 2]); + + // 1 2 | 3 4 + // 5 6 | 7 8 + // -----|----- + // 9 10 | 0 0 + // 0 0 | 0 0 + array.async_store_chunk(&[0, 0], vec![1, 2, 0, 0]).await?; + array.async_store_chunk(&[0, 1], vec![3, 4, 7, 8]).await?; + array.async_store_array_subset(&ArraySubset::new_with_ranges(&[1..3, 0..2]), vec![5, 6, 9, 10]).await?; + + assert!(array.async_retrieve_chunk(&[0, 0, 0]).await.is_err()); + assert_eq!(array.async_retrieve_chunk(&[0, 0]).await?, [1, 2, 5, 6]); + assert_eq!(array.async_retrieve_chunk(&[0, 1]).await?, [3, 4, 7, 8]); + assert_eq!(array.async_retrieve_chunk(&[1, 0]).await?, [9, 10, 0, 0]); + assert_eq!(array.async_retrieve_chunk(&[1, 1]).await?, [0, 0, 0, 0]); + + assert!(array.async_retrieve_chunk_if_exists(&[0, 0, 0]).await.is_err()); + assert_eq!(array.async_retrieve_chunk_if_exists(&[0, 0]).await?, Some(vec![1, 2, 5, 6])); + assert_eq!(array.async_retrieve_chunk_if_exists(&[0, 1]).await?, Some(vec![3, 4, 7, 8])); + assert_eq!(array.async_retrieve_chunk_if_exists(&[1, 0]).await?, Some(vec![9, 10, 0, 0])); + assert_eq!(array.async_retrieve_chunk_if_exists(&[1, 1]).await?, None); + + assert!(array.async_retrieve_chunk_ndarray::(&[0, 0]).await.is_err()); + assert_eq!(array.async_retrieve_chunk_ndarray::(&[0, 0]).await?, ndarray::array![[1, 2], [5, 6]].into_dyn()); + assert_eq!(array.async_retrieve_chunk_ndarray::(&[0, 1]).await?, ndarray::array![[3, 4], [7, 8]].into_dyn()); + assert_eq!(array.async_retrieve_chunk_ndarray::(&[1, 0]).await?, ndarray::array![[9, 10], [0, 0]].into_dyn()); + assert_eq!(array.async_retrieve_chunk_ndarray::(&[1, 1]).await?, ndarray::array![[0, 0], [0, 0]].into_dyn()); + + assert_eq!(array.async_retrieve_chunk_ndarray_if_exists::(&[0, 0]).await?, Some(ndarray::array![[1, 2], [5, 6]].into_dyn())); + assert_eq!(array.async_retrieve_chunk_ndarray_if_exists::(&[0, 1]).await?, Some(ndarray::array![[3, 4], [7, 8]].into_dyn())); + assert_eq!(array.async_retrieve_chunk_ndarray_if_exists::(&[1, 0]).await?, Some(ndarray::array![[9, 10], [0, 0]].into_dyn())); + assert_eq!(array.async_retrieve_chunk_ndarray_if_exists::(&[1, 1]).await?, None); + + assert!(array.async_retrieve_chunk_subset(&[0, 0], &ArraySubset::new_with_ranges(&[0..2])).await.is_err()); + assert!(array.async_retrieve_chunk_subset(&[0, 0], &ArraySubset::new_with_ranges(&[0..3, 0..3])).await.is_err()); + assert_eq!(array.async_retrieve_chunk_subset(&[0, 0], &ArraySubset::new_with_ranges(&[0..2, 0..2])).await?, [1, 2, 5, 6]); + assert_eq!(array.async_retrieve_chunk_subset(&[0, 0], &ArraySubset::new_with_ranges(&[0..1, 0..2])).await?, [1, 2]); + assert_eq!(array.async_retrieve_chunk_subset(&[0, 0], &ArraySubset::new_with_ranges(&[0..2, 1..2])).await?, [2, 6]); + + assert!(array.async_retrieve_chunk_subset_ndarray::(&[0, 0], &ArraySubset::new_with_ranges(&[0..3, 0..3])).await.is_err()); + assert!(array.async_retrieve_chunk_subset_ndarray::(&[0, 0], &ArraySubset::new_with_ranges(&[0..2, 0..2])).await.is_err()); + assert_eq!(array.async_retrieve_chunk_subset_ndarray::(&[0, 0], &ArraySubset::new_with_ranges(&[0..2, 0..2])).await?, ndarray::array![[1, 2], [5, 6]].into_dyn()); + assert_eq!(array.async_retrieve_chunk_subset_ndarray::(&[0, 0], &ArraySubset::new_with_ranges(&[0..1, 0..2])).await?, ndarray::array![[1, 2]].into_dyn()); + assert_eq!(array.async_retrieve_chunk_subset_ndarray::(&[0, 0], &ArraySubset::new_with_ranges(&[0..2, 1..2])).await?, ndarray::array![[2], [6]].into_dyn()); + + assert!(array.async_retrieve_chunks(&ArraySubset::new_with_ranges(&[0..2])).await.is_err()); + assert_eq!(array.async_retrieve_chunks(&ArraySubset::new_with_ranges(&[0..0, 0..0])).await?, Vec::::new()); + assert_eq!(array.async_retrieve_chunks(&ArraySubset::new_with_ranges(&[0..1, 0..1])).await?, [1, 2, 5, 6]); + assert_eq!(array.async_retrieve_chunks(&ArraySubset::new_with_ranges(&[0..2, 0..2])).await?, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0, 0, 0, 0, 0, 0]); + assert_eq!(array.async_retrieve_chunks(&ArraySubset::new_with_ranges(&[0..2, 1..2])).await?, [3, 4, 7, 8, 0, 0, 0, 0]); + assert_eq!(array.async_retrieve_chunks(&ArraySubset::new_with_ranges(&[0..1, 1..3])).await?, [3, 4, 0, 0, 7, 8, 0, 0]); + + assert!(array.async_retrieve_chunks_ndarray::(&ArraySubset::new_with_ranges(&[0..2])).await.is_err()); + assert!(array.async_retrieve_chunks_ndarray::(&ArraySubset::new_with_ranges(&[0..2, 0..2])).await.is_err()); + assert_eq!(array.async_retrieve_chunks_ndarray::(&ArraySubset::new_with_ranges(&[0..2, 0..2])).await?, ndarray::array![[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 0, 0], [0, 0, 0, 0]].into_dyn()); + assert_eq!(array.async_retrieve_chunks_ndarray::(&ArraySubset::new_with_ranges(&[0..2, 1..2])).await?, ndarray::array![[3, 4], [7, 8], [0, 0], [0, 0]].into_dyn()); + assert_eq!(array.async_retrieve_chunks_ndarray::(&ArraySubset::new_with_ranges(&[0..1, 1..3])).await?, ndarray::array![[3, 4, 0, 0], [7, 8, 0, 0]].into_dyn()); + + assert!(array.async_retrieve_array_subset(&ArraySubset::new_with_ranges(&[0..4])).await.is_err()); + assert_eq!(array.async_retrieve_array_subset(&ArraySubset::new_with_ranges(&[0..0, 0..0])).await?, Vec::::new()); + assert_eq!(array.async_retrieve_array_subset(&ArraySubset::new_with_ranges(&[0..0, 0..0])).await?, [] as [u8; 0]); + assert_eq!(array.async_retrieve_array_subset(&ArraySubset::new_with_ranges(&[0..2, 0..2])).await?, [1, 2, 5, 6]); + assert_eq!(array.async_retrieve_array_subset(&ArraySubset::new_with_ranges(&[0..4, 0..4])).await?, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0, 0, 0, 0, 0, 0]); + assert_eq!(array.async_retrieve_array_subset(&ArraySubset::new_with_ranges(&[1..3, 1..3])).await?, [6, 7, 10 ,0]); + assert_eq!(array.async_retrieve_array_subset(&ArraySubset::new_with_ranges(&[5..7, 5..6])).await?, [0, 0]); // OOB -> fill value + assert_eq!(array.async_retrieve_array_subset(&ArraySubset::new_with_ranges(&[0..5, 0..5])).await?, [1, 2, 3, 4, 0, 5, 6, 7, 8, 0, 9, 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]); // OOB -> fill value + + assert!(array.async_retrieve_array_subset_ndarray::(&ArraySubset::new_with_ranges(&[0..4])).await.is_err()); + assert!(array.async_retrieve_array_subset_ndarray::(&ArraySubset::new_with_ranges(&[0..4, 0..4])).await.is_err()); + assert_eq!(array.async_retrieve_array_subset_ndarray::(&ArraySubset::new_with_ranges(&[0..0, 0..0])).await?, ndarray::Array2::::zeros((0, 0)).into_dyn()); + assert_eq!(array.async_retrieve_array_subset_ndarray::(&ArraySubset::new_with_ranges(&[0..4, 0..4])).await?, ndarray::array![[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 0, 0], [0, 0, 0, 0]].into_dyn()); + assert_eq!(array.async_retrieve_array_subset_ndarray::(&ArraySubset::new_with_ranges(&[1..3, 1..3])).await?, ndarray::array![[6, 7], [10 ,0]].into_dyn()); + assert_eq!(array.async_retrieve_array_subset_ndarray::(&ArraySubset::new_with_ranges(&[5..7, 5..6])).await?, ndarray::array![[0], [0]].into_dyn()); // OOB -> fill value + assert_eq!(array.async_retrieve_array_subset_ndarray::(&ArraySubset::new_with_ranges(&[0..5, 0..5])).await?, ndarray::array![[1, 2, 3, 4, 0], [5, 6, 7, 8, 0], [9, 10, 0, 0, 0], [0, 0, 0, 0, 0], [0, 0, 0, 0, 0]].into_dyn()); // OOB -> fill value + + // { + // // Invalid array view dimensionality + // let mut data = vec![0, 0, 0, 0, 0, 0]; + // let shape = &[3]; + // let array_view_subset = ArraySubset::new_with_ranges(&[1..2]); + // let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; + // assert!(array.async_retrieve_chunk_subset_into_array_view(&[0, 0], &ArraySubset::new_with_ranges(&[0..1, 0..2]), &array_view).await.is_err()); + // } + // { + // let mut data = vec![0, 0, 0, 0, 0, 0]; + // let shape = &[3, 2]; + // let array_view_subset = ArraySubset::new_with_ranges(&[1..2, 0..2]); + // let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; + // array.async_retrieve_chunk_subset_into_array_view(&[0, 0], &ArraySubset::new_with_ranges(&[0..1, 0..2]), &array_view).await?; + // assert_eq!(data, [0, 0, 1, 2, 0, 0]); + // } + // { + // let mut data = vec![0, 0, 0, 0, 0, 0]; + // let shape = &[3, 2]; + // let array_view_subset = ArraySubset::new_with_ranges(&[0..2, 0..2]); + // let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; + // array.async_retrieve_chunk_into_array_view(&[0, 0], &array_view).await?; + // assert_eq!(data, [1, 2, 5, 6, 0, 0]); + // } + // { + // let mut data = vec![0, 0, 0, 0, 0, 0]; + // let shape = &[3, 2]; + // let array_view_subset = ArraySubset::new_with_ranges(&[1..2, 0..2]); + // let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; + // array.async_retrieve_array_subset_into_array_view(&ArraySubset::new_with_ranges(&[0..1,0..2]), &array_view).await?; + // assert_eq!(data, [0, 0, 1, 2, 0, 0]); + // } + // { + // let mut data = vec![0, 0, 0, 0, 0, 0]; + // let shape = &[3, 2]; + // let array_view_subset = ArraySubset::new_with_ranges(&[1..3, 0..2]); + // let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; + // array.async_retrieve_array_subset_into_array_view(&ArraySubset::new_with_ranges(&[0..2,0..2]), &array_view).await?; + // assert_eq!(data, [0, 0, 1, 2, 5, 6]); + // } + // { + // let mut data = vec![0, 0, 0, 0, 0, 0]; + // let shape = &[3, 2]; + // let array_view_subset = ArraySubset::new_with_ranges(&[1..3, 0..2]); + // let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; + // array.async_retrieve_array_subset_into_array_view(&ArraySubset::new_with_ranges(&[1..3, 1..3]), &array_view).await?; + // assert_eq!(data, [0, 0, 6, 7, 10, 0]); + // } + // { + // let mut data = vec![0, 0, 0, 0, 0, 0, 0, 0]; + // let shape = &[4, 2]; + // let array_view_subset = ArraySubset::new_with_ranges(&[1..4, 0..2]); + // let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; + // array.async_retrieve_array_subset_into_array_view(&ArraySubset::new_with_ranges(&[1..4, 0..2]), &array_view).await?; + // assert_eq!(data, [0, 0, 5, 6, 9, 10, 0, 0]); + // } + + assert!(array.async_partial_decoder(&[0]).await.is_err()); + assert!(array.async_partial_decoder(&[0, 0]).await?.partial_decode(&[ArraySubset::new_with_ranges(&[0..1])]).await.is_err()); + assert_eq!(array.async_partial_decoder(&[0, 0]).await?.partial_decode(&[]).await?, Vec::>::new()); + assert_eq!(array.async_partial_decoder(&[5, 0]).await?.partial_decode(&[ArraySubset::new_with_ranges(&[0..1, 0..2])]).await?, [vec![0, 0]]); // OOB -> fill value + assert_eq!(array.async_partial_decoder(&[0, 0]).await?.partial_decode(&[ArraySubset::new_with_ranges(&[0..1, 0..2]), ArraySubset::new_with_ranges(&[0..2, 1..2])]).await?, [vec![1, 2], vec![2, 6]]); + // { + // let mut data = vec![0, 0, 0, 0, 0, 0]; + // let shape = &[3, 2]; + // let array_view_subset = ArraySubset::new_with_ranges(&[1..2, 0..2]); + // let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; + // assert!(array.async_partial_decoder(&[0, 0]).await?.partial_decode_into_array_view(&ArraySubset::new_with_ranges(&[0..1, 0..2]), &array_view).await.is_ok()); + // assert_eq!(data, [0, 0, 1, 2, 0, 0]); + // } + // { + // let mut data = vec![0, 0, 0, 0, 0, 0]; + // let shape = &[3, 2]; + // let array_view_subset = ArraySubset::new_with_ranges(&[1..2, 0..2]); + // let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; + // assert!(array.async_partial_decoder(&[0, 0]).await?.partial_decode_into_array_view(&ArraySubset::new_with_ranges(&[0..2, 0..2]), &array_view).await.is_err()); + // } + + Ok(()) +} + +#[cfg(all(feature = "ndarray", feature = "async", feature = "object_store"))] +#[tokio::test] +async fn array_async_read_uncompressed() -> Result<(), Box> { + let store = Arc::new(AsyncObjectStore::new(InMemory::new())); + let array_path = "/array"; + let array = ArrayBuilder::new( + vec![4, 4], // array shape + DataType::UInt8, + vec![2, 2].try_into().unwrap(), // regular chunk shape + FillValue::from(0u8), + ) + .bytes_to_bytes_codecs(vec![]) + // .storage_transformers(vec![].into()) + .build(store, array_path) + .unwrap(); + array_async_read(array).await +} + +// #[cfg(all(feature = "ndarray", feature = "async", feature = "object_store"))] +#[tokio::test] +async fn array_async_read_shard_compress() -> Result<(), Box> { + let store = Arc::new(AsyncObjectStore::new(InMemory::new())); + let array_path = "/array"; + let array = ArrayBuilder::new( + vec![4, 4], // array shape + DataType::UInt8, + vec![2, 2].try_into().unwrap(), // regular chunk shape + FillValue::from(0u8), + ) + .array_to_bytes_codec(Box::new( + ShardingCodecBuilder::new(vec![1, 1].try_into().unwrap()) + .bytes_to_bytes_codecs(vec![ + #[cfg(feature = "gzip")] + Box::new(GzipCodec::new(5)?), + ]) + .build(), + )) + // .storage_transformers(vec![].into()) + .build(store, array_path) + .unwrap(); + array_async_read(array).await +} diff --git a/tests/array_sync.rs b/tests/array_sync.rs new file mode 100644 index 00000000..7ac565c3 --- /dev/null +++ b/tests/array_sync.rs @@ -0,0 +1,213 @@ +use std::sync::Arc; + +use zarrs::array::codec::{array_to_bytes::sharding::ShardingCodecBuilder, GzipCodec}; +use zarrs::array::{Array, ArrayBuilder, ArrayView, DataType, FillValue}; +use zarrs::array_subset::ArraySubset; +use zarrs::storage::store::MemoryStore; + +#[cfg(feature = "ndarray")] +#[rustfmt::skip] +fn array_sync_read(array: Array) -> Result<(), Box> { + assert_eq!(array.data_type(), &DataType::UInt8); + assert_eq!(array.fill_value().as_ne_bytes(), &[0u8]); + assert_eq!(array.shape(), &[4, 4]); + assert_eq!(array.chunk_shape(&[0, 0]).unwrap(), [2, 2].try_into().unwrap()); + assert_eq!(array.chunk_grid_shape().unwrap(), &[2, 2]); + + // 1 2 | 3 4 + // 5 6 | 7 8 + // -----|----- + // 9 10 | 0 0 + // 0 0 | 0 0 + array.store_chunk(&[0, 0], vec![1, 2, 0, 0])?; + array.store_chunk(&[0, 1], vec![3, 4, 7, 8])?; + array.store_array_subset(&ArraySubset::new_with_ranges(&[1..3, 0..2]), vec![5, 6, 9, 10])?; + + assert!(array.retrieve_chunk(&[0, 0, 0]).is_err()); + assert_eq!(array.retrieve_chunk(&[0, 0])?, [1, 2, 5, 6]); + assert_eq!(array.retrieve_chunk(&[0, 1])?, [3, 4, 7, 8]); + assert_eq!(array.retrieve_chunk(&[1, 0])?, [9, 10, 0, 0]); + assert_eq!(array.retrieve_chunk(&[1, 1])?, [0, 0, 0, 0]); + + assert!(array.retrieve_chunk_if_exists(&[0, 0, 0]).is_err()); + assert_eq!(array.retrieve_chunk_if_exists(&[0, 0])?, Some(vec![1, 2, 5, 6])); + assert_eq!(array.retrieve_chunk_if_exists(&[0, 1])?, Some(vec![3, 4, 7, 8])); + assert_eq!(array.retrieve_chunk_if_exists(&[1, 0])?, Some(vec![9, 10, 0, 0])); + assert_eq!(array.retrieve_chunk_if_exists(&[1, 1])?, None); + + assert!(array.retrieve_chunk_ndarray::(&[0, 0]).is_err()); + assert_eq!(array.retrieve_chunk_ndarray::(&[0, 0])?, ndarray::array![[1, 2], [5, 6]].into_dyn()); + assert_eq!(array.retrieve_chunk_ndarray::(&[0, 1])?, ndarray::array![[3, 4], [7, 8]].into_dyn()); + assert_eq!(array.retrieve_chunk_ndarray::(&[1, 0])?, ndarray::array![[9, 10], [0, 0]].into_dyn()); + assert_eq!(array.retrieve_chunk_ndarray::(&[1, 1])?, ndarray::array![[0, 0], [0, 0]].into_dyn()); + + assert_eq!(array.retrieve_chunk_ndarray_if_exists::(&[0, 0])?, Some(ndarray::array![[1, 2], [5, 6]].into_dyn())); + assert_eq!(array.retrieve_chunk_ndarray_if_exists::(&[0, 1])?, Some(ndarray::array![[3, 4], [7, 8]].into_dyn())); + assert_eq!(array.retrieve_chunk_ndarray_if_exists::(&[1, 0])?, Some(ndarray::array![[9, 10], [0, 0]].into_dyn())); + assert_eq!(array.retrieve_chunk_ndarray_if_exists::(&[1, 1])?, None); + + assert!(array.retrieve_chunk_subset(&[0, 0], &ArraySubset::new_with_ranges(&[0..2])).is_err()); + assert!(array.retrieve_chunk_subset(&[0, 0], &ArraySubset::new_with_ranges(&[0..3, 0..3])).is_err()); + assert_eq!(array.retrieve_chunk_subset(&[0, 0], &ArraySubset::new_with_ranges(&[0..2, 0..2]))?, [1, 2, 5, 6]); + assert_eq!(array.retrieve_chunk_subset(&[0, 0], &ArraySubset::new_with_ranges(&[0..1, 0..2]))?, [1, 2]); + assert_eq!(array.retrieve_chunk_subset(&[0, 0], &ArraySubset::new_with_ranges(&[0..2, 1..2]))?, [2, 6]); + + assert!(array.retrieve_chunk_subset_ndarray::(&[0, 0], &ArraySubset::new_with_ranges(&[0..3, 0..3])).is_err()); + assert!(array.retrieve_chunk_subset_ndarray::(&[0, 0], &ArraySubset::new_with_ranges(&[0..2, 0..2])).is_err()); + assert_eq!(array.retrieve_chunk_subset_ndarray::(&[0, 0], &ArraySubset::new_with_ranges(&[0..2, 0..2]))?, ndarray::array![[1, 2], [5, 6]].into_dyn()); + assert_eq!(array.retrieve_chunk_subset_ndarray::(&[0, 0], &ArraySubset::new_with_ranges(&[0..1, 0..2]))?, ndarray::array![[1, 2]].into_dyn()); + assert_eq!(array.retrieve_chunk_subset_ndarray::(&[0, 0], &ArraySubset::new_with_ranges(&[0..2, 1..2]))?, ndarray::array![[2], [6]].into_dyn()); + + assert!(array.retrieve_chunks(&ArraySubset::new_with_ranges(&[0..2])).is_err()); + assert_eq!(array.retrieve_chunks(&ArraySubset::new_with_ranges(&[0..0, 0..0]))?, Vec::::new()); + assert_eq!(array.retrieve_chunks(&ArraySubset::new_with_ranges(&[0..1, 0..1]))?, [1, 2, 5, 6]); + assert_eq!(array.retrieve_chunks(&ArraySubset::new_with_ranges(&[0..2, 0..2]))?, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0, 0, 0, 0, 0, 0]); + assert_eq!(array.retrieve_chunks(&ArraySubset::new_with_ranges(&[0..2, 1..2]))?, [3, 4, 7, 8, 0, 0, 0, 0]); + assert_eq!(array.retrieve_chunks(&ArraySubset::new_with_ranges(&[0..1, 1..3]))?, [3, 4, 0, 0, 7, 8, 0, 0]); + + assert!(array.retrieve_chunks_ndarray::(&ArraySubset::new_with_ranges(&[0..2])).is_err()); + assert!(array.retrieve_chunks_ndarray::(&ArraySubset::new_with_ranges(&[0..2, 0..2])).is_err()); + assert_eq!(array.retrieve_chunks_ndarray::(&ArraySubset::new_with_ranges(&[0..2, 0..2]))?, ndarray::array![[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 0, 0], [0, 0, 0, 0]].into_dyn()); + assert_eq!(array.retrieve_chunks_ndarray::(&ArraySubset::new_with_ranges(&[0..2, 1..2]))?, ndarray::array![[3, 4], [7, 8], [0, 0], [0, 0]].into_dyn()); + assert_eq!(array.retrieve_chunks_ndarray::(&ArraySubset::new_with_ranges(&[0..1, 1..3]))?, ndarray::array![[3, 4, 0, 0], [7, 8, 0, 0]].into_dyn()); + + assert!(array.retrieve_array_subset(&ArraySubset::new_with_ranges(&[0..4])).is_err()); + assert_eq!(array.retrieve_array_subset(&ArraySubset::new_with_ranges(&[0..0, 0..0]))?, Vec::::new()); + assert_eq!(array.retrieve_array_subset(&ArraySubset::new_with_ranges(&[0..0, 0..0]))?, [] as [u8; 0]); + assert_eq!(array.retrieve_array_subset(&ArraySubset::new_with_ranges(&[0..2, 0..2]))?, [1, 2, 5, 6]); + assert_eq!(array.retrieve_array_subset(&ArraySubset::new_with_ranges(&[0..4, 0..4]))?, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0, 0, 0, 0, 0, 0]); + assert_eq!(array.retrieve_array_subset(&ArraySubset::new_with_ranges(&[1..3, 1..3]))?, [6, 7, 10 ,0]); + assert_eq!(array.retrieve_array_subset(&ArraySubset::new_with_ranges(&[5..7, 5..6]))?, [0, 0]); // OOB -> fill value + assert_eq!(array.retrieve_array_subset(&ArraySubset::new_with_ranges(&[0..5, 0..5]))?, [1, 2, 3, 4, 0, 5, 6, 7, 8, 0, 9, 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]); // OOB -> fill value + + assert!(array.retrieve_array_subset_ndarray::(&ArraySubset::new_with_ranges(&[0..4])).is_err()); + assert!(array.retrieve_array_subset_ndarray::(&ArraySubset::new_with_ranges(&[0..4, 0..4])).is_err()); + assert_eq!(array.retrieve_array_subset_ndarray::(&ArraySubset::new_with_ranges(&[0..0, 0..0]))?, ndarray::Array2::::zeros((0, 0)).into_dyn()); + assert_eq!(array.retrieve_array_subset_ndarray::(&ArraySubset::new_with_ranges(&[0..4, 0..4]))?, ndarray::array![[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 0, 0], [0, 0, 0, 0]].into_dyn()); + assert_eq!(array.retrieve_array_subset_ndarray::(&ArraySubset::new_with_ranges(&[1..3, 1..3]))?, ndarray::array![[6, 7], [10 ,0]].into_dyn()); + assert_eq!(array.retrieve_array_subset_ndarray::(&ArraySubset::new_with_ranges(&[5..7, 5..6]))?, ndarray::array![[0], [0]].into_dyn()); // OOB -> fill value + assert_eq!(array.retrieve_array_subset_ndarray::(&ArraySubset::new_with_ranges(&[0..5, 0..5]))?, ndarray::array![[1, 2, 3, 4, 0], [5, 6, 7, 8, 0], [9, 10, 0, 0, 0], [0, 0, 0, 0, 0], [0, 0, 0, 0, 0]].into_dyn()); // OOB -> fill value + + { + // Invalid array view dimensionality + let mut data = vec![0, 0, 0, 0, 0, 0]; + let shape = &[3]; + let array_view_subset = ArraySubset::new_with_ranges(&[1..2]); + let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; + assert!(array.retrieve_chunk_subset_into_array_view(&[0, 0], &ArraySubset::new_with_ranges(&[0..1, 0..2]), &array_view).is_err()); + } + { + let mut data = vec![0, 0, 0, 0, 0, 0]; + let shape = &[3, 2]; + let array_view_subset = ArraySubset::new_with_ranges(&[1..2, 0..2]); + let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; + array.retrieve_chunk_subset_into_array_view(&[0, 0], &ArraySubset::new_with_ranges(&[0..1, 0..2]), &array_view)?; + assert_eq!(data, [0, 0, 1, 2, 0, 0]); + } + { + let mut data = vec![0, 0, 0, 0, 0, 0]; + let shape = &[3, 2]; + let array_view_subset = ArraySubset::new_with_ranges(&[0..2, 0..2]); + let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; + array.retrieve_chunk_into_array_view(&[0, 0], &array_view)?; + assert_eq!(data, [1, 2, 5, 6, 0, 0]); + } + { + let mut data = vec![0, 0, 0, 0, 0, 0]; + let shape = &[3, 2]; + let array_view_subset = ArraySubset::new_with_ranges(&[1..2, 0..2]); + let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; + array.retrieve_array_subset_into_array_view(&ArraySubset::new_with_ranges(&[0..1,0..2]), &array_view)?; + assert_eq!(data, [0, 0, 1, 2, 0, 0]); + } + { + let mut data = vec![0, 0, 0, 0, 0, 0]; + let shape = &[3, 2]; + let array_view_subset = ArraySubset::new_with_ranges(&[1..3, 0..2]); + let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; + array.retrieve_array_subset_into_array_view(&ArraySubset::new_with_ranges(&[0..2,0..2]), &array_view)?; + assert_eq!(data, [0, 0, 1, 2, 5, 6]); + } + { + let mut data = vec![0, 0, 0, 0, 0, 0]; + let shape = &[3, 2]; + let array_view_subset = ArraySubset::new_with_ranges(&[1..3, 0..2]); + let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; + array.retrieve_array_subset_into_array_view(&ArraySubset::new_with_ranges(&[1..3, 1..3]), &array_view)?; + assert_eq!(data, [0, 0, 6, 7, 10, 0]); + } + { + let mut data = vec![0, 0, 0, 0, 0, 0, 0, 0]; + let shape = &[4, 2]; + let array_view_subset = ArraySubset::new_with_ranges(&[1..4, 0..2]); + let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; + array.retrieve_array_subset_into_array_view(&ArraySubset::new_with_ranges(&[1..4, 0..2]), &array_view)?; + assert_eq!(data, [0, 0, 5, 6, 9, 10, 0, 0]); + } + + assert!(array.partial_decoder(&[0]).is_err()); + assert!(array.partial_decoder(&[0, 0])?.partial_decode(&[ArraySubset::new_with_ranges(&[0..1])]).is_err()); + assert_eq!(array.partial_decoder(&[0, 0])?.partial_decode(&[])?, Vec::>::new()); + assert_eq!(array.partial_decoder(&[5, 0])?.partial_decode(&[ArraySubset::new_with_ranges(&[0..1, 0..2])])?, [vec![0, 0]]); // OOB -> fill value + assert_eq!(array.partial_decoder(&[0, 0])?.partial_decode(&[ArraySubset::new_with_ranges(&[0..1, 0..2]), ArraySubset::new_with_ranges(&[0..2, 1..2])])?, [vec![1, 2], vec![2, 6]]); + { + let mut data = vec![0, 0, 0, 0, 0, 0]; + let shape = &[3, 2]; + let array_view_subset = ArraySubset::new_with_ranges(&[1..2, 0..2]); + let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; + assert!(array.partial_decoder(&[0, 0])?.partial_decode_into_array_view(&ArraySubset::new_with_ranges(&[0..1, 0..2]), &array_view).is_ok()); + assert_eq!(data, [0, 0, 1, 2, 0, 0]); + } + { + let mut data = vec![0, 0, 0, 0, 0, 0]; + let shape = &[3, 2]; + let array_view_subset = ArraySubset::new_with_ranges(&[1..2, 0..2]); + let array_view = ArrayView::new(&mut data, shape, array_view_subset)?; + assert!(array.partial_decoder(&[0, 0])?.partial_decode_into_array_view(&ArraySubset::new_with_ranges(&[0..2, 0..2]), &array_view).is_err()); + } + + Ok(()) +} + +#[cfg(feature = "ndarray")] +#[test] +fn array_sync_read_uncompressed() -> Result<(), Box> { + let store = Arc::new(MemoryStore::default()); + let array_path = "/array"; + let array = ArrayBuilder::new( + vec![4, 4], // array shape + DataType::UInt8, + vec![2, 2].try_into().unwrap(), // regular chunk shape + FillValue::from(0u8), + ) + .bytes_to_bytes_codecs(vec![]) + // .storage_transformers(vec![].into()) + .build(store, array_path) + .unwrap(); + array_sync_read(array) +} + +#[cfg(feature = "ndarray")] +#[test] +fn array_sync_read_shard_compress() -> Result<(), Box> { + let store = Arc::new(MemoryStore::default()); + let array_path = "/array"; + let array = ArrayBuilder::new( + vec![4, 4], // array shape + DataType::UInt8, + vec![2, 2].try_into().unwrap(), // regular chunk shape + FillValue::from(0u8), + ) + .array_to_bytes_codec(Box::new( + ShardingCodecBuilder::new(vec![1, 1].try_into().unwrap()) + .bytes_to_bytes_codecs(vec![ + #[cfg(feature = "gzip")] + Box::new(GzipCodec::new(5)?), + ]) + .build(), + )) + // .storage_transformers(vec![].into()) + .build(store, array_path) + .unwrap(); + array_sync_read(array) +}