diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 90d8bb9d..9bd3142c 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -8,7 +8,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [310, 311, 312] + python-version: [311, 312] steps: - uses: actions/checkout@v4 @@ -19,8 +19,9 @@ jobs: - name: Install kerchunk shell: bash -l {0} run: | - pip install -e . + pip install -e . --no-deps + pip list - name: Test with pytest shell: bash -l {0} run: | - pytest -v --cov + pytest -v --timeout 60 --cov diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 9a7486bd..3f4955e6 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,17 +1,17 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v2.3.0 + rev: v4.6.0 hooks: - id: check-yaml - id: end-of-file-fixer - id: trailing-whitespace - repo: https://github.com/psf/black - rev: 22.3.0 + rev: 25.1.0 hooks: - id: black exclude: ^docs/ - repo: https://github.com/pycqa/flake8 - rev: '4.0.1' + rev: '7.1.1' hooks: - id: flake8 exclude: tests/|^docs/|__init__.py diff --git a/ci/environment-docs.yml b/ci/environment-docs.yml index aa62a1ce..f321b27b 100644 --- a/ci/environment-docs.yml +++ b/ci/environment-docs.yml @@ -3,7 +3,7 @@ channels: - conda-forge - defaults dependencies: - - python=3.10 + - python=3.12 - dask - zarr - xarray>=2024.10.0 @@ -17,7 +17,7 @@ dependencies: - aiohttp - pytest-cov - pytest-subtests - - fsspec<=2024.12.0 + - fsspec - dask - scipy - s3fs diff --git a/ci/environment-py310.yml b/ci/environment-py310.yml deleted file mode 100644 index 2ded3ed3..00000000 --- a/ci/environment-py310.yml +++ /dev/null @@ -1,38 +0,0 @@ -name: test_env -channels: - - conda-forge - - nodefaults -dependencies: - - python=3.10 - - dask - - zarr - - xarray>=2024.10.0 - - h5netcdf - - h5py - - pandas - - cfgrib - - cftime - # Temporary workaround for #508 - - eccodes <2.38 - - - astropy - - requests - - aiohttp - - pytest-cov - - pytest-subtests - - fsspec<=2024.12.0 - - dask - - scipy - - s3fs - - gcsfs - - python-blosc - - flake8 - - black - - fastparquet - - pip - - pyopenssl - - tifffile - - netCDF4 -# - pip: -# - git+https://github.com/fsspec/filesystem_spec -# - ipfsspec diff --git a/ci/environment-py311.yml b/ci/environment-py311.yml index b3ed2467..e3bd38e7 100644 --- a/ci/environment-py311.yml +++ b/ci/environment-py311.yml @@ -13,14 +13,14 @@ dependencies: - cfgrib # Temporary workaround for #508 - eccodes <2.38 - + - ujson - cftime - astropy - requests - aiohttp - pytest-cov - pytest-subtests - - fsspec<=2024.12.0 + - pytest-timeout - dask - scipy - s3fs @@ -34,5 +34,6 @@ dependencies: - tifffile - rioxarray - netCDF4 -# - pip: -# - git+https://github.com/fsspec/filesystem_spec + - pip: + - git+https://github.com/fsspec/filesystem_spec + - git+https://github.com/zarr-developers/zarr-python diff --git a/ci/environment-py312.yml b/ci/environment-py312.yml index 045f8c49..b8b807d5 100644 --- a/ci/environment-py312.yml +++ b/ci/environment-py312.yml @@ -13,14 +13,14 @@ dependencies: - cfgrib # Temporary workaround for #508 - eccodes <2.38 - + - ujson - cftime - astropy - requests - aiohttp - pytest-cov - pytest-subtests - - fsspec<=2024.12.0 + - pytest-timeout - gcsfs - dask - scipy @@ -34,5 +34,6 @@ dependencies: - tifffile - rioxarray - netCDF4 -# - pip: -# - git+https://github.com/fsspec/filesystem_spec + - pip: + - git+https://github.com/fsspec/filesystem_spec + - git+https://github.com/zarr-developers/zarr-python diff --git a/kerchunk/__init__.py b/kerchunk/__init__.py index 21b4e540..85863c32 100644 --- a/kerchunk/__init__.py +++ b/kerchunk/__init__.py @@ -1,4 +1,4 @@ -from . import codecs +from kerchunk import codecs from importlib.metadata import version as _version diff --git a/kerchunk/_grib_idx.py b/kerchunk/_grib_idx.py index e038749d..11255793 100644 --- a/kerchunk/_grib_idx.py +++ b/kerchunk/_grib_idx.py @@ -744,7 +744,8 @@ def _extract_single_group(grib_group: dict, idx: int, storage_options: Dict): return None dt = xr.open_datatree( - fsspec.filesystem("reference", fo=grib_tree_store).get_mapper(""), + "reference://", + storage_options={"fo": grib_tree_store}, engine="zarr", consolidated=False, ) diff --git a/kerchunk/codecs.py b/kerchunk/codecs.py index 852076ea..f97bef32 100644 --- a/kerchunk/codecs.py +++ b/kerchunk/codecs.py @@ -1,4 +1,5 @@ import ast +from dataclasses import dataclass import io import numcodecs @@ -6,6 +7,11 @@ import numpy as np import threading import zlib +from zarr.core.array_spec import ArraySpec +from zarr.abc.codec import ArrayBytesCodec +from zarr.core.buffer import Buffer, NDBuffer +from zarr.core.common import JSON, parse_named_configuration +from zarr.registry import register_codec class FillStringsCodec(Codec): @@ -115,6 +121,78 @@ def decode(self, buf, out=None): numcodecs.register_codec(GRIBCodec, "grib") +@dataclass(frozen=True) +class GRIBZarrCodec(ArrayBytesCodec): + eclock = threading.RLock() + + var: str + dtype: np.dtype + + def __init__(self, *, var: str, dtype: np.dtype) -> None: + object.__setattr__(self, "var", var) + object.__setattr__(self, "dtype", dtype) + + @classmethod + def from_dict(cls, data: dict[str, JSON]) -> "GRIBZarrCodec": + _, configuration_parsed = parse_named_configuration( + data, "bytes", require_configuration=True + ) + configuration_parsed = configuration_parsed or {} + return cls(**configuration_parsed) # type: ignore[arg-type] + + def to_dict(self) -> dict[str, JSON]: + if self.endian is None: + return {"name": "grib"} + else: + return { + "name": "grib", + "configuration": {"var": self.var, "dtype": self.dtype}, + } + + async def _decode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> NDBuffer: + assert isinstance(chunk_bytes, Buffer) + import eccodes + + if self.var in ["latitude", "longitude"]: + var = self.var + "s" + dt = self.dtype or "float64" + else: + var = "values" + dt = self.dtype or "float32" + + with self.eclock: + mid = eccodes.codes_new_from_message(chunk_bytes.to_bytes()) + try: + data = eccodes.codes_get_array(mid, var) + missingValue = eccodes.codes_get_string(mid, "missingValue") + if var == "values" and missingValue: + data[data == float(missingValue)] = np.nan + return data.astype(dt, copy=False) + + finally: + eccodes.codes_release(mid) + + async def _encode_single( + self, + chunk_array: NDBuffer, + chunk_spec: ArraySpec, + ) -> Buffer | None: + # This is a one way codec + raise NotImplementedError + + def compute_encoded_size( + self, input_byte_length: int, _chunk_spec: ArraySpec + ) -> int: + raise NotImplementedError + + +register_codec("grib", GRIBZarrCodec) + + class AsciiTableCodec(numcodecs.abc.Codec): """Decodes ASCII-TABLE extensions in FITS files""" @@ -166,7 +244,6 @@ def decode(self, buf, out=None): arr2 = np.empty((self.nrow,), dtype=dt_out) heap = buf[arr.nbytes :] for name in dt_out.names: - if dt_out[name] == "O": dt = np.dtype(self.ftypes[self.types[name]]) counts = arr[name][:, 0] @@ -245,7 +322,7 @@ class ZlibCodec(Codec): codec_id = "zlib" def __init__(self): - ... + pass def decode(self, data, out=None): if out: diff --git a/kerchunk/combine.py b/kerchunk/combine.py index b30b8707..0a52383b 100644 --- a/kerchunk/combine.py +++ b/kerchunk/combine.py @@ -1,3 +1,4 @@ +import asyncio import collections.abc import logging import re @@ -10,8 +11,9 @@ import numcodecs import ujson import zarr +from zarr.core.buffer.core import default_buffer_prototype -from kerchunk.utils import consolidate +from kerchunk.utils import consolidate, fs_as_store logger = logging.getLogger("kerchunk.combine") @@ -193,20 +195,28 @@ def append( """ import xarray as xr - fs = fsspec.filesystem( - "reference", + storage_options = dict( fo=original_refs, remote_protocol=remote_protocol, remote_options=remote_options, target_options=target_options, + asynchronous=True, ) ds = xr.open_dataset( - fs.get_mapper(), engine="zarr", backend_kwargs={"consolidated": False} + "reference://", + engine="zarr", + backend_kwargs={"consolidated": False}, + storage_options=storage_options, + ) + z = zarr.open( + "reference://", + zarr_format=2, + storage_options=storage_options, + use_consolidated=False, ) - z = zarr.open(fs.get_mapper()) mzz = MultiZarrToZarr( path, - out=fs.references, # dict or parquet/lazy + out=z.store.fs.references, # normalised dict or parquet/lazy remote_protocol=remote_protocol, remote_options=remote_options, target_options=target_options, @@ -264,7 +274,9 @@ def fss(self): self._paths = [] for of in fsspec.open_files(self.path, **self.target_options): self._paths.append(of.full_name) - fs = fsspec.core.url_to_fs(self.path[0], **self.target_options)[0] + fs = fsspec.core.url_to_fs( + self.path[0], asynchronous=True, **self.target_options + )[0] try: # JSON path fo_list = fs.cat(self.path) @@ -352,6 +364,16 @@ def _get_value(self, index, z, var, fn=None): logger.debug("Decode: %s -> %s", (selector, index, var, fn), o) return o + async def _read_meta_files(self, m, files): + """Helper to load multiple metadata files asynchronously""" + res = {} + for fn in files: + exists = await m.exists(fn) + if exists: + content = await m.get(fn, prototype=default_buffer_prototype()) + res[fn] = ujson.dumps(ujson.loads(content.to_bytes())) + return res + def first_pass(self): """Accumulate the set of concat coords values across all inputs""" @@ -364,7 +386,8 @@ def first_pass(self): fs._dircache_from_items() logger.debug("First pass: %s", i) - z = zarr.open_group(fs.get_mapper("")) + z_store = fs_as_store(fs, read_only=False) + z = zarr.open_group(z_store, zarr_format=2, use_consolidated=False) for var in self.concat_dims: value = self._get_value(i, z, var, fn=self._paths[i]) if isinstance(value, np.ndarray): @@ -390,16 +413,16 @@ def store_coords(self): Write coordinate arrays into the output """ kv = {} - store = zarr.storage.KVStore(kv) - group = zarr.open(store) - m = self.fss[0].get_mapper("") - z = zarr.open(m) + store = zarr.storage.MemoryStore(kv) + group = zarr.open_group(store, zarr_format=2, use_consolidated=False) + m = fs_as_store(self.fss[0], read_only=False) + z = zarr.open(m, zarr_format=2, use_consolidated=False) for k, v in self.coos.items(): if k == "var": # The names of the variables to write in the second pass, not a coordinate continue # parametrize the threshold value below? - compression = numcodecs.Zstd() if len(v) > 100 else None + compressor = numcodecs.Zstd() if len(v) > 100 else None kw = {} if self.cf_units and k in self.cf_units: if "M" not in self.coo_dtypes.get(k, ""): @@ -424,14 +447,15 @@ def store_coords(self): elif k in z: # Fall back to existing fill value kw["fill_value"] = z[k].fill_value - arr = group.create_dataset( + arr = group.create_array( name=k, - data=data, + shape=data.shape, overwrite=True, - compressor=compression, + compressor=compressor, dtype=self.coo_dtypes.get(k, data.dtype), **kw, ) + arr[:] = data if k in z: # copy attributes if values came from an original variable arr.attrs.update(z[k].attrs) @@ -445,10 +469,9 @@ def store_coords(self): # TODO: rewrite .zarray/.zattrs with ujson to save space. Maybe make them by hand anyway. self.out.update(kv) logger.debug("Written coordinates") - for fn in [".zgroup", ".zattrs"]: - # top-level group attributes from first input - if fn in m: - self.out[fn] = ujson.dumps(ujson.loads(m[fn])) + + metadata = asyncio.run(self._read_meta_files(m, [".zgroup", ".zattrs"])) + self.out.update(metadata) logger.debug("Written global metadata") self.done.add(2) @@ -464,8 +487,8 @@ def second_pass(self): for i, fs in enumerate(self.fss): to_download = {} - m = fs.get_mapper("") - z = zarr.open(m) + m = fs_as_store(fs, read_only=False) + z = zarr.open(m, zarr_format=2, use_consolidated=False) if no_deps is None: # done first time only @@ -495,9 +518,10 @@ def second_pass(self): if f"{v}/.zgroup" in fns: # recurse into groups - copy meta, add to dirs to process and don't look # for references in this dir - self.out[f"{v}/.zgroup"] = m[f"{v}/.zgroup"] - if f"{v}/.zattrs" in fns: - self.out[f"{v}/.zattrs"] = m[f"{v}/.zattrs"] + metadata = asyncio.run( + self._read_meta_files(m, [f"{v}/.zgroup", f"{v}/.zattrs"]) + ) + self.out.update(metadata) dirs.extend([f for f in fns if not f.startswith(f"{v}/.z")]) continue if v in self.identical_dims: @@ -509,7 +533,10 @@ def second_pass(self): continue logger.debug("Second pass: %s, %s", i, v) - zarray = ujson.loads(m[f"{v}/.zarray"]) + zarray = asyncio.run(self._read_meta_files(m, [f"{v}/.zarray"]))[ + f"{v}/.zarray" + ] + zarray = ujson.loads(zarray) if v not in chunk_sizes: chunk_sizes[v] = zarray["chunks"] elif chunk_sizes[v] != zarray["chunks"]: @@ -520,7 +547,8 @@ def second_pass(self): chunks so far: {zarray["chunks"]}""" ) chunks = chunk_sizes[v] - zattrs = ujson.loads(m.get(f"{v}/.zattrs", "{}")) + zattr_meta = asyncio.run(self._read_meta_files(m, [f"{v}/.zattrs"])) + zattrs = ujson.loads(zattr_meta.get(f"{v}/.zattrs", "{}")) coords = zattrs.get("_ARRAY_DIMENSIONS", []) if zarray["shape"] and not coords: coords = list("ikjlm")[: len(zarray["shape"])] diff --git a/kerchunk/df.py b/kerchunk/df.py index 7bd2bfb5..ce3fff44 100644 --- a/kerchunk/df.py +++ b/kerchunk/df.py @@ -22,7 +22,9 @@ def _proc_raw(r): - if not isinstance(r, bytes): + if hasattr(r, "to_bytes"): + r = r.to_bytes() + elif not isinstance(r, bytes): r = r.encode() if r.startswith(b"base64:"): return base64.b64decode(r[7:]) diff --git a/kerchunk/fits.py b/kerchunk/fits.py index 4e2d53de..aa6374bf 100644 --- a/kerchunk/fits.py +++ b/kerchunk/fits.py @@ -8,7 +8,7 @@ from fsspec.implementations.reference import LazyReferenceMapper -from kerchunk.utils import class_factory +from kerchunk.utils import class_factory, dict_to_store, translate_refs_serializable from kerchunk.codecs import AsciiTableCodec, VarArrCodec try: @@ -72,7 +72,8 @@ def process_file( storage_options = storage_options or {} out = out or {} - g = zarr.open(out) + store = dict_to_store(out) + g = zarr.open_group(store=store, zarr_format=2) with fsspec.open(url, mode="rb", **storage_options) as f: infile = fits.open(f, do_not_scale_image_data=True) @@ -93,7 +94,7 @@ def process_file( hdu.header.__str__() # causes fixing of invalid cards attrs = dict(hdu.header) - kwargs = {} + kwargs = {"compressor": None} if hdu.is_image: # for images/cubes (i.e., ndarrays with simple type) nax = hdu.header["NAXIS"] @@ -139,10 +140,12 @@ def process_file( # contains var fields length = hdu.fileinfo()["datSpan"] dt2 = [ - (name, "O") - if hdu.columns[name].format.startswith(("P", "Q")) - else (name, str(dtype[name].base)) - + ((dtype[name].shape,) if dtype[name].shape else ()) + ( + (name, "O") + if hdu.columns[name].format.startswith(("P", "Q")) + else (name, str(dtype[name].base)) + + ((dtype[name].shape,) if dtype[name].shape else ()) + ) for name in dtype.names ] types = { @@ -150,9 +153,10 @@ def process_file( for name in dtype.names if hdu.columns[name].format.startswith(("P", "Q")) } - kwargs["object_codec"] = VarArrCodec( + kwargs["compressor"] = VarArrCodec( str(dtype), str(dt2), nrows, types ) + kwargs["fill_value"] = None dtype = dt2 else: length = dtype.itemsize * nrows @@ -163,8 +167,8 @@ def process_file( # one chunk for whole thing. # TODO: we could sub-chunk on biggest dimension name = hdu.name or str(ext) - arr = g.empty( - name, dtype=dtype, shape=shape, chunks=shape, compression=None, **kwargs + arr = g.create_array( + name=name, dtype=dtype, shape=shape, chunks=shape, **kwargs ) arr.attrs.update( { @@ -190,6 +194,7 @@ def process_file( ) if isinstance(out, LazyReferenceMapper): out.flush() + out = translate_refs_serializable(out) return out @@ -248,7 +253,7 @@ def add_wcs_coords(hdu, zarr_group=None, dataset=None, dtype="float32"): } if zarr_group is not None: arr = zarr_group.empty( - name, shape=shape, chunks=shape, overwrite=True, dtype=dtype + name, shape=shape, chunks=shape, dtype=dtype, exists_ok=True ) arr.attrs.update(attrs) arr[:] = world_coord.value.reshape(shape) diff --git a/kerchunk/grib2.py b/kerchunk/grib2.py index 37dcb3ff..885b80c9 100644 --- a/kerchunk/grib2.py +++ b/kerchunk/grib2.py @@ -11,7 +11,13 @@ import xarray import numpy as np -from kerchunk.utils import class_factory, _encode_for_JSON +from kerchunk.utils import ( + class_factory, + _encode_for_JSON, + dict_to_store, + fs_as_store, + translate_refs_serializable, +) from kerchunk.codecs import GRIBCodec from kerchunk.combine import MultiZarrToZarr, drop from kerchunk._grib_idx import ( @@ -81,13 +87,13 @@ def _store_array(store, z, data, var, inline_threshold, offset, size, attr): shape = tuple(data.shape or ()) if nbytes < inline_threshold: logger.debug(f"Store {var} inline") - d = z.create_dataset( + d = z.create_array( name=var, shape=shape, chunks=shape, dtype=data.dtype, fill_value=attr.get("missingValue", None), - compressor=False, + compressor=None, ) if hasattr(data, "tobytes"): b = data.tobytes() @@ -101,15 +107,14 @@ def _store_array(store, z, data, var, inline_threshold, offset, size, attr): store[f"{var}/0"] = b.decode("ascii") else: logger.debug(f"Store {var} reference") - d = z.create_dataset( + d = z.create_array( name=var, shape=shape, chunks=shape, dtype=data.dtype, fill_value=attr.get("missingValue", None), filters=[GRIBCodec(var=var, dtype=str(data.dtype))], - compressor=False, - overwrite=True, + compressor=None, ) store[f"{var}/" + ".".join(["0"] * len(shape))] = ["{{u}}", offset, size] d.attrs.update(attr) @@ -163,7 +168,9 @@ def scan_grib( with fsspec.open(url, "rb", **storage_options) as f: logger.debug(f"File {url}") for offset, size, data in _split_file(f, skip=skip): - store = {} + store_dict = {} + store = dict_to_store(store_dict) + mid = eccodes.codes_new_from_message(data) m = cfgrib.cfmessage.CfMessage(mid) @@ -201,7 +208,7 @@ def scan_grib( if good is False: continue - z = zarr.open_group(store) + z = zarr.open_group(store, zarr_format=2) global_attrs = { f"GRIB_{k}": m[k] for k in cfgrib.dataset.GLOBAL_ATTRIBUTES_KEYS @@ -237,7 +244,9 @@ def scan_grib( varName = m["cfVarName"] if varName in ("undef", "unknown"): varName = m["shortName"] - _store_array(store, z, vals, varName, inline_threshold, offset, size, attrs) + _store_array( + store_dict, z, vals, varName, inline_threshold, offset, size, attrs + ) if "typeOfLevel" in message_keys and "level" in message_keys: name = m["typeOfLevel"] coordinates.append(name) @@ -251,7 +260,7 @@ def scan_grib( attrs = {} attrs["_ARRAY_DIMENSIONS"] = [] _store_array( - store, z, data, name, inline_threshold, offset, size, attrs + store_dict, z, data, name, inline_threshold, offset, size, attrs ) dims = ( ["y", "x"] @@ -308,7 +317,7 @@ def scan_grib( dims = [coord] attrs = cfgrib.dataset.COORD_ATTRS[coord] _store_array( - store, + store_dict, z, x, coord, @@ -321,10 +330,11 @@ def scan_grib( if coordinates: z.attrs["coordinates"] = " ".join(coordinates) + translate_refs_serializable(store_dict) out.append( { "version": 1, - "refs": _encode_for_JSON(store), + "refs": _encode_for_JSON(store_dict), "templates": {"u": url}, } ) @@ -407,8 +417,9 @@ def grib_tree( filters = ["stepType", "typeOfLevel"] # TODO allow passing a LazyReferenceMapper as output? - zarr_store = {} - zroot = zarr.open_group(store=zarr_store) + zarr_store_dict = {} + zarr_store = dict_to_store(zarr_store_dict) + zroot = zarr.open_group(store=zarr_store, zarr_format=2) aggregations: Dict[str, List] = defaultdict(list) aggregation_dims: Dict[str, Set] = defaultdict(set) @@ -527,17 +538,18 @@ def grib_tree( for key, value in group["refs"].items(): if key not in [".zattrs", ".zgroup"]: - zarr_store[f"{path}/{key}"] = value + zarr_store._store_dict[f"{path}/{key}"] = value # Force all stored values to decode as string, not bytes. String should be correct. # ujson will reject bytes values by default. # Using 'reject_bytes=False' one write would fail an equality check on read. - zarr_store = { + zarr_dict = { key: (val.decode() if isinstance(val, bytes) else val) - for key, val in zarr_store.items() + for key, val in zarr_store._store_dict.items() } # TODO handle other kerchunk reference spec versions? - result = dict(refs=zarr_store, version=1) + translate_refs_serializable(zarr_dict) + result = dict(refs=zarr_dict, version=1) return result @@ -578,7 +590,8 @@ def correct_hrrr_subhf_step(group: Dict) -> Dict: group["refs"][".zattrs"] = ujson.dumps(attrs) fo = fsspec.filesystem("reference", fo=group, mode="r") - xd = xarray.open_dataset(fo.get_mapper(), engine="zarr", consolidated=False) + fstore = fs_as_store(fo, read_only=True) + xd = xarray.open_dataset(fstore, engine="zarr", consolidated=False) correct_step = xd.valid_time.values - xd.time.values diff --git a/kerchunk/hdf.py b/kerchunk/hdf.py index e3edd267..8af0a5f1 100644 --- a/kerchunk/hdf.py +++ b/kerchunk/hdf.py @@ -11,7 +11,12 @@ import numcodecs from .codecs import FillStringsCodec -from .utils import _encode_for_JSON +from .utils import ( + _encode_for_JSON, + encode_fill_value, + dict_to_store, + translate_refs_serializable, +) try: import h5py @@ -22,12 +27,6 @@ "for more details." ) -try: - from zarr.meta import encode_fill_value -except ModuleNotFoundError: - # https://github.com/zarr-developers/zarr-python/issues/2021 - from zarr.v2.meta import encode_fill_value - lggr = logging.getLogger("h5-to-zarr") _HIDDEN_ATTRS = { # from h5netcdf.attrs "REFERENCE_LIST", @@ -39,6 +38,7 @@ "_nc3_strict", "_NCProperties", } +fsspec.utils.setup_logging(lggr) class SingleHdf5ToZarr: @@ -112,9 +112,9 @@ def __init__( if vlen_encode not in ["embed", "null", "leave", "encode"]: raise NotImplementedError self.vlen = vlen_encode - self.store = out or {} - self._zroot = zarr.group(store=self.store, overwrite=True) - + self.store_dict = out or {} + self.store = dict_to_store(self.store_dict) + self._zroot = zarr.group(store=self.store, zarr_format=2) self._uri = url self.error = error lggr.debug(f"HDF5 file URI: {self._uri}") @@ -141,7 +141,6 @@ def translate(self, preserve_linked_dsets=False): """ lggr.debug("Translation begins") self._transfer_attrs(self._h5f, self._zroot) - self._h5f.visititems(self._translator) if preserve_linked_dsets: @@ -158,7 +157,8 @@ def translate(self, preserve_linked_dsets=False): self.store.flush() return self.store else: - store = _encode_for_JSON(self.store) + translate_refs_serializable(self.store_dict) + store = _encode_for_JSON(self.store_dict) return {"version": 1, "refs": store} def _unref(self, ref): @@ -180,6 +180,7 @@ def _transfer_attrs( An equivalent Zarr group or array to the HDF5 group or dataset with attributes. """ + upd = {} for n, v in h5obj.attrs.items(): if n in _HIDDEN_ATTRS: continue @@ -203,11 +204,19 @@ def _transfer_attrs( if v == "DIMENSION_SCALE": continue try: - zobj.attrs[n] = v + if isinstance(v, (str, int, float)): + upd[n] = v + elif isinstance(v, (tuple, set, list)) and all( + isinstance(_, (str, int, float)) for _ in v + ): + upd[n] = list(v) + else: + upd[n] = str(v) except TypeError: lggr.debug( f"TypeError transferring attr, skipping:\n {n}@{h5obj.name} = {v} ({type(v)})" ) + zobj.attrs.update(upd) def _decode_filters(self, h5obj: Union[h5py.Dataset, h5py.Group]): if h5obj.scaleoffset: @@ -279,7 +288,7 @@ def _translator( ): """Produce Zarr metadata for all groups and datasets in the HDF5 file.""" try: # method must not raise exception - kwargs = {} + kwargs = {"compressor": None} if isinstance(h5obj, (h5py.SoftLink, h5py.HardLink)): h5obj = self._h5f[name] @@ -296,9 +305,9 @@ def _translator( if h5obj.id.get_create_plist().get_layout() == h5py.h5d.COMPACT: # Only do if h5obj.nbytes < self.inline?? kwargs["data"] = h5obj[:] - filters = [] + kwargs["filters"] = [] else: - filters = self._decode_filters(h5obj) + kwargs["filters"] = self._decode_filters(h5obj) dt = None # Get storage info of this HDF5 dataset... cinfo = self._storage_info(h5obj) @@ -332,11 +341,11 @@ def _translator( for v in val ] kwargs["data"] = out - kwargs["object_codec"] = numcodecs.JSON() + kwargs["filters"] = [numcodecs.JSON()] fill = None elif self.vlen == "null": dt = "O" - kwargs["object_codec"] = FillStringsCodec(dtype="S16") + kwargs["filters"] = [FillStringsCodec(dtype="S16")] fill = " " elif self.vlen == "leave": dt = "S16" @@ -351,9 +360,9 @@ def _translator( index.decode(): label.decode() for index, label in zip(indexes, labels) } - kwargs["object_codec"] = FillStringsCodec( - dtype="S16", id_map=mapping - ) + kwargs["filters"] = [ + FillStringsCodec(dtype="S16", id_map=mapping) + ] fill = " " else: raise NotImplementedError @@ -391,9 +400,9 @@ def _translator( ) } ) - kwargs["object_codec"] = FillStringsCodec( - dtype=str(dt), id_map=mapping - ) + kwargs["filters"] = [ + FillStringsCodec(dtype=str(dt), id_map=mapping) + ] dt = [ ( v, @@ -417,7 +426,7 @@ def _translator( ) for v in h5obj.dtype.names ] - kwargs["object_codec"] = FillStringsCodec(dtype=str(dt)) + kwargs["filters"] = [FillStringsCodec(dtype=str(dt))] dt = [ ( v, @@ -458,7 +467,7 @@ def _translator( ) dt = "O" kwargs["data"] = data2 - kwargs["object_codec"] = numcodecs.JSON() + kwargs["filters"] = [numcodecs.JSON()] fill = None else: raise NotImplementedError @@ -466,30 +475,37 @@ def _translator( if h5py.h5ds.is_scale(h5obj.id) and not cinfo: return if h5obj.attrs.get("_FillValue") is not None: - fill = encode_fill_value( - h5obj.attrs.get("_FillValue"), dt or h5obj.dtype - ) + fill = h5obj.attrs.get("_FillValue") + fill = encode_fill_value(fill, dt or h5obj.dtype) + + adims = self._get_array_dims(h5obj) - # Create a Zarr array equivalent to this HDF5 dataset... - za = self._zroot.require_dataset( - h5obj.name, + # Create a Zarr array equivalent to this HDF5 dataset. + data = kwargs.pop("data", None) + za = self._zroot.require_array( + name=h5obj.name.lstrip("/"), shape=h5obj.shape, dtype=dt or h5obj.dtype, - chunks=h5obj.chunks or False, + chunks=h5obj.chunks or h5obj.shape, fill_value=fill, - compression=None, - filters=filters, + attributes={ + "_ARRAY_DIMENSIONS": adims, + }, overwrite=True, **kwargs, ) lggr.debug(f"Created Zarr array: {za}") self._transfer_attrs(h5obj, za) - adims = self._get_array_dims(h5obj) - za.attrs["_ARRAY_DIMENSIONS"] = adims - lggr.debug(f"_ARRAY_DIMENSIONS = {adims}") - if "data" in kwargs: - return # embedded bytes, no chunks to copy + lggr.debug(f"_ARRAY_DIMENSIONS = {adims}") + if data is not None: + try: + za[:] = data + except (ValueError, TypeError): + self.store_dict[f"{za.path}/0"] = kwargs["filters"][0].encode( + data + ) + return # Store chunk location metadata... if cinfo: @@ -497,6 +513,12 @@ def _translator( if h5obj.fletcher32: logging.info("Discarding fletcher32 checksum") v["size"] -= 4 + key = ( + str.removeprefix(h5obj.name, "/") + + "/" + + ".".join(map(str, k)) + ) + if ( self.inline and isinstance(v, dict) @@ -509,9 +531,10 @@ def _translator( data.decode("ascii") except UnicodeDecodeError: data = b"base64:" + base64.b64encode(data) - self.store[za._chunk_key(k)] = data + + self.store_dict[key] = data else: - self.store[za._chunk_key(k)] = [ + self.store_dict[key] = [ self._uri, v["offset"], v["size"], @@ -519,14 +542,14 @@ def _translator( elif isinstance(h5obj, h5py.Group): lggr.debug(f"HDF5 group: {h5obj.name}") - zgrp = self._zroot.require_group(h5obj.name) + zgrp = self._zroot.require_group(h5obj.name.lstrip("/")) self._transfer_attrs(h5obj, zgrp) except Exception as e: import traceback msg = "\n".join( [ - "The following excepion was caught and quashed while traversing HDF5", + "The following exception was caught and quashed while traversing HDF5", str(e), traceback.format_exc(limit=5), ] diff --git a/kerchunk/hdf4.py b/kerchunk/hdf4.py index 483ffba7..729b7e9e 100644 --- a/kerchunk/hdf4.py +++ b/kerchunk/hdf4.py @@ -2,6 +2,8 @@ import numpy as np import ujson +from kerchunk.utils import refs_as_store + decoders = {} @@ -138,32 +140,28 @@ def translate(self, filename=None, storage_options=None): output = self._descend_vg(*sorted(roots, key=lambda t: t[1])[-1]) prot = fo.fs.protocol prot = prot[0] if isinstance(prot, tuple) else prot - fs = fsspec.filesystem( - "reference", - fo=self.out, - remote_protocol=prot, - remote_options=self.st, - ) - g = zarr.open_group("reference://", storage_options=dict(fs=fs)) + store = refs_as_store(self.out, remote_protocol=prot, remote_options=self.st) + g = zarr.open_group(store, zarr_format=2, use_consolidated=False) refs = {} for k, v in output.items(): if isinstance(v, dict): - compression = ZlibCodec() if "refs" in v else None - arr = g.create_dataset( + compressor = ZlibCodec() if "refs" in v else None + arr = g.require_array( name=k, shape=v["dims"], dtype=v["dtype"], chunks=v.get("chunks", v["dims"]), - compressor=compression, - overwrite=True, + compressor=compressor, ) arr.attrs.update( dict( - _ARRAY_DIMENSIONS=[f"{k}_x", f"{k}_y"][: len(v["dims"])] - if "refs" in v - else ["0"], + _ARRAY_DIMENSIONS=( + [f"{k}_x", f"{k}_y"][: len(v["dims"])] + if "refs" in v + else ["0"] + ), **{ - i: j + i: j.tolist() if isinstance(j, np.generic) else j for i, j in v.items() if i not in {"chunk", "dims", "dtype", "refs"} }, @@ -177,14 +175,14 @@ def translate(self, filename=None, storage_options=None): if not k.startswith( ("CoreMetadata.", "ArchiveMetadata.", "StructMetadata.") ): - attrs[k] = v - fs.references.update(refs) + attrs[k] = v.tolist() if isinstance(v, np.generic) else v + store.fs.references.update(refs) g.attrs.update(attrs) if filename is None: - return fs.references + return store.fs.references with fsspec.open(filename, **(storage_options or {})) as f: - ujson.dumps(dict(fs.references), f) + ujson.dumps(dict(store.fs.references), f) def _descend_vg(self, tag, ref): info = self.tags[(tag, ref)] diff --git a/kerchunk/netCDF3.py b/kerchunk/netCDF3.py index d43b6b97..1fad9b65 100644 --- a/kerchunk/netCDF3.py +++ b/kerchunk/netCDF3.py @@ -5,7 +5,13 @@ from fsspec.implementations.reference import LazyReferenceMapper import fsspec -from kerchunk.utils import _encode_for_JSON, inline_array +import kerchunk.utils +from kerchunk.utils import ( + _encode_for_JSON, + dict_to_store, + inline_array, + translate_refs_serializable, +) try: from scipy.io._netcdf import ZERO, NC_VARIABLE, netcdf_file, netcdf_variable @@ -167,7 +173,9 @@ def translate(self): import zarr out = self.out - z = zarr.open(out, mode="w") + store = dict_to_store(out) + z = zarr.open_group(store, mode="w", zarr_format=2) + for dim, var in self.variables.items(): if dim in self.chunks: shape = self.chunks[dim][-1] @@ -191,13 +199,13 @@ def translate(self): fill = float(fill) if fill is not None and var.data.dtype.kind == "i": fill = int(fill) - arr = z.create_dataset( + arr = z.create_array( name=dim, shape=shape, dtype=var.data.dtype, fill_value=fill, chunks=shape, - compression=None, + compressor=None, ) part = ".".join(["0"] * len(shape)) or "0" k = f"{dim}/{part}" @@ -245,13 +253,14 @@ def translate(self): fill = float(fill) if fill is not None and base.kind == "i": fill = int(fill) - arr = z.create_dataset( + arr = z.create_array( name=name, shape=shape, dtype=base, fill_value=fill, chunks=(1,) + dtype.shape, - compression=None, + compressor=None, + overwrite=True, ) arr.attrs.update( { @@ -284,17 +293,18 @@ def translate(self): if k != "filename" # special "attribute" } ) + out = kerchunk.utils.translate_refs_serializable(out) if self.threshold: out = inline_array( out, self.threshold, remote_options=dict(remote_options=self.storage_options), ) - if isinstance(out, LazyReferenceMapper): out.flush() return out else: + translate_refs_serializable(out) out = _encode_for_JSON(out) return {"version": 1, "refs": out} diff --git a/kerchunk/utils.py b/kerchunk/utils.py index 838c3cb1..77830565 100644 --- a/kerchunk/utils.py +++ b/kerchunk/utils.py @@ -1,12 +1,92 @@ import base64 import copy import itertools +import fsspec.asyn +from packaging.version import Version +from typing import Any, cast import warnings import ujson -import fsspec -import zarr +import fsspec.implementations.asyn_wrapper +import numpy as np +import zarr.storage + + +def refs_as_fs( + refs, + fs=None, + remote_protocol=None, + remote_options=None, + asynchronous=True, + **kwargs, +): + """Convert a reference set to an fsspec filesystem""" + fs = fsspec.filesystem( + "reference", + fo=refs, + fs=fs, + remote_protocol=remote_protocol, + remote_options=remote_options, + **kwargs, + asynchronous=asynchronous, + ) + return fs + + +def refs_as_store( + refs, read_only=False, fs=None, remote_protocol=None, remote_options=None +): + """Convert a reference set to a zarr store""" + remote_options = remote_options or {} + remote_options["asynchronous"] = True + + fss = refs_as_fs( + refs, + fs=fs, + remote_protocol=remote_protocol, + remote_options=remote_options, + ) + return fs_as_store(fss, read_only=read_only) + + +def is_zarr3(): + """Check if the installed zarr version is version 3""" + return Version(zarr.__version__) >= Version("3.0.0.b2") + + +def dict_to_store(store_dict: dict): + """Create an in memory zarr store backed by the given dictionary""" + if is_zarr3(): + return zarr.storage.MemoryStore(read_only=False, store_dict=store_dict) + else: + return zarr.storage.KVStore(store_dict) + + +def fs_as_store(fs: fsspec.asyn.AsyncFileSystem, read_only=False): + """Open the refs as a zarr store + + Parameters + ---------- + fs: fsspec.async.AsyncFileSystem + read_only: bool + + Returns + ------- + zarr.storage.Store or zarr.storage.Mapper, fsspec.AbstractFileSystem + """ + if not fs.async_impl: + try: + from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper + + fs = AsyncFileSystemWrapper(fs) + except ImportError: + raise ImportError( + "Only fsspec>2024.10.0 supports the async filesystem wrapper " + "required for working with reference filesystems. " + ) + fs.asynchronous = True + return zarr.storage.FsspecStore(fs, read_only=read_only) def class_factory(func): @@ -42,6 +122,8 @@ def consolidate(refs): """Turn raw references into output""" out = {} for k, v in refs.items(): + if hasattr(v, "to_bytes"): + v = v.to_bytes() if isinstance(v, bytes): try: # easiest way to test if data is ascii @@ -72,7 +154,7 @@ def rename_target(refs, renames): ------- dict: the altered reference set, which can be saved """ - fs = fsspec.filesystem("reference", fo=refs) # to produce normalised refs + fs = refs_as_fs(refs) # to produce normalised refs refs = fs.references out = {} for k, v in refs.items(): @@ -134,17 +216,58 @@ def _encode_for_JSON(store): return store +def encode_fill_value(v: Any, dtype: np.dtype, compressor: Any = None) -> Any: + # early out + if v is None: + return v + if dtype.kind == "V" and dtype.hasobject: + if compressor is None: + raise ValueError("missing compressor for object array") + v = compressor.encode(v) + v = str(base64.standard_b64encode(v), "ascii") + return v + if dtype.kind == "f": + if np.isnan(v): + return "NaN" + elif np.isposinf(v): + return "Infinity" + elif np.isneginf(v): + return "-Infinity" + else: + return float(v) + elif dtype.kind in "ui": + return int(v) + elif dtype.kind == "b": + return bool(v) + elif dtype.kind in "c": + c = cast(np.complex128, np.dtype(complex).type()) + v = ( + encode_fill_value(v.real, c.real.dtype, compressor), + encode_fill_value(v.imag, c.imag.dtype, compressor), + ) + return v + elif dtype.kind in "SV": + v = str(base64.standard_b64encode(v), "ascii") + return v + elif dtype.kind == "U": + return v + elif dtype.kind in "mM": + return int(v.view("i8")) + else: + return v + + def do_inline(store, threshold, remote_options=None, remote_protocol=None): """Replace short chunks with the value of that chunk and inline metadata The chunk may need encoding with base64 if not ascii, so actual length may be larger than threshold. """ - fs = fsspec.filesystem( - "reference", - fo=store, - remote_options=remote_options, + fs = refs_as_fs( + store, remote_protocol=remote_protocol, + remote_options=remote_options, + asynchronous=False, ) out = fs.references.copy() @@ -174,7 +297,7 @@ def do_inline(store, threshold, remote_options=None, remote_protocol=None): def _inline_array(group, threshold, names, prefix=""): - for name, thing in group.items(): + for name, thing in group.members(): if prefix: prefix1 = f"{prefix}.{name}" else: @@ -186,16 +309,15 @@ def _inline_array(group, threshold, names, prefix=""): cond2 = prefix1 in names if cond1 or cond2: original_attrs = dict(thing.attrs) - arr = group.create_dataset( + arr = group.create_array( name=name, dtype=thing.dtype, shape=thing.shape, - data=thing[:], chunks=thing.shape, - compression=None, - overwrite=True, fill_value=thing.fill_value, + overwrite=True, ) + arr[:] = thing[:] arr.attrs.update(original_attrs) @@ -223,10 +345,9 @@ def inline_array(store, threshold=1000, names=None, remote_options=None): ------- amended references set (simple style) """ - fs = fsspec.filesystem( - "reference", fo=store, **(remote_options or {}), skip_instance_cache=True - ) - g = zarr.open_group(fs.get_mapper(), mode="r+") + fs = refs_as_fs(store, remote_options=remote_options or {}) + zarr_store = fs_as_store(fs, read_only=False) + g = zarr.open_group(zarr_store, zarr_format=2) _inline_array(g, threshold, names=names or []) return fs.references @@ -299,7 +420,7 @@ def subchunk(store, variable, factor): else: (url,) = v offset = 0 - size = fs.size(k) + size = fs.info(k)["size"] for subpart in range(factor): new_index = ( chunk_index[:ind] @@ -440,3 +561,34 @@ def templateize(strings, min_length=10, template_name="u"): else: template = {} return template, strings + + +def translate_refs_serializable(refs: dict): + """Translate a reference set to a serializable form, given that zarr + v3 memory stores store data in buffers by default. This modifies the + input dictionary in place, and returns a reference to it. + + It also fixes keys that have a leading slash, which is not appropriate for + zarr v3 keys + + Parameters + ---------- + refs: dict + The reference set + + Returns + ------- + dict + A serializable form of the reference set + """ + keys_to_remove = [] + new_keys = {} + for k, v in refs.items(): + if isinstance(v, zarr.core.buffer.cpu.Buffer): + key = k.removeprefix("/") + new_keys[key] = v.to_bytes() + keys_to_remove.append(k) + for k in keys_to_remove: + del refs[k] + refs.update(new_keys) + return refs diff --git a/kerchunk/xarray_backend.py b/kerchunk/xarray_backend.py index ca377f6d..fc9197c8 100644 --- a/kerchunk/xarray_backend.py +++ b/kerchunk/xarray_backend.py @@ -1,7 +1,8 @@ from xarray.backends import BackendEntrypoint import xarray as xr import os -import fsspec + +from kerchunk.utils import refs_as_store class KerchunkBackend(BackendEntrypoint): @@ -41,6 +42,8 @@ def open_reference_dataset( if open_dataset_options is None: open_dataset_options = {} - m = fsspec.get_mapper("reference://", fo=filename_or_obj, **storage_options) + store = refs_as_store(filename_or_obj, **storage_options) - return xr.open_dataset(m, engine="zarr", consolidated=False, **open_dataset_options) + return xr.open_zarr( + store, zarr_format=2, consolidated=False, **open_dataset_options + ) diff --git a/kerchunk/zarr.py b/kerchunk/zarr.py index ea0612de..320a2c0e 100644 --- a/kerchunk/zarr.py +++ b/kerchunk/zarr.py @@ -2,6 +2,7 @@ from fsspec.implementations.reference import LazyReferenceMapper import kerchunk.utils +import ujson def single_zarr( @@ -35,11 +36,22 @@ def single_zarr( """ if isinstance(uri_or_store, str): mapper = fsspec.get_mapper(uri_or_store, **(storage_options or {})) + prot = mapper.fs.protocol + protocol = prot[0] if isinstance(prot, tuple) else prot else: mapper = uri_or_store if isinstance(mapper, fsspec.FSMap) and storage_options is None: storage_options = mapper.fs.storage_options + prot = mapper.fs.protocol + protocol = prot[0] if isinstance(prot, tuple) else prot + else: + protocol = None + try: + check = ujson.loads(mapper[".zgroup"]) + assert check["zarr_format"] == 2 + except (KeyError, ValueError, TypeError) as e: + raise ValueError("Failed to load dataset as V2 zarr") from e refs = out or {} for k in mapper: if k.startswith("."): @@ -48,9 +60,13 @@ def single_zarr( refs[k] = [fsspec.utils._unstrip_protocol(mapper._key_to_str(k), mapper.fs)] from kerchunk.utils import do_inline - inline_threshold = inline or inline_threshold - if inline_threshold: - refs = do_inline(refs, inline_threshold, remote_options=storage_options) + inline_threshold = inline if inline is not None else inline_threshold + refs = do_inline( + refs, + inline_threshold, + remote_options=storage_options, + remote_protocol=protocol, + ) if isinstance(refs, LazyReferenceMapper): refs.flush() refs = kerchunk.utils.consolidate(refs) diff --git a/pyproject.toml b/pyproject.toml index 38720c2e..767c3dc2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,7 @@ name = "kerchunk" description = "Functions to make reference descriptions for ReferenceFileSystem" readme = "README.md" -requires-python = ">=3.7" +requires-python = ">=3.11" dynamic = ["version"] license = {text = "MIT"} authors = [ @@ -20,11 +20,11 @@ classifiers = [ dependencies = [ - "fsspec<=2024.12.0", + "fsspec", "numcodecs", "numpy", "ujson", - "zarr<3", + "zarr>3", ] [project.optional-dependencies] diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 00000000..e83bb177 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +asyncio_default_fixture_loop_scope=session diff --git a/tests/MOD14.hdf4 b/tests/MOD14.hdf4 new file mode 100644 index 00000000..d831200c Binary files /dev/null and b/tests/MOD14.hdf4 differ diff --git a/tests/test__grib_idx.py b/tests/test__grib_idx.py index 1e83d2f9..49736418 100644 --- a/tests/test__grib_idx.py +++ b/tests/test__grib_idx.py @@ -25,6 +25,7 @@ import os import numpy as np import pandas as pd +import pytest import xarray as xr from kerchunk.grib2 import ( scan_grib, @@ -40,6 +41,7 @@ read_store, write_store, ) +from kerchunk.utils import refs_as_store import fsspec import zarr import ujson @@ -47,6 +49,9 @@ import typing import io +# https://github.com/pydata/xarray/issues/9984 +# until datatree/xarray supports zarr3 +pytest.skip(allow_module_level=True) logger = logging.getLogger(__name__) THIS_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -62,8 +67,7 @@ def test_integration(self): correct_hrrr_subhf_step(msg) for msg in scanned_msg_groups ] grib_tree_store = grib_tree(corrected_msg_groups) - fs = fsspec.filesystem("reference", fo=grib_tree_store) - zg = zarr.open_group(fs.get_mapper("")) + zg = zarr.open_group("reference://", storage_options={"fo": grib_tree_store}) self.assertIsInstance(zg["refc/instant/atmosphere/refc"], zarr.Array) self.assertIsInstance(zg["vbdsf/avg/surface/vbdsf"], zarr.Array) self.assertEqual( @@ -75,25 +79,26 @@ def test_integration(self): "atmosphere latitude longitude step time valid_time", ) # Assert that the fill value is set correctly - self.assertIs(zg.refc.instant.atmosphere.step.fill_value, np.nan) + self.assertIs(zg["refc"]["instant"]["atmosphere"]["step"].fill_value, np.nan) np.testing.assert_array_equal( - zg.refc.instant.atmosphere.time[:], np.array([1665709200]) + zg["refc"]["instant"]["atmosphere"]["time"][:], np.array([1665709200]) ) # Read it with data tree and assert the same... dt = xr.open_datatree( - fs.get_mapper(""), + "reference://", + storage_options={"fo": grib_tree_store}, engine="zarr", consolidated=False, ) # Assert a few things... but if it loads we are mostly done. np.testing.assert_array_equal( - dt.refc.instant.atmosphere.time.values[:], + dt["refc"]["instant"]["atmosphere"]["time"].values[:], np.array([np.datetime64("2022-10-14T01:00:00")]), ) self.assertDictEqual( - dt.refc.attrs, dict(name="Maximum/Composite radar reflectivity") + dt["refc"].attrs, dict(name="Maximum/Composite radar reflectivity") ) # Now try the extract and reinflate methods @@ -133,9 +138,9 @@ def test_integration(self): # Back to the same number of keys! self.assertEqual(len(zstore["refs"]), 55) - fs = fsspec.filesystem("reference", fo=zstore) dt = xr.open_datatree( - fs.get_mapper(""), + "reference://", + storage_options={"fo": zstore}, engine="zarr", consolidated=False, ) @@ -351,9 +356,8 @@ def test_kerchunk_indexing(self): grib_tree_store = grib_tree(scan_grib(basename)) dt = xr.open_datatree( - fsspec.filesystem("reference", fo=grib_tree_store).get_mapper( - "" - ), + "reference://", + storage_options={"fo": grib_tree_store}, engine="zarr", consolidated=False, ) @@ -403,9 +407,9 @@ def _read_sample_prefix(self, sample_prefix: str) -> tuple[xr.DataTree, dict]: scanned_msgs = [correct_hrrr_subhf_step(msg) for msg in scanned_msgs] grib_tree_store = grib_tree(scanned_msgs) - fs = fsspec.filesystem("reference", fo=grib_tree_store) dt = xr.open_datatree( - fs.get_mapper(""), + "reference://", + storage_options={"fo": grib_tree_store}, engine="zarr", consolidated=False, ) @@ -574,12 +578,7 @@ def _reinflate_grib_store( os.path.join(THIS_DIR, "grib_idx_fixtures", dataset) ), ) - fs = fsspec.filesystem("reference", fo=zstore) - dt = xr.open_datatree( - fs.get_mapper(""), - engine="zarr", - consolidated=False, - ) + dt = xr.open_datatree(refs_as_store(zstore), engine="zarr") for node in dt.subtree: if not node.has_data: continue diff --git a/tests/test_combine.py b/tests/test_combine.py index ffd62051..75821395 100644 --- a/tests/test_combine.py +++ b/tests/test_combine.py @@ -4,215 +4,236 @@ import dask.array as da import pytest import xarray as xr -import zarr +import zarr.storage import kerchunk.combine from kerchunk.zarr import single_zarr from kerchunk.combine import MultiZarrToZarr +from kerchunk.utils import fs_as_store, refs_as_store, consolidate + fs = fsspec.filesystem("memory") arr = np.random.rand(1, 10, 10) -static = xr.DataArray(data=np.random.rand(10, 10), dims=["x", "y"], name="static") -data = xr.DataArray( - data=arr.squeeze(), - dims=["x", "y"], - name="data", -) -xr.Dataset({"data": data}, attrs={"attr0": 3}).to_zarr("memory://simple1.zarr") - -data = xr.DataArray( - data=arr.squeeze() + 1, - dims=["x", "y"], - name="data", -) -xr.Dataset({"data": data}, attrs={"attr0": 4}).to_zarr("memory://simple2.zarr") - -data = xr.DataArray( - data=arr.squeeze(), - dims=["x", "y"], - name="datum", -) -xr.Dataset({"datum": data}, attrs={"attr0": 3}).to_zarr("memory://simple_var1.zarr") -data = xr.DataArray( - data=arr.squeeze() + 1, - dims=["x", "y"], - name="datum", -) -xr.Dataset({"datum": data}, attrs={"attr0": 4}).to_zarr("memory://simple_var2.zarr") - -data = xr.DataArray( - data=arr, - coords={"time": np.array([1])}, - dims=["time", "x", "y"], - name="data", - attrs={"attr0": 3}, -) -xr.Dataset({"data": data, "static": static}, attrs={"attr1": 5}).to_zarr( - "memory://single1.zarr" -) +@pytest.fixture(scope="module", autouse=True) +def datasets(): + # if something fails here, it won't crash the test run + static = xr.DataArray(data=np.random.rand(10, 10), dims=["x", "y"], name="static") + data = xr.DataArray( + data=arr.squeeze(), + dims=["x", "y"], + name="data", + ) + xr.Dataset({"data": data}, attrs={"attr0": 3}).to_zarr( + "memory://simple1.zarr", zarr_format=2 + ) -data = xr.DataArray( - data=arr, - coords={"time": np.array([2])}, - dims=["time", "x", "y"], - name="data", - attrs={"attr0": 4}, -) -xr.Dataset({"data": data, "static": static}, attrs={"attr1": 6}).to_zarr( - "memory://single2.zarr" -) + data = xr.DataArray( + data=arr.squeeze() + 1, + dims=["x", "y"], + name="data", + ) + xr.Dataset({"data": data}, attrs={"attr0": 4}).to_zarr( + "memory://simple2.zarr", zarr_format=2 + ) -data = xr.DataArray( - data=arr, - coords={"time": np.array([3])}, - dims=["time", "x", "y"], - name="data", - attrs={"attr0": 4}, -) -xr.Dataset({"data": data, "static": static}, attrs={"attr1": 6}).to_zarr( - "memory://single3.zarr" -) + data = xr.DataArray( + data=arr.squeeze(), + dims=["x", "y"], + name="datum", + ) + xr.Dataset({"datum": data}, attrs={"attr0": 3}).to_zarr( + "memory://simple_var1.zarr", zarr_format=2 + ) -data = xr.DataArray( - data=np.vstack([arr] * 4), - coords={"time": np.array([1, 2, 3, 4])}, - dims=["time", "x", "y"], - name="data", - attrs={"attr0": 0}, -) -xr.Dataset({"data": data}).to_zarr("memory://quad_nochunk1.zarr") -xr.Dataset({"data": data}).to_zarr("memory://group1.zarr", group="group") - -data = xr.DataArray( - data=np.vstack([arr] * 4), - coords={"time": np.array([5, 6, 7, 8])}, - dims=["time", "x", "y"], - name="data", - attrs={"attr0": 0}, -) -xr.Dataset({"data": data}).to_zarr("memory://quad_nochunk2.zarr") -xr.Dataset({"data": data}).to_zarr("memory://group2.zarr", group="group") - -data = xr.DataArray( - data=da.from_array(np.vstack([arr] * 4), chunks=(1, 10, 10)), - coords={"time": np.array([1, 2, 3, 4])}, - dims=["time", "x", "y"], - name="data", - attrs={"attr0": 0}, -) -xr.Dataset({"data": data}).to_zarr("memory://quad_1chunk1.zarr") - -data = xr.DataArray( - data=da.from_array(np.vstack([arr] * 4), chunks=(1, 10, 10)), - coords={"time": np.array([5, 6, 7, 8])}, - dims=["time", "x", "y"], - name="data", - attrs={"attr0": 0}, -) -xr.Dataset({"data": data}).to_zarr("memory://quad_1chunk2.zarr") - -data = xr.DataArray( - data=da.from_array(np.vstack([arr] * 4), chunks=(2, 10, 10)), - coords={"time": np.array([1, 2, 3, 4])}, - dims=["time", "x", "y"], - name="data", - attrs={"attr0": 0}, -) -xr.Dataset({"data": data}).to_zarr("memory://quad_2chunk1.zarr") - -data = xr.DataArray( - data=da.from_array(np.vstack([arr] * 4), chunks=(2, 10, 10)), - coords={"time": np.array([5, 6, 7, 8])}, - dims=["time", "x", "y"], - name="data", - attrs={"attr0": 0}, -) -xr.Dataset({"data": data}).to_zarr("memory://quad_2chunk2.zarr") - -# simple time arrays - xarray can't make these! -m = fs.get_mapper("time1.zarr") -z = zarr.open(m, mode="w") -ar = z.create_dataset("time", data=np.array([1], dtype="M8[s]")) -ar.attrs.update({"_ARRAY_DIMENSIONS": ["time"]}) -ar = z.create_dataset("data", data=arr) -ar.attrs.update({"_ARRAY_DIMENSIONS": ["time", "x", "y"]}) - -m = fs.get_mapper("time2.zarr") -z = zarr.open(m, mode="w") -ar = z.create_dataset("time", data=np.array([2], dtype="M8[s]")) -ar.attrs.update({"_ARRAY_DIMENSIONS": ["time"]}) -ar = z.create_dataset("data", data=arr) -ar.attrs.update({"_ARRAY_DIMENSIONS": ["time", "x", "y"]}) - - -# cftime arrays - standard -tdata1 = xr.DataArray( - data=arr, - coords={"time": np.array([1])}, - dims=["time", "x", "y"], - name="data", -) -xr.Dataset({"data": tdata1}).to_zarr("memory://cfstdtime1.zarr") -fs.pipe( - "cfstdtime1.zarr/time/.zattrs", - b'{"_ARRAY_DIMENSIONS": ["time"], "units": "seconds since ' - b'1970-01-01T00:00:00"}', -) + data = xr.DataArray( + data=arr.squeeze() + 1, + dims=["x", "y"], + name="datum", + ) + xr.Dataset({"datum": data}, attrs={"attr0": 4}).to_zarr( + "memory://simple_var2.zarr", zarr_format=2 + ) -tdata1 = xr.DataArray( - data=arr, - coords={"time": np.array([2])}, - dims=["time", "x", "y"], - name="data", -) -xr.Dataset({"data": tdata1}).to_zarr("memory://cfstdtime2.zarr") -fs.pipe( - "cfstdtime2.zarr/time/.zattrs", - b'{"_ARRAY_DIMENSIONS": ["time"], "units": "seconds since ' - b'1970-01-01T00:00:00"}', -) + data = xr.DataArray( + data=arr, + coords={"time": np.array([1])}, + dims=["time", "x", "y"], + name="data", + attrs={"attr0": 3}, + ) + xr.Dataset({"data": data, "static": static}, attrs={"attr1": 5}).to_zarr( + "memory://single1.zarr", zarr_format=2 + ) -tdata1 = xr.DataArray( - data=arr, - coords={"time": np.array([3])}, - dims=["time", "x", "y"], - name="data", -) -xr.Dataset({"data": tdata1}).to_zarr("memory://cfstdtime3.zarr") -fs.pipe( - "cfstdtime3.zarr/time/.zattrs", - b'{"_ARRAY_DIMENSIONS": ["time"], "units": "seconds since ' - b'1970-01-01T00:00:00"}', -) + data = xr.DataArray( + data=arr, + coords={"time": np.array([2])}, + dims=["time", "x", "y"], + name="data", + attrs={"attr0": 4}, + ) + xr.Dataset({"data": data, "static": static}, attrs={"attr1": 6}).to_zarr( + "memory://single2.zarr", zarr_format=2 + ) + + data = xr.DataArray( + data=arr, + coords={"time": np.array([3])}, + dims=["time", "x", "y"], + name="data", + attrs={"attr0": 4}, + ) + xr.Dataset({"data": data, "static": static}, attrs={"attr1": 6}).to_zarr( + "memory://single3.zarr", zarr_format=2 + ) -# cftime arrays - non standard -tdata1 = xr.DataArray( - data=arr, - coords={"time": np.array([1])}, - dims=["time", "x", "y"], - name="data", - attrs={"units": "months since 1970-01-01", "calendar": "360_day"}, -) -xr.Dataset({"data": tdata1}).to_zarr("memory://cfnontime1.zarr") -fs.pipe( - "cfnontime1.zarr/time/.zattrs", - b'{"_ARRAY_DIMENSIONS": ["time"], "units": "months since 1970-01-01", "calendar": "360_day"}', -) + data = xr.DataArray( + data=np.vstack([arr] * 4), + coords={"time": np.array([1, 2, 3, 4])}, + dims=["time", "x", "y"], + name="data", + attrs={"attr0": 0}, + ) + xr.Dataset({"data": data}).to_zarr("memory://quad_nochunk1.zarr", zarr_format=2) + xr.Dataset({"data": data}).to_zarr( + "memory://group1.zarr", group="group", zarr_format=2 + ) -tdata1 = xr.DataArray( - data=arr, - coords={"time": np.array([2])}, - dims=["time", "x", "y"], - name="data", - attrs={"units": "months since 1970-01-01", "calendar": "360_day"}, -) -xr.Dataset({"data": tdata1}).to_zarr("memory://cfnontime2.zarr") -fs.pipe( - "cfnontime2.zarr/time/.zattrs", - b'{"_ARRAY_DIMENSIONS": ["time"], "units": "months since 1970-01-01", "calendar": "360_day"}', -) + data = xr.DataArray( + data=np.vstack([arr] * 4), + coords={"time": np.array([5, 6, 7, 8])}, + dims=["time", "x", "y"], + name="data", + attrs={"attr0": 0}, + ) + xr.Dataset({"data": data}).to_zarr("memory://quad_nochunk2.zarr", zarr_format=2) + xr.Dataset({"data": data}).to_zarr( + "memory://group2.zarr", group="group", zarr_format=2 + ) + + data = xr.DataArray( + data=da.from_array(np.vstack([arr] * 4), chunks=(1, 10, 10)), + coords={"time": np.array([1, 2, 3, 4])}, + dims=["time", "x", "y"], + name="data", + attrs={"attr0": 0}, + ) + xr.Dataset({"data": data}).to_zarr("memory://quad_1chunk1.zarr", zarr_format=2) + + data = xr.DataArray( + data=da.from_array(np.vstack([arr] * 4), chunks=(1, 10, 10)), + coords={"time": np.array([5, 6, 7, 8])}, + dims=["time", "x", "y"], + name="data", + attrs={"attr0": 0}, + ) + xr.Dataset({"data": data}).to_zarr("memory://quad_1chunk2.zarr", zarr_format=2) + + data = xr.DataArray( + data=da.from_array(np.vstack([arr] * 4), chunks=(2, 10, 10)), + coords={"time": np.array([1, 2, 3, 4])}, + dims=["time", "x", "y"], + name="data", + attrs={"attr0": 0}, + ) + xr.Dataset({"data": data}).to_zarr("memory://quad_2chunk1.zarr", zarr_format=2) + + data = xr.DataArray( + data=da.from_array(np.vstack([arr] * 4), chunks=(2, 10, 10)), + coords={"time": np.array([5, 6, 7, 8])}, + dims=["time", "x", "y"], + name="data", + attrs={"attr0": 0}, + ) + xr.Dataset({"data": data}).to_zarr("memory://quad_2chunk2.zarr", zarr_format=2) + + # simple time arrays - xarray can't make these! + z = zarr.open("memory://time1.zarr", mode="w", zarr_format=2) + time1_array = np.array([1], dtype="M8[s]") + ar = z.create_array("time", shape=time1_array.shape, dtype=time1_array.dtype) + ar[:] = time1_array + ar.attrs.update({"_ARRAY_DIMENSIONS": ["time"]}) + ar = z.create_array("data", dtype=arr.dtype, shape=arr.shape) + ar[:] = arr + ar.attrs.update({"_ARRAY_DIMENSIONS": ["time", "x", "y"]}) + + z = zarr.open("memory://time2.zarr", mode="w", zarr_format=2) + time2_array = np.array([2], dtype="M8[s]") + ar = z.create_array("time", dtype=time2_array.dtype, shape=time2_array.shape) + ar[:] = time2_array + ar.attrs.update({"_ARRAY_DIMENSIONS": ["time"]}) + ar = z.create_array("data", dtype=arr.dtype, shape=arr.shape) + ar[:] = arr + ar.attrs.update({"_ARRAY_DIMENSIONS": ["time", "x", "y"]}) + + # cftime arrays - standard + tdata1 = xr.DataArray( + data=arr, + coords={"time": np.array([1])}, + dims=["time", "x", "y"], + name="data", + ) + xr.Dataset({"data": tdata1}).to_zarr("memory://cfstdtime1.zarr", zarr_format=2) + fs.pipe( + "cfstdtime1.zarr/time/.zattrs", + b'{"_ARRAY_DIMENSIONS": ["time"], "units": "seconds since ' + b'1970-01-01T00:00:00"}', + ) + + tdata1 = xr.DataArray( + data=arr, + coords={"time": np.array([2])}, + dims=["time", "x", "y"], + name="data", + ) + xr.Dataset({"data": tdata1}).to_zarr("memory://cfstdtime2.zarr", zarr_format=2) + fs.pipe( + "cfstdtime2.zarr/time/.zattrs", + b'{"_ARRAY_DIMENSIONS": ["time"], "units": "seconds since ' + b'1970-01-01T00:00:00"}', + ) + + tdata1 = xr.DataArray( + data=arr, + coords={"time": np.array([3])}, + dims=["time", "x", "y"], + name="data", + ) + xr.Dataset({"data": tdata1}).to_zarr("memory://cfstdtime3.zarr", zarr_format=2) + fs.pipe( + "cfstdtime3.zarr/time/.zattrs", + b'{"_ARRAY_DIMENSIONS": ["time"], "units": "seconds since ' + b'1970-01-01T00:00:00"}', + ) + + # cftime arrays - non standard + tdata1 = xr.DataArray( + data=arr, + coords={"time": np.array([1])}, + dims=["time", "x", "y"], + name="data", + attrs={"units": "months since 1970-01-01", "calendar": "360_day"}, + ) + xr.Dataset({"data": tdata1}).to_zarr("memory://cfnontime1.zarr", zarr_format=2) + fs.pipe( + "cfnontime1.zarr/time/.zattrs", + b'{"_ARRAY_DIMENSIONS": ["time"], "units": "months since 1970-01-01", "calendar": "360_day"}', + ) + + tdata1 = xr.DataArray( + data=arr, + coords={"time": np.array([2])}, + dims=["time", "x", "y"], + name="data", + attrs={"units": "months since 1970-01-01", "calendar": "360_day"}, + ) + xr.Dataset({"data": tdata1}).to_zarr("memory://cfnontime2.zarr", zarr_format=2) + fs.pipe( + "cfnontime2.zarr/time/.zattrs", + b'{"_ARRAY_DIMENSIONS": ["time"], "units": "months since 1970-01-01", "calendar": "360_day"}', + ) @pytest.fixture(scope="module") @@ -226,8 +247,9 @@ def refs(): def test_fixture(refs): # effectively checks that single_zarr works assert "single1" in refs - m = fsspec.get_mapper("reference://", fo=refs["single1"], remote_protocol="memory") - g = xr.open_dataset(m, engine="zarr", backend_kwargs={"consolidated": False}) + fs = fsspec.filesystem("reference", fo=refs["single1"], remote_protocol="memory") + store = zarr.storage.FsspecStore(fs) + g = xr.open_dataset(store, engine="zarr", backend_kwargs={"consolidated": False}) assert g.time.values.tolist() == [1] assert (g.data.values == arr).all() assert g.attrs["attr1"] == 5 @@ -272,7 +294,8 @@ def test_get_coos(refs, selector, expected): mzz.first_pass() assert mzz.coos["time"].tolist() == expected mzz.store_coords() - g = zarr.open(mzz.out) + store = refs_as_store(mzz.out) + g = zarr.open(store, zarr_format=2) assert g["time"][:].tolist() == expected assert dict(g.attrs) @@ -593,19 +616,18 @@ def test_chunked(refs, inputs, chunks): ) out = mzz.translate() z = xr.open_dataset( - "reference://", + f"reference://{'group' if 'group' in inputs[0] else ''}", backend_kwargs={ "storage_options": {"fo": out, "remote_protocol": "memory"}, "consolidated": False, }, engine="zarr", chunks={}, - group="group" if "group" in inputs[0] else None, ) # TODO: make some assert_eq style function - assert z.time.values.tolist() == [1, 2, 3, 4, 5, 6, 7, 8] - assert z.data.shape == (8, 10, 10) - assert z.data.chunks == chunks + assert z["time"].values.tolist() == [1, 2, 3, 4, 5, 6, 7, 8] + assert z["data"].shape == (8, 10, 10) + assert z["data"].chunks == chunks for i in range(z.data.shape[0]): assert (z.data[i].values == arr).all() @@ -772,8 +794,8 @@ def test_no_inline(): """Ensure that inline_threshold=0 disables MultiZarrToZarr checking file size.""" ds = xr.Dataset(dict(x=[1, 2, 3])) ds["y"] = 3 + ds["x"] + ds.to_zarr("memory://zarr_store", mode="w", zarr_format=2, consolidated=False) store = fsspec.get_mapper("memory://zarr_store") - ds.to_zarr(store, mode="w", consolidated=False) ref = kerchunk.utils.consolidate(store) # This type of reference with no offset or total size is produced by # kerchunk.zarr.single_zarr or equivalently ZarrToZarr.translate. diff --git a/tests/test_combine_concat.py b/tests/test_combine_concat.py index 3f7ff823..8590c544 100644 --- a/tests/test_combine_concat.py +++ b/tests/test_combine_concat.py @@ -7,6 +7,7 @@ import kerchunk.combine import kerchunk.zarr import kerchunk.df +from kerchunk.utils import fs_as_store, refs_as_store @pytest.mark.parametrize( @@ -51,8 +52,9 @@ def test_success(tmpdir, arrays, chunks, axis, m): refs = [] for i, x in enumerate(arrays): fn = f"{tmpdir}/out{i}.zarr" - g = zarr.open(fn) - g.create_dataset("x", data=x, chunks=chunks) + g = zarr.open(fn, zarr_format=2) + arr = g.create_array("x", shape=x.shape, dtype=x.dtype, chunks=chunks) + arr[:] = x fns.append(fn) ref = kerchunk.zarr.single_zarr(fn, inline=0) refs.append(ref) @@ -61,33 +63,31 @@ def test_success(tmpdir, arrays, chunks, axis, m): refs, axis=axis, path="x", check_arrays=True ) - mapper = fsspec.get_mapper("reference://", fo=out) - g = zarr.open(mapper) - assert (g.x[:] == np.concatenate(arrays, axis=axis)).all() + store = refs_as_store(out) + g = zarr.open(store, zarr_format=2) + assert (g["x"][:] == np.concatenate(arrays, axis=axis)).all() try: import fastparquet except ImportError: return kerchunk.df.refs_to_dataframe(out, "memory://out.parq") - mapper = fsspec.get_mapper( - "reference://", + storage_options = dict( fo="memory://out.parq", remote_protocol="file", skip_instance_cache=True, ) - g = zarr.open(mapper) - assert (g.x[:] == np.concatenate(arrays, axis=axis)).all() + g = zarr.open("reference://", zarr_format=2, storage_options=storage_options) + assert (g["x"][:] == np.concatenate(arrays, axis=axis)).all() kerchunk.df.refs_to_dataframe(out, "memory://out.parq", record_size=1) - mapper = fsspec.get_mapper( - "reference://", + storage_options = dict( fo="memory://out.parq", remote_protocol="file", skip_instance_cache=True, ) - g = zarr.open(mapper) - assert (g.x[:] == np.concatenate(arrays, axis=axis)).all() + g = zarr.open("reference://", zarr_format=2, storage_options=storage_options) + assert (g["x"][:] == np.concatenate(arrays, axis=axis)).all() def test_fail_chunks(tmpdir): @@ -95,10 +95,12 @@ def test_fail_chunks(tmpdir): fn2 = f"{tmpdir}/out2.zarr" x1 = np.arange(10) x2 = np.arange(10, 20) - g = zarr.open(fn1) - g.create_dataset("x", data=x1, chunks=(2,)) - g = zarr.open(fn2) - g.create_dataset("x", data=x2, chunks=(3,)) + g = zarr.open(fn1, zarr_format=2) + arr = g.create_array("x", shape=x1.shape, dtype=x1.dtype, chunks=(2,)) + arr[:] = x1 + g = zarr.open(fn2, zarr_format=2) + arr = g.create_array("x", shape=x2.shape, dtype=x2.dtype, chunks=(3,)) + arr[:] = x2 ref1 = kerchunk.zarr.single_zarr(fn1, inline=0) ref2 = kerchunk.zarr.single_zarr(fn2, inline=0) @@ -112,10 +114,12 @@ def test_fail_shape(tmpdir): fn2 = f"{tmpdir}/out2.zarr" x1 = np.arange(12).reshape(6, 2) x2 = np.arange(12, 24) - g = zarr.open(fn1) - g.create_dataset("x", data=x1, chunks=(2,)) - g = zarr.open(fn2) - g.create_dataset("x", data=x2, chunks=(2,)) + g = zarr.open(fn1, zarr_format=2) + arr = g.create_array("x", shape=x1.shape, dtype=x1.dtype) + arr[:] = x1 + g = zarr.open(fn2, zarr_format=2) + arr = g.create_array("x", shape=x2.shape, dtype=x2.dtype) + arr[:] = x2 ref1 = kerchunk.zarr.single_zarr(fn1, inline=0) ref2 = kerchunk.zarr.single_zarr(fn2, inline=0) @@ -129,10 +133,12 @@ def test_fail_irregular_chunk_boundaries(tmpdir): fn2 = f"{tmpdir}/out2.zarr" x1 = np.arange(10) x2 = np.arange(10, 24) - g = zarr.open(fn1) - g.create_dataset("x", data=x1, chunks=(4,)) - g = zarr.open(fn2) - g.create_dataset("x", data=x2, chunks=(4,)) + g = zarr.open(fn1, zarr_format=2) + arr = g.create_array("x", shape=x1.shape, dtype=x1.dtype, chunks=(4,)) + arr[:] = x1 + g = zarr.open(fn2, zarr_format=2) + arr = g.create_array("x", shape=x2.shape, dtype=x2.dtype, chunks=(4,)) + arr[:] = x2 ref1 = kerchunk.zarr.single_zarr(fn1, inline=0) ref2 = kerchunk.zarr.single_zarr(fn2, inline=0) diff --git a/tests/test_combine_dask.py b/tests/test_combine_dask.py index f652abf2..b572d673 100644 --- a/tests/test_combine_dask.py +++ b/tests/test_combine_dask.py @@ -5,6 +5,7 @@ import xarray as xr from kerchunk.combine import auto_dask +from kerchunk.utils import refs_as_store from kerchunk.zarr import ZarrToZarr dask = pytest.importorskip("dask") @@ -33,9 +34,8 @@ def test_simplest(m, n_batches): "coo_dtypes": {"count": "i4"}, }, ) - fs = fsspec.filesystem("reference", fo=out) ds = xr.open_dataset( - fs.get_mapper(), engine="zarr", backend_kwargs={"consolidated": False} + refs_as_store(out), engine="zarr", backend_kwargs={"consolidated": False} ) assert ds["count"].values.tolist() == [0, 1, 2, 3] assert ds.data.shape == (4, 3) diff --git a/tests/test_df.py b/tests/test_df.py index 0d0fafb1..45bcb9bc 100644 --- a/tests/test_df.py +++ b/tests/test_df.py @@ -18,7 +18,7 @@ def test_1(m, url): "a/4": ["memory://url4.file"], "a/5": ["memory://url5.file"], "a/6": b"data", - "a/.zarray": b"""{"shape": [7], "chunks":[1], "filters": [], "compression": null}""", + "a/.zarray": b"""{"shape": [7], "chunks":[1], "filters": [], "compressor": null}""", ".zgroup": b'{"zarr_format": 2}', } u = "memory://myrefs.json" diff --git a/tests/test_fits.py b/tests/test_fits.py index 2ec19216..78f92491 100644 --- a/tests/test_fits.py +++ b/tests/test_fits.py @@ -2,6 +2,8 @@ import fsspec import pytest +from kerchunk.utils import refs_as_store + fits = pytest.importorskip("astropy.io.fits") import kerchunk.fits @@ -17,8 +19,18 @@ def test_image(): # this one directly hits a remote server - should cache? url = "https://fits.gsfc.nasa.gov/samples/WFPC2ASSNu5780205bx.fits" out = kerchunk.fits.process_file(url) - m = fsspec.get_mapper("reference://", fo=out, remote_protocol="https") - g = zarr.open(m) + g = zarr.open( + "reference://", + storage_options=dict( + fo=out, + remote_protocol="https", + asynchronous=True, + remote_options={"asynchronous": True}, + ), + ) + + # store = refs_as_store(out, remote_protocol="https") + # g = zarr.open(store) arr = g["PRIMARY"][:] with fsspec.open( "https://fits.gsfc.nasa.gov/samples/WFPC2ASSNu5780205bx.fits" @@ -31,8 +43,8 @@ def test_ascii_table(): # this one directly hits a remote server - should cache? url = "https://fits.gsfc.nasa.gov/samples/WFPC2u5780205r_c0fx.fits" out = kerchunk.fits.process_file(url, extension=1) - m = fsspec.get_mapper("reference://", fo=out, remote_protocol="https") - g = zarr.open(m) + store = refs_as_store(out, remote_protocol="https") + g = zarr.open(store, zarr_format=2) arr = g["u5780205r_cvt.c0h.tab"][:] with fsspec.open( "https://fits.gsfc.nasa.gov/samples/WFPC2u5780205r_c0fx.fits" @@ -44,25 +56,25 @@ def test_ascii_table(): def test_binary_table(): out = kerchunk.fits.process_file(btable, extension=1) - m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m) + store = refs_as_store(out) + z = zarr.open(store, zarr_format=2) arr = z["1"] with open(btable, "rb") as f: hdul = fits.open(f) attr2 = dict(arr.attrs) assert attr2.pop("_ARRAY_DIMENSIONS") == ["x"] assert attr2 == dict(hdul[1].header) - assert (arr["order"] == hdul[1].data["order"]).all() - assert (arr["mag"] == hdul[1].data["mag"]).all() + assert (arr[:]["order"] == hdul[1].data["order"]).all() + assert (arr[:]["mag"] == hdul[1].data["mag"]).all() assert ( - arr["name"].astype("U") == hdul[1].data["name"] + arr[:]["name"].astype("U") == hdul[1].data["name"] ).all() # string come out as bytes def test_cube(): out = kerchunk.fits.process_file(range_im) - m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m) + store = refs_as_store(out) + z = zarr.open(store, zarr_format=2) arr = z["PRIMARY"] with open(range_im, "rb") as f: hdul = fits.open(f) @@ -74,8 +86,8 @@ def test_with_class(): ftz = kerchunk.fits.FitsToZarr(range_im) out = ftz.translate() assert "fits" in repr(ftz) - m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m) + store = refs_as_store(out) + z = zarr.open(store, zarr_format=2) arr = z["PRIMARY"] with open(range_im, "rb") as f: hdul = fits.open(f) @@ -89,10 +101,10 @@ def test_var(): ftz = kerchunk.fits.FitsToZarr(var) out = ftz.translate() - m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m) + store = refs_as_store(out) + z = zarr.open(store, zarr_format=2) arr = z["1"] - vars = [_.tolist() for _ in arr["var"]] + vars = [_.tolist() for _ in arr[:]["var"]] assert vars == expected - assert (z["1"]["xyz"] == data["xyz"]).all() + assert (z["1"][:]["xyz"] == data["xyz"]).all() diff --git a/tests/test_grib.py b/tests/test_grib.py index ac297b01..2c5387fd 100644 --- a/tests/test_grib.py +++ b/tests/test_grib.py @@ -8,6 +8,7 @@ import xarray as xr import zarr import ujson +from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper from kerchunk.grib2 import ( scan_grib, _split_file, @@ -20,6 +21,7 @@ extract_dataset_chunk_index, extract_datatree_chunk_index, ) +from kerchunk.utils import fs_as_store, refs_as_store eccodes_ver = tuple(int(i) for i in eccodes.__version__.split(".")) cfgrib = pytest.importorskip("cfgrib") @@ -30,11 +32,10 @@ def test_one(): # from https://dd.weather.gc.ca/model_gem_regional/10km/grib2/00/000 fn = os.path.join(here, "CMC_reg_DEPR_ISBL_10_ps10km_2022072000_P000.grib2") out = scan_grib(fn) - ds = xr.open_dataset( - "reference://", - engine="zarr", - backend_kwargs={"consolidated": False, "storage_options": {"fo": out[0]}}, - ) + + fs = AsyncFileSystemWrapper(fsspec.filesystem("file")) + store = refs_as_store(out[0], fs=fs) + ds = xr.open_zarr(store, zarr_format=2, consolidated=False) assert ds.attrs["GRIB_centre"] == "cwao" ds2 = xr.open_dataset(fn, engine="cfgrib", backend_kwargs={"indexpath": ""}) @@ -67,18 +68,10 @@ def _fetch_first(url): def test_archives(tmpdir, url): grib = GribToZarr(url, storage_options={"anon": True}, skip=1) out = grib.translate()[0] - ours = xr.open_dataset( - "reference://", - engine="zarr", - backend_kwargs={ - "consolidated": False, - "storage_options": { - "fo": out, - "remote_protocol": "s3", - "remote_options": {"anon": True}, - }, - }, - ) + + store = refs_as_store(out, remote_options={"anon": True}) + + ours = xr.open_zarr(store, zarr_format=2, consolidated=False) data = _fetch_first(url) fn = os.path.join(tmpdir, "grib.grib2") @@ -118,7 +111,8 @@ def test_grib_tree(): corrected_msg_groups = [correct_hrrr_subhf_step(msg) for msg in scanned_msg_groups] result = grib_tree(corrected_msg_groups) fs = fsspec.filesystem("reference", fo=result) - zg = zarr.open_group(fs.get_mapper("")) + store = fs_as_store(fs) + zg = zarr.open_group(store, mode="r", zarr_format=2) assert isinstance(zg["refc/instant/atmosphere/refc"], zarr.Array) assert isinstance(zg["vbdsf/avg/surface/vbdsf"], zarr.Array) assert set(zg["vbdsf/avg/surface"].attrs["coordinates"].split()) == set( @@ -128,7 +122,7 @@ def test_grib_tree(): "atmosphere latitude longitude step time valid_time".split() ) # Assert that the fill value is set correctly - assert zg.refc.instant.atmosphere.step.fill_value is np.nan + assert np.isnan(zg["refc/instant/atmosphere/step"].fill_value) # The following two tests use json fixture data generated from calling scan grib @@ -146,14 +140,18 @@ def test_correct_hrrr_subhf_group_step(): scanned_msgs = ujson.load(fobj) original_zg = [ - zarr.open_group(fsspec.filesystem("reference", fo=val).get_mapper("")) + zarr.open_group( + fs_as_store(fsspec.filesystem("reference", fo=val)), mode="r", zarr_format=2 + ) for val in scanned_msgs ] corrected_msgs = [correct_hrrr_subhf_step(msg) for msg in scanned_msgs] corrected_zg = [ - zarr.open_group(fsspec.filesystem("reference", fo=val).get_mapper("")) + zarr.open_group( + fs_as_store(fsspec.filesystem("reference", fo=val)), mode="r", zarr_format=2 + ) for val in corrected_msgs ] @@ -162,10 +160,10 @@ def test_correct_hrrr_subhf_group_step(): assert not all(["step" in zg.array_keys() for zg in original_zg]) # The step values are corrected to floating point hour - assert all([zg.step[()] <= 1.0 for zg in corrected_zg]) + assert all([zg["step"][()] <= 1.0 for zg in corrected_zg]) # The original seems to have values in minutes for some step variables! assert not all( - [zg.step[()] <= 1.0 for zg in original_zg if "step" in zg.array_keys()] + [zg["step"][()] <= 1.0 for zg in original_zg if "step" in zg.array_keys()] ) @@ -176,35 +174,32 @@ def test_hrrr_subhf_corrected_grib_tree(): corrected_msgs = [correct_hrrr_subhf_step(msg) for msg in scanned_msgs] merged = grib_tree(corrected_msgs) - zg = zarr.open_group(fsspec.filesystem("reference", fo=merged).get_mapper("")) + z_fs = fsspec.filesystem("reference", fo=merged, asynchronous=True) + zstore = fs_as_store(z_fs) + zg = zarr.open_group(zstore, mode="r", zarr_format=2) # Check the values and shape of the time coordinates - assert zg.u.instant.heightAboveGround.step[:].tolist() == [ + assert zg["u/instant/heightAboveGround/step"][:].tolist() == [ 0.0, 0.25, 0.5, 0.75, 1.0, ] - assert zg.u.instant.heightAboveGround.step.shape == (5,) - - assert zg.u.instant.heightAboveGround.valid_time[:].tolist() == [ + assert zg["u/instant/heightAboveGround/step"].shape == (5,) + assert zg["u/instant/heightAboveGround/valid_time"][:].tolist() == [ [1695862800, 1695863700, 1695864600, 1695865500, 1695866400] ] - assert zg.u.instant.heightAboveGround.valid_time.shape == (1, 5) - - assert zg.u.instant.heightAboveGround.time[:].tolist() == [1695862800] - assert zg.u.instant.heightAboveGround.time.shape == (1,) - - assert zg.dswrf.avg.surface.step[:].tolist() == [0.0, 0.25, 0.5, 0.75, 1.0] - assert zg.dswrf.avg.surface.step.shape == (5,) - - assert zg.dswrf.avg.surface.valid_time[:].tolist() == [ + assert zg["u/instant/heightAboveGround/valid_time"].shape == (1, 5) + assert zg["u/instant/heightAboveGround/time"][:].tolist() == [1695862800] + assert zg["u/instant/heightAboveGround/time"].shape == (1,) + assert zg["dswrf/avg/surface/step"][:].tolist() == [0.0, 0.25, 0.5, 0.75, 1.0] + assert zg["dswrf/avg/surface/step"].shape == (5,) + assert zg["dswrf/avg/surface/valid_time"][:].tolist() == [ [1695862800, 1695863700, 1695864600, 1695865500, 1695866400] ] - assert zg.dswrf.avg.surface.valid_time.shape == (1, 5) - - assert zg.dswrf.avg.surface.time[:].tolist() == [1695862800] - assert zg.dswrf.avg.surface.time.shape == (1,) + assert zg["dswrf/avg/surface/valid_time"].shape == (1, 5) + assert zg["dswrf/avg/surface/time"][:].tolist() == [1695862800] + assert zg["dswrf/avg/surface/time"].shape == (1,) # The following two test use json fixture data generated from calling scan grib @@ -219,24 +214,22 @@ def test_hrrr_sfcf_grib_tree(): with open(fpath, "rb") as fobj: scanned_msgs = ujson.load(fobj) merged = grib_tree(scanned_msgs) - zg = zarr.open_group(fsspec.filesystem("reference", fo=merged).get_mapper("")) + store = fs_as_store(fsspec.filesystem("reference", fo=merged)) + zg = zarr.open_group(store, mode="r", zarr_format=2) # Check the heightAboveGround level shape of the time coordinates - assert zg.u.instant.heightAboveGround.heightAboveGround[()] == 80.0 - assert zg.u.instant.heightAboveGround.heightAboveGround.shape == () - - assert zg.u.instant.heightAboveGround.step[:].tolist() == [0.0, 1.0] - assert zg.u.instant.heightAboveGround.step.shape == (2,) - - assert zg.u.instant.heightAboveGround.valid_time[:].tolist() == [ + assert zg["u/instant/heightAboveGround/heightAboveGround"][()] == 80.0 + assert zg["u/instant/heightAboveGround/heightAboveGround"].shape == () + assert zg["u/instant/heightAboveGround/step"][:].tolist() == [0.0, 1.0] + assert zg["u/instant/heightAboveGround/step"].shape == (2,) + assert zg["u/instant/heightAboveGround/valid_time"][:].tolist() == [ [1695862800, 1695866400] ] - assert zg.u.instant.heightAboveGround.valid_time.shape == (1, 2) - - assert zg.u.instant.heightAboveGround.time[:].tolist() == [1695862800] - assert zg.u.instant.heightAboveGround.time.shape == (1,) + assert zg["u/instant/heightAboveGround/valid_time"].shape == (1, 2) + assert zg["u/instant/heightAboveGround/time"][:].tolist() == [1695862800] + assert zg["u/instant/heightAboveGround/time"].shape == (1,) # Check the isobaricInhPa level shape and time coordinates - assert zg.u.instant.isobaricInhPa.isobaricInhPa[:].tolist() == [ + assert zg["u/instant/isobaricInhPa/isobaricInhPa"][:].tolist() == [ 250.0, 300.0, 500.0, @@ -245,10 +238,9 @@ def test_hrrr_sfcf_grib_tree(): 925.0, 1000.0, ] - assert zg.u.instant.isobaricInhPa.isobaricInhPa.shape == (7,) - - assert zg.u.instant.isobaricInhPa.step[:].tolist() == [0.0, 1.0] - assert zg.u.instant.isobaricInhPa.step.shape == (2,) + assert zg["u/instant/isobaricInhPa/isobaricInhPa"].shape == (7,) + assert zg["u/instant/isobaricInhPa/step"][:].tolist() == [0.0, 1.0] + assert zg["u/instant/isobaricInhPa/step"].shape == (2,) # Valid time values get exploded by isobaricInhPa aggregation # Is this a feature or a bug? @@ -258,29 +250,29 @@ def test_hrrr_sfcf_grib_tree(): [1695866400 for _ in range(7)], ] ] - assert zg.u.instant.isobaricInhPa.valid_time[:].tolist() == expected_valid_times - assert zg.u.instant.isobaricInhPa.valid_time.shape == (1, 2, 7) - - assert zg.u.instant.isobaricInhPa.time[:].tolist() == [1695862800] - assert zg.u.instant.isobaricInhPa.time.shape == (1,) - - -def test_hrrr_sfcf_grib_datatree(): - fpath = os.path.join(here, "hrrr.wrfsfcf.subset.json") - with open(fpath, "rb") as fobj: - scanned_msgs = ujson.load(fobj) - merged = grib_tree(scanned_msgs) - dt = xr.open_datatree( - fsspec.filesystem("reference", fo=merged).get_mapper(""), - engine="zarr", - consolidated=False, - ) - # Assert a few things... but if it loads we are mostly done. - np.testing.assert_array_equal( - dt.u.instant.heightAboveGround.step.values[:], - np.array([0, 3600 * 10**9], dtype="timedelta64[ns]"), - ) - assert dt.u.attrs == dict(name="U component of wind") + assert zg["u/instant/isobaricInhPa/valid_time"][:].tolist() == expected_valid_times + assert zg["u/instant/isobaricInhPa/valid_time"].shape == (1, 2, 7) + + assert zg["u/instant/isobaricInhPa/time"][:].tolist() == [1695862800] + assert zg["u/instant/isobaricInhPa/time"].shape == (1,) + + +# def test_hrrr_sfcf_grib_datatree(): +# fpath = os.path.join(here, "hrrr.wrfsfcf.subset.json") +# with open(fpath, "rb") as fobj: +# scanned_msgs = ujson.load(fobj) +# merged = grib_tree(scanned_msgs) +# dt = datatree.open_datatree( +# fsspec.filesystem("reference", fo=merged).get_mapper(""), +# engine="zarr", +# consolidated=False, +# ) +# # Assert a few things... but if it loads we are mostly done. +# np.testing.assert_array_equal( +# dt.u.instant.heightAboveGround.step.values[:], +# np.array([0, 3600 * 10**9], dtype="timedelta64[ns]"), +# ) +# assert dt.u.attrs == dict(name="U component of wind") def test_parse_grib_idx_invalid_url(): @@ -292,11 +284,11 @@ def test_parse_grib_idx_invalid_url(): def test_parse_grib_idx_no_file(): - with pytest.raises((FileNotFoundError, PermissionError)): + with pytest.raises(PermissionError): # the url is spelled wrong parse_grib_idx( "s3://noaahrrr-bdp-pds/hrrr.20220804/conus/hrrr.t01z.wrfsfcf01.grib2", - storage_options=dict(anon=True), + storage_options={"anon": True}, ) @@ -344,19 +336,20 @@ def test_parse_grib_idx_content(idx_url, storage_options): assert idx_df.iloc[message_no]["length"] == output[message_no]["refs"][variable][2] -@pytest.fixture -def zarr_tree_and_datatree_instance(): - fn = os.path.join(here, "gfs.t00z.pgrb2.0p25.f006.test-limit-100") - tree_store = tree_store = grib_tree(scan_grib(fn)) - dt_instance = xr.open_datatree( - fsspec.filesystem("reference", fo=tree_store).get_mapper(""), - engine="zarr", - consolidated=False, - ) +# @pytest.fixture +# def zarr_tree_and_datatree_instance(): +# fn = os.path.join(here, "gfs.t00z.pgrb2.0p25.f006.test-limit-100") +# tree_store = tree_store = grib_tree(scan_grib(fn)) +# dt_instance = datatree.open_datatree( +# fsspec.filesystem("reference", fo=tree_store).get_mapper(""), +# engine="zarr", +# consolidated=False, +# ) - return tree_store, dt_instance, fn +# return tree_store, dt_instance, fn +@pytest.mark.skip(reason="datatree support should be updated to use xarray.Datatree") def test_extract_dataset_chunk_index(zarr_tree_and_datatree_instance): tree_store, dt_instance, fn = zarr_tree_and_datatree_instance @@ -387,6 +380,7 @@ def test_extract_dataset_chunk_index(zarr_tree_and_datatree_instance): ) +@pytest.mark.skip(reason="datatree support should be updated to use xarray.Datatree") def test_extract_datatree_chunk_index(zarr_tree_and_datatree_instance): tree_store, dt_instance, fn = zarr_tree_and_datatree_instance @@ -440,6 +434,7 @@ def test_extract_datatree_chunk_index(zarr_tree_and_datatree_instance): ).all() +@pytest.mark.skip(reason="datatree support should be updated to use xarray.Datatree") def test_extract_methods_grib_parameter(zarr_tree_and_datatree_instance): tree_store, dt_instance, _ = zarr_tree_and_datatree_instance diff --git a/tests/test_hdf.py b/tests/test_hdf.py index 6d374e8a..b40cdc07 100644 --- a/tests/test_hdf.py +++ b/tests/test_hdf.py @@ -1,15 +1,22 @@ +import asyncio import fsspec +import json import os.path as osp +import zarr.core +import zarr.core.buffer +import zarr.core.group + import kerchunk.hdf import numpy as np import pytest import xarray as xr import zarr -import h5py +from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper from kerchunk.hdf import SingleHdf5ToZarr, has_visititems_links from kerchunk.combine import MultiZarrToZarr, drop +from kerchunk.utils import fs_as_store, refs_as_fs, refs_as_store here = osp.dirname(__file__) @@ -18,18 +25,19 @@ def test_single(): """Test creating references for a single HDF file""" url = "s3://noaa-nwm-retro-v2.0-pds/full_physics/2017/201704010000.CHRTOUT_DOMAIN1.comp" so = dict(anon=True, default_fill_cache=False, default_cache_type="none") + with fsspec.open(url, **so) as f: - h5chunks = SingleHdf5ToZarr(f, url, storage_options=so) + h5chunks = SingleHdf5ToZarr(f, url, storage_options=so, inline_threshold=1) test_dict = h5chunks.translate() - m = fsspec.get_mapper( - "reference://", fo=test_dict, remote_protocol="s3", remote_options=so - ) - ds = xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False)) + with open("test_dict.json", "w") as f: + json.dump(test_dict, f) + + store = refs_as_store(test_dict, remote_options=dict(asynchronous=True, anon=True)) + ds = xr.open_zarr(store, zarr_format=2, consolidated=False) with fsspec.open(url, **so) as f: expected = xr.open_dataset(f, engine="h5netcdf") - xr.testing.assert_equal(ds.drop_vars("crs"), expected.drop_vars("crs")) @@ -42,22 +50,20 @@ def test_single_direct_open(): h5f=url, inline_threshold=300, storage_options=so ).translate() - m = fsspec.get_mapper( - "reference://", fo=test_dict, remote_protocol="s3", remote_options=so - ) + store = refs_as_store(test_dict, remote_options=dict(asynchronous=True, anon=True)) + ds_direct = xr.open_dataset( - m, engine="zarr", backend_kwargs=dict(consolidated=False) + store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False) ) with fsspec.open(url, **so) as f: h5chunks = SingleHdf5ToZarr(f, url, storage_options=so) test_dict = h5chunks.translate() - m = fsspec.get_mapper( - "reference://", fo=test_dict, remote_protocol="s3", remote_options=so - ) + store = refs_as_store(test_dict, remote_options=dict(asynchronous=True, anon=True)) + ds_from_file_opener = xr.open_dataset( - m, engine="zarr", backend_kwargs=dict(consolidated=False) + store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False) ) xr.testing.assert_equal( @@ -81,10 +87,10 @@ def test_multizarr(generate_mzz): mzz = generate_mzz test_dict = mzz.translate() - m = fsspec.get_mapper( - "reference://", fo=test_dict, remote_protocol="s3", remote_options=so + store = refs_as_store(test_dict, remote_options=dict(asynchronous=True, anon=True)) + ds = xr.open_dataset( + store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False) ) - ds = xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False)) with fsspec.open_files(urls, **so) as fs: expts = [xr.open_dataset(f, engine="h5netcdf") for f in fs] @@ -158,11 +164,11 @@ def test_times(times_data): h5chunks = SingleHdf5ToZarr(f, url) test_dict = h5chunks.translate() - m = fsspec.get_mapper( - "reference://", - fo=test_dict, + localfs = AsyncFileSystemWrapper(fsspec.filesystem("file")) + store = refs_as_store(test_dict, fs=localfs) + result = xr.open_dataset( + store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False) ) - result = xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False)) expected = x1.to_dataset() xr.testing.assert_equal(result, expected) @@ -174,11 +180,11 @@ def test_times_str(times_data): h5chunks = SingleHdf5ToZarr(url) test_dict = h5chunks.translate() - m = fsspec.get_mapper( - "reference://", - fo=test_dict, + localfs = AsyncFileSystemWrapper(fsspec.filesystem("file")) + store = refs_as_store(test_dict, fs=localfs) + result = xr.open_dataset( + store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False) ) - result = xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False)) expected = x1.to_dataset() xr.testing.assert_equal(result, expected) @@ -189,14 +195,17 @@ def test_times_str(times_data): def test_string_embed(): fn = osp.join(here, "vlen.h5") - h = kerchunk.hdf.SingleHdf5ToZarr(fn, fn, vlen_encode="embed") + h = kerchunk.hdf.SingleHdf5ToZarr(fn, fn, vlen_encode="embed", error="pdb") out = h.translate() - fs = fsspec.filesystem("reference", fo=out) - assert txt in fs.references["vlen_str/0"] - z = zarr.open(fs.get_mapper()) - assert z.vlen_str.dtype == "O" - assert z.vlen_str[0] == txt - assert (z.vlen_str[1:] == "").all() + + localfs = AsyncFileSystemWrapper(fsspec.filesystem("file")) + fs = refs_as_fs(out, fs=localfs) + # assert txt in fs.references["vlen_str/0"] + store = fs_as_store(fs) + z = zarr.open(store, zarr_format=2) + assert z["vlen_str"].dtype == "O" + assert z["vlen_str"][0] == txt + assert (z["vlen_str"][1:] == "").all() def test_string_pathlib(): @@ -208,20 +217,21 @@ def test_string_pathlib(): out = h.translate() fs = fsspec.filesystem("reference", fo=out) assert txt in fs.references["vlen_str/0"] - z = zarr.open(fs.get_mapper()) - assert z.vlen_str.dtype == "O" - assert z.vlen_str[0] == txt - assert (z.vlen_str[1:] == "").all() + z = zarr.open(fs_as_store(fs)) + assert z["vlen_str"].dtype == "O" + assert z["vlen_str"][0] == txt + assert (z["vlen_str"][1:] == "").all() def test_string_null(): fn = osp.join(here, "vlen.h5") h = kerchunk.hdf.SingleHdf5ToZarr(fn, fn, vlen_encode="null", inline_threshold=0) out = h.translate() - fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper()) - assert z.vlen_str.dtype == "O" - assert (z.vlen_str[:] == None).all() + localfs = AsyncFileSystemWrapper(fsspec.filesystem("file")) + store = refs_as_store(out, fs=localfs) + z = zarr.open(store, zarr_format=2) + assert z["vlen_str"].dtype == "O" + assert (z["vlen_str"][:] == None).all() def test_string_leave(): @@ -231,11 +241,13 @@ def test_string_leave(): f, fn, vlen_encode="leave", inline_threshold=0 ) out = h.translate() - fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper()) - assert z.vlen_str.dtype == "S16" - assert z.vlen_str[0] # some obscured ID - assert (z.vlen_str[1:] == b"").all() + + localfs = AsyncFileSystemWrapper(fsspec.filesystem("file")) + store = refs_as_store(out, fs=localfs) + z = zarr.open(store, zarr_format=2) + assert z["vlen_str"].dtype == "S16" + assert z["vlen_str"][0] # some obscured ID + assert (z["vlen_str"][1:] == b"").all() def test_string_decode(): @@ -245,11 +257,13 @@ def test_string_decode(): f, fn, vlen_encode="encode", inline_threshold=0 ) out = h.translate() - fs = fsspec.filesystem("reference", fo=out) + localfs = AsyncFileSystemWrapper(fsspec.filesystem("file")) + fs = refs_as_fs(out, fs=localfs) assert txt in fs.cat("vlen_str/.zarray").decode() # stored in filter def - z = zarr.open(fs.get_mapper()) - assert z.vlen_str[0] == txt - assert (z.vlen_str[1:] == "").all() + store = fs_as_store(fs) + z = zarr.open(store, zarr_format=2) + assert z["vlen_str"][0] == txt + assert (z["vlen_str"][1:] == "").all() def test_compound_string_null(): @@ -257,11 +271,12 @@ def test_compound_string_null(): with open(fn, "rb") as f: h = kerchunk.hdf.SingleHdf5ToZarr(f, fn, vlen_encode="null", inline_threshold=0) out = h.translate() - fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper()) - assert z.vlen_str[0].tolist() == (10, None) - assert (z.vlen_str["ints"][1:] == 0).all() - assert (z.vlen_str["strs"][1:] == None).all() + localfs = AsyncFileSystemWrapper(fsspec.filesystem("file")) + store = refs_as_store(out, fs=localfs) + z = zarr.open(store, zarr_format=2) + assert z["vlen_str"][0].tolist() == (10, None) + assert (z["vlen_str"][1:]["ints"] == 0).all() + assert (z["vlen_str"][1:]["strs"] == None).all() def test_compound_string_leave(): @@ -271,12 +286,13 @@ def test_compound_string_leave(): f, fn, vlen_encode="leave", inline_threshold=0 ) out = h.translate() - fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper()) - assert z.vlen_str["ints"][0] == 10 - assert z.vlen_str["strs"][0] # random ID - assert (z.vlen_str["ints"][1:] == 0).all() - assert (z.vlen_str["strs"][1:] == b"").all() + localfs = AsyncFileSystemWrapper(fsspec.filesystem("file")) + store = refs_as_store(out, fs=localfs) + z = zarr.open(store, zarr_format=2) + assert z["vlen_str"][0]["ints"] == 10 + assert z["vlen_str"][0]["strs"] # random ID + assert (z["vlen_str"][1:]["ints"] == 0).all() + assert (z["vlen_str"][1:]["strs"] == b"").all() def test_compound_string_encode(): @@ -286,12 +302,13 @@ def test_compound_string_encode(): f, fn, vlen_encode="encode", inline_threshold=0 ) out = h.translate() - fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper()) - assert z.vlen_str["ints"][0] == 10 - assert z.vlen_str["strs"][0] == "water" - assert (z.vlen_str["ints"][1:] == 0).all() - assert (z.vlen_str["strs"][1:] == "").all() + localfs = AsyncFileSystemWrapper(fsspec.filesystem("file")) + store = refs_as_store(out, fs=localfs) + z = zarr.open(store, zarr_format=2) + assert z["vlen_str"][0]["ints"] == 10 + assert z["vlen_str"][0]["strs"] == "water" + assert (z["vlen_str"][1:]["ints"] == 0).all() + assert (z["vlen_str"][1:]["strs"] == "").all() # def test_compact(): @@ -317,29 +334,31 @@ def test_compress(): h.translate() continue out = h.translate() - m = fsspec.get_mapper("reference://", fo=out) - g = zarr.open(m) - assert np.mean(g.data) == 49.5 - + localfs = AsyncFileSystemWrapper(fsspec.filesystem("file")) + store = refs_as_store(out, fs=localfs) + g = zarr.open(store, zarr_format=2) + assert np.mean(g["data"]) == 49.5 -def test_embed(): - fn = osp.join(here, "NEONDSTowerTemperatureData.hdf5") - h = kerchunk.hdf.SingleHdf5ToZarr(fn, vlen_encode="embed") - out = h.translate() - fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper()) - data = z["Domain_10"]["STER"]["min_1"]["boom_1"]["temperature"][:] - assert data[0].tolist() == [ - "2014-04-01 00:00:00.0", - "60", - "6.72064364129017", - "6.667845743708792", - "6.774491093631761", - "0.0012746926446369846", - "0.004609216572327277", - "0.01298182345556785", - ] +# def test_embed(): +# fn = osp.join(here, "NEONDSTowerTemperatureData.hdf5") +# h = kerchunk.hdf.SingleHdf5ToZarr(fn, vlen_encode="embed", error="pdb") +# out = h.translate() +# +# store = refs_as_store(out) +# z = zarr.open(store, zarr_format=2) +# data = z["Domain_10"]["STER"]["min_1"]["boom_1"]["temperature"][:] +# assert data[0].tolist() == [ +# "2014-04-01 00:00:00.0", +# "60", +# "6.72064364129017", +# "6.667845743708792", +# "6.774491093631761", +# "0.0012746926446369846", +# "0.004609216572327277", +# "0.01298182345556785", +# ] +# def test_inline_threshold(): @@ -362,8 +381,9 @@ def test_translate_links(): out = kerchunk.hdf.SingleHdf5ToZarr(fn, inline_threshold=50).translate( preserve_linked_dsets=True ) - fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper()) + localfs = AsyncFileSystemWrapper(fsspec.filesystem("file")) + store = refs_as_store(out, fs=localfs) + z = zarr.open(store, zarr_format=2) # 1. Test the hard linked datasets were translated correctly # 2. Test the soft linked datasets were translated correctly diff --git a/tests/test_hdf4.py b/tests/test_hdf4.py new file mode 100644 index 00000000..42a059d9 --- /dev/null +++ b/tests/test_hdf4.py @@ -0,0 +1,16 @@ +import os.path + +import zarr + +import kerchunk.hdf4 +from kerchunk.utils import refs_as_store + + +def test1(): + here = os.path.dirname(__file__) + fn = os.path.join(here, "MOD14.hdf4") + + out = kerchunk.hdf4.HDF4ToZarr(fn).translate() + store = refs_as_store(out) + g = zarr.open(store, zarr_format=2) + assert g["fire mask"][:].max() == 5 diff --git a/tests/test_netcdf.py b/tests/test_netcdf.py index 43b6021b..b7143398 100644 --- a/tests/test_netcdf.py +++ b/tests/test_netcdf.py @@ -1,12 +1,14 @@ import os - import fsspec import numpy as np from packaging.version import Version import pytest from kerchunk import netCDF3 +from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper +from kerchunk.utils import refs_as_store + xr = pytest.importorskip("xarray") @@ -28,12 +30,17 @@ def test_one(m): m.pipe("data.nc3", bdata) h = netCDF3.netcdf_recording_file("memory://data.nc3") out = h.translate() + + print(out) + + store = refs_as_store(out) + ds = xr.open_dataset( - "reference://", + store, engine="zarr", backend_kwargs={ "consolidated": False, - "storage_options": {"fo": out, "remote_protocol": "memory"}, + "zarr_format": 2, }, ) assert (ds.data == data).all() @@ -81,13 +88,14 @@ def test_unlimited(unlimited_dataset): expected = xr.open_dataset(fn, engine="scipy") h = netCDF3.NetCDF3ToZarr(fn) out = h.translate() - ds = xr.open_dataset( - "reference://", - engine="zarr", - backend_kwargs={ - "consolidated": False, - "storage_options": {"fo": out}, - }, + + fs = AsyncFileSystemWrapper(fsspec.filesystem("file")) + store = refs_as_store(out, fs) + + ds = xr.open_zarr( + store, + zarr_format=2, + consolidated=False, ) assert ds.attrs["title"] == "testing" assert ds.temp.attrs["units"] == "K" diff --git a/tests/test_tiff.py b/tests/test_tiff.py index 3cc52471..3e4ea1c7 100644 --- a/tests/test_tiff.py +++ b/tests/test_tiff.py @@ -5,6 +5,8 @@ import pytest import xarray as xr +from kerchunk.utils import refs_as_store + pytest.importorskip("tifffile") pytest.importorskip("rioxarray") import kerchunk.tiff @@ -15,8 +17,8 @@ def test_one(): fn = files[0] out = kerchunk.tiff.tiff_to_zarr(fn) - m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m) + store = refs_as_store(out) + z = zarr.open(store, zarr_format=2) assert list(z) == ["0", "1", "2"] assert z.attrs["multiscales"] == [ { @@ -33,9 +35,9 @@ def test_one(): def test_coord(): fn = files[0] out = kerchunk.tiff.tiff_to_zarr(fn) - m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m) # highest res is the one xarray picks - out = kerchunk.tiff.generate_coords(z.attrs, z[0].shape) + store = refs_as_store(out) + z = zarr.open(store, zarr_format=2) # highest res is the one xarray picks + out = kerchunk.tiff.generate_coords(z.attrs, z["0"].shape) ds = xr.open_dataset(fn) assert (ds.x == out["x"]).all() diff --git a/tests/test_utils.py b/tests/test_utils.py index a1bb094d..f6c7e5ef 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -8,6 +8,8 @@ import pytest import zarr +from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper + def test_rename(): old = {"version": 1, "refs": {"v0": ["oldpath", 0, 0], "bin": "data"}} @@ -72,21 +74,20 @@ def test_inline_array(): "data/1": b"\x02\x00\x00\x00", "data/.zattrs": '{"foo": "bar"}', } - fs = fsspec.filesystem("reference", fo=refs) out1 = kerchunk.utils.inline_array(refs, threshold=1) # does nothing assert out1 == refs out2 = kerchunk.utils.inline_array(refs, threshold=1, names=["data"]) # explicit - assert "data/1" not in out2 assert json.loads(out2["data/.zattrs"]) == json.loads(refs["data/.zattrs"]) - fs = fsspec.filesystem("reference", fo=out2) - g = zarr.open(fs.get_mapper()) - assert g.data[:].tolist() == [1, 2] + + localfs = fsspec.filesystem("file") + store = kerchunk.utils.refs_as_store(out2, fs=localfs) + g = zarr.open(store, mode="r", zarr_format=2) + assert g["data"][:].tolist() == [1, 2] # What is g.data??? out3 = kerchunk.utils.inline_array(refs, threshold=1000) # inlines because of size - assert "data/1" not in out3 - fs = fsspec.filesystem("reference", fo=out3) - g = zarr.open(fs.get_mapper()) - assert g.data[:].tolist() == [1, 2] + store = kerchunk.utils.refs_as_store(out3, localfs) + g = zarr.open(store, mode="r", zarr_format=2) + assert g["data"][:].tolist() == [1, 2] # What is g.data??? def test_json(): @@ -98,25 +99,30 @@ def test_json(): @pytest.mark.parametrize("chunks", [[10, 10], [5, 10]]) def test_subchunk_exact(m, chunks): - store = m.get_mapper("test.zarr") - g = zarr.open_group(store, mode="w") + g = zarr.open_group("memory://test.zarr", mode="w", zarr_format=2) data = np.arange(100).reshape(10, 10) - arr = g.create_dataset("data", data=data, chunks=chunks, compression=None) + arr = g.create_array( + "data", dtype=data.dtype, shape=data.shape, chunks=chunks, compressor=None + ) + arr[:] = data ref = kerchunk.zarr.single_zarr("memory://test.zarr")["refs"] extra = [] if chunks[0] == 10 else ["data/1.0"] - assert list(ref) == [".zgroup", "data/.zarray", "data/0.0"] + extra + ref2 = list(_ for _ in ref if not _.endswith("zattrs")) # ignore empty attrs + assert ref2 == [".zgroup", "data/.zarray", "data/0.0"] + extra out = kerchunk.utils.subchunk(ref, "data", 5) nchunk = 10 // chunks[0] * 5 - assert list(out) == [".zgroup", "data/.zarray"] + [ - f"data/{_}.0" for _ in range(nchunk) - ] + out2 = list(_ for _ in out if not _.endswith("zattrs")) + assert out2 == [".zgroup", "data/.zarray"] + [f"data/{_}.0" for _ in range(nchunk)] - g2 = zarr.open_group( - "reference://", storage_options={"fo": out, "remote_protocol": "memory"} - ) - assert (g2.data[:] == data).all() + store = kerchunk.utils.refs_as_store(out, remote_protocol="memory") + g2 = zarr.open_group(store, mode="r", zarr_format=2) + + # g2 = zarr.open_group( + # "reference://", storage_options={"fo": out, "remote_protocol": "memory"}, zarr_format=2 + # ) + assert (g2["data"][:] == data).all() @pytest.mark.parametrize("archive", ["zip", "tar"]) diff --git a/tests/test_zarr.py b/tests/test_zarr.py index 94af8939..4852ab28 100644 --- a/tests/test_zarr.py +++ b/tests/test_zarr.py @@ -46,7 +46,7 @@ def _zip(file): return filename fn = f"{tmpdir}/test.zarr" - ds.to_zarr(fn, mode="w") + ds.to_zarr(fn, mode="w", zarr_format=2) return _zip(fn) @@ -54,6 +54,7 @@ def test_zarr_in_zip(zarr_in_zip, ds): out = kerchunk.zarr.ZarrToZarr( url="zip://", storage_options={"fo": zarr_in_zip} ).translate() + ds2 = xr.open_dataset( out, engine="kerchunk", @@ -75,7 +76,7 @@ def test_zarr_in_zip(zarr_in_zip, ds): def test_zarr_combine(tmpdir, ds): fn1 = f"{tmpdir}/test1.zarr" - ds.to_zarr(fn1) + ds.to_zarr(fn1, zarr_format=2) one = kerchunk.zarr.ZarrToZarr(fn1, inline_threshold=0).translate() fn = f"{tmpdir}/out.parq" @@ -89,7 +90,7 @@ def test_zarr_combine(tmpdir, ds): def test_zarr_json_dump_succeeds(tmpdir, ds): fn1 = f"{tmpdir}/test1.zarr" - ds.to_zarr(fn1) + ds.to_zarr(fn1, zarr_format=2) one = kerchunk.zarr.ZarrToZarr( fn1,