From 2d1b0c5b1a73f7b7791e155ecfee030b0ca720c5 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Tue, 28 Jan 2025 15:49:51 -0800 Subject: [PATCH 1/8] vendor kerchunk netCDF3 reader and its' required utils --- virtualizarr/vendor/kerchunk/__init__.py | 0 virtualizarr/vendor/kerchunk/netCDF3.py | 302 +++++++++++++++++++++++ virtualizarr/vendor/kerchunk/utils.py | 81 ++++++ 3 files changed, 383 insertions(+) create mode 100644 virtualizarr/vendor/kerchunk/__init__.py create mode 100644 virtualizarr/vendor/kerchunk/netCDF3.py create mode 100644 virtualizarr/vendor/kerchunk/utils.py diff --git a/virtualizarr/vendor/kerchunk/__init__.py b/virtualizarr/vendor/kerchunk/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/virtualizarr/vendor/kerchunk/netCDF3.py b/virtualizarr/vendor/kerchunk/netCDF3.py new file mode 100644 index 00000000..1f22f67d --- /dev/null +++ b/virtualizarr/vendor/kerchunk/netCDF3.py @@ -0,0 +1,302 @@ +from functools import reduce +from operator import mul + +import numpy as np +from fsspec.implementations.reference import LazyReferenceMapper +import fsspec + +from virtualizarr.vendor.kerchunk.utils import _encode_for_JSON, inline_array + +try: + from scipy.io._netcdf import ZERO, NC_VARIABLE, netcdf_file, netcdf_variable +except ModuleNotFoundError: # pragma: no cover + raise ImportError( + "Scipy is required for kerchunking NetCDF3 files. Please install with " + "`pip/conda install scipy`. See https://scipy.org/install/ for more details." + ) + + +class NetCDF3ToZarr(netcdf_file): + """Generate references for a netCDF3 file + + Uses scipy's netCDF3 reader, but only reads the metadata. Note that instances + do behave like actual scipy netcdf files, but contain no valid data. + Also appears to work for netCDF2, although this is not currently tested. + """ + + def __init__( + self, + filename, + storage_options=None, + inline_threshold=100, + max_chunk_size=0, + out=None, + **kwargs, + ): + """ + Parameters + ---------- + filename: str + location of the input + storage_options: dict + passed to fsspec when opening filename + inline_threshold: int + Byte size below which an array will be embedded in the output. Use 0 + to disable inlining. + max_chunk_size: int + How big a chunk can be before triggering subchunking. If 0, there is no + subchunking, and there is never subchunking for coordinate/dimension arrays. + E.g., if an array contains 10,000bytes, and this value is 6000, there will + be two output chunks, split on the biggest available dimension. [TBC] + out: dict-like or None + This allows you to supply an fsspec.implementations.reference.LazyReferenceMapper + to write out parquet as the references get filled, or some other dictionary-like class + to customise how references get stored + args, kwargs: passed to scipy superclass ``scipy.io.netcdf.netcdf_file`` + """ + assert kwargs.pop("mmap", False) is False + assert kwargs.pop("mode", "r") == "r" + assert kwargs.pop("maskandscale", False) is False + + # attributes set before super().__init__ don't accidentally turn into + # dataset attributes + self.chunks = {} + self.threshold = inline_threshold + self.max_chunk_size = max_chunk_size + self.out = out or {} + self.storage_options = storage_options + self.fp = fsspec.open(filename, **(storage_options or {})).open() + magic = self.fp.read(4) + assert magic[:3] == b"CDF" + version = kwargs.pop("version", None) or magic[3] + self.fp.seek(0) + super().__init__( + self.fp, + mmap=False, + mode="r", + maskandscale=False, + version=version, + ) + self.filename = filename # this becomes an attribute, so must ignore on write + + def _read_var_array(self): + header = self.fp.read(4) + if header not in [ZERO, NC_VARIABLE]: + raise ValueError("Unexpected header.") + + begin = 0 + dtypes = {"names": [], "formats": []} + rec_vars = [] + count = self._unpack_int() + for var in range(count): + ( + name, + dimensions, + shape, + attributes, + typecode, + size, + dtype_, + begin_, + vsize, + ) = self._read_var() + if shape and shape[0] is None: # record variable + rec_vars.append(name) + # The netCDF "record size" is calculated as the sum of + # the vsize's of all the record variables. + self.__dict__["_recsize"] += vsize + if begin == 0: + begin = begin_ + dtypes["names"].append(name) + dtypes["formats"].append(str(shape[1:]) + dtype_) + + # Handle padding with a virtual variable. + if typecode in "bch": + actual_size = reduce(mul, (1,) + shape[1:]) * size + padding = -actual_size % 4 + if padding: + dtypes["names"].append("_padding_%d" % var) + dtypes["formats"].append("(%d,)>b" % padding) + + # Data will be set later. + data = None + else: # not a record variable + # Calculate size to avoid problems with vsize (above) + a_size = reduce(mul, shape, 1) * size + self.chunks[name] = [begin_, a_size, dtype_, shape] + if name in ["latitude", "longitude", "time"]: + pos = self.fp.tell() + self.fp.seek(begin_) + data = np.frombuffer(self.fp.read(a_size), dtype=dtype_).copy() + # data.shape = shape + self.fp.seek(pos) + else: + data = np.empty(1, dtype=dtype_) + + # Add variable. + self.variables[name] = netcdf_variable( + data, + typecode, + size, + shape, + dimensions, + attributes, + maskandscale=self.maskandscale, + ) + + if rec_vars: + # Remove padding when only one record variable. + if len(rec_vars) == 1: + dtypes["names"] = dtypes["names"][:1] + dtypes["formats"] = dtypes["formats"][:1] + + pos = self.fp.tell() + self.fp.seek(begin) + self.chunks.setdefault("record_array", []).append( + [begin, self._recs * self._recsize, dtypes] + ) + self.fp.seek(pos) + + def translate(self): + """ + Produce references dictionary + + Parameters + ---------- + """ + import zarr + + out = self.out + z = zarr.open(out, mode="w") + for dim, var in self.variables.items(): + if dim in self.chunks: + shape = self.chunks[dim][-1] + elif dim in self.dimensions: + shape = self.dimensions[dim] + else: + # defer record array + continue + if isinstance(shape, int): + shape = (shape,) + if shape is None or (len(shape) and shape[0] is None): + # defer record array + continue + else: + # simple array block + # TODO: chance to sub-chunk + fill = var._attributes.get("missing_value", None) + if fill is None: + fill = var._attributes.get("_FillValue", None) + if fill is not None and var.data.dtype.kind == "f": + fill = float(fill) + if fill is not None and var.data.dtype.kind == "i": + fill = int(fill) + arr = z.create_dataset( + name=dim, + shape=shape, + dtype=var.data.dtype, + fill_value=fill, + chunks=shape, + compression=None, + ) + part = ".".join(["0"] * len(shape)) or "0" + k = f"{dim}/{part}" + out[k] = [ + self.filename, + int(self.chunks[dim][0]), + int(self.chunks[dim][1]), + ] + arr.attrs.update( + { + k: v.decode() if isinstance(v, bytes) else str(v) + for k, v in var._attributes.items() + if k not in ["_FillValue", "missing_value"] + } + ) + for k in ["add_offset", "scale_factor"]: + if k in arr.attrs: + arr.attrs[k] = float(arr.attrs[k]) + arr.attrs["_ARRAY_DIMENSIONS"] = list(var.dimensions) + if "record_array" in self.chunks: + # native chunks version (no codec, no options) + start, size, dt = self.chunks["record_array"][0] + dt = np.dtype(dt) + itemsize = sum(dt[_].itemsize for _ in dt.names) + outer_shape = size // itemsize + offset = start + for name in dt.names: + dtype = dt[name] + + # Skip padding, but increment offset. + if name.startswith("_padding_"): + offset += dtype.itemsize + continue + + # the order of the names if fixed and important! + var = self.variables[name] + base = dtype.base # actual dtype + shape = (outer_shape,) + dtype.shape + + # TODO: avoid this code repeat + fill = var._attributes.get("missing_value", None) + if fill is None: + fill = var._attributes.get("_FillValue", None) + if fill is not None and base.kind == "f": + fill = float(fill) + if fill is not None and base.kind == "i": + fill = int(fill) + arr = z.create_dataset( + name=name, + shape=shape, + dtype=base, + fill_value=fill, + chunks=(1,) + dtype.shape, + compression=None, + ) + arr.attrs.update( + { + k: v.decode() if isinstance(v, bytes) else str(v) + for k, v in var._attributes.items() + if k not in ["_FillValue", "missing_value"] + } + ) + for k in ["add_offset", "scale_factor"]: + if k in arr.attrs: + arr.attrs[k] = float(arr.attrs[k]) + + arr.attrs["_ARRAY_DIMENSIONS"] = list(var.dimensions) + + suffix = ( + ("." + ".".join("0" for _ in dtype.shape)) if dtype.shape else "" + ) + for i in range(outer_shape): + out[f"{name}/{i}{suffix}"] = [ + self.filename, + int(offset + i * dt.itemsize), + int(dtype.itemsize), + ] + + offset += dtype.itemsize + z.attrs.update( + { + k: v.decode() if isinstance(v, bytes) else str(v) + for k, v in self._attributes.items() + if k != "filename" # special "attribute" + } + ) + if self.threshold: + out = inline_array( + out, + self.threshold, + remote_options=dict(remote_options=self.storage_options), + ) + + if isinstance(out, LazyReferenceMapper): + out.flush() + return out + else: + out = _encode_for_JSON(out) + return {"version": 1, "refs": out} + + +netcdf_recording_file = NetCDF3ToZarr # old name diff --git a/virtualizarr/vendor/kerchunk/utils.py b/virtualizarr/vendor/kerchunk/utils.py new file mode 100644 index 00000000..f404a35b --- /dev/null +++ b/virtualizarr/vendor/kerchunk/utils.py @@ -0,0 +1,81 @@ +import base64 + +import fsspec +import ujson +import zarr + + +def _encode_for_JSON(store): + """Make store JSON encodable""" + for k, v in store.copy().items(): + if isinstance(v, list): + continue + else: + try: + # minify JSON + v = ujson.dumps(ujson.loads(v)) + except (ValueError, TypeError): + pass + try: + store[k] = v.decode() if isinstance(v, bytes) else v + except UnicodeDecodeError: + store[k] = "base64:" + base64.b64encode(v).decode() + return store + + +def _inline_array(group, threshold, names, prefix=""): + for name, thing in group.items(): + if prefix: + prefix1 = f"{prefix}.{name}" + else: + prefix1 = name + if isinstance(thing, zarr.Group): + _inline_array(thing, threshold=threshold, prefix=prefix1, names=names) + else: + cond1 = threshold and thing.nbytes < threshold + cond2 = prefix1 in names + if cond1 or cond2: + original_attrs = dict(thing.attrs) + arr = group.create_dataset( + name=name, + dtype=thing.dtype, + shape=thing.shape, + data=thing[:], + chunks=thing.shape, + compression=None, + overwrite=True, + fill_value=thing.fill_value, + ) + arr.attrs.update(original_attrs) + + +def inline_array(store, threshold=1000, names=None, remote_options=None): + """Inline whole arrays by threshold or name, replace with a single metadata chunk + + Inlining whole arrays results in fewer keys. If the constituent keys were + already inlined, this also results in a smaller file overall. No action is taken + for arrays that are already of one chunk (they should be in + + Parameters + ---------- + store: dict/JSON file + reference set + threshold: int + Size in bytes below which to inline. Set to 0 to prevent inlining by size + names: list[str] | None + It the array name (as a dotted full path) appears in this list, it will + be inlined irrespective of the threshold size. Useful for coordinates. + remote_options: dict | None + Needed to fetch data, if the required keys are not already individually inlined + in the data. + + Returns + ------- + amended references set (simple style) + """ + fs = fsspec.filesystem( + "reference", fo=store, **(remote_options or {}), skip_instance_cache=True + ) + g = zarr.open_group(fs.get_mapper(), mode="r+") + _inline_array(g, threshold, names=names or []) + return fs.references From aeb6b6734a980ebc0fd88ecde951090fce2555ec Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Tue, 28 Jan 2025 15:52:11 -0800 Subject: [PATCH 2/8] change netcdf3 reader to use vendored version of kerchunk reader --- virtualizarr/readers/netcdf3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/virtualizarr/readers/netcdf3.py b/virtualizarr/readers/netcdf3.py index 816df75e..1b0a5d0c 100644 --- a/virtualizarr/readers/netcdf3.py +++ b/virtualizarr/readers/netcdf3.py @@ -26,7 +26,7 @@ def open_virtual_dataset( virtual_backend_kwargs: Optional[dict] = None, reader_options: Optional[dict] = None, ) -> Dataset: - from kerchunk.netCDF3 import NetCDF3ToZarr + from virtualizarr.vendor.kerchunk.netCDF3 import NetCDF3ToZarr if virtual_backend_kwargs: raise NotImplementedError( From 2627ffc3450330b2967cc98d59ad76479535aff0 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Tue, 28 Jan 2025 15:52:53 -0800 Subject: [PATCH 3/8] add test for netcdf3 reader --- virtualizarr/tests/test_readers/conftest.py | 11 +++++++ .../tests/test_readers/test_netcdf3.py | 29 +++++++++++++++++++ 2 files changed, 40 insertions(+) create mode 100644 virtualizarr/tests/test_readers/test_netcdf3.py diff --git a/virtualizarr/tests/test_readers/conftest.py b/virtualizarr/tests/test_readers/conftest.py index f96447db..e367ef35 100644 --- a/virtualizarr/tests/test_readers/conftest.py +++ b/virtualizarr/tests/test_readers/conftest.py @@ -1,3 +1,4 @@ +import pathlib import warnings import h5py # type: ignore @@ -322,3 +323,13 @@ def root_coordinates_hdf5_file(tmpdir, np_uncompressed_int16): f.create_dataset(name="lon", data=data) f.attrs.create(name="coordinates", data="lat lon") return filepath + + +@pytest.fixture +def netcdf3_file(tmp_path: pathlib.Path) -> pathlib.Path: + ds = xr.Dataset({"foo": ("x", np.array([1, 2, 3]))}) + + filepath = tmp_path / "file.nc" + ds.to_netcdf(filepath, format="NETCDF3_CLASSIC") + + return filepath diff --git a/virtualizarr/tests/test_readers/test_netcdf3.py b/virtualizarr/tests/test_readers/test_netcdf3.py new file mode 100644 index 00000000..20da57e2 --- /dev/null +++ b/virtualizarr/tests/test_readers/test_netcdf3.py @@ -0,0 +1,29 @@ +import numpy as np +import xarray as xr +import xarray.testing as xrt + +from virtualizarr import open_virtual_dataset +from virtualizarr.tests import requires_scipy +from virtualizarr.manifests import ManifestArray, ChunkManifest +from virtualizarr.zarr import ZArray + + +@requires_scipy +def test_read_netcdf3(netcdf3_file): + filepath = str(netcdf3_file) + vds = open_virtual_dataset(filepath) + + assert isinstance(vds, xr.Dataset) + assert list(vds.variables.keys()) == ["foo"] + assert isinstance(vds["foo"].data, ManifestArray) + + expected_manifest = ChunkManifest( + entries={"0": {"path": filepath, "offset": 80, "length": 12}} + ) + expected_zarray = ZArray(dtype=np.dtype(">i4"), shape=(3,), chunks=(3,)) + expected_ma = ManifestArray(chunkmanifest=expected_manifest, zarray=expected_zarray) + expected_vds = xr.Dataset( + {"foo": xr.Variable(data=expected_ma, dims=["x"])} + ) + + xrt.assert_identical(vds, expected_vds) From a33e674b35cbc9eabe1c083e8fe14dbc468b2b8e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 29 Jan 2025 00:49:26 +0000 Subject: [PATCH 4/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- virtualizarr/tests/test_readers/test_netcdf3.py | 10 ++++------ virtualizarr/vendor/kerchunk/netCDF3.py | 4 ++-- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/virtualizarr/tests/test_readers/test_netcdf3.py b/virtualizarr/tests/test_readers/test_netcdf3.py index 20da57e2..607e7f87 100644 --- a/virtualizarr/tests/test_readers/test_netcdf3.py +++ b/virtualizarr/tests/test_readers/test_netcdf3.py @@ -3,8 +3,8 @@ import xarray.testing as xrt from virtualizarr import open_virtual_dataset +from virtualizarr.manifests import ChunkManifest, ManifestArray from virtualizarr.tests import requires_scipy -from virtualizarr.manifests import ManifestArray, ChunkManifest from virtualizarr.zarr import ZArray @@ -12,18 +12,16 @@ def test_read_netcdf3(netcdf3_file): filepath = str(netcdf3_file) vds = open_virtual_dataset(filepath) - + assert isinstance(vds, xr.Dataset) assert list(vds.variables.keys()) == ["foo"] assert isinstance(vds["foo"].data, ManifestArray) - + expected_manifest = ChunkManifest( entries={"0": {"path": filepath, "offset": 80, "length": 12}} ) expected_zarray = ZArray(dtype=np.dtype(">i4"), shape=(3,), chunks=(3,)) expected_ma = ManifestArray(chunkmanifest=expected_manifest, zarray=expected_zarray) - expected_vds = xr.Dataset( - {"foo": xr.Variable(data=expected_ma, dims=["x"])} - ) + expected_vds = xr.Dataset({"foo": xr.Variable(data=expected_ma, dims=["x"])}) xrt.assert_identical(vds, expected_vds) diff --git a/virtualizarr/vendor/kerchunk/netCDF3.py b/virtualizarr/vendor/kerchunk/netCDF3.py index 1f22f67d..08bd9152 100644 --- a/virtualizarr/vendor/kerchunk/netCDF3.py +++ b/virtualizarr/vendor/kerchunk/netCDF3.py @@ -1,14 +1,14 @@ from functools import reduce from operator import mul +import fsspec import numpy as np from fsspec.implementations.reference import LazyReferenceMapper -import fsspec from virtualizarr.vendor.kerchunk.utils import _encode_for_JSON, inline_array try: - from scipy.io._netcdf import ZERO, NC_VARIABLE, netcdf_file, netcdf_variable + from scipy.io._netcdf import NC_VARIABLE, ZERO, netcdf_file, netcdf_variable except ModuleNotFoundError: # pragma: no cover raise ImportError( "Scipy is required for kerchunking NetCDF3 files. Please install with " From 1355f238bf5f977d9c2234e37bb65557e265d494 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Tue, 28 Jan 2025 16:55:25 -0800 Subject: [PATCH 5/8] type checking --- virtualizarr/readers/netcdf3.py | 3 +++ virtualizarr/vendor/kerchunk/netCDF3.py | 7 ++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/virtualizarr/readers/netcdf3.py b/virtualizarr/readers/netcdf3.py index 1b0a5d0c..529b0c1b 100644 --- a/virtualizarr/readers/netcdf3.py +++ b/virtualizarr/readers/netcdf3.py @@ -38,6 +38,9 @@ def open_virtual_dataset( loadable_variables, ) + if reader_options is None: + reader_options = {} + refs = NetCDF3ToZarr(filepath, inline_threshold=0, **reader_options).translate() # both group=None and group='' mean to read root group diff --git a/virtualizarr/vendor/kerchunk/netCDF3.py b/virtualizarr/vendor/kerchunk/netCDF3.py index 08bd9152..00190821 100644 --- a/virtualizarr/vendor/kerchunk/netCDF3.py +++ b/virtualizarr/vendor/kerchunk/netCDF3.py @@ -8,7 +8,12 @@ from virtualizarr.vendor.kerchunk.utils import _encode_for_JSON, inline_array try: - from scipy.io._netcdf import NC_VARIABLE, ZERO, netcdf_file, netcdf_variable + from scipy.io._netcdf import ( # type: ignore[import-untyped] + NC_VARIABLE, + ZERO, + netcdf_file, + netcdf_variable, + ) except ModuleNotFoundError: # pragma: no cover raise ImportError( "Scipy is required for kerchunking NetCDF3 files. Please install with " From 5b7f2befbf8bb073eff13a62beb87bc5743c34f6 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Tue, 28 Jan 2025 16:57:41 -0800 Subject: [PATCH 6/8] release note --- docs/releases.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/releases.rst b/docs/releases.rst index 02b270bf..a5df9329 100644 --- a/docs/releases.rst +++ b/docs/releases.rst @@ -49,6 +49,8 @@ Documentation Internal Changes ~~~~~~~~~~~~~~~~ +- Vendor netCDF3 reader from kerchunk. (:pull:`397`) By `Tom Nicholas `_. + .. _v1.2.0: v1.2.0 (5th Dec 2024) From 426cc2f2f17ee4f5e8ed1236c55cab0b73a9b566 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Tue, 28 Jan 2025 17:00:56 -0800 Subject: [PATCH 7/8] add TODO about testing against xarray backend --- virtualizarr/tests/test_readers/test_netcdf3.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/virtualizarr/tests/test_readers/test_netcdf3.py b/virtualizarr/tests/test_readers/test_netcdf3.py index 607e7f87..66b3ba98 100644 --- a/virtualizarr/tests/test_readers/test_netcdf3.py +++ b/virtualizarr/tests/test_readers/test_netcdf3.py @@ -25,3 +25,6 @@ def test_read_netcdf3(netcdf3_file): expected_vds = xr.Dataset({"foo": xr.Variable(data=expected_ma, dims=["x"])}) xrt.assert_identical(vds, expected_vds) + + +# TODO test loading data against xarray backend, see issue #394 for context From 08e2eb8b6ee5c51c621b86be90f10cb0be3964b1 Mon Sep 17 00:00:00 2001 From: Max Jones <14077947+maxrjones@users.noreply.github.com> Date: Wed, 29 Jan 2025 14:37:27 -0500 Subject: [PATCH 8/8] Add Kerchunk's license to vendor dir --- virtualizarr/vendor/kerchunk/LICENSE.txt | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 virtualizarr/vendor/kerchunk/LICENSE.txt diff --git a/virtualizarr/vendor/kerchunk/LICENSE.txt b/virtualizarr/vendor/kerchunk/LICENSE.txt new file mode 100644 index 00000000..a6ad28df --- /dev/null +++ b/virtualizarr/vendor/kerchunk/LICENSE.txt @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2020 Intake + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE.