Skip to content

Commit

Permalink
Allow open_virtual_dataset to read existing Kerchunk references (#251)
Browse files Browse the repository at this point in the history
* reading existing refs - wip

* ujson stub to mypy overrides in pyproject.toml

* added xfail to kerchunk json

* updated reference writing to remove trailing //

* MYPY TEMP DISABLED

* added section to usage docs + updated releases.rst

* test

* remove test deps from doc.yaml build

* tests passing for reading parquet references to virtual dataset, refactored _fsspec_open... to class

* Update pyproject.toml

Co-authored-by: Justus Magin <keewis@users.noreply.github.com>

* remove ast and replace with ujson

* Update .github/workflows/main.yml

Co-authored-by: Tom Nicholas <tom@cworthy.org>

* Dict -> dict,  ->  engine option. very flaky autodetection

* removed version from parquet refs

* adds path for invalid kerchunk format + test

* updates existing references docs

---------

Co-authored-by: Justus Magin <keewis@users.noreply.github.com>
Co-authored-by: Tom Nicholas <tom@cworthy.org>
  • Loading branch information
3 people authored Oct 16, 2024
1 parent 2d66e88 commit ec8e465
Show file tree
Hide file tree
Showing 12 changed files with 197 additions and 39 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,4 @@ cython_debug/
#.idea/
virtualizarr/_version.py
docs/generated/
examples/
1 change: 0 additions & 1 deletion ci/doc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,3 @@ dependencies:
- "sphinx_design"
- "sphinx_togglebutton"
- "sphinx-autodoc-typehints"
- -e "..[test]"
14 changes: 14 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ def netcdf4_file(tmpdir):
return filepath


@pytest.fixture
def netcdf4_virtual_dataset(netcdf4_file):
from virtualizarr import open_virtual_dataset

return open_virtual_dataset(netcdf4_file, indexes={})


@pytest.fixture
def netcdf4_inlined_ref(netcdf4_file):
from kerchunk.hdf import SingleHdf5ToZarr

return SingleHdf5ToZarr(netcdf4_file, inline_threshold=1000).translate()


@pytest.fixture
def hdf5_groups_file(tmpdir):
# Set up example xarray dataset
Expand Down
5 changes: 5 additions & 0 deletions docs/releases.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ v1.0.1 (unreleased)

New Features
~~~~~~~~~~~~


- Can open `kerchunk` reference files with ``open_virtual_dataset``.
(:pull:`251`, :pull:`186`) By `Raphael Hagen <https://github.com/norlandrhagen>`_ & `Kristen Thyng <https://github.com/kthyng>`_.

- Adds defaults for `open_virtual_dataset_from_v3_store` in (:pull:`234`)
By `Raphael Hagen <https://github.com/norlandrhagen>`_.

Expand Down
12 changes: 12 additions & 0 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,18 @@ Currently there are not yet any zarr v3 readers which understand the chunk manif
This store can however be read by {py:func}`~virtualizarr.xarray.open_virtual_dataset`, by passing `filetype="zarr_v3"`.
```

## Opening Kerchunk references as virtual datasets

You can open existing Kerchunk `json` or `parquet` references as Virtualizarr virtual datasets. This may be useful for converting existing Kerchunk formatted references to storage formats like [Icechunk](https://icechunk.io/).

```python

vds = open_virtual_dataset('combined.json', format='kerchunk')
# or
vds = open_virtual_dataset('combined.parquet', format='kerchunk')

```

## Rewriting existing manifests

Sometimes it can be useful to rewrite the contents of an already-generated manifest or virtual dataset.
Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ ignore_missing_imports = true
module = "kerchunk.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "ujson.*"
ignore_missing_imports = true

[tool.ruff]
# Same as Black.
line-length = 88
Expand Down
50 changes: 44 additions & 6 deletions virtualizarr/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
from xarray.core.variable import IndexVariable

from virtualizarr.manifests import ManifestArray
from virtualizarr.utils import _fsspec_openfile_from_filepath
from virtualizarr.types.kerchunk import KerchunkStoreRefs
from virtualizarr.utils import _FsspecFSFromFilepath

XArrayOpenT = str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore

Expand All @@ -39,6 +40,7 @@ class FileType(AutoName):
zarr = auto()
dmrpp = auto()
zarr_v3 = auto()
kerchunk = auto()


class ManifestBackendArray(ManifestArray, BackendArray):
Expand Down Expand Up @@ -67,13 +69,14 @@ def open_virtual_dataset(
Xarray indexes can optionally be created (the default behaviour). To avoid creating any xarray indexes pass ``indexes={}``.
Parameters
----------
filepath : str, default None
File path to open as a set of virtualized zarr arrays.
filetype : FileType, default None
Type of file to be opened. Used to determine which kerchunk file format backend to use.
Can be one of {'netCDF3', 'netCDF4', 'HDF', 'TIFF', 'GRIB', 'FITS', 'zarr_v3'}.
Can be one of {'netCDF3', 'netCDF4', 'HDF', 'TIFF', 'GRIB', 'FITS', 'zarr_v3', 'kerchunk'}.
If not provided will attempt to automatically infer the correct filetype from header bytes.
group : str, default is None
Path to the HDF5/netCDF4 group in the given file to open. Given as a str, supported by filetypes “netcdf4” and “hdf5”.
Expand Down Expand Up @@ -133,9 +136,44 @@ def open_virtual_dataset(
raise NotImplementedError()

# if filetype is user defined, convert to FileType

if filetype is not None:
filetype = FileType(filetype)

if filetype == FileType.kerchunk:
from virtualizarr.readers.kerchunk import dataset_from_kerchunk_refs

fs = _FsspecFSFromFilepath(filepath=filepath, reader_options=reader_options)

# The kerchunk .parquet storage format isn't actually a parquet, but a directory that contains named parquets for each group/variable.
if fs.filepath.endswith("ref.parquet"):
from fsspec.implementations.reference import LazyReferenceMapper

lrm = LazyReferenceMapper(filepath, fs.fs)

# build reference dict from KV pairs in LazyReferenceMapper
# is there a better / more preformant way to extract this?
array_refs = {k: lrm[k] for k in lrm.keys()}

full_reference = {"refs": array_refs}

return dataset_from_kerchunk_refs(KerchunkStoreRefs(full_reference))

# JSON has no magic bytes, but the Kerchunk version 1 spec starts with 'version':
# https://fsspec.github.io/kerchunk/spec.html
elif fs.read_bytes(9).startswith(b'{"version'):
import ujson

with fs.open_file() as of:
refs = ujson.load(of)

return dataset_from_kerchunk_refs(KerchunkStoreRefs(refs))

else:
raise ValueError(
"The input Kerchunk reference did not seem to be in Kerchunk's JSON or Parquet spec: https://fsspec.github.io/kerchunk/spec.html. The Kerchunk format autodetection is quite flaky, so if your reference matches the Kerchunk spec feel free to open an issue: https://github.com/zarr-developers/VirtualiZarr/issues"
)

if filetype == FileType.zarr_v3:
# TODO is there a neat way of auto-detecting this?
from virtualizarr.readers.zarr import open_virtual_dataset_from_v3_store
Expand All @@ -151,9 +189,9 @@ def open_virtual_dataset(
"Specifying `loadable_variables` or auto-creating indexes with `indexes=None` is not supported for dmrpp files."
)

fpath = _fsspec_openfile_from_filepath(
fpath = _FsspecFSFromFilepath(
filepath=filepath, reader_options=reader_options
)
).open_file()
parser = DMRParser(fpath.read(), data_filepath=filepath.strip(".dmrpp"))
vds = parser.parse_dataset()
vds.drop_vars(drop_variables)
Expand Down Expand Up @@ -189,9 +227,9 @@ def open_virtual_dataset(
# TODO we are reading a bunch of stuff we know we won't need here, e.g. all of the data variables...
# TODO it would also be nice if we could somehow consolidate this with the reading of the kerchunk references
# TODO really we probably want a dedicated xarray backend that iterates over all variables only once
fpath = _fsspec_openfile_from_filepath(
fpath = _FsspecFSFromFilepath(
filepath=filepath, reader_options=reader_options
)
).open_file()

# fpath can be `Any` thanks to fsspec.filesystem(...).open() returning Any.
# We'll (hopefully safely) cast it to what xarray is expecting, but this might let errors through.
Expand Down
4 changes: 3 additions & 1 deletion virtualizarr/manifests/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,9 @@ def _from_kerchunk_chunk_dict(
chunk_entries: dict[ChunkKey, ChunkDictEntry] = {}
for k, v in kerchunk_chunk_dict.items():
if isinstance(v, (str, bytes)):
raise NotImplementedError("TODO: handle inlined data")
raise NotImplementedError(
"Reading inlined reference data is currently not supported. [ToDo]"
)
elif not isinstance(v, (tuple, list)):
raise TypeError(f"Unexpected type {type(v)} for chunk value: {v}")
chunk_entries[k] = ChunkEntry.from_kerchunk(v).dict()
Expand Down
6 changes: 3 additions & 3 deletions virtualizarr/readers/kerchunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
KerchunkArrRefs,
KerchunkStoreRefs,
)
from virtualizarr.utils import _fsspec_openfile_from_filepath
from virtualizarr.utils import _FsspecFSFromFilepath
from virtualizarr.zarr import ZArray, ZAttrs


Expand All @@ -28,9 +28,9 @@ def _automatically_determine_filetype(
raise NotImplementedError()

# Read magic bytes from local or remote file
fpath = _fsspec_openfile_from_filepath(
fpath = _FsspecFSFromFilepath(
filepath=filepath, reader_options=reader_options
)
).open_file()
magic_bytes = fpath.read(8)
fpath.close()

Expand Down
74 changes: 74 additions & 0 deletions virtualizarr/tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,3 +336,77 @@ def test_open_dataset_with_scalar(self, hdf5_scalar, tmpdir):
vds = open_virtual_dataset(hdf5_scalar)
assert vds.scalar.dims == ()
assert vds.scalar.attrs == {"scalar": "true"}


@pytest.mark.parametrize(
"reference_format",
["json", "parquet", "invalid"],
)
def test_open_virtual_dataset_existing_kerchunk_refs(
tmp_path, netcdf4_virtual_dataset, reference_format
):
example_reference_dict = netcdf4_virtual_dataset.virtualize.to_kerchunk(
format="dict"
)

if reference_format == "invalid":
# Test invalid file format leads to ValueError
ref_filepath = tmp_path / "ref.csv"
with open(ref_filepath.as_posix(), mode="w") as of:
of.write("tmp")

with pytest.raises(ValueError):
open_virtual_dataset(
filepath=ref_filepath.as_posix(), filetype="kerchunk", indexes={}
)

else:
# Test valid json and parquet reference formats

if reference_format == "json":
ref_filepath = tmp_path / "ref.json"

import ujson

with open(ref_filepath, "w") as json_file:
ujson.dump(example_reference_dict, json_file)

if reference_format == "parquet":
from kerchunk.df import refs_to_dataframe

ref_filepath = tmp_path / "ref.parquet"
refs_to_dataframe(fo=example_reference_dict, url=ref_filepath.as_posix())

vds = open_virtual_dataset(
filepath=ref_filepath.as_posix(), filetype="kerchunk", indexes={}
)

# Inconsistent results! https://github.com/TomNicholas/VirtualiZarr/pull/73#issuecomment-2040931202
# assert vds.virtualize.to_kerchunk(format='dict') == example_reference_dict
refs = vds.virtualize.to_kerchunk(format="dict")
expected_refs = netcdf4_virtual_dataset.virtualize.to_kerchunk(format="dict")
assert refs["refs"]["air/0.0.0"] == expected_refs["refs"]["air/0.0.0"]
assert refs["refs"]["lon/0"] == expected_refs["refs"]["lon/0"]
assert refs["refs"]["lat/0"] == expected_refs["refs"]["lat/0"]
assert refs["refs"]["time/0"] == expected_refs["refs"]["time/0"]

assert list(vds) == list(netcdf4_virtual_dataset)
assert set(vds.coords) == set(netcdf4_virtual_dataset.coords)
assert set(vds.variables) == set(netcdf4_virtual_dataset.variables)


def test_notimplemented_read_inline_refs(tmp_path, netcdf4_inlined_ref):
# For now, we raise a NotImplementedError if we read existing references that have inlined data
# https://github.com/zarr-developers/VirtualiZarr/pull/251#pullrequestreview-2361916932

ref_filepath = tmp_path / "ref.json"

import ujson

with open(ref_filepath, "w") as json_file:
ujson.dump(netcdf4_inlined_ref, json_file)

with pytest.raises(NotImplementedError):
open_virtual_dataset(
filepath=ref_filepath.as_posix(), filetype="kerchunk", indexes={}
)
6 changes: 3 additions & 3 deletions virtualizarr/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pytest
import xarray as xr

from virtualizarr.utils import _fsspec_openfile_from_filepath
from virtualizarr.utils import _FsspecFSFromFilepath


@pytest.fixture
Expand All @@ -21,7 +21,7 @@ def test_fsspec_openfile_from_path(tmp_path: pathlib.Path, dataset: xr.Dataset)
f = tmp_path / "dataset.nc"
dataset.to_netcdf(f)

result = _fsspec_openfile_from_filepath(filepath=f.as_posix())
result = _FsspecFSFromFilepath(filepath=f.as_posix()).open_file()
assert isinstance(result, fsspec.implementations.local.LocalFileOpener)


Expand All @@ -32,6 +32,6 @@ def test_fsspec_openfile_memory(dataset: xr.Dataset):
with fs.open("dataset.nc", mode="wb") as f:
dataset.to_netcdf(f, engine="h5netcdf")

result = _fsspec_openfile_from_filepath(filepath="memory://dataset.nc")
result = _FsspecFSFromFilepath(filepath="memory://dataset.nc").open_file()
with result:
assert isinstance(result, fsspec.implementations.memory.MemoryFile)
59 changes: 34 additions & 25 deletions virtualizarr/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,51 @@
]


def _fsspec_openfile_from_filepath(
*,
filepath: str,
reader_options: Optional[dict] = None,
) -> OpenFileType:
"""Converts input filepath to fsspec openfile object.
from dataclasses import dataclass, field


@dataclass
class _FsspecFSFromFilepath:
"""Class to create fsspec Filesystem from input filepath.
Parameters
----------
filepath : str
Input filepath
reader_options : dict, optional
Dict containing kwargs to pass to file opener, by default {}
Returns
-------
OpenFileType
An open file-like object, specific to the protocol supplied in filepath.
dict containing kwargs to pass to file opener, by default {}
fs : Option | None
The fsspec filesystem object, created in __post_init__
Raises
------
NotImplementedError
Raises a Not Implemented Error if filepath protocol is not supported.
"""

import fsspec
from upath import UPath
filepath: str
reader_options: Optional[dict] = field(default_factory=dict)
fs: fsspec.AbstractFileSystem = field(init=False)

def open_file(self) -> OpenFileType:
"""Calls `.open` on fsspec.Filesystem instantiation using self.filepath as an input.
Returns
-------
OpenFileType
file opened with fsspec
"""
return self.fs.open(self.filepath)

universal_filepath = UPath(filepath)
protocol = universal_filepath.protocol
def read_bytes(self, bytes: int) -> bytes:
with self.open_file() as of:
return of.read(bytes)

if reader_options is None:
reader_options = {}
def __post_init__(self) -> None:
"""Initialize the fsspec filesystem object"""
import fsspec
from upath import UPath

storage_options = reader_options.get("storage_options", {}) # type: ignore
universal_filepath = UPath(self.filepath)
protocol = universal_filepath.protocol

fpath = fsspec.filesystem(protocol, **storage_options).open(filepath)
self.reader_options = self.reader_options or {}
storage_options = self.reader_options.get("storage_options", {}) # type: ignore

return fpath
self.fs = fsspec.filesystem(protocol, **storage_options)

0 comments on commit ec8e465

Please sign in to comment.