diff --git a/conftest.py b/conftest.py index 32b3581..58906de 100644 --- a/conftest.py +++ b/conftest.py @@ -1,6 +1,8 @@ import h5py +import numpy as np import pytest import xarray as xr +from xarray.core.variable import Variable def pytest_addoption(parser): @@ -82,3 +84,16 @@ def hdf5_scalar(tmpdir): dataset = f.create_dataset("scalar", data=0.1, dtype="float32") dataset.attrs["scalar"] = "true" return filepath + + +@pytest.fixture +def simple_netcdf4(tmpdir): + filepath = f"{tmpdir}/simple.nc" + + arr = np.arange(12, dtype=np.dtype("int32")).reshape(3, 4) + var = Variable(data=arr, dims=["x", "y"]) + ds = xr.Dataset({"foo": var}) + + ds.to_netcdf(filepath) + + return filepath diff --git a/virtualizarr/tests/test_writers/conftest.py b/virtualizarr/tests/test_writers/conftest.py new file mode 100644 index 0000000..28c5b3d --- /dev/null +++ b/virtualizarr/tests/test_writers/conftest.py @@ -0,0 +1,27 @@ +import numpy as np +import pytest +from xarray import Dataset +from xarray.core.variable import Variable + +from virtualizarr.manifests import ChunkManifest, ManifestArray + + +@pytest.fixture +def vds_with_manifest_arrays() -> Dataset: + arr = ManifestArray( + chunkmanifest=ChunkManifest( + entries={"0.0": dict(path="/test.nc", offset=6144, length=48)} + ), + zarray=dict( + shape=(2, 3), + dtype=np.dtype(" "IcechunkStore": + from icechunk import IcechunkStore, StorageConfig + + storage = StorageConfig.filesystem(str(tmpdir)) + + # TODO if icechunk exposed a synchronous version of .open then we wouldn't need to use asyncio.run here + # TODO is this the correct mode to use? + store = asyncio.run(IcechunkStore.open(storage=storage, mode="r+")) + + # TODO instead yield store then store.close() ?? + return store + + +class TestWriteVirtualRefs: + def test_write_new_virtual_variable( + self, icechunk_filestore: "IcechunkStore", vds_with_manifest_arrays: Dataset + ): + vds = vds_with_manifest_arrays + + dataset_to_icechunk(vds, icechunk_filestore) + + # check attrs + root_group = group(store=icechunk_filestore) + assert isinstance(root_group, Group) + assert root_group.attrs == {"something": 0} + + # TODO check against vds, then perhaps parametrize? + + # check array exists + assert "a" in root_group + arr = root_group["a"] + assert isinstance(arr, Array) + + # check array metadata + # TODO why doesn't a .zarr_format or .version attribute exist on zarr.Array? + # assert arr.zarr_format == 3 + assert arr.shape == (2, 3) + assert arr.chunks == (2, 3) + assert arr.dtype == np.dtype(" Dataset: - arr = ManifestArray( - chunkmanifest=ChunkManifest( - entries={"0.0": dict(path="test.nc", offset=6144, length=48)} - ), - zarray=dict( - shape=(2, 3), - dtype=np.dtype(" bool: """ Several metadata attributes in ZarrV3 use a dictionary with keys "name" : str and "configuration" : dict diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py new file mode 100644 index 0000000..b7a5982 --- /dev/null +++ b/virtualizarr/writers/icechunk.py @@ -0,0 +1,180 @@ +import asyncio +from typing import TYPE_CHECKING + +import numpy as np +from xarray import Dataset, conventions +from xarray.backends.zarr import encode_zarr_attr_value +from xarray.core.variable import Variable +from zarr import Group + +from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.zarr import encode_dtype + +if TYPE_CHECKING: + from icechunk import IcechunkStore + + +VALID_URI_PREFIXES = { + "s3://", + # "gs://", + # "azure://", + # "r2://", + # "cos://", + # "minio://", + "file:///", +} + + +def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: + """ + Write an xarray dataset whose variables wrap ManifestArrays to an Icechunk store. + + Currently requires all variables to be backed by ManifestArray objects. + + Parameters + ---------- + ds: xr.Dataset + store: IcechunkStore + """ + from icechunk import IcechunkStore + + if not isinstance(store, IcechunkStore): + raise TypeError(f"expected type IcechunkStore, but got type {type(store)}") + + if not store.supports_writes: + raise ValueError("supplied store does not support writes") + + # TODO only supports writing to the root group currently + # TODO pass zarr_format kwarg? + root_group = Group.from_store(store=store) + + # TODO this is Frozen, the API for setting attributes must be something else + # root_group.attrs = ds.attrs + for k, v in ds.attrs.items(): + root_group.attrs[k] = encode_zarr_attr_value(v) + + return write_variables_to_icechunk_group( + ds.variables, + store=store, + group=root_group, + ) + + +def write_variables_to_icechunk_group( + variables, + store, + group, +): + for name, var in variables.items(): + write_variable_to_icechunk( + store=store, + group=group, + name=name, + var=var, + ) + + +def write_variable_to_icechunk( + store: "IcechunkStore", + group: Group, + name: str, + var: Variable, +) -> None: + """Write a single (possibly virtual) variable into an icechunk store""" + + if isinstance(var.data, ManifestArray): + write_virtual_variable_to_icechunk( + store=store, + group=group, + name=name, + var=var, + ) + else: + # TODO is writing loadable_variables just normal xarray ds.to_zarr? + # raise NotImplementedError() + print("skipping non-virtual variable", name) + + +def write_virtual_variable_to_icechunk( + store: "IcechunkStore", + group: Group, + name: str, + var: Variable, +) -> None: + """Write a single virtual variable into an icechunk store""" + ma = var.data + zarray = ma.zarray + + # creates array if it doesn't already exist + arr = group.require_array( + name=name, + shape=zarray.shape, + chunk_shape=zarray.chunks, + dtype=encode_dtype(zarray.dtype), + codecs=zarray._v3_codec_pipeline(), + dimension_names=var.dims, + fill_value=zarray.fill_value, + # TODO fill_value? + ) + + # TODO it would be nice if we could assign directly to the .attrs property + for k, v in var.attrs.items(): + arr.attrs[k] = encode_zarr_attr_value(v) + arr.attrs["_ARRAY_DIMENSIONS"] = encode_zarr_attr_value(var.dims) + + _encoding_keys = {"_FillValue", "missing_value", "scale_factor", "add_offset"} + for k, v in var.encoding.items(): + if k in _encoding_keys: + arr.attrs[k] = encode_zarr_attr_value(v) + + write_manifest_virtual_refs( + store=store, + group=group, + arr_name=name, + manifest=ma.manifest, + ) + + +def write_manifest_virtual_refs( + store: "IcechunkStore", + group: Group, + arr_name: str, + manifest: ChunkManifest, +) -> None: + """Write all the virtual references for one array manifest at once.""" + + key_prefix = f"{group.name}{arr_name}" + + # loop over every reference in the ChunkManifest for that array + # TODO inefficient: this should be replaced with something that sets all (new) references for the array at once + # but Icechunk need to expose a suitable API first + it = np.nditer( + [manifest._paths, manifest._offsets, manifest._lengths], + flags=[ + "refs_ok", + "multi_index", + "c_index", # TODO is "c_index" correct? what's the convention for zarr chunk keys? + ], + op_flags=[["readonly"]] * 3, + ) + for path, offset, length in it: + index = it.multi_index + chunk_key = "/".join(str(i) for i in index) + + # set each reference individually + store.set_virtual_ref( + # TODO it would be marginally neater if I could pass the group and name as separate args + key=f"{key_prefix}/c/{chunk_key}", # should be of form 'group/arr_name/c/0/1/2', where c stands for chunks + location=as_file_uri(path.item()), + offset=offset.item(), + length=length.item(), + ) + + +def as_file_uri(path): + # TODO a more robust solution to this requirement exists in https://github.com/zarr-developers/VirtualiZarr/pull/243 + if not any(path.startswith(prefix) for prefix in VALID_URI_PREFIXES) and path != "": + # assume path is local + return f"file://{path}" + else: + return path diff --git a/virtualizarr/zarr.py b/virtualizarr/zarr.py index f62b126..d047f91 100644 --- a/virtualizarr/zarr.py +++ b/virtualizarr/zarr.py @@ -74,8 +74,9 @@ def codec(self) -> Codec: @classmethod def from_kerchunk_refs(cls, decoded_arr_refs_zarray) -> "ZArray": # coerce type of fill_value as kerchunk can be inconsistent with this + dtype = np.dtype(decoded_arr_refs_zarray["dtype"]) fill_value = decoded_arr_refs_zarray["fill_value"] - if fill_value is None or fill_value == "NaN" or fill_value == "nan": + if np.issubdtype(dtype, np.floating) and (fill_value is None or fill_value == "NaN" or fill_value == "nan"): fill_value = np.nan compressor = decoded_arr_refs_zarray["compressor"] @@ -86,7 +87,7 @@ def from_kerchunk_refs(cls, decoded_arr_refs_zarray) -> "ZArray": return ZArray( chunks=tuple(decoded_arr_refs_zarray["chunks"]), compressor=compressor, - dtype=np.dtype(decoded_arr_refs_zarray["dtype"]), + dtype=dtype, fill_value=fill_value, filters=decoded_arr_refs_zarray["filters"], order=decoded_arr_refs_zarray["order"], @@ -140,7 +141,7 @@ def replace( replacements["zarr_format"] = zarr_format return dataclasses.replace(self, **replacements) - def _v3_codec_pipeline(self) -> list: + def _v3_codec_pipeline(self) -> Any: """ VirtualiZarr internally uses the `filters`, `compressor`, and `order` attributes from zarr v2, but to create conformant zarr v3 metadata those 3 must be turned into `codecs` objects. @@ -153,44 +154,46 @@ def _v3_codec_pipeline(self) -> list: post_compressor: Iterable[BytesBytesCodec] #optional ``` """ - if self.filters: - filter_codecs_configs = [ - numcodecs.get_codec(filter).get_config() for filter in self.filters - ] - filters = [ - dict(name=codec.pop("id"), configuration=codec) - for codec in filter_codecs_configs - ] - else: - filters = [] - - # Noting here that zarr v3 has very few codecs specificed in the official spec, - # and that there are far more codecs in `numcodecs`. We take a gamble and assume - # that the codec names and configuration are simply mapped into zarrv3 "configurables". - if self.compressor: - compressor = [_num_codec_config_to_configurable(self.compressor)] - else: - compressor = [] + try: + from zarr.core.metadata.v3 import parse_codecs + except ImportError: + raise ImportError( + "zarr v3 is required to generate v3 codec pipelines" + ) + + codec_configs = [] # https://zarr-specs.readthedocs.io/en/latest/v3/codecs/transpose/v1.0.html#transpose-codec-v1 # Either "C" or "F", defining the layout of bytes within each chunk of the array. # "C" means row-major order, i.e., the last dimension varies fastest; # "F" means column-major order, i.e., the first dimension varies fastest. - if self.order == "C": - order = tuple(range(len(self.shape))) - elif self.order == "F": + # For now, we only need transpose if the order is not "C" + if self.order == "F": order = tuple(reversed(range(len(self.shape)))) + transpose = dict(name="transpose", configuration=dict(order=order)) + codec_configs.append(transpose) - transpose = dict(name="transpose", configuration=dict(order=order)) # https://github.com/zarr-developers/zarr-python/pull/1944#issuecomment-2151994097 # "If no ArrayBytesCodec is supplied, we can auto-add a BytesCodec" bytes = dict( name="bytes", configuration={} ) # TODO need to handle endianess configuration + codec_configs.append(bytes) + + # Noting here that zarr v3 has very few codecs specificed in the official spec, + # and that there are far more codecs in `numcodecs`. We take a gamble and assume + # that the codec names and configuration are simply mapped into zarrv3 "configurables". + if self.filters: + codec_configs.extend([ + _num_codec_config_to_configurable(filter) for filter in self.filters + ]) + + if self.compressor: + codec_configs.append(_num_codec_config_to_configurable(self.compressor)) + + # convert the pipeline repr into actual v3 codec objects + codec_pipeline = parse_codecs(codec_configs) - # The order here is significant! - # [ArrayArray] -> ArrayBytes -> [BytesBytes] - codec_pipeline = [transpose, bytes] + compressor + filters return codec_pipeline @@ -213,4 +216,5 @@ def _num_codec_config_to_configurable(num_codec: dict) -> dict: Convert a numcodecs codec into a zarr v3 configurable. """ num_codec_copy = num_codec.copy() - return {"name": num_codec_copy.pop("id"), "configuration": num_codec_copy} + name = "numcodecs." + num_codec_copy.pop("id") + return {"name": name, "configuration": num_codec_copy}