Skip to content

Commit

Permalink
fix: use upstream metadata handling
Browse files Browse the repository at this point in the history
There is a lot of additional logic already taken care of by `zarrs`, like handling multiple versions of codec metadata.
  • Loading branch information
LDeakin committed Feb 5, 2025
1 parent cb4bedc commit 26ee516
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 54 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pyo3-stub-gen = "0.6.2"
opendal = { version = "0.51.0", features = ["services-http"] }
tokio = { version = "1.41.1", features = ["rt-multi-thread"] }
zarrs_opendal = "0.5.0"
zarrs_metadata = "0.3.3" # require recent zarr-python compatibility fixes (remove with zarrs 0.20)

[profile.release]
lto = true
67 changes: 13 additions & 54 deletions python/zarrs/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import numpy as np
from zarr.abc.codec import Codec, CodecPipeline
from zarr.codecs import BytesCodec
from zarr.core import BatchedCodecPipeline
from zarr.core.config import config

Expand All @@ -23,7 +22,7 @@
from zarr.core.common import ChunkCoords
from zarr.core.indexing import SelectorTuple

from ._internal import CodecPipelineImpl
from ._internal import CodecPipelineImpl, codec_metadata_v2_to_v3
from .utils import (
CollapsedDimensionError,
DiscontiguousArrayError,
Expand Down Expand Up @@ -62,63 +61,23 @@ def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl | Non


def codecs_to_dict(codecs: Iterable[Codec]) -> Generator[dict[str, Any], None, None]:
# See https://github.com/LDeakin/zarrs/blob/9070e12ea06c297532347af3668be9927ba35fa1/zarrs_metadata/src/v2_to_v3.rs#L69
for codec in codecs:
if codec.__class__.__name__ == "V2Codec":
codec_dict = codec.to_dict()
has_array_to_bytes = False
if codec_dict.get("filters", None) is not None:
for filter in codec_dict.get("filters"):
filter = filter.get_config()
name = filter.pop("id")
if name in [
"vlen-array",
"vlen-bytes",
"vlen-utf8",
]:
has_array_to_bytes = True
as_dict = {"name": name, "configuration": filter}
yield as_dict
compressor = {}
filters = [
json.dumps(filter.get_config())
for filter in codec_dict.get("filters")
]
else:
filters = None
if codec_dict.get("compressor", None) is not None:
compressor = codec_dict["compressor"].get_config()
if compressor.get("id") in ["zfpy", "pcodec"]:
has_array_to_bytes = True
if not has_array_to_bytes:
yield BytesCodec().to_dict()
if compressor:
if compressor.get("id") == "zstd":
as_dict = {
"name": "zstd",
"configuration": {
"level": int(compressor["level"]),
"checksum": compressor["checksum"],
},
}
elif compressor.get("id") == "blosc":
as_dict = {
"name": "blosc",
"configuration": {
"cname": compressor["cname"],
"clevel": int(compressor["clevel"]),
"blocksize": int(compressor["blocksize"]),
},
}
if typesize := compressor.get("typesize", None) is not None:
as_dict["typesize"] = typesize
if shuffle := compressor.get("shuffler", None) is not None:
# https://github.com/LDeakin/zarrs/blob/0532fe983b7b42b59dbf84e50a2fe5e6f7bad4ce/zarrs_metadata/src/v3/array/codec/blosc.rs#L46-L54
match shuffle:
case 0:
as_dict["shuffle"] = "noshuffle"
case 1:
as_dict["shuffle"] = "shuffle"
case 2:
as_dict["shuffle"] = "bitshuffle"
else:
name = compressor.pop("id")
as_dict = {"name": name, "configuration": compressor}
yield as_dict
compressor = json.dumps(codec_dict.get("compressor").get_config())
else:
compressor = None
codecs_v3 = codec_metadata_v2_to_v3(filters, compressor)
for codec in codecs_v3:
yield json.loads(codec)
else:
yield codec.to_dict()

Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use zarrs::metadata::v3::MetadataV3;

mod chunk_item;
mod concurrency;
mod metadata_v2;
mod runtime;
mod store;
#[cfg(test)]
Expand All @@ -31,6 +32,7 @@ mod utils;

use crate::chunk_item::ChunksItem;
use crate::concurrency::ChunkConcurrentLimitAndCodecOptions;
use crate::metadata_v2::codec_metadata_v2_to_v3;
use crate::store::StoreManager;
use crate::utils::{PyErrExt as _, PyUntypedArrayExt as _};

Expand Down Expand Up @@ -424,6 +426,7 @@ fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<CodecPipelineImpl>()?;
m.add_class::<chunk_item::Basic>()?;
m.add_class::<chunk_item::WithSubset>()?;
m.add_function(wrap_pyfunction!(codec_metadata_v2_to_v3, m)?)?;
Ok(())
}

Expand Down
54 changes: 54 additions & 0 deletions src/metadata_v2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use pyo3::{exceptions::PyRuntimeError, pyfunction, PyErr, PyResult};
use zarrs::metadata::{
v2::{array::ArrayMetadataV2Order, MetadataV2},
v3::array::data_type::DataTypeMetadataV3,
};

#[pyfunction]
#[pyo3(signature = (filters=None, compressor=None))]
pub fn codec_metadata_v2_to_v3(
filters: Option<Vec<String>>,
compressor: Option<String>,
) -> PyResult<Vec<String>> {
// Try and convert filters/compressor to V2 metadata
let filters = if let Some(filters) = filters {
Some(
filters
.into_iter()
.map(|filter| {
serde_json::from_str::<MetadataV2>(&filter)
.map_err(|err| PyErr::new::<PyRuntimeError, _>(err.to_string()))
})
.collect::<Result<Vec<_>, _>>()?,
)
} else {
None
};
let compressor = if let Some(compressor) = compressor {
Some(
serde_json::from_str::<MetadataV2>(&compressor)
.map_err(|err| PyErr::new::<PyRuntimeError, _>(err.to_string()))?,
)
} else {
None
};

// FIXME: The array order, dimensionality, data type, and endianness are needed to exhaustively support all Zarr V2 data that zarrs can handle.
// However, CodecPipeline.from_codecs does not supply this information, and CodecPipeline.evolve_from_array_spec is seemingly never called.
let metadata = zarrs::metadata::v2_to_v3::codec_metadata_v2_to_v3(
ArrayMetadataV2Order::C,
0, // unused with C order
&DataTypeMetadataV3::Bool, // FIXME
None,
&filters,
&compressor,
)
.map_err(|err| {
// TODO: More informative error messages from zarrs for ArrayMetadataV2ToV3ConversionError
PyErr::new::<PyRuntimeError, _>(err.to_string())
})?;
Ok(metadata
.into_iter()
.map(|metadata| serde_json::to_string(&metadata).expect("infallible")) // TODO: Add method to zarrs
.collect())
}

0 comments on commit 26ee516

Please sign in to comment.