Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding to_icechunk as another type of roundtrip tests #393

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions virtualizarr/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import importlib
import itertools

import fsspec
import numpy as np
import pytest
import xarray as xr
from packaging.version import Version

from virtualizarr.manifests import ChunkManifest, ManifestArray
Expand Down Expand Up @@ -35,6 +37,7 @@ def _importorskip(


has_astropy, requires_astropy = _importorskip("astropy")
has_icechunk, requires_icechunk = _importorskip("icechunk")
has_kerchunk, requires_kerchunk = _importorskip("kerchunk")
has_s3fs, requires_s3fs = _importorskip("s3fs")
has_scipy, requires_scipy = _importorskip("scipy")
Expand Down Expand Up @@ -105,3 +108,23 @@ def offset_from_chunk_key(ind: tuple[int, ...]) -> int:

def length_from_chunk_key(ind: tuple[int, ...]) -> int:
return sum(ind) + 5


def open_dataset_kerchunk(
filename_or_obj: str, *, storage_options=None, **kwargs
) -> xr.Dataset:
"""Equivalent to ``xr.open_dataset(..., engine="kerchunk")`` but without depending on
kerchunk library
"""
m = fsspec.filesystem(
"reference", fo=filename_or_obj, **(storage_options or {})
).get_mapper()
return xr.open_dataset(m, engine="zarr", consolidated=False, **kwargs)


def in_memory_icechunk_session():
from icechunk import Repository, Storage

repo = Repository.create(storage=Storage.new_in_memory())
session = repo.writable_session("main")
return session
3 changes: 2 additions & 1 deletion virtualizarr/tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from virtualizarr.readers.hdf import HDFVirtualBackend
from virtualizarr.tests import (
has_astropy,
open_dataset_kerchunk,
parametrize_over_hdf_backends,
requires_hdf5plugin,
requires_imagecodecs,
Expand Down Expand Up @@ -321,7 +322,7 @@ def test_virtualizarr_vs_local_nisar(self, hdf_backend):
)
tmpref = "/tmp/cmip6.json"
vds.virtualize.to_kerchunk(tmpref, format="json")
dsV = xr.open_dataset(tmpref, engine="kerchunk")
dsV = open_dataset_kerchunk(tmpref)

# xrt.assert_identical(dsXR, dsV) #Attribute order changes
xrt.assert_equal(dsXR, dsV)
Expand Down
138 changes: 69 additions & 69 deletions virtualizarr/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,15 @@

from virtualizarr import open_virtual_dataset
from virtualizarr.manifests import ChunkManifest, ManifestArray
from virtualizarr.tests import parametrize_over_hdf_backends, requires_kerchunk
from virtualizarr.tests import (
has_icechunk,
has_kerchunk,
in_memory_icechunk_session,
open_dataset_kerchunk,
parametrize_over_hdf_backends,
requires_kerchunk,
requires_zarr_python,
)
from virtualizarr.translators.kerchunk import (
dataset_from_kerchunk_refs,
)
Expand All @@ -34,16 +42,16 @@ def test_kerchunk_roundtrip_in_memory_no_concat():
),
chunkmanifest=manifest,
)
ds = xr.Dataset({"a": (["x", "y"], marr)})
vds = xr.Dataset({"a": (["x", "y"], marr)})

# Use accessor to write it out to kerchunk reference dict
ds_refs = ds.virtualize.to_kerchunk(format="dict")
ds_refs = vds.virtualize.to_kerchunk(format="dict")

# Use dataset_from_kerchunk_refs to reconstruct the dataset
roundtrip = dataset_from_kerchunk_refs(ds_refs)

# Assert equal to original dataset
xrt.assert_equal(roundtrip, ds)
xrt.assert_equal(roundtrip, vds)


@requires_kerchunk
Expand Down Expand Up @@ -84,11 +92,55 @@ def test_numpy_arrays_to_inlined_kerchunk_refs(
assert refs["refs"]["time/0"] == expected["refs"]["time/0"]


@requires_kerchunk
@pytest.mark.parametrize("format", ["dict", "json", "parquet"])
class TestKerchunkRoundtrip:
def roundtrip_as_kerchunk_dict(vds: xr.Dataset, tmpdir, **kwargs):
# write those references to an in-memory kerchunk-formatted references dictionary
ds_refs = vds.virtualize.to_kerchunk(format="dict")

# use fsspec to read the dataset from the kerchunk references dict
return open_dataset_kerchunk(ds_refs, **kwargs)


def roundtrip_as_kerchunk_json(vds: xr.Dataset, tmpdir, **kwargs):
# write those references to disk as kerchunk references format
vds.virtualize.to_kerchunk(f"{tmpdir}/refs.json", format="json")

# use fsspec to read the dataset from disk via the kerchunk references
return open_dataset_kerchunk(f"{tmpdir}/refs.json", **kwargs)


def roundtrip_as_kerchunk_parquet(vds: xr.Dataset, tmpdir, **kwargs):
# write those references to disk as kerchunk references format
vds.virtualize.to_kerchunk(f"{tmpdir}/refs.parquet", format="parquet")

# use fsspec to read the dataset from disk via the kerchunk references
return open_dataset_kerchunk(f"{tmpdir}/refs.parquet", **kwargs)


def roundtrip_as_in_memory_icechunk(vds: xr.Dataset, tmpdir, **kwargs):
# write those references to an in-memory icechunk store
icechunk_session = in_memory_icechunk_session()
vds.virtualize.to_icechunk(icechunk_session.store)
icechunk_session.commit("add data")

# read the dataset from icechunk
return xr.open_zarr(
icechunk_session.store, zarr_format=3, consolidated=False, **kwargs
)


@requires_zarr_python
@pytest.mark.parametrize(
"roundtrip_func",
[
roundtrip_as_kerchunk_dict,
roundtrip_as_kerchunk_json,
*([roundtrip_as_kerchunk_parquet] if has_kerchunk else []),
*([roundtrip_as_in_memory_icechunk] if has_icechunk else []),
],
)
class TestRoundtrip:
@parametrize_over_hdf_backends
def test_kerchunk_roundtrip_no_concat(self, tmpdir, format, hdf_backend):
def test_roundtrip_no_concat(self, tmpdir, roundtrip_func, hdf_backend):
# set up example xarray dataset
ds = xr.tutorial.open_dataset("air_temperature", decode_times=False)

Expand All @@ -98,20 +150,7 @@ def test_kerchunk_roundtrip_no_concat(self, tmpdir, format, hdf_backend):
# use open_dataset_via_kerchunk to read it as references
vds = open_virtual_dataset(f"{tmpdir}/air.nc", indexes={}, backend=hdf_backend)

if format == "dict":
# write those references to an in-memory kerchunk-formatted references dictionary
ds_refs = vds.virtualize.to_kerchunk(format=format)

# use fsspec to read the dataset from the kerchunk references dict
roundtrip = xr.open_dataset(ds_refs, engine="kerchunk", decode_times=False)
else:
# write those references to disk as kerchunk references format
vds.virtualize.to_kerchunk(f"{tmpdir}/refs.{format}", format=format)

# use fsspec to read the dataset from disk via the kerchunk references
roundtrip = xr.open_dataset(
f"{tmpdir}/refs.{format}", engine="kerchunk", decode_times=False
)
roundtrip = roundtrip_func(vds, tmpdir, decode_times=False)

# assert all_close to original dataset
xrt.assert_allclose(roundtrip, ds)
Expand All @@ -123,7 +162,7 @@ def test_kerchunk_roundtrip_no_concat(self, tmpdir, format, hdf_backend):
@parametrize_over_hdf_backends
@pytest.mark.parametrize("decode_times,time_vars", [(False, []), (True, ["time"])])
def test_kerchunk_roundtrip_concat(
self, tmpdir, format, hdf_backend, decode_times, time_vars
self, tmpdir, roundtrip_func, hdf_backend, decode_times, time_vars
):
# set up example xarray dataset
ds = xr.tutorial.open_dataset("air_temperature", decode_times=decode_times)
Expand Down Expand Up @@ -159,22 +198,7 @@ def test_kerchunk_roundtrip_concat(
# concatenate virtually along time
vds = xr.concat([vds1, vds2], dim="time", coords="minimal", compat="override")

if format == "dict":
# write those references to an in-memory kerchunk-formatted references dictionary
ds_refs = vds.virtualize.to_kerchunk(format=format)

# use fsspec to read the dataset from the kerchunk references dict
roundtrip = xr.open_dataset(
ds_refs, engine="kerchunk", decode_times=decode_times
)
else:
# write those references to disk as kerchunk references format
vds.virtualize.to_kerchunk(f"{tmpdir}/refs.{format}", format=format)

# use fsspec to read the dataset from disk via the kerchunk references
roundtrip = xr.open_dataset(
f"{tmpdir}/refs.{format}", engine="kerchunk", decode_times=decode_times
)
roundtrip = roundtrip_func(vds, tmpdir, decode_times=decode_times)

if decode_times is False:
# assert all_close to original dataset
Expand All @@ -191,7 +215,7 @@ def test_kerchunk_roundtrip_concat(
assert roundtrip.time.encoding["calendar"] == ds.time.encoding["calendar"]

@parametrize_over_hdf_backends
def test_non_dimension_coordinates(self, tmpdir, format, hdf_backend):
def test_non_dimension_coordinates(self, tmpdir, roundtrip_func, hdf_backend):
# regression test for GH issue #105

if hdf_backend:
Expand All @@ -209,20 +233,7 @@ def test_non_dimension_coordinates(self, tmpdir, format, hdf_backend):
assert "lat" in vds.coords
assert "coordinates" not in vds.attrs

if format == "dict":
# write those references to an in-memory kerchunk-formatted references dictionary
ds_refs = vds.virtualize.to_kerchunk(format=format)

# use fsspec to read the dataset from the kerchunk references dict
roundtrip = xr.open_dataset(ds_refs, engine="kerchunk", decode_times=False)
else:
# write those references to disk as kerchunk references format
vds.virtualize.to_kerchunk(f"{tmpdir}/refs.{format}", format=format)

# use fsspec to read the dataset from disk via the kerchunk references
roundtrip = xr.open_dataset(
f"{tmpdir}/refs.{format}", engine="kerchunk", decode_times=False
)
roundtrip = roundtrip_func(vds, tmpdir)

# assert equal to original dataset
xrt.assert_allclose(roundtrip, ds)
Expand All @@ -231,7 +242,7 @@ def test_non_dimension_coordinates(self, tmpdir, format, hdf_backend):
for coord in ds.coords:
assert ds.coords[coord].attrs == roundtrip.coords[coord].attrs

def test_datetime64_dtype_fill_value(self, tmpdir, format):
def test_datetime64_dtype_fill_value(self, tmpdir, roundtrip_func):
chunks_dict = {
"0.0.0": {"path": "/foo.nc", "offset": 100, "length": 100},
}
Expand All @@ -249,7 +260,7 @@ def test_datetime64_dtype_fill_value(self, tmpdir, format):
zarr_format=2,
)
marr1 = ManifestArray(zarray=zarray, chunkmanifest=manifest)
ds = xr.Dataset(
vds = xr.Dataset(
{
"a": xr.DataArray(
marr1,
Expand All @@ -260,20 +271,9 @@ def test_datetime64_dtype_fill_value(self, tmpdir, format):
}
)

if format == "dict":
# write those references to an in-memory kerchunk-formatted references dictionary
ds_refs = ds.virtualize.to_kerchunk(format=format)

# use fsspec to read the dataset from the kerchunk references dict
roundtrip = xr.open_dataset(ds_refs, engine="kerchunk")
else:
# write those references to disk as kerchunk references format
ds.virtualize.to_kerchunk(f"{tmpdir}/refs.{format}", format=format)

# use fsspec to read the dataset from disk via the kerchunk references
roundtrip = xr.open_dataset(f"{tmpdir}/refs.{format}", engine="kerchunk")
roundtrip = roundtrip_func(vds, tmpdir)

assert roundtrip.a.attrs == ds.a.attrs
assert roundtrip.a.attrs == vds.a.attrs


@parametrize_over_hdf_backends
Expand Down
19 changes: 12 additions & 7 deletions virtualizarr/tests/test_readers/test_hdf/test_hdf_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,21 @@

import virtualizarr
from virtualizarr.readers.hdf import HDFVirtualBackend
from virtualizarr.tests import requires_kerchunk
from virtualizarr.tests import (
open_dataset_kerchunk,
requires_hdf5plugin,
requires_imagecodecs,
)


@requires_kerchunk
@requires_hdf5plugin
@requires_imagecodecs
class TestIntegration:
@pytest.mark.xfail(
reason="0 time start is being interpreted as fillvalue see issues/280"
)
def test_filters_h5netcdf_roundtrip(
self, tmpdir, filter_encoded_roundtrip_hdf5_file, backend=HDFVirtualBackend
self, tmpdir, filter_encoded_roundtrip_hdf5_file
):
ds = xr.open_dataset(filter_encoded_roundtrip_hdf5_file, decode_times=True)
vds = virtualizarr.open_virtual_dataset(
Expand All @@ -24,7 +29,7 @@ def test_filters_h5netcdf_roundtrip(
)
kerchunk_file = f"{tmpdir}/kerchunk.json"
vds.virtualize.to_kerchunk(kerchunk_file, format="json")
roundtrip = xr.open_dataset(kerchunk_file, engine="kerchunk", decode_times=True)
roundtrip = open_dataset_kerchunk(kerchunk_file, decode_times=True)
xrt.assert_allclose(ds, roundtrip)

@pytest.mark.xfail(
Expand All @@ -37,8 +42,8 @@ def test_filters_netcdf4_roundtrip(
ds = xr.open_dataset(filepath)
vds = virtualizarr.open_virtual_dataset(filepath, backend=HDFVirtualBackend)
kerchunk_file = f"{tmpdir}/kerchunk.json"
vds.virtualize.to_kerchunk(kerchunk_file, format="json")
roundtrip = xr.open_dataset(kerchunk_file, engine="kerchunk")
vds.virtualize.to_kerchunk(kerchunk_file, format="dict")
roundtrip = open_dataset_kerchunk(kerchunk_file)
xrt.assert_equal(ds, roundtrip)

def test_filter_and_cf_roundtrip(self, tmpdir, filter_and_cf_roundtrip_hdf5_file):
Expand All @@ -48,5 +53,5 @@ def test_filter_and_cf_roundtrip(self, tmpdir, filter_and_cf_roundtrip_hdf5_file
)
kerchunk_file = f"{tmpdir}/filter_cf_kerchunk.json"
vds.virtualize.to_kerchunk(kerchunk_file, format="json")
roundtrip = xr.open_dataset(kerchunk_file, engine="kerchunk")
roundtrip = open_dataset_kerchunk(kerchunk_file)
xrt.assert_allclose(ds, roundtrip)
Loading