Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add HTTP store support #61

Merged
merged 48 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
c2ac57b
chore(deps): bump zarrs to 0.18.0
LDeakin Nov 27, 2024
4074328
feat: add HTTP store support
LDeakin Nov 27, 2024
2e8599c
update stubs
LDeakin Nov 27, 2024
f61183e
fix: disallow storage_options for HTTP store
LDeakin Nov 28, 2024
3f2d077
add another todo for store info on codec init
LDeakin Nov 28, 2024
bf4e365
Merge remote-tracking branch 'origin/main' into ld/http_store
LDeakin Nov 29, 2024
390f23b
Merge branch 'main' into ld/http_store
LDeakin Dec 2, 2024
3244bee
refactor store initialisation
LDeakin Dec 6, 2024
95ba17b
try build with latest ring
LDeakin Dec 6, 2024
fa4fca5
try add nasm
LDeakin Dec 6, 2024
38636fd
Revert "try add nasm"
LDeakin Dec 6, 2024
3bf21da
Revert "try build with latest ring"
LDeakin Dec 6, 2024
1bae158
CD: bump maturin to 1.7.8
LDeakin Dec 7, 2024
11f4c87
set python architecture for windows aarch64
LDeakin Dec 7, 2024
4020a7c
Revert "set python architecture for windows aarch64"
LDeakin Dec 7, 2024
08476b3
Set MATURIN_USE_XWIN=1 on windows aarch64
LDeakin Dec 7, 2024
56d69c6
Exclude windows aarch64
LDeakin Dec 7, 2024
96c1a24
fix: store/str type hints
LDeakin Dec 9, 2024
d57e037
fix: get_store to get_store_from_config
LDeakin Dec 9, 2024
86883c5
fix: use match in StoreConfig::extract_bound
LDeakin Dec 9, 2024
5e80adf
fix: elide 'py lifetimes in HTTPStoreConfig
LDeakin Dec 9, 2024
31e9101
fix: remove unneeded return statement
LDeakin Dec 9, 2024
60e867b
fix: address clippy::upper_case_acronyms
LDeakin Dec 9, 2024
63fa081
fix: address clippy::single_match_else
LDeakin Dec 9, 2024
301f6d5
Revert "CD: bump maturin to 1.7.8"
LDeakin Dec 9, 2024
190becd
fix: match on remote store name in StoreConfig::extract_bound
LDeakin Dec 9, 2024
be244e1
fix: remove MATURIN_USE_XWIN
LDeakin Dec 9, 2024
d1e3405
fix: remove zarrs config setup in http tests
LDeakin Dec 9, 2024
dd3ac5b
Merge branch 'main' into ld/http_store
flying-sheep Dec 10, 2024
71f1698
fix: TryFrom<&StoreConfig> for store
LDeakin Dec 10, 2024
0643735
fix: add StoreConfig super class
LDeakin Dec 10, 2024
94806cf
fix: convert local store root to string
LDeakin Dec 10, 2024
9b70af1
fix: change unsupported store to NotImplementedError
LDeakin Dec 10, 2024
0a3bf0f
fix: add docs for HTTP store
LDeakin Dec 10, 2024
6e42799
refactor: move store logic into store module
LDeakin Dec 10, 2024
db7c622
refactor: remove CodecPipelineStore
LDeakin Dec 10, 2024
c288bda
fix: return store directly from opendal_builder_to_sync_store
LDeakin Dec 10, 2024
428710a
fix: use zarrs::ReadableWritableListableStorage
LDeakin Dec 10, 2024
e0704dc
Merge branch 'main' into ld/http_store
LDeakin Dec 10, 2024
fd0c6b4
fix(docs): clarify NotImplementedError
LDeakin Dec 11, 2024
78d9642
Revert store enum changes
flying-sheep Dec 12, 2024
028826e
fmt
flying-sheep Dec 12, 2024
97f9428
update stubs
flying-sheep Dec 12, 2024
41ee02e
wtf
flying-sheep Dec 12, 2024
5873e2a
fix: http store root to endpoint
LDeakin Dec 12, 2024
223c57d
Instantiate Rust containers from Python (#72)
flying-sheep Dec 14, 2024
014b04f
Abstract away store manager
flying-sheep Dec 14, 2024
2296ca1
fix(deps): constrain zarr<=3.0.0b3
LDeakin Dec 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ jobs:
- { os: linux, manylinux: musllinux_1_2, target: armv7 }
# windows
- { os: windows, target: i686, python-architecture: x86 }
exclude:
# https://github.com/rust-cross/cargo-xwin/issues/76
- os: windows
target: aarch64
runs-on: ${{ (matrix.os == 'linux' && 'ubuntu') || matrix.os }}-latest
steps:
- uses: actions/checkout@v4
Expand Down
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ numpy = "0.23.0"
unsafe_cell_slice = "0.2.0"
serde_json = "1.0.128"
pyo3-stub-gen = "0.6.1"
opendal = { version = "0.50.2", features = ["services-http"] }
tokio = { version = "1.41.1", features = ["rt-multi-thread"] }
zarrs_opendal = "0.4.0"

[profile.release]
lto = true
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,14 @@ You can then use your `zarr` as normal (with some caveats)!

We export a `ZarrsCodecPipeline` class so that `zarr-python` can use the class but it is not meant to be instantiated and we do not guarantee the stability of its API beyond what is required so that `zarr-python` can use it. Therefore, it is not documented here. We also export two errors, `DiscontiguousArrayError` and `CollapsedDimensionError` that can be thrown in the process of converting to indexers that `zarrs` can understand (see below for more details).

At the moment, we only support local filesystems but intend to support more in the future: https://github.com/ilan-gold/zarrs-python/issues/44
At the moment, we only support a subset of the `zarr-python` stores:

- [x] [LocalStore](https://zarr.readthedocs.io/en/main/_autoapi/zarr/storage/local/index.html) (FileSystem)
- [RemoteStore](https://zarr.readthedocs.io/en/main/_autoapi/zarr/storage/remote/index.html)
- [x] [HTTPFileSystem](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.implementations.http.HTTPFileSystem)

A `NotImplementedError` will be raised if a store is not supported.
We intend to support more stores in the future: https://github.com/ilan-gold/zarrs-python/issues/44.

### Configuration

Expand Down
21 changes: 18 additions & 3 deletions python/zarrs/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# ruff: noqa: E501, F401

import typing
from enum import Enum, auto

import numpy
import numpy.typing
Expand All @@ -21,7 +22,9 @@ class CodecPipelineImpl:
self,
chunk_descriptions: typing.Sequence[
tuple[
tuple[str, typing.Sequence[int], str, typing.Sequence[int]],
tuple[
StoreConfig, str, typing.Sequence[int], str, typing.Sequence[int]
],
typing.Sequence[slice],
typing.Sequence[slice],
]
Expand All @@ -31,17 +34,29 @@ class CodecPipelineImpl:
def retrieve_chunks(
self,
chunk_descriptions: typing.Sequence[
tuple[str, typing.Sequence[int], str, typing.Sequence[int]]
tuple[StoreConfig, str, typing.Sequence[int], str, typing.Sequence[int]]
],
) -> list[numpy.typing.NDArray[numpy.uint8]]: ...
def store_chunks_with_indices(
self,
chunk_descriptions: typing.Sequence[
tuple[
tuple[str, typing.Sequence[int], str, typing.Sequence[int]],
tuple[
StoreConfig, str, typing.Sequence[int], str, typing.Sequence[int]
],
typing.Sequence[slice],
typing.Sequence[slice],
]
],
value: numpy.NDArray[typing.Any],
) -> None: ...

class FilesystemStoreConfig:
root: str

class HttpStoreConfig:
root: str

class StoreConfig(Enum):
Filesystem = auto()
Http = auto()
13 changes: 7 additions & 6 deletions python/zarrs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from collections.abc import Iterable
from types import EllipsisType

from zarr.abc.store import ByteGetter, ByteSetter
from zarr.abc.store import ByteGetter, ByteSetter, Store
from zarr.core.array_spec import ArraySpec
from zarr.core.common import ChunkCoords

Expand Down Expand Up @@ -63,10 +63,11 @@ def selector_tuple_to_slice_selection(selector_tuple: SelectorTuple) -> list[sli


def convert_chunk_to_primitive(
byte_getter: ByteGetter | ByteSetter, chunk_spec: ArraySpec
) -> tuple[str, ChunkCoords, str, Any]:
byte_interface: ByteGetter | ByteSetter, chunk_spec: ArraySpec
) -> tuple[Store, str, ChunkCoords, str, Any]:
return (
str(byte_getter),
byte_interface.store,
byte_interface.path,
chunk_spec.shape,
str(chunk_spec.dtype),
chunk_spec.fill_value.tobytes(),
Expand Down Expand Up @@ -149,7 +150,7 @@ def make_chunk_info_for_rust_with_indices(
tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]
],
drop_axes: tuple[int, ...],
) -> list[tuple[tuple[str, ChunkCoords, str, Any], list[slice], list[slice]]]:
) -> list[tuple[tuple[Store, str, ChunkCoords, str, Any], list[slice], list[slice]]]:
chunk_info_with_indices = []
for byte_getter, chunk_spec, chunk_selection, out_selection in batch_info:
chunk_info = convert_chunk_to_primitive(byte_getter, chunk_spec)
Expand Down Expand Up @@ -178,7 +179,7 @@ def make_chunk_info_for_rust(
batch_info: Iterable[
tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]
],
) -> list[tuple[str, ChunkCoords, str, Any]]:
) -> list[tuple[Store, str, ChunkCoords, str, Any]]:
return list(
convert_chunk_to_primitive(byte_getter, chunk_spec)
for (byte_getter, chunk_spec, _, _) in batch_info
Expand Down
45 changes: 29 additions & 16 deletions src/chunk_item.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{num::NonZeroU64, sync::Arc};
use std::num::NonZeroU64;

use pyo3::{
exceptions::{PyRuntimeError, PyValueError},
Expand All @@ -9,13 +9,15 @@ use zarrs::{
array::{ChunkRepresentation, DataType, FillValue},
array_subset::ArraySubset,
metadata::v3::{array::data_type::DataTypeMetadataV3, MetadataV3},
storage::{MaybeBytes, ReadableWritableListableStorageTraits, StorageError, StoreKey},
storage::{MaybeBytes, ReadableWritableListableStorage, StorageError, StoreKey},
};

use crate::utils::PyErrExt;
use crate::{utils::PyErrExt, StoreConfig};

pub(crate) type Raw<'a> = (
// store path
// store
StoreConfig,
// path
String,
// shape
Vec<u64>,
Expand All @@ -34,17 +36,18 @@ pub(crate) type RawWithIndices<'a> = (
);

pub(crate) trait IntoItem<T, S>: std::marker::Sized {
fn store_path(&self) -> &str;
fn store_config(&self) -> &StoreConfig;
fn path(&self) -> &str;
fn into_item(
self,
store: Arc<dyn ReadableWritableListableStorageTraits>,
store: ReadableWritableListableStorage,
key: StoreKey,
shape: S,
) -> PyResult<T>;
}

pub(crate) trait ChunksItem {
fn store(&self) -> Arc<dyn ReadableWritableListableStorageTraits>;
fn store(&self) -> ReadableWritableListableStorage;
fn key(&self) -> &StoreKey;
fn representation(&self) -> &ChunkRepresentation;

Expand All @@ -54,7 +57,7 @@ pub(crate) trait ChunksItem {
}

pub(crate) struct Basic {
store: Arc<dyn ReadableWritableListableStorageTraits>,
store: ReadableWritableListableStorage,
key: StoreKey,
representation: ChunkRepresentation,
}
Expand All @@ -66,7 +69,7 @@ pub(crate) struct WithSubset {
}

impl ChunksItem for Basic {
fn store(&self) -> Arc<dyn ReadableWritableListableStorageTraits> {
fn store(&self) -> ReadableWritableListableStorage {
self.store.clone()
}
fn key(&self) -> &StoreKey {
Expand All @@ -78,7 +81,7 @@ impl ChunksItem for Basic {
}

impl ChunksItem for WithSubset {
fn store(&self) -> Arc<dyn ReadableWritableListableStorageTraits> {
fn store(&self) -> ReadableWritableListableStorage {
self.item.store.clone()
}
fn key(&self) -> &StoreKey {
Expand All @@ -90,16 +93,21 @@ impl ChunksItem for WithSubset {
}

impl<'a> IntoItem<Basic, ()> for Raw<'a> {
fn store_path(&self) -> &str {
fn store_config(&self) -> &StoreConfig {
&self.0
}

fn path(&self) -> &str {
&self.1
}

fn into_item(
self,
store: Arc<dyn ReadableWritableListableStorageTraits>,
store: ReadableWritableListableStorage,
key: StoreKey,
(): (),
) -> PyResult<Basic> {
let (_, chunk_shape, dtype, fill_value) = self;
let (_, _, chunk_shape, dtype, fill_value) = self;
let representation = get_chunk_representation(chunk_shape, &dtype, fill_value)?;
Ok(Basic {
store,
Expand All @@ -110,17 +118,22 @@ impl<'a> IntoItem<Basic, ()> for Raw<'a> {
}

impl IntoItem<WithSubset, &[u64]> for RawWithIndices<'_> {
fn store_path(&self) -> &str {
fn store_config(&self) -> &StoreConfig {
&self.0 .0
}

fn path(&self) -> &str {
&self.0 .1
}

fn into_item(
self,
store: Arc<dyn ReadableWritableListableStorageTraits>,
store: ReadableWritableListableStorage,
key: StoreKey,
shape: &[u64],
) -> PyResult<WithSubset> {
let (raw, selection, chunk_selection) = self;
let chunk_shape = raw.1.clone();
let chunk_shape = raw.2.clone();
let item = raw.into_item(store.clone(), key, ())?;
Ok(WithSubset {
item,
Expand Down
47 changes: 0 additions & 47 deletions src/codec_pipeline_store_filesystem.rs

This file was deleted.

42 changes: 17 additions & 25 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![warn(clippy::pedantic)]
#![allow(clippy::module_name_repetitions)]

use chunk_item::{ChunksItem, IntoItem};
use concurrency::ChunkConcurrentLimitAndCodecOptions;
Expand All @@ -12,6 +13,7 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon_iter_concurrent_limit::iter_concurrent_limit;
use std::borrow::Cow;
use std::sync::{Arc, Mutex};
use store::StoreConfig;
use unsafe_cell_slice::UnsafeCellSlice;
use zarrs::array::codec::{
ArrayToBytesCodecTraits, CodecOptions, CodecOptionsBuilder, StoragePartialDecoder,
Expand All @@ -21,58 +23,46 @@ use zarrs::array::{
};
use zarrs::array_subset::ArraySubset;
use zarrs::metadata::v3::MetadataV3;
use zarrs::storage::{ReadableWritableListableStorageTraits, StorageHandle, StoreKey};
use zarrs::storage::{ReadableWritableListableStorage, StorageHandle, StoreKey};

mod chunk_item;
mod codec_pipeline_store_filesystem;
mod concurrency;
mod runtime;
mod store;
#[cfg(test)]
mod tests;
mod utils;

use codec_pipeline_store_filesystem::CodecPipelineStoreFilesystem;
use utils::{PyErrExt, PyUntypedArrayExt};

trait CodecPipelineStore: Send + Sync {
fn store(&self) -> Arc<dyn ReadableWritableListableStorageTraits>;
fn chunk_path(&self, store_path: &str) -> PyResult<String>;
}

// TODO: Use a OnceLock for store with get_or_try_init when stabilised?
#[gen_stub_pyclass]
#[pyclass]
pub struct CodecPipelineImpl {
pub(crate) codec_chain: Arc<CodecChain>,
pub(crate) store: Mutex<Option<Arc<dyn CodecPipelineStore>>>,
pub(crate) store: Mutex<Option<ReadableWritableListableStorage>>,
pub(crate) codec_options: CodecOptions,
pub(crate) chunk_concurrent_minimum: usize,
pub(crate) chunk_concurrent_maximum: usize,
pub(crate) num_threads: usize,
}

impl CodecPipelineImpl {
fn get_store_and_path(
fn get_store_from_config(
&self,
store_path: &str,
) -> PyResult<(Arc<dyn ReadableWritableListableStorageTraits>, String)> {
config: &StoreConfig,
) -> PyResult<ReadableWritableListableStorage> {
let mut gstore = self.store.lock().map_err(|_| {
PyErr::new::<PyRuntimeError, _>("failed to lock the store mutex".to_string())
})?;

#[allow(clippy::collapsible_if)]
if gstore.is_none() {
if store_path.starts_with("file://") {
*gstore = Some(Arc::new(CodecPipelineStoreFilesystem::new()?));
}
// TODO: Add support for more stores
}

// TODO: Request upstream change to get store on codec pipeline initialisation, do not want to do all of this here
if let Some(gstore) = gstore.as_ref() {
Ok((gstore.store(), gstore.chunk_path(store_path)?))
Ok(gstore.clone())
} else {
Err(PyErr::new::<PyTypeError, _>(format!(
"unsupported store for {store_path}"
)))
let store: ReadableWritableListableStorage = config.try_into()?;
*gstore = Some(store.clone());
Ok(store)
}
}

Expand All @@ -84,7 +74,9 @@ impl CodecPipelineImpl {
chunk_descriptions
.into_iter()
.map(|raw| {
let (store, path) = self.get_store_and_path(raw.store_path())?;
// TODO: Prefer to get the store once, and assume it is the same for all chunks
let store = self.get_store_from_config(raw.store_config())?;
let path = raw.path();
let key = StoreKey::new(path).map_py_err::<PyValueError>()?;
raw.into_item(store, key, shape)
})
Expand Down
Loading
Loading