Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(feat): full v2 compat via python fallback #84

Merged
merged 28 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c7fb95a
chore(deps): bump zarr to 3.0.0rc1
LDeakin Jan 3, 2025
0a877e0
fmt
LDeakin Jan 3, 2025
def2e70
(feat): python fallack
ilan-gold Jan 28, 2025
622287d
Merge branch 'main' into ig/python_fallback
ilan-gold Jan 29, 2025
b362759
(fix): dtypes
ilan-gold Jan 30, 2025
fba8226
(fix): `object` dtypes + `v2` tests
ilan-gold Jan 30, 2025
4aa21a3
(fix): `object` dtypes + `v2` tests
ilan-gold Jan 30, 2025
a51e810
(fix): `object` dtypes + `v2` tests
ilan-gold Jan 30, 2025
19e90e3
(fix): `object` dtypes in rust
ilan-gold Feb 2, 2025
4a59ec1
(fix): blosc support
ilan-gold Feb 2, 2025
45efee1
(refactor): handle `None` fill-value more gracefully
ilan-gold Feb 2, 2025
59e60fc
fix: V2 codec pipeline creation
LDeakin Feb 3, 2025
1a6dc77
fix: zfpy/pcodec metadata handling
LDeakin Feb 3, 2025
008fd6a
(fix): fall back for unsupported codecs
ilan-gold Feb 4, 2025
9a0daa9
(fix): our decode codec pipeline does not support vlen
ilan-gold Feb 4, 2025
4637d24
(fix): string dtype test to match zarr-python
ilan-gold Feb 4, 2025
cf2e6b5
(chore): add note
ilan-gold Feb 4, 2025
00e73ed
(fix): ruff
ilan-gold Feb 4, 2025
d8aa2cc
(fix): rustfmt
ilan-gold Feb 4, 2025
8ea80bc
(fix): `pyi`
ilan-gold Feb 4, 2025
db255a9
(fix): try removing zarr main branch dep
ilan-gold Feb 4, 2025
cb4bedc
fix: use upstream implicit fill values
LDeakin Feb 5, 2025
26ee516
fix: use upstream metadata handling
LDeakin Feb 5, 2025
6ff6c2b
fix: cleanup fill value handling for string dtype
LDeakin Feb 7, 2025
abe4dd5
Revert "fix: cleanup fill value handling for string dtype"
LDeakin Feb 7, 2025
a618605
fix: cleanup fill value handling for string dtype
LDeakin Feb 7, 2025
4159751
fix: fmt and clippy warnings
LDeakin Feb 7, 2025
ae194a6
fix: zarr-python 0 fill value handling
LDeakin Feb 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pyo3-stub-gen = "0.6.2"
opendal = { version = "0.51.0", features = ["services-http"] }
tokio = { version = "1.41.1", features = ["rt-multi-thread"] }
zarrs_opendal = "0.5.0"
zarrs_metadata = "0.3.3" # require recent zarr-python compatibility fixes (remove with zarrs 0.20)

[profile.release]
lto = true
11 changes: 7 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ You can then use your `zarr` as normal (with some caveats)!

## API

We export a `ZarrsCodecPipeline` class so that `zarr-python` can use the class but it is not meant to be instantiated and we do not guarantee the stability of its API beyond what is required so that `zarr-python` can use it. Therefore, it is not documented here. We also export two errors, `DiscontiguousArrayError` and `CollapsedDimensionError` that can be thrown in the process of converting to indexers that `zarrs` can understand (see below for more details).
We export a `ZarrsCodecPipeline` class so that `zarr-python` can use the class but it is not meant to be instantiated and we do not guarantee the stability of its API beyond what is required so that `zarr-python` can use it. Therefore, it is not documented here.

At the moment, we only support a subset of the `zarr-python` stores:

Expand Down Expand Up @@ -86,7 +86,7 @@ Chunk concurrency is typically favored because:

## Supported Indexing Methods

We **do not** officially support the following indexing methods. Some of these methods may error out, others may not:
The following methods will trigger use with the old zarr-python pipeline:

1. Any `oindex` or `vindex` integer `np.ndarray` indexing with dimensionality >=3 i.e.,

Expand Down Expand Up @@ -116,6 +116,9 @@ We **do not** officially support the following indexing methods. Some of these
arr[0:10, ..., 0:5]
```

Otherwise, we believe that we support your indexing case: slices, ints, and all integer `np.ndarray` indices in 2D for reading, contiguous integer `np.ndarray` indices along one axis for writing etc. Please file an issue if you believe we have more holes in our coverage than we are aware of or you wish to contribute! For example, we have an [issue in zarrs for integer-array indexing](https://github.com/LDeakin/zarrs/issues/52) that would unblock a lot of these issues!

That being said, using non-contiguous integer `np.ndarray` indexing for reads may not be as fast as expected given the performance of other supported methods. Until `zarrs` supports integer indexing, only fetching chunks is done in `rust` while indexing then occurs in `python`.
Furthermore, using anything except contiguous (i.e., slices or consecutive integer) `np.ndarray` for numeric data will fall back to the default `zarr-python` implementation.

Please file an issue if you believe we have more holes in our coverage than we are aware of or you wish to contribute! For example, we have an [issue in zarrs for integer-array indexing](https://github.com/LDeakin/zarrs/issues/52) that would unblock a lot the use of the rust pipeline for that use-case (very useful for mini-batch training perhaps!).

Further, any codecs not supported by `zarrs` will also automatically fall back to the python implementation.
13 changes: 0 additions & 13 deletions docs/api.md

This file was deleted.

1 change: 0 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,5 @@
:hidden: true
:maxdepth: 1

api
contributing
```
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dependencies = [
'donfig',
'pytest',
'universal_pathlib>=0.2.0',
'zarr>=3.0.0',
"zarr",
]

[project.optional-dependencies]
Expand Down
4 changes: 0 additions & 4 deletions python/zarrs/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import typing
from enum import Enum, auto

import numpy
import numpy.typing

class Basic:
Expand All @@ -27,9 +26,6 @@ class CodecPipelineImpl:
chunk_descriptions: typing.Sequence[WithSubset],
value: numpy.typing.NDArray[typing.Any],
) -> None: ...
def retrieve_chunks(
self, chunk_descriptions: typing.Sequence[Basic]
) -> list[numpy.typing.NDArray[numpy.uint8]]: ...
def store_chunks_with_indices(
self,
chunk_descriptions: typing.Sequence[WithSubset],
Expand Down
154 changes: 115 additions & 39 deletions python/zarrs/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@

import asyncio
import json
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING, TypedDict

import numpy as np
from zarr.abc.codec import Codec, CodecPipeline
from zarr.core import BatchedCodecPipeline
from zarr.core.config import config

if TYPE_CHECKING:
from collections.abc import Iterable, Iterator
from collections.abc import Generator, Iterable, Iterator
from typing import Any, Self

from zarr.abc.store import ByteGetter, ByteSetter
Expand All @@ -20,28 +22,64 @@
from zarr.core.common import ChunkCoords
from zarr.core.indexing import SelectorTuple

from ._internal import CodecPipelineImpl
from ._internal import CodecPipelineImpl, codec_metadata_v2_to_v3
from .utils import (
CollapsedDimensionError,
DiscontiguousArrayError,
make_chunk_info_for_rust,
FillValueNoneError,
make_chunk_info_for_rust_with_indices,
)


def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl:
return CodecPipelineImpl(
codec_metadata_json,
validate_checksums=config.get("codec_pipeline.validate_checksums", None),
store_empty_chunks=config.get("array.write_empty_chunks", None),
chunk_concurrent_minimum=config.get(
"codec_pipeline.chunk_concurrent_minimum", None
),
chunk_concurrent_maximum=config.get(
"codec_pipeline.chunk_concurrent_maximum", None
),
num_threads=config.get("threading.max_workers", None),
)
class UnsupportedDataTypeError(Exception):
pass


class UnsupportedMetadataError(Exception):
pass


def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl | None:
try:
return CodecPipelineImpl(
codec_metadata_json,
validate_checksums=config.get("codec_pipeline.validate_checksums", None),
store_empty_chunks=config.get("array.write_empty_chunks", None),
chunk_concurrent_minimum=config.get(
"codec_pipeline.chunk_concurrent_minimum", None
),
chunk_concurrent_maximum=config.get(
"codec_pipeline.chunk_concurrent_maximum", None
),
num_threads=config.get("threading.max_workers", None),
)
except TypeError as e:
if re.match(r"codec (delta|zlib) is not supported", str(e)):
return None
else:
raise e


def codecs_to_dict(codecs: Iterable[Codec]) -> Generator[dict[str, Any], None, None]:
for codec in codecs:
if codec.__class__.__name__ == "V2Codec":
codec_dict = codec.to_dict()
if codec_dict.get("filters", None) is not None:
filters = [
json.dumps(filter.get_config())
for filter in codec_dict.get("filters")
]
else:
filters = None
if codec_dict.get("compressor", None) is not None:
compressor = json.dumps(codec_dict.get("compressor").get_config())
else:
compressor = None
codecs_v3 = codec_metadata_v2_to_v3(filters, compressor)
for codec in codecs_v3:
yield json.loads(codec)
else:
yield codec.to_dict()


class ZarrsCodecPipelineState(TypedDict):
Expand All @@ -52,8 +90,9 @@ class ZarrsCodecPipelineState(TypedDict):
@dataclass
class ZarrsCodecPipeline(CodecPipeline):
codecs: tuple[Codec, ...]
impl: CodecPipelineImpl
impl: CodecPipelineImpl | None
codec_metadata_json: str
python_impl: BatchedCodecPipeline

def __getstate__(self) -> ZarrsCodecPipelineState:
return {"codec_metadata_json": self.codec_metadata_json, "codecs": self.codecs}
Expand All @@ -62,13 +101,14 @@ def __setstate__(self, state: ZarrsCodecPipelineState):
self.codecs = state["codecs"]
self.codec_metadata_json = state["codec_metadata_json"]
self.impl = get_codec_pipeline_impl(self.codec_metadata_json)
self.python_impl = BatchedCodecPipeline.from_codecs(self.codecs)

def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
raise NotImplementedError("evolve_from_array_spec")

@classmethod
def from_codecs(cls, codecs: Iterable[Codec]) -> Self:
codec_metadata = [codec.to_dict() for codec in codecs]
codec_metadata = list(codecs_to_dict(codecs))
codec_metadata_json = json.dumps(codec_metadata)
# TODO: upstream zarr-python has not settled on how to deal with configs yet
# Should they be checked when an array is created, or when an operation is performed?
Expand All @@ -78,6 +118,7 @@ def from_codecs(cls, codecs: Iterable[Codec]) -> Self:
codec_metadata_json=codec_metadata_json,
codecs=tuple(codecs),
impl=get_codec_pipeline_impl(codec_metadata_json),
python_impl=BatchedCodecPipeline.from_codecs(codecs),
)

@property
Expand Down Expand Up @@ -120,29 +161,32 @@ async def read(
drop_axes: tuple[int, ...] = (), # FIXME: unused
) -> None:
# FIXME: Error if array is not in host memory
out: NDArrayLike = out.as_ndarray_like()
if not out.dtype.isnative:
raise RuntimeError("Non-native byte order not supported")
try:
if self.impl is None:
raise UnsupportedMetadataError()
self._raise_error_on_unsupported_batch_dtype(batch_info)
chunks_desc = make_chunk_info_for_rust_with_indices(
batch_info, drop_axes, out.shape
)
except (DiscontiguousArrayError, CollapsedDimensionError):
chunks_desc = make_chunk_info_for_rust(batch_info)
except (
UnsupportedMetadataError,
DiscontiguousArrayError,
CollapsedDimensionError,
UnsupportedDataTypeError,
FillValueNoneError,
):
await self.python_impl.read(batch_info, out, drop_axes)
return None
else:
out: NDArrayLike = out.as_ndarray_like()
await asyncio.to_thread(
self.impl.retrieve_chunks_and_apply_index,
chunks_desc,
out,
)
return None
chunks = await asyncio.to_thread(self.impl.retrieve_chunks, chunks_desc)
for chunk, (_, spec, selection, out_selection) in zip(chunks, batch_info):
chunk_reshaped = chunk.view(spec.dtype).reshape(spec.shape)
chunk_selected = chunk_reshaped[selection]
if drop_axes:
chunk_selected = np.squeeze(chunk_selected, axis=drop_axes)
out[out_selection] = chunk_selected

async def write(
self,
Expand All @@ -152,14 +196,46 @@ async def write(
value: NDBuffer, # type: ignore
drop_axes: tuple[int, ...] = (),
) -> None:
# FIXME: Error if array is not in host memory
value: NDArrayLike | np.ndarray = value.as_ndarray_like()
if not value.dtype.isnative:
value = np.ascontiguousarray(value, dtype=value.dtype.newbyteorder("="))
elif not value.flags.c_contiguous:
value = np.ascontiguousarray(value)
chunks_desc = make_chunk_info_for_rust_with_indices(
batch_info, drop_axes, value.shape
)
await asyncio.to_thread(self.impl.store_chunks_with_indices, chunks_desc, value)
return None
try:
if self.impl is None:
raise UnsupportedMetadataError()
self._raise_error_on_unsupported_batch_dtype(batch_info)
chunks_desc = make_chunk_info_for_rust_with_indices(
batch_info, drop_axes, value.shape
)
except (
UnsupportedMetadataError,
DiscontiguousArrayError,
CollapsedDimensionError,
UnsupportedDataTypeError,
FillValueNoneError,
):
await self.python_impl.write(batch_info, value, drop_axes)
return None
else:
# FIXME: Error if array is not in host memory
value_np: NDArrayLike | np.ndarray = value.as_ndarray_like()
if not value_np.dtype.isnative:
value_np = np.ascontiguousarray(
value_np, dtype=value_np.dtype.newbyteorder("=")
)
elif not value_np.flags.c_contiguous:
value_np = np.ascontiguousarray(value_np)
await asyncio.to_thread(
self.impl.store_chunks_with_indices, chunks_desc, value_np
)
return None

def _raise_error_on_unsupported_batch_dtype(
self,
batch_info: Iterable[
tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]
],
):
# https://github.com/LDeakin/zarrs/blob/0532fe983b7b42b59dbf84e50a2fe5e6f7bad4ce/zarrs_metadata/src/v2_to_v3.rs#L289-L293 for VSUMm
# Further, our pipeline does not support variable-length objects due to limitations on decode_into, so object is also out
if any(
info.dtype.kind in {"V", "S", "U", "M", "m", "O"}
for (_, info, _, _) in batch_info
):
raise UnsupportedDataTypeError()
34 changes: 21 additions & 13 deletions python/zarrs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import operator
import os
from functools import reduce
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

import numpy as np
from zarr.core.array_spec import ArraySpec
from zarr.core.indexing import SelectorTuple, is_integer
from zarr.core.metadata.v2 import _default_fill_value

from zarrs._internal import Basic, WithSubset

Expand All @@ -15,7 +17,6 @@
from types import EllipsisType

from zarr.abc.store import ByteGetter, ByteSetter
from zarr.core.array_spec import ArraySpec


# adapted from https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
Expand All @@ -31,6 +32,10 @@ class CollapsedDimensionError(Exception):
pass


class FillValueNoneError(Exception):
pass


# This is a (mostly) copy of the function from zarr.core.indexing that fixes:
# DeprecationWarning: Conversion of an array with ndim > 0 to a scalar is deprecated
# TODO: Upstream this fix
Expand Down Expand Up @@ -134,6 +139,12 @@ def get_shape_for_selector(
return resulting_shape_from_index(shape, selector_tuple, drop_axes, pad=pad)


def get_implicit_fill_value(dtype: np.dtype, fill_value: Any) -> Any:
if fill_value is None:
fill_value = _default_fill_value(dtype)
return fill_value


def make_chunk_info_for_rust_with_indices(
batch_info: Iterable[
tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]
Expand All @@ -144,6 +155,14 @@ def make_chunk_info_for_rust_with_indices(
shape = shape if shape else (1,) # constant array
chunk_info_with_indices: list[WithSubset] = []
for byte_getter, chunk_spec, chunk_selection, out_selection in batch_info:
if chunk_spec.fill_value is None:
chunk_spec = ArraySpec(
chunk_spec.shape,
chunk_spec.dtype,
get_implicit_fill_value(chunk_spec.dtype, chunk_spec.fill_value),
chunk_spec.config,
chunk_spec.prototype,
)
chunk_info = Basic(byte_getter, chunk_spec)
out_selection_as_slices = selector_tuple_to_slice_selection(out_selection)
chunk_selection_as_slices = selector_tuple_to_slice_selection(chunk_selection)
Expand All @@ -169,14 +188,3 @@ def make_chunk_info_for_rust_with_indices(
)
)
return chunk_info_with_indices


def make_chunk_info_for_rust(
batch_info: Iterable[
tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]
],
) -> list[Basic]:
return [
Basic(byte_interface, chunk_spec)
for (byte_interface, chunk_spec, _, _) in batch_info
]
Loading