Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(feat): full v2 compat via python fallback #84

Merged
merged 28 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c7fb95a
chore(deps): bump zarr to 3.0.0rc1
LDeakin Jan 3, 2025
0a877e0
fmt
LDeakin Jan 3, 2025
def2e70
(feat): python fallack
ilan-gold Jan 28, 2025
622287d
Merge branch 'main' into ig/python_fallback
ilan-gold Jan 29, 2025
b362759
(fix): dtypes
ilan-gold Jan 30, 2025
fba8226
(fix): `object` dtypes + `v2` tests
ilan-gold Jan 30, 2025
4aa21a3
(fix): `object` dtypes + `v2` tests
ilan-gold Jan 30, 2025
a51e810
(fix): `object` dtypes + `v2` tests
ilan-gold Jan 30, 2025
19e90e3
(fix): `object` dtypes in rust
ilan-gold Feb 2, 2025
4a59ec1
(fix): blosc support
ilan-gold Feb 2, 2025
45efee1
(refactor): handle `None` fill-value more gracefully
ilan-gold Feb 2, 2025
59e60fc
fix: V2 codec pipeline creation
LDeakin Feb 3, 2025
1a6dc77
fix: zfpy/pcodec metadata handling
LDeakin Feb 3, 2025
008fd6a
(fix): fall back for unsupported codecs
ilan-gold Feb 4, 2025
9a0daa9
(fix): our decode codec pipeline does not support vlen
ilan-gold Feb 4, 2025
4637d24
(fix): string dtype test to match zarr-python
ilan-gold Feb 4, 2025
cf2e6b5
(chore): add note
ilan-gold Feb 4, 2025
00e73ed
(fix): ruff
ilan-gold Feb 4, 2025
d8aa2cc
(fix): rustfmt
ilan-gold Feb 4, 2025
8ea80bc
(fix): `pyi`
ilan-gold Feb 4, 2025
db255a9
(fix): try removing zarr main branch dep
ilan-gold Feb 4, 2025
cb4bedc
fix: use upstream implicit fill values
LDeakin Feb 5, 2025
26ee516
fix: use upstream metadata handling
LDeakin Feb 5, 2025
6ff6c2b
fix: cleanup fill value handling for string dtype
LDeakin Feb 7, 2025
abe4dd5
Revert "fix: cleanup fill value handling for string dtype"
LDeakin Feb 7, 2025
a618605
fix: cleanup fill value handling for string dtype
LDeakin Feb 7, 2025
4159751
fix: fmt and clippy warnings
LDeakin Feb 7, 2025
ae194a6
fix: zarr-python 0 fill value handling
LDeakin Feb 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ Chunk concurrency is typically favored because:

## Supported Indexing Methods

We **do not** officially support the following indexing methods. Some of these methods may error out, others may not:
The following methods will trigger use with the old zarr-python pipeline:

1. Any `oindex` or `vindex` integer `np.ndarray` indexing with dimensionality >=3 i.e.,

Expand Down Expand Up @@ -116,6 +116,9 @@ We **do not** officially support the following indexing methods. Some of these
arr[0:10, ..., 0:5]
```

Otherwise, we believe that we support your indexing case: slices, ints, and all integer `np.ndarray` indices in 2D for reading, contiguous integer `np.ndarray` indices along one axis for writing etc. Please file an issue if you believe we have more holes in our coverage than we are aware of or you wish to contribute! For example, we have an [issue in zarrs for integer-array indexing](https://github.com/LDeakin/zarrs/issues/52) that would unblock a lot of these issues!

That being said, using non-contiguous integer `np.ndarray` indexing for reads may not be as fast as expected given the performance of other supported methods. Until `zarrs` supports integer indexing, only fetching chunks is done in `rust` while indexing then occurs in `python`.
Furthermore, using anything except contiguous (i.e., slices or consecutive integer) `np.ndarray` for numeric data will fall back to the default `zarr-python` implementation.

Please file an issue if you believe we have more holes in our coverage than we are aware of or you wish to contribute! For example, we have an [issue in zarrs for integer-array indexing](https://github.com/LDeakin/zarrs/issues/52) that would unblock a lot the use of the rust pipeline for that use-case (very useful for mini-batch training perhaps!).

Further, any codecs not supported by `zarrs` will also automatically fall back to the python implementation.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dependencies = [
'donfig',
'pytest',
'universal_pathlib>=0.2.0',
'zarr>=3.0.0',
'zarr @ git+https://github.com/zarr-developers/zarr-python.git#main',
]

[project.optional-dependencies]
Expand Down
112 changes: 88 additions & 24 deletions python/zarrs/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@

import numpy as np
from zarr.abc.codec import Codec, CodecPipeline
from zarr.codecs import BytesCodec
from zarr.core import BatchedCodecPipeline
from zarr.core.config import config

if TYPE_CHECKING:
from collections.abc import Iterable, Iterator
from collections.abc import Generator, Iterable, Iterator
from typing import Any, Self

from zarr.abc.store import ByteGetter, ByteSetter
Expand All @@ -24,11 +26,18 @@
from .utils import (
CollapsedDimensionError,
DiscontiguousArrayError,
make_chunk_info_for_rust,
make_chunk_info_for_rust_with_indices,
)


class FillValueNoneError(Exception):
pass


class UnsupportedDataTypeError(Exception):
pass


def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl:
return CodecPipelineImpl(
codec_metadata_json,
Expand All @@ -44,6 +53,29 @@ def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl:
)


def codecs_to_dict(codecs: Iterable[Codec]) -> Generator[dict[str, Any], None, None]:
for codec in codecs:
if codec.__class__.__name__ == "V2Codec":
codec_dict = codec.to_dict()
compressor = {}
if codec_dict.get("compressor", None) is not None:
compressor = codec_dict["compressor"].get_config()
elif codec_dict.get("filter", None) is not None:
compressor = codec_dict["filter"].get_config()
if compressor.get("id", None) == "zstd":
yield {
"name": "zstd",
"configuration": {
"level": int(compressor["level"]),
"checksum": compressor["checksum"],
},
}
# TODO: get the endianness added to V2Codec API
yield BytesCodec().to_dict()
else:
yield codec.to_dict()


class ZarrsCodecPipelineState(TypedDict):
codec_metadata_json: str
codecs: tuple[Codec, ...]
Expand All @@ -54,6 +86,7 @@ class ZarrsCodecPipeline(CodecPipeline):
codecs: tuple[Codec, ...]
impl: CodecPipelineImpl
codec_metadata_json: str
python_impl: BatchedCodecPipeline

def __getstate__(self) -> ZarrsCodecPipelineState:
return {"codec_metadata_json": self.codec_metadata_json, "codecs": self.codecs}
Expand All @@ -62,13 +95,14 @@ def __setstate__(self, state: ZarrsCodecPipelineState):
self.codecs = state["codecs"]
self.codec_metadata_json = state["codec_metadata_json"]
self.impl = get_codec_pipeline_impl(self.codec_metadata_json)
self.python_impl = BatchedCodecPipeline.from_codecs(self.codecs)

def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
raise NotImplementedError("evolve_from_array_spec")

@classmethod
def from_codecs(cls, codecs: Iterable[Codec]) -> Self:
codec_metadata = [codec.to_dict() for codec in codecs]
codec_metadata = list(codecs_to_dict(codecs))
codec_metadata_json = json.dumps(codec_metadata)
# TODO: upstream zarr-python has not settled on how to deal with configs yet
# Should they be checked when an array is created, or when an operation is performed?
Expand All @@ -78,6 +112,7 @@ def from_codecs(cls, codecs: Iterable[Codec]) -> Self:
codec_metadata_json=codec_metadata_json,
codecs=tuple(codecs),
impl=get_codec_pipeline_impl(codec_metadata_json),
python_impl=BatchedCodecPipeline.from_codecs(codecs),
)

@property
Expand Down Expand Up @@ -120,29 +155,29 @@ async def read(
drop_axes: tuple[int, ...] = (), # FIXME: unused
) -> None:
# FIXME: Error if array is not in host memory
out: NDArrayLike = out.as_ndarray_like()
if not out.dtype.isnative:
raise RuntimeError("Non-native byte order not supported")
try:
self._raise_error_on_batch_info_error(batch_info)
chunks_desc = make_chunk_info_for_rust_with_indices(
batch_info, drop_axes, out.shape
)
except (DiscontiguousArrayError, CollapsedDimensionError):
chunks_desc = make_chunk_info_for_rust(batch_info)
except (
DiscontiguousArrayError,
CollapsedDimensionError,
UnsupportedDataTypeError,
FillValueNoneError,
):
await self.python_impl.read(batch_info, out, drop_axes)
return None
else:
out: NDArrayLike = out.as_ndarray_like()
await asyncio.to_thread(
self.impl.retrieve_chunks_and_apply_index,
chunks_desc,
out,
)
return None
chunks = await asyncio.to_thread(self.impl.retrieve_chunks, chunks_desc)
for chunk, (_, spec, selection, out_selection) in zip(chunks, batch_info):
chunk_reshaped = chunk.view(spec.dtype).reshape(spec.shape)
chunk_selected = chunk_reshaped[selection]
if drop_axes:
chunk_selected = np.squeeze(chunk_selected, axis=drop_axes)
out[out_selection] = chunk_selected

async def write(
self,
Expand All @@ -152,14 +187,43 @@ async def write(
value: NDBuffer, # type: ignore
drop_axes: tuple[int, ...] = (),
) -> None:
# FIXME: Error if array is not in host memory
value: NDArrayLike | np.ndarray = value.as_ndarray_like()
if not value.dtype.isnative:
value = np.ascontiguousarray(value, dtype=value.dtype.newbyteorder("="))
elif not value.flags.c_contiguous:
value = np.ascontiguousarray(value)
chunks_desc = make_chunk_info_for_rust_with_indices(
batch_info, drop_axes, value.shape
)
await asyncio.to_thread(self.impl.store_chunks_with_indices, chunks_desc, value)
return None
try:
self._raise_error_on_batch_info_error(batch_info)
chunks_desc = make_chunk_info_for_rust_with_indices(
batch_info, drop_axes, value.shape
)
except (
DiscontiguousArrayError,
CollapsedDimensionError,
UnsupportedDataTypeError,
FillValueNoneError,
):
await self.python_impl.write(batch_info, value, drop_axes)
return None
else:
# FIXME: Error if array is not in host memory
value_np: NDArrayLike | np.ndarray = value.as_ndarray_like()
if not value_np.dtype.isnative:
value_np = np.ascontiguousarray(
value_np, dtype=value_np.dtype.newbyteorder("=")
)
elif not value_np.flags.c_contiguous:
value_np = np.ascontiguousarray(value_np)
await asyncio.to_thread(
self.impl.store_chunks_with_indices, chunks_desc, value_np
)
return None

def _raise_error_on_batch_info_error(
self,
batch_info: Iterable[
tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]
],
):
if any(
info.dtype in ["object"] or info.dtype.kind in {"V", "S"}
for (_, info, _, _) in batch_info
):
raise UnsupportedDataTypeError()
if any(info.fill_value is None for (_, info, _, _) in batch_info):
raise FillValueNoneError()
13 changes: 1 addition & 12 deletions python/zarrs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,4 @@ def make_chunk_info_for_rust_with_indices(
shape=shape,
)
)
return chunk_info_with_indices


def make_chunk_info_for_rust(
batch_info: Iterable[
tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]
],
) -> list[Basic]:
return [
Basic(byte_interface, chunk_spec)
for (byte_interface, chunk_spec, _, _) in batch_info
]
return chunk_info_with_indices
20 changes: 14 additions & 6 deletions src/chunk_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::num::NonZeroU64;
use pyo3::{
exceptions::{PyRuntimeError, PyValueError},
pyclass, pymethods,
types::{PyAnyMethods as _, PySlice, PySliceMethods as _},
types::{PyAnyMethods as _, PyBytes, PyBytesMethods, PySlice, PySliceMethods as _},
Bound, PyAny, PyErr, PyResult,
};
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
Expand Down Expand Up @@ -44,14 +44,22 @@ impl Basic {
.getattr("dtype")?
.call_method0("__str__")?
.extract()?;
let fill_value = chunk_spec
.getattr("fill_value")?
.call_method0("tobytes")?
.extract()?;
let fill_value: Bound<'_, PyAny> = chunk_spec.getattr("fill_value")?;
let fill_value_bytes: Vec<u8>;
if let Ok(fill_value_downcast) = fill_value.downcast::<PyBytes>() {
fill_value_bytes = fill_value_downcast.as_bytes().to_vec();
} else if fill_value.hasattr("tobytes")? {
fill_value_bytes = fill_value.call_method0("tobytes")?.extract()?;
} else {
return Err(PyErr::new::<PyValueError, _>(format!(
"Unsupported fill value {:?}",
fill_value
)));
}
Ok(Self {
store,
key: StoreKey::new(path).map_py_err::<PyValueError>()?,
representation: get_chunk_representation(chunk_shape, &dtype, fill_value)?,
representation: get_chunk_representation(chunk_shape, &dtype, fill_value_bytes)?,
})
}
}
Expand Down
48 changes: 0 additions & 48 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,54 +337,6 @@ impl CodecPipelineImpl {
})
}

fn retrieve_chunks<'py>(
&self,
py: Python<'py>,
chunk_descriptions: Vec<chunk_item::Basic>, // FIXME: Ref / iterable?
) -> PyResult<Vec<Bound<'py, PyArray1<u8>>>> {
// Adjust the concurrency based on the codec chain and the first chunk description
let Some((chunk_concurrent_limit, codec_options)) =
chunk_descriptions.get_chunk_concurrent_limit_and_codec_options(self)?
else {
return Ok(vec![]);
};

let chunk_bytes = py.allow_threads(move || {
let get_chunk_subset = |item: chunk_item::Basic| {
Ok(if let Some(chunk_encoded) = self.stores.get(&item)? {
let chunk_encoded: Vec<u8> = chunk_encoded.into();
self.codec_chain
.decode(
Cow::Owned(chunk_encoded),
item.representation(),
&codec_options,
)
.map_py_err::<PyRuntimeError>()?
} else {
// The chunk is missing so we need to create one.
let num_elements = item.representation().num_elements();
let data_type_size = item.representation().data_type().size();
let chunk_shape = ArraySize::new(data_type_size, num_elements);
ArrayBytes::new_fill_value(chunk_shape, item.representation().fill_value())
}
.into_fixed()
.map_py_err::<PyRuntimeError>()?
.into_owned())
};
iter_concurrent_limit!(
chunk_concurrent_limit,
chunk_descriptions,
map,
get_chunk_subset
)
.collect::<PyResult<Vec<Vec<u8>>>>()
})?;
Ok(chunk_bytes
.into_iter()
.map(|x| x.into_pyarray(py))
.collect())
}

fn store_chunks_with_indices(
&self,
py: Python,
Expand Down
Loading
Loading