Skip to content

Commit

Permalink
(feat): full v2 compat via python fallback (#84)
Browse files Browse the repository at this point in the history
* chore(deps): bump zarr to 3.0.0rc1

* fmt

* (feat): python fallack

* (fix):  dtypes

* (fix): `object` dtypes + `v2` tests

* (fix): `object` dtypes + `v2` tests

* (fix): `object` dtypes + `v2` tests

* (fix): `object` dtypes in rust

* (fix):  blosc support

* (refactor): handle `None` fill-value more gracefully

* fix: V2 codec pipeline creation

* fix: zfpy/pcodec metadata handling

* (fix): fall back for unsupported codecs

* (fix): our decode codec pipeline does not support vlen

* (fix): string dtype test to match zarr-python

* (chore): add note

* (fix): ruff

* (fix): rustfmt

* (fix): `pyi`

* (fix): try removing zarr main branch dep

* fix: use upstream implicit fill values

* fix: use upstream metadata handling

There is a lot of additional logic already taken care of by `zarrs`, like handling multiple versions of codec metadata.

* fix: cleanup fill value handling for string dtype

* Revert "fix: cleanup fill value handling for string dtype"

This reverts commit 6ff6c2b.

* fix: cleanup fill value handling for string dtype

* fix: fmt and clippy warnings

* fix: zarr-python 0 fill value handling

---------

Co-authored-by: Lachlan Deakin <ljdgit@gmail.com>
  • Loading branch information
ilan-gold and LDeakin authored Feb 11, 2025
1 parent 5558c5e commit 1d4e3cb
Show file tree
Hide file tree
Showing 14 changed files with 694 additions and 250 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pyo3-stub-gen = "0.6.2"
opendal = { version = "0.51.0", features = ["services-http"] }
tokio = { version = "1.41.1", features = ["rt-multi-thread"] }
zarrs_opendal = "0.5.0"
zarrs_metadata = "0.3.3" # require recent zarr-python compatibility fixes (remove with zarrs 0.20)

[profile.release]
lto = true
11 changes: 7 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ You can then use your `zarr` as normal (with some caveats)!

## API

We export a `ZarrsCodecPipeline` class so that `zarr-python` can use the class but it is not meant to be instantiated and we do not guarantee the stability of its API beyond what is required so that `zarr-python` can use it. Therefore, it is not documented here. We also export two errors, `DiscontiguousArrayError` and `CollapsedDimensionError` that can be thrown in the process of converting to indexers that `zarrs` can understand (see below for more details).
We export a `ZarrsCodecPipeline` class so that `zarr-python` can use the class but it is not meant to be instantiated and we do not guarantee the stability of its API beyond what is required so that `zarr-python` can use it. Therefore, it is not documented here.

At the moment, we only support a subset of the `zarr-python` stores:

Expand Down Expand Up @@ -86,7 +86,7 @@ Chunk concurrency is typically favored because:

## Supported Indexing Methods

We **do not** officially support the following indexing methods. Some of these methods may error out, others may not:
The following methods will trigger use with the old zarr-python pipeline:

1. Any `oindex` or `vindex` integer `np.ndarray` indexing with dimensionality >=3 i.e.,

Expand Down Expand Up @@ -116,6 +116,9 @@ We **do not** officially support the following indexing methods. Some of these
arr[0:10, ..., 0:5]
```

Otherwise, we believe that we support your indexing case: slices, ints, and all integer `np.ndarray` indices in 2D for reading, contiguous integer `np.ndarray` indices along one axis for writing etc. Please file an issue if you believe we have more holes in our coverage than we are aware of or you wish to contribute! For example, we have an [issue in zarrs for integer-array indexing](https://github.com/LDeakin/zarrs/issues/52) that would unblock a lot of these issues!

That being said, using non-contiguous integer `np.ndarray` indexing for reads may not be as fast as expected given the performance of other supported methods. Until `zarrs` supports integer indexing, only fetching chunks is done in `rust` while indexing then occurs in `python`.
Furthermore, using anything except contiguous (i.e., slices or consecutive integer) `np.ndarray` for numeric data will fall back to the default `zarr-python` implementation.

Please file an issue if you believe we have more holes in our coverage than we are aware of or you wish to contribute! For example, we have an [issue in zarrs for integer-array indexing](https://github.com/LDeakin/zarrs/issues/52) that would unblock a lot the use of the rust pipeline for that use-case (very useful for mini-batch training perhaps!).

Further, any codecs not supported by `zarrs` will also automatically fall back to the python implementation.
13 changes: 0 additions & 13 deletions docs/api.md

This file was deleted.

1 change: 0 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,5 @@
:hidden: true
:maxdepth: 1
api
contributing
```
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dependencies = [
'donfig',
'pytest',
'universal_pathlib>=0.2.0',
'zarr>=3.0.0',
"zarr",
]

[project.optional-dependencies]
Expand Down
4 changes: 0 additions & 4 deletions python/zarrs/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import typing
from enum import Enum, auto

import numpy
import numpy.typing

class Basic:
Expand All @@ -27,9 +26,6 @@ class CodecPipelineImpl:
chunk_descriptions: typing.Sequence[WithSubset],
value: numpy.typing.NDArray[typing.Any],
) -> None: ...
def retrieve_chunks(
self, chunk_descriptions: typing.Sequence[Basic]
) -> list[numpy.typing.NDArray[numpy.uint8]]: ...
def store_chunks_with_indices(
self,
chunk_descriptions: typing.Sequence[WithSubset],
Expand Down
154 changes: 115 additions & 39 deletions python/zarrs/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@

import asyncio
import json
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING, TypedDict

import numpy as np
from zarr.abc.codec import Codec, CodecPipeline
from zarr.core import BatchedCodecPipeline
from zarr.core.config import config

if TYPE_CHECKING:
from collections.abc import Iterable, Iterator
from collections.abc import Generator, Iterable, Iterator
from typing import Any, Self

from zarr.abc.store import ByteGetter, ByteSetter
Expand All @@ -20,28 +22,64 @@
from zarr.core.common import ChunkCoords
from zarr.core.indexing import SelectorTuple

from ._internal import CodecPipelineImpl
from ._internal import CodecPipelineImpl, codec_metadata_v2_to_v3
from .utils import (
CollapsedDimensionError,
DiscontiguousArrayError,
make_chunk_info_for_rust,
FillValueNoneError,
make_chunk_info_for_rust_with_indices,
)


def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl:
return CodecPipelineImpl(
codec_metadata_json,
validate_checksums=config.get("codec_pipeline.validate_checksums", None),
store_empty_chunks=config.get("array.write_empty_chunks", None),
chunk_concurrent_minimum=config.get(
"codec_pipeline.chunk_concurrent_minimum", None
),
chunk_concurrent_maximum=config.get(
"codec_pipeline.chunk_concurrent_maximum", None
),
num_threads=config.get("threading.max_workers", None),
)
class UnsupportedDataTypeError(Exception):
pass


class UnsupportedMetadataError(Exception):
pass


def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl | None:
try:
return CodecPipelineImpl(
codec_metadata_json,
validate_checksums=config.get("codec_pipeline.validate_checksums", None),
store_empty_chunks=config.get("array.write_empty_chunks", None),
chunk_concurrent_minimum=config.get(
"codec_pipeline.chunk_concurrent_minimum", None
),
chunk_concurrent_maximum=config.get(
"codec_pipeline.chunk_concurrent_maximum", None
),
num_threads=config.get("threading.max_workers", None),
)
except TypeError as e:
if re.match(r"codec (delta|zlib) is not supported", str(e)):
return None
else:
raise e


def codecs_to_dict(codecs: Iterable[Codec]) -> Generator[dict[str, Any], None, None]:
for codec in codecs:
if codec.__class__.__name__ == "V2Codec":
codec_dict = codec.to_dict()
if codec_dict.get("filters", None) is not None:
filters = [
json.dumps(filter.get_config())
for filter in codec_dict.get("filters")
]
else:
filters = None
if codec_dict.get("compressor", None) is not None:
compressor = json.dumps(codec_dict.get("compressor").get_config())
else:
compressor = None
codecs_v3 = codec_metadata_v2_to_v3(filters, compressor)
for codec in codecs_v3:
yield json.loads(codec)
else:
yield codec.to_dict()


class ZarrsCodecPipelineState(TypedDict):
Expand All @@ -52,8 +90,9 @@ class ZarrsCodecPipelineState(TypedDict):
@dataclass
class ZarrsCodecPipeline(CodecPipeline):
codecs: tuple[Codec, ...]
impl: CodecPipelineImpl
impl: CodecPipelineImpl | None
codec_metadata_json: str
python_impl: BatchedCodecPipeline

def __getstate__(self) -> ZarrsCodecPipelineState:
return {"codec_metadata_json": self.codec_metadata_json, "codecs": self.codecs}
Expand All @@ -62,13 +101,14 @@ def __setstate__(self, state: ZarrsCodecPipelineState):
self.codecs = state["codecs"]
self.codec_metadata_json = state["codec_metadata_json"]
self.impl = get_codec_pipeline_impl(self.codec_metadata_json)
self.python_impl = BatchedCodecPipeline.from_codecs(self.codecs)

def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
raise NotImplementedError("evolve_from_array_spec")

@classmethod
def from_codecs(cls, codecs: Iterable[Codec]) -> Self:
codec_metadata = [codec.to_dict() for codec in codecs]
codec_metadata = list(codecs_to_dict(codecs))
codec_metadata_json = json.dumps(codec_metadata)
# TODO: upstream zarr-python has not settled on how to deal with configs yet
# Should they be checked when an array is created, or when an operation is performed?
Expand All @@ -78,6 +118,7 @@ def from_codecs(cls, codecs: Iterable[Codec]) -> Self:
codec_metadata_json=codec_metadata_json,
codecs=tuple(codecs),
impl=get_codec_pipeline_impl(codec_metadata_json),
python_impl=BatchedCodecPipeline.from_codecs(codecs),
)

@property
Expand Down Expand Up @@ -120,29 +161,32 @@ async def read(
drop_axes: tuple[int, ...] = (), # FIXME: unused
) -> None:
# FIXME: Error if array is not in host memory
out: NDArrayLike = out.as_ndarray_like()
if not out.dtype.isnative:
raise RuntimeError("Non-native byte order not supported")
try:
if self.impl is None:
raise UnsupportedMetadataError()
self._raise_error_on_unsupported_batch_dtype(batch_info)
chunks_desc = make_chunk_info_for_rust_with_indices(
batch_info, drop_axes, out.shape
)
except (DiscontiguousArrayError, CollapsedDimensionError):
chunks_desc = make_chunk_info_for_rust(batch_info)
except (
UnsupportedMetadataError,
DiscontiguousArrayError,
CollapsedDimensionError,
UnsupportedDataTypeError,
FillValueNoneError,
):
await self.python_impl.read(batch_info, out, drop_axes)
return None
else:
out: NDArrayLike = out.as_ndarray_like()
await asyncio.to_thread(
self.impl.retrieve_chunks_and_apply_index,
chunks_desc,
out,
)
return None
chunks = await asyncio.to_thread(self.impl.retrieve_chunks, chunks_desc)
for chunk, (_, spec, selection, out_selection) in zip(chunks, batch_info):
chunk_reshaped = chunk.view(spec.dtype).reshape(spec.shape)
chunk_selected = chunk_reshaped[selection]
if drop_axes:
chunk_selected = np.squeeze(chunk_selected, axis=drop_axes)
out[out_selection] = chunk_selected

async def write(
self,
Expand All @@ -152,14 +196,46 @@ async def write(
value: NDBuffer, # type: ignore
drop_axes: tuple[int, ...] = (),
) -> None:
# FIXME: Error if array is not in host memory
value: NDArrayLike | np.ndarray = value.as_ndarray_like()
if not value.dtype.isnative:
value = np.ascontiguousarray(value, dtype=value.dtype.newbyteorder("="))
elif not value.flags.c_contiguous:
value = np.ascontiguousarray(value)
chunks_desc = make_chunk_info_for_rust_with_indices(
batch_info, drop_axes, value.shape
)
await asyncio.to_thread(self.impl.store_chunks_with_indices, chunks_desc, value)
return None
try:
if self.impl is None:
raise UnsupportedMetadataError()
self._raise_error_on_unsupported_batch_dtype(batch_info)
chunks_desc = make_chunk_info_for_rust_with_indices(
batch_info, drop_axes, value.shape
)
except (
UnsupportedMetadataError,
DiscontiguousArrayError,
CollapsedDimensionError,
UnsupportedDataTypeError,
FillValueNoneError,
):
await self.python_impl.write(batch_info, value, drop_axes)
return None
else:
# FIXME: Error if array is not in host memory
value_np: NDArrayLike | np.ndarray = value.as_ndarray_like()
if not value_np.dtype.isnative:
value_np = np.ascontiguousarray(
value_np, dtype=value_np.dtype.newbyteorder("=")
)
elif not value_np.flags.c_contiguous:
value_np = np.ascontiguousarray(value_np)
await asyncio.to_thread(
self.impl.store_chunks_with_indices, chunks_desc, value_np
)
return None

def _raise_error_on_unsupported_batch_dtype(
self,
batch_info: Iterable[
tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]
],
):
# https://github.com/LDeakin/zarrs/blob/0532fe983b7b42b59dbf84e50a2fe5e6f7bad4ce/zarrs_metadata/src/v2_to_v3.rs#L289-L293 for VSUMm
# Further, our pipeline does not support variable-length objects due to limitations on decode_into, so object is also out
if any(
info.dtype.kind in {"V", "S", "U", "M", "m", "O"}
for (_, info, _, _) in batch_info
):
raise UnsupportedDataTypeError()
34 changes: 21 additions & 13 deletions python/zarrs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import operator
import os
from functools import reduce
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

import numpy as np
from zarr.core.array_spec import ArraySpec
from zarr.core.indexing import SelectorTuple, is_integer
from zarr.core.metadata.v2 import _default_fill_value

from zarrs._internal import Basic, WithSubset

Expand All @@ -15,7 +17,6 @@
from types import EllipsisType

from zarr.abc.store import ByteGetter, ByteSetter
from zarr.core.array_spec import ArraySpec


# adapted from https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
Expand All @@ -31,6 +32,10 @@ class CollapsedDimensionError(Exception):
pass


class FillValueNoneError(Exception):
pass


# This is a (mostly) copy of the function from zarr.core.indexing that fixes:
# DeprecationWarning: Conversion of an array with ndim > 0 to a scalar is deprecated
# TODO: Upstream this fix
Expand Down Expand Up @@ -134,6 +139,12 @@ def get_shape_for_selector(
return resulting_shape_from_index(shape, selector_tuple, drop_axes, pad=pad)


def get_implicit_fill_value(dtype: np.dtype, fill_value: Any) -> Any:
if fill_value is None:
fill_value = _default_fill_value(dtype)
return fill_value


def make_chunk_info_for_rust_with_indices(
batch_info: Iterable[
tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]
Expand All @@ -144,6 +155,14 @@ def make_chunk_info_for_rust_with_indices(
shape = shape if shape else (1,) # constant array
chunk_info_with_indices: list[WithSubset] = []
for byte_getter, chunk_spec, chunk_selection, out_selection in batch_info:
if chunk_spec.fill_value is None:
chunk_spec = ArraySpec(
chunk_spec.shape,
chunk_spec.dtype,
get_implicit_fill_value(chunk_spec.dtype, chunk_spec.fill_value),
chunk_spec.config,
chunk_spec.prototype,
)
chunk_info = Basic(byte_getter, chunk_spec)
out_selection_as_slices = selector_tuple_to_slice_selection(out_selection)
chunk_selection_as_slices = selector_tuple_to_slice_selection(chunk_selection)
Expand All @@ -169,14 +188,3 @@ def make_chunk_info_for_rust_with_indices(
)
)
return chunk_info_with_indices


def make_chunk_info_for_rust(
batch_info: Iterable[
tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]
],
) -> list[Basic]:
return [
Basic(byte_interface, chunk_spec)
for (byte_interface, chunk_spec, _, _) in batch_info
]
Loading

0 comments on commit 1d4e3cb

Please sign in to comment.