Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zarr-python v3 compatibility #516

Merged
merged 54 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
39722e7
Save progress for next week
mpiannucci Oct 4, 2024
d3c7e37
Bump zarr python version
mpiannucci Oct 5, 2024
25d7d14
Get some tests working others failing
mpiannucci Oct 5, 2024
ffe5f9d
get through single hdf to zarr
mpiannucci Oct 8, 2024
5aef233
Save progress
mpiannucci Oct 8, 2024
b9323d2
Cleanup, almost working with hdf
mpiannucci Oct 9, 2024
0f17119
Closer...
mpiannucci Oct 9, 2024
5c8806b
Updating tests
mpiannucci Oct 9, 2024
80fedcd
reorganize
mpiannucci Oct 10, 2024
1f69a0b
Save progress
mpiannucci Oct 10, 2024
d556e52
Refactor to clean things up
mpiannucci Oct 10, 2024
b27e64c
Fix circular import
mpiannucci Oct 10, 2024
41d6e8e
Iterate
mpiannucci Oct 10, 2024
7ade1a6
Change zarr dep
mpiannucci Oct 10, 2024
492ddee
More conversion
mpiannucci Oct 10, 2024
6e5741c
Specify zarr version
mpiannucci Oct 15, 2024
c0316ac
Working remote hdf tests
mpiannucci Oct 23, 2024
59bd36c
Working grib impl
mpiannucci Oct 23, 2024
187ced2
Add back commented out code
mpiannucci Oct 23, 2024
690ed21
Make grib codec a compressor since its bytes to array
mpiannucci Oct 23, 2024
5019b15
Switch back
mpiannucci Oct 23, 2024
d96cf46
Add first pass at grib zarr 3 codec
mpiannucci Oct 26, 2024
cbcb720
Fix typing
mpiannucci Oct 29, 2024
b88655f
Fix some broken tests; use async filesystem wrapper
moradology Nov 6, 2024
73eaf33
Implement zarr3 compatibility for grib
moradology Nov 20, 2024
3757199
Use zarr3 stores directly; avoid use of internal fs
moradology Nov 21, 2024
9444ff8
Merge pull request #4 from moradology/fix/zarr3-grib-tests
mpiannucci Nov 26, 2024
d8848ce
Forward
mpiannucci Nov 26, 2024
1fa294e
More
mpiannucci Nov 26, 2024
543178d
Figure out async wrapper
mpiannucci Nov 26, 2024
96b56cd
Closer on hdf5
mpiannucci Nov 26, 2024
0808b05
netcdf but failing
mpiannucci Nov 26, 2024
aef006e
grib passing
mpiannucci Nov 26, 2024
d9bf0dd
Fix inline test
mpiannucci Nov 26, 2024
884fc68
More
mpiannucci Nov 26, 2024
1145f45
standardize compressor name
mpiannucci Nov 27, 2024
94ec479
Fix one more hdf test
mpiannucci Nov 27, 2024
a9693d1
Small tweaks
mpiannucci Nov 27, 2024
7e9112a
Hide fsspec import where necessary
mpiannucci Nov 27, 2024
a7af691
Update with many fixes - but stioll not complete
martindurant Jan 16, 2025
f7b87de
Merge branch 'main' into v3
martindurant Jan 16, 2025
95f340f
min python
martindurant Jan 16, 2025
fa364a7
Loads of changes
martindurant Jan 22, 2025
53922a2
Merge branch 'main' into v3
martindurant Jan 23, 2025
0486ac1
more improvements (slowly slowly)
martindurant Jan 23, 2025
c522a52
grib and combine
martindurant Jan 29, 2025
9b96d8c
more fix!
martindurant Jan 29, 2025
17478bd
env typo
martindurant Jan 29, 2025
2d5033c
Add HDF4 simple test
martindurant Jan 29, 2025
9066360
fix other runs
martindurant Jan 29, 2025
4750f8e
comma
martindurant Jan 29, 2025
73385f5
lint
martindurant Jan 29, 2025
1a79a5c
CI deps
martindurant Jan 29, 2025
f51604f
skip tests needing datatree
martindurant Jan 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions kerchunk/combine.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def append(
ds = xr.open_dataset(
fs.get_mapper(), engine="zarr", backend_kwargs={"consolidated": False}
)
z = zarr.open(fs.get_mapper())
z = zarr.open(fs.get_mapper(), zarr_format=2)
mzz = MultiZarrToZarr(
path,
out=fs.references, # dict or parquet/lazy
Expand Down Expand Up @@ -360,7 +360,7 @@ def first_pass(self):
fs._dircache_from_items()

logger.debug("First pass: %s", i)
z = zarr.open_group(fs.get_mapper(""))
z = zarr.open_group(fs.get_mapper(""), zarr_format=2)
for var in self.concat_dims:
value = self._get_value(i, z, var, fn=self._paths[i])
if isinstance(value, np.ndarray):
Expand All @@ -387,7 +387,7 @@ def store_coords(self):
"""
kv = {}
store = zarr.storage.KVStore(kv)
group = zarr.open(store)
group = zarr.open(store, zarr_format=2)
m = self.fss[0].get_mapper("")
z = zarr.open(m)
for k, v in self.coos.items():
Expand Down Expand Up @@ -461,7 +461,7 @@ def second_pass(self):
for i, fs in enumerate(self.fss):
to_download = {}
m = fs.get_mapper("")
z = zarr.open(m)
z = zarr.open(m, zarr_format=2)

if no_deps is None:
# done first time only
Expand Down
2 changes: 1 addition & 1 deletion kerchunk/fits.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def process_file(

storage_options = storage_options or {}
out = out or {}
g = zarr.open(out)
g = zarr.open(out, zarr_format=2)

with fsspec.open(url, mode="rb", **storage_options) as f:
infile = fits.open(f, do_not_scale_image_data=True)
Expand Down
4 changes: 2 additions & 2 deletions kerchunk/grib2.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def scan_grib(
if good is False:
continue

z = zarr.open_group(store)
z = zarr.open_group(store, zarr_format=2)
global_attrs = {
f"GRIB_{k}": m[k]
for k in cfgrib.dataset.GLOBAL_ATTRIBUTES_KEYS
Expand Down Expand Up @@ -398,7 +398,7 @@ def grib_tree(

# TODO allow passing a LazyReferenceMapper as output?
zarr_store = {}
zroot = zarr.open_group(store=zarr_store)
zroot = zarr.open_group(store=zarr_store, zarr_format=2)

aggregations: Dict[str, List] = defaultdict(list)
aggregation_dims: Dict[str, Set] = defaultdict(set)
Expand Down
104 changes: 84 additions & 20 deletions kerchunk/hdf.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import base64
import io
import logging
from typing import Union, BinaryIO
from typing import Union, BinaryIO, Any, cast
from packaging.version import Version

import fsspec.core
from fsspec.implementations.reference import LazyReferenceMapper
Expand All @@ -21,11 +22,11 @@
"for more details."
)

try:
from zarr.meta import encode_fill_value
except ModuleNotFoundError:
# https://github.com/zarr-developers/zarr-python/issues/2021
from zarr.v2.meta import encode_fill_value
# try:
# from zarr.meta import encode_fill_value
# except ModuleNotFoundError:
# # https://github.com/zarr-developers/zarr-python/issues/2021
# from zarr.v2.meta import encode_fill_value

lggr = logging.getLogger("h5-to-zarr")
_HIDDEN_ATTRS = { # from h5netcdf.attrs
Expand Down Expand Up @@ -111,9 +112,14 @@ def __init__(
if vlen_encode not in ["embed", "null", "leave", "encode"]:
raise NotImplementedError
self.vlen = vlen_encode
self.store = out or {}
self._zroot = zarr.group(store=self.store, overwrite=True)

self.store_dict = out or {}
if Version(zarr.__version__) < Version("3.0.0.a0"):
self.store = zarr.storage.KVStore(self.store_dict)
self._zroot = zarr.group(store=self.store, overwrite=True)
else:
self.store = zarr.storage.MemoryStore(mode="a", store_dict=self.store_dict)
self._zroot = zarr.group(store=self.store, zarr_format=2, overwrite=True)

self._uri = url
self.error = error
lggr.debug(f"HDF5 file URI: {self._uri}")
Expand All @@ -140,7 +146,6 @@ def translate(self, preserve_linked_dsets=False):
"""
lggr.debug("Translation begins")
self._transfer_attrs(self._h5f, self._zroot)

self._h5f.visititems(self._translator)

if preserve_linked_dsets:
Expand All @@ -157,7 +162,17 @@ def translate(self, preserve_linked_dsets=False):
self.store.flush()
return self.store
else:
store = _encode_for_JSON(self.store)
keys_to_remove = []
new_keys = {}
for k, v in self.store_dict.items():
if isinstance(v, zarr.core.buffer.cpu.Buffer):
key = str.removeprefix(k, "/")
new_keys[key] = v.to_bytes()
keys_to_remove.append(k)
for k in keys_to_remove:
del self.store_dict[k]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the hacky bit and could use some explanations. Even when requesting "v2", zarr makes Buffer objects, and the keys are also wrong?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah so two issues here:

  1. the keys we get from hdf are for example /depth/.zarray when then need to be depth/.zarray
  2. we cant jsonify buffers, which is how the internal MemoryStore in v3 stores its data. So we need to convert the buffers to bytes to be serialized

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK - would appreciate comments on the code saying this.

self.store_dict.update(new_keys)
store = _encode_for_JSON(self.store_dict)
return {"version": 1, "refs": store}

def _unref(self, ref):
Expand Down Expand Up @@ -465,26 +480,31 @@ def _translator(
if h5py.h5ds.is_scale(h5obj.id) and not cinfo:
return
if h5obj.attrs.get("_FillValue") is not None:
fill = h5obj.attrs.get("_FillValue")
fill = encode_fill_value(
h5obj.attrs.get("_FillValue"), dt or h5obj.dtype
)

# Create a Zarr array equivalent to this HDF5 dataset...
za = self._zroot.require_dataset(
h5obj.name,
adims = self._get_array_dims(h5obj)

# Create a Zarr array equivalent to this HDF5 dataset..
za = self._zroot.require_array(
name=h5obj.name,
shape=h5obj.shape,
dtype=dt or h5obj.dtype,
chunks=h5obj.chunks or False,
fill_value=fill,
compression=None,
compressor=None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here, you could reintroduce the compressor

filters = filters[:-1]
compressor = filters[-1]

but obviously it depends on whether there are indeed any filters at all.

It would still need back compat, since filters-only datasts definitely exist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah the big issue is that v3 cares about what type of operation it is, and v2w doesnt so moving them around doesnt necessarily fix that bug

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there needs to be a change upstream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filters=filters,
overwrite=True,
attributes={
"_ARRAY_DIMENSIONS": adims,
},
**kwargs,
)
lggr.debug(f"Created Zarr array: {za}")
self._transfer_attrs(h5obj, za)
adims = self._get_array_dims(h5obj)
za.attrs["_ARRAY_DIMENSIONS"] = adims

# za.attrs["_ARRAY_DIMENSIONS"] = adims
lggr.debug(f"_ARRAY_DIMENSIONS = {adims}")

if "data" in kwargs:
Expand All @@ -496,6 +516,8 @@ def _translator(
if h5obj.fletcher32:
logging.info("Discarding fletcher32 checksum")
v["size"] -= 4
key = str.removeprefix(h5obj.name, "/") + "/" + ".".join(map(str, k))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same as what _chunk_key did? Maybe make it a function with a comment saying it's a copy/reimplementation.

By the way, is h5obj.name not actually a string, so you could have done h5obj.name.removeprefix()?


if (
self.inline
and isinstance(v, dict)
Expand All @@ -508,9 +530,10 @@ def _translator(
data.decode("ascii")
except UnicodeDecodeError:
data = b"base64:" + base64.b64encode(data)
self.store[za._chunk_key(k)] = data

self.store_dict[key] = data
else:
self.store[za._chunk_key(k)] = [
self.store_dict[key] = [
self._uri,
v["offset"],
v["size"],
Expand Down Expand Up @@ -681,3 +704,44 @@ def _is_netcdf_variable(dataset: h5py.Dataset):

def has_visititems_links():
return hasattr(h5py.Group, "visititems_links")


def encode_fill_value(v: Any, dtype: np.dtype, object_codec: Any = None) -> Any:
# early out
if v is None:
return v
if dtype.kind == "V" and dtype.hasobject:
if object_codec is None:
raise ValueError("missing object_codec for object array")
v = object_codec.encode(v)
v = str(base64.standard_b64encode(v), "ascii")
return v
if dtype.kind == "f":
if np.isnan(v):
return "NaN"
elif np.isposinf(v):
return "Infinity"
elif np.isneginf(v):
return "-Infinity"
else:
return float(v)
elif dtype.kind in "ui":
return int(v)
elif dtype.kind == "b":
return bool(v)
elif dtype.kind in "c":
c = cast(np.complex128, np.dtype(complex).type())
v = (
encode_fill_value(v.real, c.real.dtype, object_codec),
encode_fill_value(v.imag, c.imag.dtype, object_codec),
)
return v
elif dtype.kind in "SV":
v = str(base64.standard_b64encode(v), "ascii")
return v
elif dtype.kind == "U":
return v
elif dtype.kind in "mM":
return int(v.view("i8"))
else:
return v
2 changes: 1 addition & 1 deletion kerchunk/hdf4.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def translate(self, filename=None, storage_options=None):
remote_protocol=prot,
remote_options=self.st,
)
g = zarr.open_group("reference://", storage_options=dict(fs=fs))
g = zarr.open_group("reference://", storage_options=dict(fs=fs), zarr_format=2)
refs = {}
for k, v in output.items():
if isinstance(v, dict):
Expand Down
2 changes: 1 addition & 1 deletion kerchunk/netCDF3.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def translate(self):
import zarr

out = self.out
z = zarr.open(out, mode="w")
z = zarr.open(out, mode="w", zarr_format=2)
for dim, var in self.variables.items():
if dim in self.chunks:
shape = self.chunks[dim][-1]
Expand Down
6 changes: 3 additions & 3 deletions kerchunk/tests/test_combine.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,14 @@

# simple time arrays - xarray can't make these!
m = fs.get_mapper("time1.zarr")
z = zarr.open(m, mode="w")
z = zarr.open(m, mode="w", zarr_format=2)
ar = z.create_dataset("time", data=np.array([1], dtype="M8[s]"))
ar.attrs.update({"_ARRAY_DIMENSIONS": ["time"]})
ar = z.create_dataset("data", data=arr)
ar.attrs.update({"_ARRAY_DIMENSIONS": ["time", "x", "y"]})

m = fs.get_mapper("time2.zarr")
z = zarr.open(m, mode="w")
z = zarr.open(m, mode="w", zarr_format=2)
ar = z.create_dataset("time", data=np.array([2], dtype="M8[s]"))
ar.attrs.update({"_ARRAY_DIMENSIONS": ["time"]})
ar = z.create_dataset("data", data=arr)
Expand Down Expand Up @@ -272,7 +272,7 @@ def test_get_coos(refs, selector, expected):
mzz.first_pass()
assert mzz.coos["time"].tolist() == expected
mzz.store_coords()
g = zarr.open(mzz.out)
g = zarr.open(mzz.out, zarr_format=2)
assert g["time"][:].tolist() == expected
assert dict(g.attrs)

Expand Down
20 changes: 10 additions & 10 deletions kerchunk/tests/test_combine_concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_success(tmpdir, arrays, chunks, axis, m):
refs = []
for i, x in enumerate(arrays):
fn = f"{tmpdir}/out{i}.zarr"
g = zarr.open(fn)
g = zarr.open(fn, zarr_format=2)
g.create_dataset("x", data=x, chunks=chunks)
fns.append(fn)
ref = kerchunk.zarr.single_zarr(fn, inline=0)
Expand All @@ -62,7 +62,7 @@ def test_success(tmpdir, arrays, chunks, axis, m):
)

mapper = fsspec.get_mapper("reference://", fo=out)
g = zarr.open(mapper)
g = zarr.open(mapper, zarr_format=2)
assert (g.x[:] == np.concatenate(arrays, axis=axis)).all()

try:
Expand All @@ -76,7 +76,7 @@ def test_success(tmpdir, arrays, chunks, axis, m):
remote_protocol="file",
skip_instance_cache=True,
)
g = zarr.open(mapper)
g = zarr.open(mapper, zarr_format=2)
assert (g.x[:] == np.concatenate(arrays, axis=axis)).all()

kerchunk.df.refs_to_dataframe(out, "memory://out.parq", record_size=1)
Expand All @@ -86,7 +86,7 @@ def test_success(tmpdir, arrays, chunks, axis, m):
remote_protocol="file",
skip_instance_cache=True,
)
g = zarr.open(mapper)
g = zarr.open(mapper, zarr_format=2)
assert (g.x[:] == np.concatenate(arrays, axis=axis)).all()


Expand All @@ -95,9 +95,9 @@ def test_fail_chunks(tmpdir):
fn2 = f"{tmpdir}/out2.zarr"
x1 = np.arange(10)
x2 = np.arange(10, 20)
g = zarr.open(fn1)
g = zarr.open(fn1, zarr_format=2)
g.create_dataset("x", data=x1, chunks=(2,))
g = zarr.open(fn2)
g = zarr.open(fn2, zarr_format=2)
g.create_dataset("x", data=x2, chunks=(3,))

ref1 = kerchunk.zarr.single_zarr(fn1, inline=0)
Expand All @@ -112,9 +112,9 @@ def test_fail_shape(tmpdir):
fn2 = f"{tmpdir}/out2.zarr"
x1 = np.arange(12).reshape(6, 2)
x2 = np.arange(12, 24)
g = zarr.open(fn1)
g = zarr.open(fn1, zarr_format=2)
g.create_dataset("x", data=x1, chunks=(2,))
g = zarr.open(fn2)
g = zarr.open(fn2, zarr_format=2)
g.create_dataset("x", data=x2, chunks=(2,))

ref1 = kerchunk.zarr.single_zarr(fn1, inline=0)
Expand All @@ -129,9 +129,9 @@ def test_fail_irregular_chunk_boundaries(tmpdir):
fn2 = f"{tmpdir}/out2.zarr"
x1 = np.arange(10)
x2 = np.arange(10, 24)
g = zarr.open(fn1)
g = zarr.open(fn1, zarr_format=2)
g.create_dataset("x", data=x1, chunks=(4,))
g = zarr.open(fn2)
g = zarr.open(fn2, zarr_format=2)
g.create_dataset("x", data=x2, chunks=(4,))

ref1 = kerchunk.zarr.single_zarr(fn1, inline=0)
Expand Down
Loading
Loading