Skip to content

Commit

Permalink
Added the `array_sharded_ext::{ArrayShardedExt,ArrayShardedReadableEx…
Browse files Browse the repository at this point in the history
…t}` array extension traits

Fix a typo in `ArrayShardedExt`
  • Loading branch information
LDeakin committed Apr 29, 2024
1 parent ba06442 commit fe7ae6b
Show file tree
Hide file tree
Showing 3 changed files with 309 additions and 0 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- Added the `array_sharded_ext::{ArrayShardedExt,ArrayShardedReadableExt}` array extension traits to simplify working with sharded arrays
- Abstracts the chunk grid to an "inner chunk grid" to simplify inner chunk retrieval
- Shard indexes are cached in a `ArrayShardedReadableExtCache`

### Changed
- Allow float fill values to be created from int fill value metadata
- Make `chunk_grid::{regular,rectangular}` public
Expand Down
8 changes: 8 additions & 0 deletions src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ mod fill_value_metadata;
mod nan_representations;
mod unsafe_cell_slice;

#[cfg(feature = "sharding")]
mod array_sharded_ext;

use std::sync::Arc;

pub use self::{
Expand All @@ -49,6 +52,11 @@ pub use self::{
unsafe_cell_slice::UnsafeCellSlice,
};

#[cfg(feature = "sharding")]
pub use array_sharded_ext::{
ArrayShardedExt, ArrayShardedReadableExt, ArrayShardedReadableExtCache,
};

use serde::Serialize;
use thiserror::Error;

Expand Down
296 changes: 296 additions & 0 deletions src/array/array_sharded_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
use std::{collections::HashMap, sync::Arc};

use super::{
codec::{CodecOptions, ShardingCodecConfiguration},
Array, ArrayError, ChunkGrid, ChunkShape,
};
use crate::array::codec::ArrayPartialDecoderTraits;
use crate::storage::ReadableStorageTraits;

/// An [`Array`] extension trait to simplify working with arrays using the `sharding_indexed` codec.
pub trait ArrayShardedExt {
/// Returns true if the array to bytes codec of the array is `sharding_indexed`.
fn is_sharded(&self) -> bool;

/// Return the inner chunk shape.
///
/// Returns [`None`] for an unsharded array.
fn inner_chunk_shape(&self) -> Option<ChunkShape>;

/// Retrieve the inner chunk grid.
///
/// Returns the normal chunk grid for an unsharded array.
fn inner_chunk_grid(&self) -> ChunkGrid;
}

impl<TStorage: ?Sized> ArrayShardedExt for Array<TStorage> {
fn is_sharded(&self) -> bool {
self.codecs
.array_to_bytes_codec()
.create_metadata()
.expect("the array to bytes codec should have metadata")
.name() // TODO: Add codec::identifier()?
== super::codec::array_to_bytes::sharding::IDENTIFIER
}

fn inner_chunk_shape(&self) -> Option<ChunkShape> {
let codec_metadata = self
.codecs
.array_to_bytes_codec()
.create_metadata()
.expect("the array to bytes codec should have metadata");
if let Ok(ShardingCodecConfiguration::V1(sharding_configuration)) =
codec_metadata.to_configuration()
{
Some(sharding_configuration.chunk_shape)
} else {
None
}
}

fn inner_chunk_grid(&self) -> ChunkGrid {
if let Some(inner_chunk_shape) = self.inner_chunk_shape() {
ChunkGrid::new(crate::array::chunk_grid::RegularChunkGrid::new(
inner_chunk_shape,
))
} else {
self.chunk_grid().clone()
}
}
}

type PartialDecoderHashMap<'a> = HashMap<Vec<u64>, Arc<dyn ArrayPartialDecoderTraits + 'a>>;

/// A cache used for methods in the [`ArrayShardedReadableExt`] trait.
pub struct ArrayShardedReadableExtCache<'a> {
array_is_sharded: bool,
inner_chunk_grid: ChunkGrid,
cache: Arc<parking_lot::Mutex<PartialDecoderHashMap<'a>>>,
}

impl<'a> ArrayShardedReadableExtCache<'a> {
/// Create a new cache for an array.
#[must_use]
pub fn new<TStorage: ?Sized + ReadableStorageTraits + 'static>(
array: &'a Array<TStorage>,
) -> Self {
let inner_chunk_grid = array.inner_chunk_grid();
Self {
array_is_sharded: array.is_sharded(),
inner_chunk_grid,
cache: Arc::new(parking_lot::Mutex::new(HashMap::default())),
}
}

/// Returns true if the array is sharded.
///
/// This is cheaper than calling [`ArrayShardedExt::is_sharded`] repeatedly.
#[must_use]
pub fn array_is_sharded(&self) -> bool {
self.array_is_sharded
}

fn inner_chunk_grid(&self) -> &ChunkGrid {
&self.inner_chunk_grid
}

/// Return the number of shard indexes cached.
#[must_use]
pub fn len(&self) -> usize {
self.cache.lock().len()
}

/// Returns true if the cache contains no cached shard indexes.
#[must_use]
pub fn is_empty(&self) -> bool {
self.cache.lock().is_empty()
}

/// Clear the cache.
pub fn clear(&self) {
self.cache.lock().clear();
}

Check warning on line 112 in src/array/array_sharded_ext.rs

View check run for this annotation

Codecov / codecov/patch

src/array/array_sharded_ext.rs#L110-L112

Added lines #L110 - L112 were not covered by tests

fn retrieve<TStorage: ?Sized + ReadableStorageTraits + 'static>(
&self,
array: &'a Array<TStorage>,
chunk_indices: &[u64],
) -> Result<Arc<dyn ArrayPartialDecoderTraits + 'a>, ArrayError> {
let mut cache = self.cache.lock();
if let Some(partial_decoder) = cache.get(chunk_indices) {
Ok(partial_decoder.clone())

Check warning on line 121 in src/array/array_sharded_ext.rs

View check run for this annotation

Codecov / codecov/patch

src/array/array_sharded_ext.rs#L121

Added line #L121 was not covered by tests
} else {
let partial_decoder: Arc<dyn ArrayPartialDecoderTraits> =
array.partial_decoder(chunk_indices)?.into();
cache.insert(chunk_indices.to_vec(), partial_decoder.clone());
Ok(partial_decoder)
}
}
}

/// An [`Array`] extension trait to efficiently read data (e.g. inner chunks) from arrays using the `sharding_indexed` codec.
///
/// Sharding indexes are cached in a [`ArrayShardedReadableExtCache`] enabling faster retrieval in some cases.
// TODO: Add more methods
pub trait ArrayShardedReadableExt<TStorage: ?Sized + ReadableStorageTraits + 'static> {
/// Read and decode the inner chunk at `chunk_indices` into its bytes with default codec options.
///
/// See [`Array::retrieve_chunk`].
#[allow(clippy::missing_errors_doc)]
fn retrieve_inner_chunk<'a>(
&'a self,
cache: &ArrayShardedReadableExtCache<'a>,
inner_chunk_indices: &[u64],
) -> Result<Vec<u8>, ArrayError> {
self.retrieve_inner_chunk_opt(cache, inner_chunk_indices, &CodecOptions::default())
}

/// Read and decode the inner chunk at `chunk_indices` into its bytes with default codec options.
///
/// See [`Array::retrieve_chunk`].
#[allow(clippy::missing_errors_doc)]
fn retrieve_inner_chunk_opt<'a>(
&'a self,
cache: &ArrayShardedReadableExtCache<'a>,
inner_chunk_indices: &[u64],
options: &CodecOptions,
) -> Result<Vec<u8>, ArrayError>;
}

impl<TStorage: ?Sized + ReadableStorageTraits + 'static> ArrayShardedReadableExt<TStorage>
for Array<TStorage>
{
fn retrieve_inner_chunk_opt<'a>(
&'a self,
cache: &ArrayShardedReadableExtCache<'a>,
inner_chunk_indices: &[u64],
options: &CodecOptions,
) -> Result<Vec<u8>, ArrayError> {
if cache.array_is_sharded() {
let array_subset = cache
.inner_chunk_grid()
.subset(inner_chunk_indices, self.shape())?
.ok_or_else(|| {
ArrayError::InvalidChunkGridIndicesError(inner_chunk_indices.to_vec())

Check warning on line 174 in src/array/array_sharded_ext.rs

View check run for this annotation

Codecov / codecov/patch

src/array/array_sharded_ext.rs#L174

Added line #L174 was not covered by tests
})?;
let outer_chunks = self.chunks_in_array_subset(&array_subset)?.ok_or_else(|| {
ArrayError::InvalidChunkGridIndicesError(inner_chunk_indices.to_vec())

Check warning on line 177 in src/array/array_sharded_ext.rs

View check run for this annotation

Codecov / codecov/patch

src/array/array_sharded_ext.rs#L177

Added line #L177 was not covered by tests
})?;
if outer_chunks.num_elements() != 1 {
// This should not happen, but it is checked just in case.
return Err(ArrayError::InvalidChunkGridIndicesError(
inner_chunk_indices.to_vec(),
));

Check warning on line 183 in src/array/array_sharded_ext.rs

View check run for this annotation

Codecov / codecov/patch

src/array/array_sharded_ext.rs#L181-L183

Added lines #L181 - L183 were not covered by tests
}
let shard_indices = outer_chunks.start();
let shard_origin = self.chunk_origin(shard_indices)?;
let shard_subset = array_subset.relative_to(&shard_origin)?;

let partial_decoder = cache.retrieve(self, shard_indices)?;
Ok(partial_decoder
.partial_decode_opt(&[shard_subset], options)?
.pop()
.expect("partial_decode_opt called with one subset, returned without error"))
} else {
self.retrieve_chunk_opt(inner_chunk_indices, options)
}
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use crate::{
array::{
codec::array_to_bytes::sharding::ShardingCodecBuilder, ArrayBuilder, DataType,
FillValue,
},
array_subset::ArraySubset,
storage::store::MemoryStore,
};

use super::*;

#[test]
fn array_sharded_ext() -> Result<(), Box<dyn std::error::Error>> {
let store = Arc::new(MemoryStore::default());
let array_path = "/array";
let array = ArrayBuilder::new(
vec![8, 8], // array shape
DataType::UInt8,
vec![4, 4].try_into()?, // regular chunk shape
FillValue::from(0u8),
)
.array_to_bytes_codec(Box::new(
ShardingCodecBuilder::new(vec![2, 2].try_into()?)
.bytes_to_bytes_codecs(vec![
#[cfg(feature = "gzip")]
Box::new(crate::array::codec::GzipCodec::new(5)?),
])
.build(),
))
.build(store, array_path)?;

let data: Vec<u8> = (0..array.shape().into_iter().product())
.map(|i| i as u8)
.collect();

array.store_array_subset(&ArraySubset::new_with_shape(array.shape().to_vec()), data)?;

assert!(array.is_sharded());

assert_eq!(array.inner_chunk_shape(), Some(vec![2, 2].try_into()?));

let inner_chunk_grid = array.inner_chunk_grid();
assert_eq!(
inner_chunk_grid.grid_shape(array.shape())?,
Some(vec![4, 4])
);

let cache = ArrayShardedReadableExtCache::new(&array);
let compare = array.retrieve_array_subset(&ArraySubset::new_with_ranges(&[4..6, 6..8]))?;
let test: Vec<u8> = array.retrieve_inner_chunk(&cache, &[2, 3])?;
assert_eq!(compare, test);
assert_eq!(cache.len(), 1);

Ok(())
}

#[test]
fn array_sharded_ext_unsharded() -> Result<(), Box<dyn std::error::Error>> {
let store = Arc::new(MemoryStore::default());
let array_path = "/array";
let array = ArrayBuilder::new(
vec![8, 8], // array shape
DataType::UInt8,
vec![4, 4].try_into()?, // regular chunk shape
FillValue::from(0u8),
)
.build(store, array_path)?;

let data: Vec<u8> = (0..array.shape().into_iter().product())
.map(|i| i as u8)
.collect();

array.store_array_subset(&ArraySubset::new_with_shape(array.shape().to_vec()), data)?;

assert!(!array.is_sharded());

assert_eq!(array.inner_chunk_shape(), None);

let inner_chunk_grid = array.inner_chunk_grid();
assert_eq!(
inner_chunk_grid.grid_shape(array.shape())?,
Some(vec![2, 2])
);

let cache = ArrayShardedReadableExtCache::new(&array);
let compare = array.retrieve_array_subset(&ArraySubset::new_with_ranges(&[4..8, 4..8]))?;
let test: Vec<u8> = array.retrieve_inner_chunk(&cache, &[1, 1])?;
assert_eq!(compare, test);
assert!(cache.is_empty());

Ok(())
}
}

0 comments on commit fe7ae6b

Please sign in to comment.