diff --git a/Cargo.toml b/Cargo.toml index b305263..66b9e1c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ pyo3-stub-gen = "0.6.2" opendal = { version = "0.51.0", features = ["services-http"] } tokio = { version = "1.41.1", features = ["rt-multi-thread"] } zarrs_opendal = "0.5.0" +zarrs_metadata = "0.3.3" # require recent zarr-python compatibility fixes (remove with zarrs 0.20) [profile.release] lto = true diff --git a/python/zarrs/pipeline.py b/python/zarrs/pipeline.py index bad2da8..0f089e5 100644 --- a/python/zarrs/pipeline.py +++ b/python/zarrs/pipeline.py @@ -8,7 +8,6 @@ import numpy as np from zarr.abc.codec import Codec, CodecPipeline -from zarr.codecs import BytesCodec from zarr.core import BatchedCodecPipeline from zarr.core.config import config @@ -23,7 +22,7 @@ from zarr.core.common import ChunkCoords from zarr.core.indexing import SelectorTuple -from ._internal import CodecPipelineImpl +from ._internal import CodecPipelineImpl, codec_metadata_v2_to_v3 from .utils import ( CollapsedDimensionError, DiscontiguousArrayError, @@ -62,63 +61,23 @@ def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl | Non def codecs_to_dict(codecs: Iterable[Codec]) -> Generator[dict[str, Any], None, None]: - # See https://github.com/LDeakin/zarrs/blob/9070e12ea06c297532347af3668be9927ba35fa1/zarrs_metadata/src/v2_to_v3.rs#L69 for codec in codecs: if codec.__class__.__name__ == "V2Codec": codec_dict = codec.to_dict() - has_array_to_bytes = False if codec_dict.get("filters", None) is not None: - for filter in codec_dict.get("filters"): - filter = filter.get_config() - name = filter.pop("id") - if name in [ - "vlen-array", - "vlen-bytes", - "vlen-utf8", - ]: - has_array_to_bytes = True - as_dict = {"name": name, "configuration": filter} - yield as_dict - compressor = {} + filters = [ + json.dumps(filter.get_config()) + for filter in codec_dict.get("filters") + ] + else: + filters = None if codec_dict.get("compressor", None) is not None: - compressor = codec_dict["compressor"].get_config() - if compressor.get("id") in ["zfpy", "pcodec"]: - has_array_to_bytes = True - if not has_array_to_bytes: - yield BytesCodec().to_dict() - if compressor: - if compressor.get("id") == "zstd": - as_dict = { - "name": "zstd", - "configuration": { - "level": int(compressor["level"]), - "checksum": compressor["checksum"], - }, - } - elif compressor.get("id") == "blosc": - as_dict = { - "name": "blosc", - "configuration": { - "cname": compressor["cname"], - "clevel": int(compressor["clevel"]), - "blocksize": int(compressor["blocksize"]), - }, - } - if typesize := compressor.get("typesize", None) is not None: - as_dict["typesize"] = typesize - if shuffle := compressor.get("shuffler", None) is not None: - # https://github.com/LDeakin/zarrs/blob/0532fe983b7b42b59dbf84e50a2fe5e6f7bad4ce/zarrs_metadata/src/v3/array/codec/blosc.rs#L46-L54 - match shuffle: - case 0: - as_dict["shuffle"] = "noshuffle" - case 1: - as_dict["shuffle"] = "shuffle" - case 2: - as_dict["shuffle"] = "bitshuffle" - else: - name = compressor.pop("id") - as_dict = {"name": name, "configuration": compressor} - yield as_dict + compressor = json.dumps(codec_dict.get("compressor").get_config()) + else: + compressor = None + codecs_v3 = codec_metadata_v2_to_v3(filters, compressor) + for codec in codecs_v3: + yield json.loads(codec) else: yield codec.to_dict() diff --git a/src/lib.rs b/src/lib.rs index b485e73..cae087c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,6 +23,7 @@ use zarrs::metadata::v3::MetadataV3; mod chunk_item; mod concurrency; +mod metadata_v2; mod runtime; mod store; #[cfg(test)] @@ -31,6 +32,7 @@ mod utils; use crate::chunk_item::ChunksItem; use crate::concurrency::ChunkConcurrentLimitAndCodecOptions; +use crate::metadata_v2::codec_metadata_v2_to_v3; use crate::store::StoreManager; use crate::utils::{PyErrExt as _, PyUntypedArrayExt as _}; @@ -424,6 +426,7 @@ fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_function(wrap_pyfunction!(codec_metadata_v2_to_v3, m)?)?; Ok(()) } diff --git a/src/metadata_v2.rs b/src/metadata_v2.rs new file mode 100644 index 0000000..de14123 --- /dev/null +++ b/src/metadata_v2.rs @@ -0,0 +1,54 @@ +use pyo3::{exceptions::PyRuntimeError, pyfunction, PyErr, PyResult}; +use zarrs::metadata::{ + v2::{array::ArrayMetadataV2Order, MetadataV2}, + v3::array::data_type::DataTypeMetadataV3, +}; + +#[pyfunction] +#[pyo3(signature = (filters=None, compressor=None))] +pub fn codec_metadata_v2_to_v3( + filters: Option>, + compressor: Option, +) -> PyResult> { + // Try and convert filters/compressor to V2 metadata + let filters = if let Some(filters) = filters { + Some( + filters + .into_iter() + .map(|filter| { + serde_json::from_str::(&filter) + .map_err(|err| PyErr::new::(err.to_string())) + }) + .collect::, _>>()?, + ) + } else { + None + }; + let compressor = if let Some(compressor) = compressor { + Some( + serde_json::from_str::(&compressor) + .map_err(|err| PyErr::new::(err.to_string()))?, + ) + } else { + None + }; + + // FIXME: The array order, dimensionality, data type, and endianness are needed to exhaustively support all Zarr V2 data that zarrs can handle. + // However, CodecPipeline.from_codecs does not supply this information, and CodecPipeline.evolve_from_array_spec is seemingly never called. + let metadata = zarrs::metadata::v2_to_v3::codec_metadata_v2_to_v3( + ArrayMetadataV2Order::C, + 0, // unused with C order + &DataTypeMetadataV3::Bool, // FIXME + None, + &filters, + &compressor, + ) + .map_err(|err| { + // TODO: More informative error messages from zarrs for ArrayMetadataV2ToV3ConversionError + PyErr::new::(err.to_string()) + })?; + Ok(metadata + .into_iter() + .map(|metadata| serde_json::to_string(&metadata).expect("infallible")) // TODO: Add method to zarrs + .collect()) +}