From 95fce110e6edfd5733ee66bc9824921c5eac6588 Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Wed, 29 Jan 2025 12:51:44 -0700 Subject: [PATCH] Vendor kerchunk netCDF3 reader (#397) --- docs/releases.rst | 2 + virtualizarr/readers/netcdf3.py | 5 +- virtualizarr/tests/test_readers/conftest.py | 11 + .../tests/test_readers/test_netcdf3.py | 30 ++ virtualizarr/vendor/kerchunk/LICENSE.txt | 21 ++ virtualizarr/vendor/kerchunk/__init__.py | 0 virtualizarr/vendor/kerchunk/netCDF3.py | 307 ++++++++++++++++++ virtualizarr/vendor/kerchunk/utils.py | 81 +++++ 8 files changed, 456 insertions(+), 1 deletion(-) create mode 100644 virtualizarr/tests/test_readers/test_netcdf3.py create mode 100644 virtualizarr/vendor/kerchunk/LICENSE.txt create mode 100644 virtualizarr/vendor/kerchunk/__init__.py create mode 100644 virtualizarr/vendor/kerchunk/netCDF3.py create mode 100644 virtualizarr/vendor/kerchunk/utils.py diff --git a/docs/releases.rst b/docs/releases.rst index a5e3fba6..abc3fb05 100644 --- a/docs/releases.rst +++ b/docs/releases.rst @@ -60,6 +60,8 @@ Documentation Internal Changes ~~~~~~~~~~~~~~~~ +- Vendor netCDF3 reader from kerchunk. (:pull:`397`) By `Tom Nicholas `_. + .. _v1.2.0: v1.2.0 (5th Dec 2024) diff --git a/virtualizarr/readers/netcdf3.py b/virtualizarr/readers/netcdf3.py index 816df75e..529b0c1b 100644 --- a/virtualizarr/readers/netcdf3.py +++ b/virtualizarr/readers/netcdf3.py @@ -26,7 +26,7 @@ def open_virtual_dataset( virtual_backend_kwargs: Optional[dict] = None, reader_options: Optional[dict] = None, ) -> Dataset: - from kerchunk.netCDF3 import NetCDF3ToZarr + from virtualizarr.vendor.kerchunk.netCDF3 import NetCDF3ToZarr if virtual_backend_kwargs: raise NotImplementedError( @@ -38,6 +38,9 @@ def open_virtual_dataset( loadable_variables, ) + if reader_options is None: + reader_options = {} + refs = NetCDF3ToZarr(filepath, inline_threshold=0, **reader_options).translate() # both group=None and group='' mean to read root group diff --git a/virtualizarr/tests/test_readers/conftest.py b/virtualizarr/tests/test_readers/conftest.py index 98663223..719bafc4 100644 --- a/virtualizarr/tests/test_readers/conftest.py +++ b/virtualizarr/tests/test_readers/conftest.py @@ -1,3 +1,4 @@ +import pathlib import warnings import h5py # type: ignore @@ -322,3 +323,13 @@ def root_coordinates_hdf5_file(tmpdir, np_uncompressed_int16): f.create_dataset(name="lon", data=data) f.attrs.create(name="coordinates", data="lat lon") return filepath + + +@pytest.fixture +def netcdf3_file(tmp_path: pathlib.Path) -> pathlib.Path: + ds = xr.Dataset({"foo": ("x", np.array([1, 2, 3]))}) + + filepath = tmp_path / "file.nc" + ds.to_netcdf(filepath, format="NETCDF3_CLASSIC") + + return filepath diff --git a/virtualizarr/tests/test_readers/test_netcdf3.py b/virtualizarr/tests/test_readers/test_netcdf3.py new file mode 100644 index 00000000..66b3ba98 --- /dev/null +++ b/virtualizarr/tests/test_readers/test_netcdf3.py @@ -0,0 +1,30 @@ +import numpy as np +import xarray as xr +import xarray.testing as xrt + +from virtualizarr import open_virtual_dataset +from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.tests import requires_scipy +from virtualizarr.zarr import ZArray + + +@requires_scipy +def test_read_netcdf3(netcdf3_file): + filepath = str(netcdf3_file) + vds = open_virtual_dataset(filepath) + + assert isinstance(vds, xr.Dataset) + assert list(vds.variables.keys()) == ["foo"] + assert isinstance(vds["foo"].data, ManifestArray) + + expected_manifest = ChunkManifest( + entries={"0": {"path": filepath, "offset": 80, "length": 12}} + ) + expected_zarray = ZArray(dtype=np.dtype(">i4"), shape=(3,), chunks=(3,)) + expected_ma = ManifestArray(chunkmanifest=expected_manifest, zarray=expected_zarray) + expected_vds = xr.Dataset({"foo": xr.Variable(data=expected_ma, dims=["x"])}) + + xrt.assert_identical(vds, expected_vds) + + +# TODO test loading data against xarray backend, see issue #394 for context diff --git a/virtualizarr/vendor/kerchunk/LICENSE.txt b/virtualizarr/vendor/kerchunk/LICENSE.txt new file mode 100644 index 00000000..a6ad28df --- /dev/null +++ b/virtualizarr/vendor/kerchunk/LICENSE.txt @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2020 Intake + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/virtualizarr/vendor/kerchunk/__init__.py b/virtualizarr/vendor/kerchunk/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/virtualizarr/vendor/kerchunk/netCDF3.py b/virtualizarr/vendor/kerchunk/netCDF3.py new file mode 100644 index 00000000..00190821 --- /dev/null +++ b/virtualizarr/vendor/kerchunk/netCDF3.py @@ -0,0 +1,307 @@ +from functools import reduce +from operator import mul + +import fsspec +import numpy as np +from fsspec.implementations.reference import LazyReferenceMapper + +from virtualizarr.vendor.kerchunk.utils import _encode_for_JSON, inline_array + +try: + from scipy.io._netcdf import ( # type: ignore[import-untyped] + NC_VARIABLE, + ZERO, + netcdf_file, + netcdf_variable, + ) +except ModuleNotFoundError: # pragma: no cover + raise ImportError( + "Scipy is required for kerchunking NetCDF3 files. Please install with " + "`pip/conda install scipy`. See https://scipy.org/install/ for more details." + ) + + +class NetCDF3ToZarr(netcdf_file): + """Generate references for a netCDF3 file + + Uses scipy's netCDF3 reader, but only reads the metadata. Note that instances + do behave like actual scipy netcdf files, but contain no valid data. + Also appears to work for netCDF2, although this is not currently tested. + """ + + def __init__( + self, + filename, + storage_options=None, + inline_threshold=100, + max_chunk_size=0, + out=None, + **kwargs, + ): + """ + Parameters + ---------- + filename: str + location of the input + storage_options: dict + passed to fsspec when opening filename + inline_threshold: int + Byte size below which an array will be embedded in the output. Use 0 + to disable inlining. + max_chunk_size: int + How big a chunk can be before triggering subchunking. If 0, there is no + subchunking, and there is never subchunking for coordinate/dimension arrays. + E.g., if an array contains 10,000bytes, and this value is 6000, there will + be two output chunks, split on the biggest available dimension. [TBC] + out: dict-like or None + This allows you to supply an fsspec.implementations.reference.LazyReferenceMapper + to write out parquet as the references get filled, or some other dictionary-like class + to customise how references get stored + args, kwargs: passed to scipy superclass ``scipy.io.netcdf.netcdf_file`` + """ + assert kwargs.pop("mmap", False) is False + assert kwargs.pop("mode", "r") == "r" + assert kwargs.pop("maskandscale", False) is False + + # attributes set before super().__init__ don't accidentally turn into + # dataset attributes + self.chunks = {} + self.threshold = inline_threshold + self.max_chunk_size = max_chunk_size + self.out = out or {} + self.storage_options = storage_options + self.fp = fsspec.open(filename, **(storage_options or {})).open() + magic = self.fp.read(4) + assert magic[:3] == b"CDF" + version = kwargs.pop("version", None) or magic[3] + self.fp.seek(0) + super().__init__( + self.fp, + mmap=False, + mode="r", + maskandscale=False, + version=version, + ) + self.filename = filename # this becomes an attribute, so must ignore on write + + def _read_var_array(self): + header = self.fp.read(4) + if header not in [ZERO, NC_VARIABLE]: + raise ValueError("Unexpected header.") + + begin = 0 + dtypes = {"names": [], "formats": []} + rec_vars = [] + count = self._unpack_int() + for var in range(count): + ( + name, + dimensions, + shape, + attributes, + typecode, + size, + dtype_, + begin_, + vsize, + ) = self._read_var() + if shape and shape[0] is None: # record variable + rec_vars.append(name) + # The netCDF "record size" is calculated as the sum of + # the vsize's of all the record variables. + self.__dict__["_recsize"] += vsize + if begin == 0: + begin = begin_ + dtypes["names"].append(name) + dtypes["formats"].append(str(shape[1:]) + dtype_) + + # Handle padding with a virtual variable. + if typecode in "bch": + actual_size = reduce(mul, (1,) + shape[1:]) * size + padding = -actual_size % 4 + if padding: + dtypes["names"].append("_padding_%d" % var) + dtypes["formats"].append("(%d,)>b" % padding) + + # Data will be set later. + data = None + else: # not a record variable + # Calculate size to avoid problems with vsize (above) + a_size = reduce(mul, shape, 1) * size + self.chunks[name] = [begin_, a_size, dtype_, shape] + if name in ["latitude", "longitude", "time"]: + pos = self.fp.tell() + self.fp.seek(begin_) + data = np.frombuffer(self.fp.read(a_size), dtype=dtype_).copy() + # data.shape = shape + self.fp.seek(pos) + else: + data = np.empty(1, dtype=dtype_) + + # Add variable. + self.variables[name] = netcdf_variable( + data, + typecode, + size, + shape, + dimensions, + attributes, + maskandscale=self.maskandscale, + ) + + if rec_vars: + # Remove padding when only one record variable. + if len(rec_vars) == 1: + dtypes["names"] = dtypes["names"][:1] + dtypes["formats"] = dtypes["formats"][:1] + + pos = self.fp.tell() + self.fp.seek(begin) + self.chunks.setdefault("record_array", []).append( + [begin, self._recs * self._recsize, dtypes] + ) + self.fp.seek(pos) + + def translate(self): + """ + Produce references dictionary + + Parameters + ---------- + """ + import zarr + + out = self.out + z = zarr.open(out, mode="w") + for dim, var in self.variables.items(): + if dim in self.chunks: + shape = self.chunks[dim][-1] + elif dim in self.dimensions: + shape = self.dimensions[dim] + else: + # defer record array + continue + if isinstance(shape, int): + shape = (shape,) + if shape is None or (len(shape) and shape[0] is None): + # defer record array + continue + else: + # simple array block + # TODO: chance to sub-chunk + fill = var._attributes.get("missing_value", None) + if fill is None: + fill = var._attributes.get("_FillValue", None) + if fill is not None and var.data.dtype.kind == "f": + fill = float(fill) + if fill is not None and var.data.dtype.kind == "i": + fill = int(fill) + arr = z.create_dataset( + name=dim, + shape=shape, + dtype=var.data.dtype, + fill_value=fill, + chunks=shape, + compression=None, + ) + part = ".".join(["0"] * len(shape)) or "0" + k = f"{dim}/{part}" + out[k] = [ + self.filename, + int(self.chunks[dim][0]), + int(self.chunks[dim][1]), + ] + arr.attrs.update( + { + k: v.decode() if isinstance(v, bytes) else str(v) + for k, v in var._attributes.items() + if k not in ["_FillValue", "missing_value"] + } + ) + for k in ["add_offset", "scale_factor"]: + if k in arr.attrs: + arr.attrs[k] = float(arr.attrs[k]) + arr.attrs["_ARRAY_DIMENSIONS"] = list(var.dimensions) + if "record_array" in self.chunks: + # native chunks version (no codec, no options) + start, size, dt = self.chunks["record_array"][0] + dt = np.dtype(dt) + itemsize = sum(dt[_].itemsize for _ in dt.names) + outer_shape = size // itemsize + offset = start + for name in dt.names: + dtype = dt[name] + + # Skip padding, but increment offset. + if name.startswith("_padding_"): + offset += dtype.itemsize + continue + + # the order of the names if fixed and important! + var = self.variables[name] + base = dtype.base # actual dtype + shape = (outer_shape,) + dtype.shape + + # TODO: avoid this code repeat + fill = var._attributes.get("missing_value", None) + if fill is None: + fill = var._attributes.get("_FillValue", None) + if fill is not None and base.kind == "f": + fill = float(fill) + if fill is not None and base.kind == "i": + fill = int(fill) + arr = z.create_dataset( + name=name, + shape=shape, + dtype=base, + fill_value=fill, + chunks=(1,) + dtype.shape, + compression=None, + ) + arr.attrs.update( + { + k: v.decode() if isinstance(v, bytes) else str(v) + for k, v in var._attributes.items() + if k not in ["_FillValue", "missing_value"] + } + ) + for k in ["add_offset", "scale_factor"]: + if k in arr.attrs: + arr.attrs[k] = float(arr.attrs[k]) + + arr.attrs["_ARRAY_DIMENSIONS"] = list(var.dimensions) + + suffix = ( + ("." + ".".join("0" for _ in dtype.shape)) if dtype.shape else "" + ) + for i in range(outer_shape): + out[f"{name}/{i}{suffix}"] = [ + self.filename, + int(offset + i * dt.itemsize), + int(dtype.itemsize), + ] + + offset += dtype.itemsize + z.attrs.update( + { + k: v.decode() if isinstance(v, bytes) else str(v) + for k, v in self._attributes.items() + if k != "filename" # special "attribute" + } + ) + if self.threshold: + out = inline_array( + out, + self.threshold, + remote_options=dict(remote_options=self.storage_options), + ) + + if isinstance(out, LazyReferenceMapper): + out.flush() + return out + else: + out = _encode_for_JSON(out) + return {"version": 1, "refs": out} + + +netcdf_recording_file = NetCDF3ToZarr # old name diff --git a/virtualizarr/vendor/kerchunk/utils.py b/virtualizarr/vendor/kerchunk/utils.py new file mode 100644 index 00000000..f404a35b --- /dev/null +++ b/virtualizarr/vendor/kerchunk/utils.py @@ -0,0 +1,81 @@ +import base64 + +import fsspec +import ujson +import zarr + + +def _encode_for_JSON(store): + """Make store JSON encodable""" + for k, v in store.copy().items(): + if isinstance(v, list): + continue + else: + try: + # minify JSON + v = ujson.dumps(ujson.loads(v)) + except (ValueError, TypeError): + pass + try: + store[k] = v.decode() if isinstance(v, bytes) else v + except UnicodeDecodeError: + store[k] = "base64:" + base64.b64encode(v).decode() + return store + + +def _inline_array(group, threshold, names, prefix=""): + for name, thing in group.items(): + if prefix: + prefix1 = f"{prefix}.{name}" + else: + prefix1 = name + if isinstance(thing, zarr.Group): + _inline_array(thing, threshold=threshold, prefix=prefix1, names=names) + else: + cond1 = threshold and thing.nbytes < threshold + cond2 = prefix1 in names + if cond1 or cond2: + original_attrs = dict(thing.attrs) + arr = group.create_dataset( + name=name, + dtype=thing.dtype, + shape=thing.shape, + data=thing[:], + chunks=thing.shape, + compression=None, + overwrite=True, + fill_value=thing.fill_value, + ) + arr.attrs.update(original_attrs) + + +def inline_array(store, threshold=1000, names=None, remote_options=None): + """Inline whole arrays by threshold or name, replace with a single metadata chunk + + Inlining whole arrays results in fewer keys. If the constituent keys were + already inlined, this also results in a smaller file overall. No action is taken + for arrays that are already of one chunk (they should be in + + Parameters + ---------- + store: dict/JSON file + reference set + threshold: int + Size in bytes below which to inline. Set to 0 to prevent inlining by size + names: list[str] | None + It the array name (as a dotted full path) appears in this list, it will + be inlined irrespective of the threshold size. Useful for coordinates. + remote_options: dict | None + Needed to fetch data, if the required keys are not already individually inlined + in the data. + + Returns + ------- + amended references set (simple style) + """ + fs = fsspec.filesystem( + "reference", fo=store, **(remote_options or {}), skip_instance_cache=True + ) + g = zarr.open_group(fs.get_mapper(), mode="r+") + _inline_array(g, threshold, names=names or []) + return fs.references