From 7b00e41bdc2c571aa83f76ccacb56b9253f5254b Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Fri, 27 Sep 2024 17:49:11 -0400 Subject: [PATCH 01/39] move vds_with_manifest_arrays fixture up --- virtualizarr/tests/test_writers/conftest.py | 26 ++++++++++++++++++++ virtualizarr/tests/test_writers/test_zarr.py | 25 +------------------ 2 files changed, 27 insertions(+), 24 deletions(-) create mode 100644 virtualizarr/tests/test_writers/conftest.py diff --git a/virtualizarr/tests/test_writers/conftest.py b/virtualizarr/tests/test_writers/conftest.py new file mode 100644 index 0000000..9dc36fd --- /dev/null +++ b/virtualizarr/tests/test_writers/conftest.py @@ -0,0 +1,26 @@ +import pytest + +import numpy as np +from xarray import Dataset + +from virtualizarr.manifests import ManifestArray, ChunkManifest + + +@pytest.fixture +def vds_with_manifest_arrays() -> Dataset: + arr = ManifestArray( + chunkmanifest=ChunkManifest( + entries={"0.0": dict(path="test.nc", offset=6144, length=48)} + ), + zarray=dict( + shape=(2, 3), + dtype=np.dtype(" Dataset: - arr = ManifestArray( - chunkmanifest=ChunkManifest( - entries={"0.0": dict(path="test.nc", offset=6144, length=48)} - ), - zarray=dict( - shape=(2, 3), - dtype=np.dtype(" bool: """ Several metadata attributes in ZarrV3 use a dictionary with keys "name" : str and "configuration" : dict From c82221c4d38a4d0a852226b73e4768889251f144 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Fri, 27 Sep 2024 17:49:54 -0400 Subject: [PATCH 02/39] sketch implementation --- virtualizarr/writers/icechunk.py | 67 ++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 virtualizarr/writers/icechunk.py diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py new file mode 100644 index 0000000..b351b2f --- /dev/null +++ b/virtualizarr/writers/icechunk.py @@ -0,0 +1,67 @@ +from typing import TYPE_CHECKING + +import numpy as np +from xarray import Dataset + +from virtualizarr.manifests import ManifestArray + +if TYPE_CHECKING: + import IcechunkStore + + +def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: + """ + Write an xarray dataset whose variables wrap ManifestArrays to an Icechunk store. + + Currently requires all variables to be backed by ManifestArray objects. + + Parameters + ---------- + ds: xr.Dataset + store: IcechunkStore + """ + + # TODO write group metadata + + for name, var in ds.variables.items(): + if isinstance(var.data, ManifestArray): + write_manifestarray_to_icechunk( + store=store, + # TODO is this right? + group='root', + arr_name=name, + ma=var.data, + ) + else: + # TODO write loadable data as normal zarr chunks + raise NotImplementedError() + + return None + + +def write_manifestarray_to_icechunk( + store: "IcechunkStore", + group: str, + arr_name: str, + ma: ManifestArray, +) -> None: + + manifest = ma.manifest + + # TODO how do we set the other zarr attributes? i.e. the .zarray information? + + # loop over every reference in the ChunkManifest for that array + # TODO this should be replaced with something more efficient that sets all (new) references for the array at once + # but Icechunk need to expose a suitable API first + for entry in np.nditer( + [manifest._paths, manifest._offsets, manifest._lengths], + flags=['multi_index'], + ): + # set each reference individually + store.set_virtual_ref( + # TODO make sure this key is the correct format + key=f"{group}/{arr_name}/{entry.index}", # your (0,1,2) tuple + location=entry[0], # filepath for this element + offset=entry[1], # offset for this element + length=entry[2], # length for this element + ) From d29362b31fab1fd9c797e60d14f48aea0c7c0441 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Fri, 27 Sep 2024 17:52:16 -0400 Subject: [PATCH 03/39] test that we can create an icechunk store --- .../tests/test_writers/test_icechunk.py | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 virtualizarr/tests/test_writers/test_icechunk.py diff --git a/virtualizarr/tests/test_writers/test_icechunk.py b/virtualizarr/tests/test_writers/test_icechunk.py new file mode 100644 index 0000000..3b9178c --- /dev/null +++ b/virtualizarr/tests/test_writers/test_icechunk.py @@ -0,0 +1,23 @@ +import pytest + +pytest.importorskip("icechunk") + +from xarray import Dataset + + +from virtualizarr.writers.icechunk import dataset_to_icechunk + + +@pytest.mark.asyncio +async def test_write_to_icechunk(tmpdir, vds_with_manifest_arrays: Dataset): + from icechunk import IcechunkStore, StorageConfig + + storage = StorageConfig.filesystem(str(tmpdir)) + store = await IcechunkStore.open(storage=storage, mode='r+') + + print(store) + + raise + + dataset_to_icechunk() + ... From 2aa3cb5a6926c5165965a4cb2fd7336106b580fb Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Fri, 27 Sep 2024 18:11:07 -0400 Subject: [PATCH 04/39] fixture to create icechunk filestore in temporary directory --- virtualizarr/tests/test_writers/conftest.py | 5 ++- .../tests/test_writers/test_icechunk.py | 31 +++++++++++++------ 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/virtualizarr/tests/test_writers/conftest.py b/virtualizarr/tests/test_writers/conftest.py index 9dc36fd..af80ea6 100644 --- a/virtualizarr/tests/test_writers/conftest.py +++ b/virtualizarr/tests/test_writers/conftest.py @@ -1,9 +1,8 @@ -import pytest - import numpy as np +import pytest from xarray import Dataset -from virtualizarr.manifests import ManifestArray, ChunkManifest +from virtualizarr.manifests import ChunkManifest, ManifestArray @pytest.fixture diff --git a/virtualizarr/tests/test_writers/test_icechunk.py b/virtualizarr/tests/test_writers/test_icechunk.py index 3b9178c..ae0ca15 100644 --- a/virtualizarr/tests/test_writers/test_icechunk.py +++ b/virtualizarr/tests/test_writers/test_icechunk.py @@ -1,23 +1,34 @@ +from typing import TYPE_CHECKING, Any + import pytest pytest.importorskip("icechunk") from xarray import Dataset - from virtualizarr.writers.icechunk import dataset_to_icechunk +if TYPE_CHECKING: + try: + from icechunk import IcechunkStore + except ImportError: + IcechunkStore = Any -@pytest.mark.asyncio -async def test_write_to_icechunk(tmpdir, vds_with_manifest_arrays: Dataset): + +@pytest.fixture +async def icechunk_filestore(tmpdir) -> "IcechunkStore": from icechunk import IcechunkStore, StorageConfig - + storage = StorageConfig.filesystem(str(tmpdir)) - store = await IcechunkStore.open(storage=storage, mode='r+') + store = await IcechunkStore.open(storage=storage, mode="r+") + + return store - print(store) - raise - - dataset_to_icechunk() - ... +@pytest.mark.asyncio +async def test_write_to_icechunk( + icechunk_filestore: "IcechunkStore", vds_with_manifest_arrays: Dataset +): + dataset_to_icechunk(vds_with_manifest_arrays, icechunk_filestore) + + print(icechunk_filestore) From f2c095cba795419fe538ea4aaafc5fce61963a18 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Fri, 27 Sep 2024 19:32:32 -0400 Subject: [PATCH 05/39] get the async fixture working properly --- virtualizarr/tests/test_writers/test_icechunk.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/virtualizarr/tests/test_writers/test_icechunk.py b/virtualizarr/tests/test_writers/test_icechunk.py index ae0ca15..3a225aa 100644 --- a/virtualizarr/tests/test_writers/test_icechunk.py +++ b/virtualizarr/tests/test_writers/test_icechunk.py @@ -22,6 +22,7 @@ async def icechunk_filestore(tmpdir) -> "IcechunkStore": storage = StorageConfig.filesystem(str(tmpdir)) store = await IcechunkStore.open(storage=storage, mode="r+") + # TODO instead yield store then store.close() ?? return store @@ -29,6 +30,8 @@ async def icechunk_filestore(tmpdir) -> "IcechunkStore": async def test_write_to_icechunk( icechunk_filestore: "IcechunkStore", vds_with_manifest_arrays: Dataset ): - dataset_to_icechunk(vds_with_manifest_arrays, icechunk_filestore) + store = await icechunk_filestore - print(icechunk_filestore) + dataset_to_icechunk(vds_with_manifest_arrays, store) + + # TODO assert that arrays and references have been written From 6abe32d2b1080966aea5fd2b403ca5e87fdec121 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Fri, 27 Sep 2024 19:33:18 -0400 Subject: [PATCH 06/39] split into more functions --- virtualizarr/writers/icechunk.py | 93 ++++++++++++++++++++++---------- 1 file changed, 64 insertions(+), 29 deletions(-) diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index b351b2f..443ad98 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -1,12 +1,15 @@ +import asyncio from typing import TYPE_CHECKING import numpy as np from xarray import Dataset +from xarray.core.variable import Variable +import zarr -from virtualizarr.manifests import ManifestArray +from virtualizarr.manifests import ManifestArray, ChunkManifest if TYPE_CHECKING: - import IcechunkStore + from icechunk import IcechunkStore def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: @@ -20,48 +23,80 @@ def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: ds: xr.Dataset store: IcechunkStore """ + from icechunk import IcechunkStore + + if not isinstance(store, IcechunkStore): + raise TypeError(f"expected type IcechunkStore, but got type {type(store)}") # TODO write group metadata for name, var in ds.variables.items(): - if isinstance(var.data, ManifestArray): - write_manifestarray_to_icechunk( - store=store, - # TODO is this right? - group='root', - arr_name=name, - ma=var.data, - ) - else: - # TODO write loadable data as normal zarr chunks - raise NotImplementedError() + write_variable_to_icechunk( + store=store, + # TODO is this right? + group="root", + name=name, + var=var, + ) return None -def write_manifestarray_to_icechunk( - store: "IcechunkStore", - group: str, - arr_name: str, - ma: ManifestArray, +def write_variable_to_icechunk( + store: "IcechunkStore", + group: str, + name: str, + var: Variable, ) -> None: + if not isinstance(var.data, ManifestArray): + # TODO is writing loadable_variables just normal xarray ds.to_zarr? + raise NotImplementedError() - manifest = ma.manifest + ma = var.data + zarray = ma.zarray # TODO how do we set the other zarr attributes? i.e. the .zarray information? + # Probably need to manually create the groups and arrays in the store... + # Would that just be re-implementing xarray's `.to_zarr()` though? + array = zarr.Array.create(store, shape=zarray.shape, chunk_shape=zarray.chunks, dtype=zarray.dtype) + + + # TODO we also need to set zarr attributes, including DIMENSION_NAMES + + write_manifest_virtual_refs( + store=store, + group=group, + name=name, + manifest=ma.manifest, + ) + + +def write_manifest_virtual_refs( + store: "IcechunkStore", + group: str, + name: str, + manifest: ChunkManifest, +) -> None: + """Write all the virtual references for one array manifest at once.""" # loop over every reference in the ChunkManifest for that array - # TODO this should be replaced with something more efficient that sets all (new) references for the array at once + # TODO inefficient: this should be replaced with something that sets all (new) references for the array at once # but Icechunk need to expose a suitable API first - for entry in np.nditer( - [manifest._paths, manifest._offsets, manifest._lengths], - flags=['multi_index'], - ): + it = np.nditer( + [manifest._paths, manifest._offsets, manifest._lengths], + flags=["refs_ok", "multi_index", "c_index"], # TODO is "c_index" correct? what's the convention for zarr chunk keys? + op_flags=[['readonly']] * 3 + ) + for (path, offset, length) in it: + index = it.multi_index + chunk_key = "/".join(str(i) for i in index) + + # TODO again this async stuff should be handled at the rust level, not here # set each reference individually store.set_virtual_ref( - # TODO make sure this key is the correct format - key=f"{group}/{arr_name}/{entry.index}", # your (0,1,2) tuple - location=entry[0], # filepath for this element - offset=entry[1], # offset for this element - length=entry[2], # length for this element + # TODO it would be marginally neater if I could pass the group and name as separate args + key=f"{group}/{name}/{chunk_key}", # should be of form '/group/name/0/1/2' + location=path.item(), + offset=offset.item(), + length=length.item(), ) From 93080b301480e25294fe6202c719a2f7514afe1f Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Fri, 27 Sep 2024 19:44:11 -0400 Subject: [PATCH 07/39] change mode --- virtualizarr/tests/test_writers/test_icechunk.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/virtualizarr/tests/test_writers/test_icechunk.py b/virtualizarr/tests/test_writers/test_icechunk.py index 3a225aa..2e1e1ab 100644 --- a/virtualizarr/tests/test_writers/test_icechunk.py +++ b/virtualizarr/tests/test_writers/test_icechunk.py @@ -20,7 +20,7 @@ async def icechunk_filestore(tmpdir) -> "IcechunkStore": from icechunk import IcechunkStore, StorageConfig storage = StorageConfig.filesystem(str(tmpdir)) - store = await IcechunkStore.open(storage=storage, mode="r+") + store = await IcechunkStore.open(storage=storage, mode="w") # TODO instead yield store then store.close() ?? return store From bebf3704bd07c4ccd8a921ed2eb00d468d5609d3 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Fri, 27 Sep 2024 19:44:31 -0400 Subject: [PATCH 08/39] try creating zarr group and arrays explicitly --- virtualizarr/writers/icechunk.py | 49 +++++++++++++++++++------------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index 443ad98..e3b1361 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -1,12 +1,11 @@ -import asyncio from typing import TYPE_CHECKING import numpy as np +import zarr from xarray import Dataset from xarray.core.variable import Variable -import zarr -from virtualizarr.manifests import ManifestArray, ChunkManifest +from virtualizarr.manifests import ChunkManifest, ManifestArray if TYPE_CHECKING: from icechunk import IcechunkStore @@ -28,13 +27,16 @@ def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: if not isinstance(store, IcechunkStore): raise TypeError(f"expected type IcechunkStore, but got type {type(store)}") - # TODO write group metadata + # TODO only supports writing to the root group currently + root_group = zarr.group(store=store, overwrite=True) + + # TODO this is Frozen, the API for setting attributes must be something else + # root_group.attrs = ds.attrs for name, var in ds.variables.items(): write_variable_to_icechunk( store=store, - # TODO is this right? - group="root", + group=root_group, name=name, var=var, ) @@ -44,7 +46,7 @@ def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: def write_variable_to_icechunk( store: "IcechunkStore", - group: str, + group: zarr.Group, name: str, var: Variable, ) -> None: @@ -55,25 +57,28 @@ def write_variable_to_icechunk( ma = var.data zarray = ma.zarray - # TODO how do we set the other zarr attributes? i.e. the .zarray information? - # Probably need to manually create the groups and arrays in the store... - # Would that just be re-implementing xarray's `.to_zarr()` though? - array = zarr.Array.create(store, shape=zarray.shape, chunk_shape=zarray.chunks, dtype=zarray.dtype) - + # TODO when I try to create this array I get an AssertionError from inside Zarr v3 + # TODO do I need this array object for anything after ensuring the array has been created? + # array = group.create_array( + # store, + # shape=zarray.shape, + # chunk_shape=zarray.chunks, + # dtype=zarray.dtype, + # ) # TODO we also need to set zarr attributes, including DIMENSION_NAMES write_manifest_virtual_refs( - store=store, - group=group, - name=name, + store=store, + group=group, + name=name, manifest=ma.manifest, ) def write_manifest_virtual_refs( - store: "IcechunkStore", - group: str, + store: "IcechunkStore", + group: zarr.Group, name: str, manifest: ChunkManifest, ) -> None: @@ -84,10 +89,14 @@ def write_manifest_virtual_refs( # but Icechunk need to expose a suitable API first it = np.nditer( [manifest._paths, manifest._offsets, manifest._lengths], - flags=["refs_ok", "multi_index", "c_index"], # TODO is "c_index" correct? what's the convention for zarr chunk keys? - op_flags=[['readonly']] * 3 + flags=[ + "refs_ok", + "multi_index", + "c_index", + ], # TODO is "c_index" correct? what's the convention for zarr chunk keys? + op_flags=[["readonly"]] * 3, ) - for (path, offset, length) in it: + for path, offset, length in it: index = it.multi_index chunk_key = "/".join(str(i) for i in index) From 833e5f09fc2446f9ad6f9ef13d0b557d8ff65211 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Sat, 28 Sep 2024 13:37:22 -0400 Subject: [PATCH 09/39] create root group from store --- virtualizarr/writers/icechunk.py | 33 +++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index e3b1361..eb620e1 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -1,11 +1,12 @@ from typing import TYPE_CHECKING import numpy as np -import zarr from xarray import Dataset from xarray.core.variable import Variable +from zarr import Group from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.zarr import encode_dtype if TYPE_CHECKING: from icechunk import IcechunkStore @@ -28,7 +29,8 @@ def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: raise TypeError(f"expected type IcechunkStore, but got type {type(store)}") # TODO only supports writing to the root group currently - root_group = zarr.group(store=store, overwrite=True) + # TODO pass zarr_format kwarg? + root = Group.from_store(store=store) # TODO this is Frozen, the API for setting attributes must be something else # root_group.attrs = ds.attrs @@ -36,7 +38,7 @@ def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: for name, var in ds.variables.items(): write_variable_to_icechunk( store=store, - group=root_group, + group=root, name=name, var=var, ) @@ -46,7 +48,7 @@ def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: def write_variable_to_icechunk( store: "IcechunkStore", - group: zarr.Group, + group: Group, name: str, var: Variable, ) -> None: @@ -57,16 +59,17 @@ def write_variable_to_icechunk( ma = var.data zarray = ma.zarray - # TODO when I try to create this array I get an AssertionError from inside Zarr v3 - # TODO do I need this array object for anything after ensuring the array has been created? - # array = group.create_array( - # store, - # shape=zarray.shape, - # chunk_shape=zarray.chunks, - # dtype=zarray.dtype, - # ) + # TODO do I need the returned zarr.array object for anything after ensuring the array has been created? + # TODO should I be checking that this array doesn't already exist? Or is that icechunks' job? + group.create_array( + name, + shape=zarray.shape, + chunk_shape=zarray.chunks, + dtype=encode_dtype(zarray.dtype), + ) # TODO we also need to set zarr attributes, including DIMENSION_NAMES + # TODO we should probably be doing some encoding of those attributes? write_manifest_virtual_refs( store=store, @@ -78,7 +81,7 @@ def write_variable_to_icechunk( def write_manifest_virtual_refs( store: "IcechunkStore", - group: zarr.Group, + group: Group, name: str, manifest: ChunkManifest, ) -> None: @@ -92,8 +95,8 @@ def write_manifest_virtual_refs( flags=[ "refs_ok", "multi_index", - "c_index", - ], # TODO is "c_index" correct? what's the convention for zarr chunk keys? + "c_index", # TODO is "c_index" correct? what's the convention for zarr chunk keys? + ], op_flags=[["readonly"]] * 3, ) for path, offset, length in it: From 9853140828cdc577cf5209172a4ccf5ce440be99 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Sat, 28 Sep 2024 13:39:31 -0400 Subject: [PATCH 10/39] todos --- virtualizarr/tests/test_writers/test_icechunk.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/virtualizarr/tests/test_writers/test_icechunk.py b/virtualizarr/tests/test_writers/test_icechunk.py index 2e1e1ab..40af4e3 100644 --- a/virtualizarr/tests/test_writers/test_icechunk.py +++ b/virtualizarr/tests/test_writers/test_icechunk.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING import pytest @@ -9,10 +9,7 @@ from virtualizarr.writers.icechunk import dataset_to_icechunk if TYPE_CHECKING: - try: - from icechunk import IcechunkStore - except ImportError: - IcechunkStore = Any + from icechunk import IcechunkStore @pytest.fixture @@ -20,7 +17,8 @@ async def icechunk_filestore(tmpdir) -> "IcechunkStore": from icechunk import IcechunkStore, StorageConfig storage = StorageConfig.filesystem(str(tmpdir)) - store = await IcechunkStore.open(storage=storage, mode="w") + # TODO if I use asyncio.run can I avoid this fixture and tests being async functions? + store = await IcechunkStore.open(storage=storage, mode="r+") # TODO instead yield store then store.close() ?? return store @@ -35,3 +33,6 @@ async def test_write_to_icechunk( dataset_to_icechunk(vds_with_manifest_arrays, store) # TODO assert that arrays and references have been written + + +# TODO roundtripping tests - requires icechunk compatibility with xarray From 030a96ebbd3ade3a8f5c9b08d67d5867a7967dae Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Sat, 28 Sep 2024 13:44:30 -0400 Subject: [PATCH 11/39] do away with the async pytest fixtures/functions --- virtualizarr/tests/test_writers/test_icechunk.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/virtualizarr/tests/test_writers/test_icechunk.py b/virtualizarr/tests/test_writers/test_icechunk.py index 40af4e3..e2199b5 100644 --- a/virtualizarr/tests/test_writers/test_icechunk.py +++ b/virtualizarr/tests/test_writers/test_icechunk.py @@ -1,3 +1,4 @@ +import asyncio from typing import TYPE_CHECKING import pytest @@ -13,24 +14,22 @@ @pytest.fixture -async def icechunk_filestore(tmpdir) -> "IcechunkStore": +def icechunk_filestore(tmpdir) -> "IcechunkStore": from icechunk import IcechunkStore, StorageConfig storage = StorageConfig.filesystem(str(tmpdir)) - # TODO if I use asyncio.run can I avoid this fixture and tests being async functions? - store = await IcechunkStore.open(storage=storage, mode="r+") + + # TODO if icechunk exposed a synchronous version of .open then we wouldn't need to use asyncio.run here + store = asyncio.run(IcechunkStore.open(storage=storage, mode="r+")) # TODO instead yield store then store.close() ?? return store -@pytest.mark.asyncio -async def test_write_to_icechunk( +def test_write_to_icechunk( icechunk_filestore: "IcechunkStore", vds_with_manifest_arrays: Dataset ): - store = await icechunk_filestore - - dataset_to_icechunk(vds_with_manifest_arrays, store) + dataset_to_icechunk(vds_with_manifest_arrays, icechunk_filestore) # TODO assert that arrays and references have been written From 90ca5cfa1bd4ed9f56dffa7fccb5c23c12c100c6 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Sat, 28 Sep 2024 14:37:04 -0400 Subject: [PATCH 12/39] successfully writes root group attrs --- .../tests/test_writers/test_icechunk.py | 21 ++++++++++++++----- virtualizarr/writers/icechunk.py | 8 +++++-- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/virtualizarr/tests/test_writers/test_icechunk.py b/virtualizarr/tests/test_writers/test_icechunk.py index e2199b5..f64781a 100644 --- a/virtualizarr/tests/test_writers/test_icechunk.py +++ b/virtualizarr/tests/test_writers/test_icechunk.py @@ -6,6 +6,7 @@ pytest.importorskip("icechunk") from xarray import Dataset +from zarr import group from virtualizarr.writers.icechunk import dataset_to_icechunk @@ -20,18 +21,28 @@ def icechunk_filestore(tmpdir) -> "IcechunkStore": storage = StorageConfig.filesystem(str(tmpdir)) # TODO if icechunk exposed a synchronous version of .open then we wouldn't need to use asyncio.run here + # TODO is this the correct mode to use? store = asyncio.run(IcechunkStore.open(storage=storage, mode="r+")) # TODO instead yield store then store.close() ?? return store -def test_write_to_icechunk( - icechunk_filestore: "IcechunkStore", vds_with_manifest_arrays: Dataset -): - dataset_to_icechunk(vds_with_manifest_arrays, icechunk_filestore) +class TestWriteVirtualRefs: + def test_write_new_variable( + self, icechunk_filestore: "IcechunkStore", vds_with_manifest_arrays: Dataset + ): + dataset_to_icechunk(vds_with_manifest_arrays, icechunk_filestore) - # TODO assert that arrays and references have been written + root_group = group(store=icechunk_filestore) + assert root_group.attrs == {"something": 0} + # TODO assert that arrays, array attrs, and references have been written + + # note: we don't need to test that committing actually works, because now we have confirmed + # the refs are in the store (even uncommitted) it's icechunk's problem to manage now. + + +# TODO test writing loadable variables # TODO roundtripping tests - requires icechunk compatibility with xarray diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index eb620e1..b3c8014 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -28,17 +28,21 @@ def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: if not isinstance(store, IcechunkStore): raise TypeError(f"expected type IcechunkStore, but got type {type(store)}") + # TODO should we check that the store supports writes at this point? + # TODO only supports writing to the root group currently # TODO pass zarr_format kwarg? - root = Group.from_store(store=store) + root_group = Group.from_store(store=store) # TODO this is Frozen, the API for setting attributes must be something else # root_group.attrs = ds.attrs + for k, v in ds.attrs.items(): + root_group.attrs[k] = v for name, var in ds.variables.items(): write_variable_to_icechunk( store=store, - group=root, + group=root_group, name=name, var=var, ) From b138ddeab84d5488172f3bedd285773f7a5d2bc7 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Sat, 28 Sep 2024 14:51:33 -0400 Subject: [PATCH 13/39] check array metadata is correct --- .../tests/test_writers/test_icechunk.py | 30 +++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/virtualizarr/tests/test_writers/test_icechunk.py b/virtualizarr/tests/test_writers/test_icechunk.py index f64781a..a00da25 100644 --- a/virtualizarr/tests/test_writers/test_icechunk.py +++ b/virtualizarr/tests/test_writers/test_icechunk.py @@ -5,8 +5,9 @@ pytest.importorskip("icechunk") +import numpy as np from xarray import Dataset -from zarr import group +from zarr import Array, Group, group from virtualizarr.writers.icechunk import dataset_to_icechunk @@ -32,13 +33,38 @@ class TestWriteVirtualRefs: def test_write_new_variable( self, icechunk_filestore: "IcechunkStore", vds_with_manifest_arrays: Dataset ): - dataset_to_icechunk(vds_with_manifest_arrays, icechunk_filestore) + vds = vds_with_manifest_arrays + dataset_to_icechunk(vds, icechunk_filestore) + + # check attrs root_group = group(store=icechunk_filestore) + assert isinstance(root_group, Group) assert root_group.attrs == {"something": 0} # TODO assert that arrays, array attrs, and references have been written + # TODO check against vds, then perhaps parametrize? + + # check array exists + assert "a" in root_group + arr = root_group["a"] + assert isinstance(arr, Array) + + # check array metadata + # TODO why doesn't a .zarr_format or .version attribute exist on zarr.Array? + # assert arr.zarr_format == 3 + assert arr.shape == (2, 3) + assert arr.chunks == (2, 3) + assert arr.dtype == np.dtype(" Date: Sat, 28 Sep 2024 15:11:27 -0400 Subject: [PATCH 14/39] try to write array attributes --- virtualizarr/tests/test_writers/conftest.py | 4 +++- virtualizarr/tests/test_writers/test_icechunk.py | 7 +++++-- virtualizarr/writers/icechunk.py | 7 +++++-- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/virtualizarr/tests/test_writers/conftest.py b/virtualizarr/tests/test_writers/conftest.py index af80ea6..27f4666 100644 --- a/virtualizarr/tests/test_writers/conftest.py +++ b/virtualizarr/tests/test_writers/conftest.py @@ -1,6 +1,7 @@ import numpy as np import pytest from xarray import Dataset +from xarray.core.variable import Variable from virtualizarr.manifests import ChunkManifest, ManifestArray @@ -22,4 +23,5 @@ def vds_with_manifest_arrays() -> Dataset: zarr_format=3, ), ) - return Dataset({"a": (["x", "y"], arr)}, attrs={"something": 0}) + var = Variable(dims=["x", "y"], data=arr, attrs={"units": "km"}) + return Dataset({"a": var}, attrs={"something": 0}) diff --git a/virtualizarr/tests/test_writers/test_icechunk.py b/virtualizarr/tests/test_writers/test_icechunk.py index a00da25..2db90fb 100644 --- a/virtualizarr/tests/test_writers/test_icechunk.py +++ b/virtualizarr/tests/test_writers/test_icechunk.py @@ -42,8 +42,6 @@ def test_write_new_variable( assert isinstance(root_group, Group) assert root_group.attrs == {"something": 0} - # TODO assert that arrays, array attrs, and references have been written - # TODO check against vds, then perhaps parametrize? # check array exists @@ -62,6 +60,11 @@ def test_write_new_variable( # TODO check compressor, filters? # check array attrs + # TODO somehow this is broken by setting the dimension names??? + # assert dict(arr.attrs) == {"units": "km"} + + # check dimensions + assert arr.attrs["DIMENSION_NAMES"] == ["x", "y"] # check chunk references diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index b3c8014..69f9694 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -65,15 +65,18 @@ def write_variable_to_icechunk( # TODO do I need the returned zarr.array object for anything after ensuring the array has been created? # TODO should I be checking that this array doesn't already exist? Or is that icechunks' job? - group.create_array( + arr = group.create_array( name, shape=zarray.shape, chunk_shape=zarray.chunks, dtype=encode_dtype(zarray.dtype), ) - # TODO we also need to set zarr attributes, including DIMENSION_NAMES + # TODO it would be nice if we could assign directly to the .attrs property + for k, v in var.attrs.items(): + arr.attrs[k] = v # TODO we should probably be doing some encoding of those attributes? + arr.attrs["DIMENSION_NAMES"] = var.dims write_manifest_virtual_refs( store=store, From e92b56c5a9cabcd5376663e74b463272417772bd Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Sat, 28 Sep 2024 15:26:21 -0400 Subject: [PATCH 15/39] sketch test for checking virtual references have been set correctly --- .../tests/test_writers/test_icechunk.py | 21 ++++++++++++++++--- virtualizarr/writers/icechunk.py | 2 +- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/virtualizarr/tests/test_writers/test_icechunk.py b/virtualizarr/tests/test_writers/test_icechunk.py index 2db90fb..5ccc165 100644 --- a/virtualizarr/tests/test_writers/test_icechunk.py +++ b/virtualizarr/tests/test_writers/test_icechunk.py @@ -30,7 +30,7 @@ def icechunk_filestore(tmpdir) -> "IcechunkStore": class TestWriteVirtualRefs: - def test_write_new_variable( + def test_write_new_virtual_variable( self, icechunk_filestore: "IcechunkStore", vds_with_manifest_arrays: Dataset ): vds = vds_with_manifest_arrays @@ -66,10 +66,25 @@ def test_write_new_variable( # check dimensions assert arr.attrs["DIMENSION_NAMES"] == ["x", "y"] + def test_set_virtual_refs( + self, icechunk_filestore: "IcechunkStore", vds_with_manifest_arrays: Dataset + ): + vds = vds_with_manifest_arrays + + dataset_to_icechunk(vds, icechunk_filestore) + + root_group = group(store=icechunk_filestore) + arr = root_group["a"] + # check chunk references + # TODO we can't explicitly check that the path/offset/length is correct because icechunk doesn't yet expose any get_virtual_refs method + + # TODO we can't check the chunk contents if the reference doesn't actually point to any real location + # TODO so we could use our netCDF file fixtures again? And use minio to test that this works with cloud urls? + assert arr[0, 0] == ... - # note: we don't need to test that committing actually works, because now we have confirmed - # the refs are in the store (even uncommitted) it's icechunk's problem to manage now. + # note: we don't need to test that committing works, because now we have confirmed + # the refs are in the store (even uncommitted) it's icechunk's problem to manage them now. # TODO test writing loadable variables diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index 69f9694..5b842b1 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -110,7 +110,7 @@ def write_manifest_virtual_refs( index = it.multi_index chunk_key = "/".join(str(i) for i in index) - # TODO again this async stuff should be handled at the rust level, not here + # TODO this needs to be awaited or something # set each reference individually store.set_virtual_ref( # TODO it would be marginally neater if I could pass the group and name as separate args From 2c8c0ee663031f9fc99aa0bfbc0b0936bc06d8c6 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Mon, 30 Sep 2024 11:52:11 -0400 Subject: [PATCH 16/39] test setting single virtual ref --- .../tests/test_writers/test_icechunk.py | 44 +++++++++++++++---- 1 file changed, 36 insertions(+), 8 deletions(-) diff --git a/virtualizarr/tests/test_writers/test_icechunk.py b/virtualizarr/tests/test_writers/test_icechunk.py index 5ccc165..c4cd002 100644 --- a/virtualizarr/tests/test_writers/test_icechunk.py +++ b/virtualizarr/tests/test_writers/test_icechunk.py @@ -1,4 +1,5 @@ import asyncio +from pathlib import Path from typing import TYPE_CHECKING import pytest @@ -6,10 +7,14 @@ pytest.importorskip("icechunk") import numpy as np -from xarray import Dataset +import numpy.testing as npt +from xarray import Dataset, open_dataset +from xarray.core.variable import Variable from zarr import Array, Group, group +from virtualizarr.manifests import ChunkManifest, ManifestArray from virtualizarr.writers.icechunk import dataset_to_icechunk +from virtualizarr.zarr import ZArray if TYPE_CHECKING: from icechunk import IcechunkStore @@ -66,22 +71,45 @@ def test_write_new_virtual_variable( # check dimensions assert arr.attrs["DIMENSION_NAMES"] == ["x", "y"] - def test_set_virtual_refs( - self, icechunk_filestore: "IcechunkStore", vds_with_manifest_arrays: Dataset + def test_set_single_virtual_ref( + self, icechunk_filestore: "IcechunkStore", netcdf4_file: Path ): - vds = vds_with_manifest_arrays + # TODO kerchunk doesn't work with zarr-python v3 yet so we can't use open_virtual_dataset and icechunk together! + # vds = open_virtual_dataset(netcdf4_file, indexes={}) + + # instead for now just write out byte ranges explicitly + manifest = ChunkManifest( + {"0.0": {"path": netcdf4_file, "offset": 15419, "length": 7738000}} + ) + zarray = ZArray( + shape=(2920, 25, 53), + chunks=(2920, 25, 53), + dtype=np.dtype("int16"), + compressor=None, + filters=None, + fill_value=None, + ) + ma = ManifestArray( + chunkmanifest=manifest, + zarray=zarray, + ) + air = Variable(data=ma, dims=["time", "lat", "lon"]) + vds = Dataset( + {"air": air}, + ) dataset_to_icechunk(vds, icechunk_filestore) root_group = group(store=icechunk_filestore) - arr = root_group["a"] + air_array = root_group["air"] + print(air_array) # check chunk references # TODO we can't explicitly check that the path/offset/length is correct because icechunk doesn't yet expose any get_virtual_refs method - # TODO we can't check the chunk contents if the reference doesn't actually point to any real location - # TODO so we could use our netCDF file fixtures again? And use minio to test that this works with cloud urls? - assert arr[0, 0] == ... + expected_ds = open_dataset(netcdf4_file) + expected_air_array = expected_ds["air"].to_numpy() + npt.assert_equal(air_array, expected_air_array) # note: we don't need to test that committing works, because now we have confirmed # the refs are in the store (even uncommitted) it's icechunk's problem to manage them now. From a2ce1edc128ff987c9af00152a6189976a30f033 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Mon, 30 Sep 2024 14:29:44 -0400 Subject: [PATCH 17/39] use async properly --- virtualizarr/writers/icechunk.py | 61 ++++++++++++++++++++++++-------- 1 file changed, 46 insertions(+), 15 deletions(-) diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index 5b842b1..99eea7c 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -1,3 +1,4 @@ +import asyncio from typing import TYPE_CHECKING import numpy as np @@ -39,21 +40,38 @@ def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: for k, v in ds.attrs.items(): root_group.attrs[k] = v - for name, var in ds.variables.items(): - write_variable_to_icechunk( + # we should be able to write references for each variable concurrently + asyncio.run( + write_variables_to_icechunk_group( + ds.variables, store=store, group=root_group, - name=name, - var=var, ) + ) + - return None +async def write_variables_to_icechunk_group( + variables, + store, + group, +): + await asyncio.gather( + *( + write_variable_to_icechunk( + store=store, + group=group, + arr_name=name, + var=var, + ) + for name, var in variables.items() + ) + ) -def write_variable_to_icechunk( +async def write_variable_to_icechunk( store: "IcechunkStore", group: Group, - name: str, + arr_name: str, var: Variable, ) -> None: if not isinstance(var.data, ManifestArray): @@ -63,13 +81,16 @@ def write_variable_to_icechunk( ma = var.data zarray = ma.zarray - # TODO do I need the returned zarr.array object for anything after ensuring the array has been created? # TODO should I be checking that this array doesn't already exist? Or is that icechunks' job? arr = group.create_array( - name, + arr_name, shape=zarray.shape, chunk_shape=zarray.chunks, dtype=encode_dtype(zarray.dtype), + # TODO fill_value? + # TODO order? + # TODO zarr format? + # TODO compressors? ) # TODO it would be nice if we could assign directly to the .attrs property @@ -78,22 +99,28 @@ def write_variable_to_icechunk( # TODO we should probably be doing some encoding of those attributes? arr.attrs["DIMENSION_NAMES"] = var.dims - write_manifest_virtual_refs( + await write_manifest_virtual_refs( store=store, group=group, - name=name, + arr_name=arr_name, manifest=ma.manifest, ) -def write_manifest_virtual_refs( +async def write_manifest_virtual_refs( store: "IcechunkStore", group: Group, - name: str, + arr_name: str, manifest: ChunkManifest, ) -> None: """Write all the virtual references for one array manifest at once.""" + key_prefix = f"{group.name}{arr_name}" + if key_prefix.startswith("/"): + # remove preceding / character + # TODO unsure if this is correct + key_prefix = key_prefix[1:] + # loop over every reference in the ChunkManifest for that array # TODO inefficient: this should be replaced with something that sets all (new) references for the array at once # but Icechunk need to expose a suitable API first @@ -110,11 +137,15 @@ def write_manifest_virtual_refs( index = it.multi_index chunk_key = "/".join(str(i) for i in index) + key = f"{key_prefix}/{chunk_key}" # should be of form 'group/name/0/1/2' + + print(key) + # TODO this needs to be awaited or something # set each reference individually - store.set_virtual_ref( + await store.set_virtual_ref( # TODO it would be marginally neater if I could pass the group and name as separate args - key=f"{group}/{name}/{chunk_key}", # should be of form '/group/name/0/1/2' + key=key, location=path.item(), offset=offset.item(), length=length.item(), From 93939959aa742a514cf09b7e920fb3ac9d85b493 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Mon, 30 Sep 2024 23:33:56 -0400 Subject: [PATCH 18/39] better separation of handling of loadable variables --- virtualizarr/writers/icechunk.py | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index 99eea7c..add7e67 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -40,7 +40,6 @@ def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: for k, v in ds.attrs.items(): root_group.attrs[k] = v - # we should be able to write references for each variable concurrently asyncio.run( write_variables_to_icechunk_group( ds.variables, @@ -55,12 +54,14 @@ async def write_variables_to_icechunk_group( store, group, ): + # we should be able to write references for each variable concurrently + # TODO we could also write to multiple groups concurrently, i.e. in a future DataTree.to_zarr(icechunkstore) await asyncio.gather( *( write_variable_to_icechunk( store=store, group=group, - arr_name=name, + name=name, var=var, ) for name, var in variables.items() @@ -71,19 +72,37 @@ async def write_variables_to_icechunk_group( async def write_variable_to_icechunk( store: "IcechunkStore", group: Group, - arr_name: str, + name: str, var: Variable, ) -> None: - if not isinstance(var.data, ManifestArray): + """Write a single (possibly virtual) variable into an icechunk store""" + + if isinstance(var.data, ManifestArray): + await write_virtual_variable_to_icechunk( + store=store, + group=group, + name=name, + var=var, + ) + else: # TODO is writing loadable_variables just normal xarray ds.to_zarr? raise NotImplementedError() + +async def write_virtual_variable_to_icechunk( + store: "IcechunkStore", + group: Group, + name: str, + var: Variable, +) -> None: + """Write a single virtual variable into an icechunk store""" + ma = var.data zarray = ma.zarray # TODO should I be checking that this array doesn't already exist? Or is that icechunks' job? arr = group.create_array( - arr_name, + name=name, shape=zarray.shape, chunk_shape=zarray.chunks, dtype=encode_dtype(zarray.dtype), @@ -102,7 +121,7 @@ async def write_variable_to_icechunk( await write_manifest_virtual_refs( store=store, group=group, - arr_name=arr_name, + arr_name=name, manifest=ma.manifest, ) From 956e32452b96edc44ad70869abae6cd93b9d5ca1 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Mon, 30 Sep 2024 23:42:19 -0400 Subject: [PATCH 19/39] fix chunk key format --- virtualizarr/writers/icechunk.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index add7e67..2e793f8 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -135,10 +135,6 @@ async def write_manifest_virtual_refs( """Write all the virtual references for one array manifest at once.""" key_prefix = f"{group.name}{arr_name}" - if key_prefix.startswith("/"): - # remove preceding / character - # TODO unsure if this is correct - key_prefix = key_prefix[1:] # loop over every reference in the ChunkManifest for that array # TODO inefficient: this should be replaced with something that sets all (new) references for the array at once @@ -156,15 +152,11 @@ async def write_manifest_virtual_refs( index = it.multi_index chunk_key = "/".join(str(i) for i in index) - key = f"{key_prefix}/{chunk_key}" # should be of form 'group/name/0/1/2' - - print(key) - # TODO this needs to be awaited or something # set each reference individually await store.set_virtual_ref( # TODO it would be marginally neater if I could pass the group and name as separate args - key=key, + key=f"{key_prefix}/c/{chunk_key}", # should be of form 'group/arr_name/c/0/1/2', where c stands for chunks location=path.item(), offset=offset.item(), length=length.item(), From 2d7d5f63a74d93203ce6988eaab1c2ed2c323d74 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Mon, 30 Sep 2024 23:44:00 -0400 Subject: [PATCH 20/39] use require_array --- virtualizarr/writers/icechunk.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index 2e793f8..36e681c 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -100,8 +100,8 @@ async def write_virtual_variable_to_icechunk( ma = var.data zarray = ma.zarray - # TODO should I be checking that this array doesn't already exist? Or is that icechunks' job? - arr = group.create_array( + # creates array if it doesn't already exist + arr = group.require_array( name=name, shape=zarray.shape, chunk_shape=zarray.chunks, From 8726e23612ae563d0a7b2c1bde160999b719f540 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Tue, 1 Oct 2024 00:00:51 -0400 Subject: [PATCH 21/39] check that store supports writes --- virtualizarr/writers/icechunk.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index 36e681c..99290c6 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -29,7 +29,8 @@ def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: if not isinstance(store, IcechunkStore): raise TypeError(f"expected type IcechunkStore, but got type {type(store)}") - # TODO should we check that the store supports writes at this point? + if not store.supports_writes: + raise ValueError("supplied store does not support writes") # TODO only supports writing to the root group currently # TODO pass zarr_format kwarg? From 387f345641a6aef2c9ddfb01a2f96cd20b22ad87 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Tue, 1 Oct 2024 00:06:43 -0400 Subject: [PATCH 22/39] removed outdated note about awaiting --- virtualizarr/writers/icechunk.py | 1 - 1 file changed, 1 deletion(-) diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index 99290c6..80b8daf 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -153,7 +153,6 @@ async def write_manifest_virtual_refs( index = it.multi_index chunk_key = "/".join(str(i) for i in index) - # TODO this needs to be awaited or something # set each reference individually await store.set_virtual_ref( # TODO it would be marginally neater if I could pass the group and name as separate args From b2a07009aee0dba206fe8970736ef666ae6c432e Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Tue, 1 Oct 2024 21:05:53 -0400 Subject: [PATCH 23/39] fix incorrect chunk key in test --- virtualizarr/tests/test_writers/test_icechunk.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/virtualizarr/tests/test_writers/test_icechunk.py b/virtualizarr/tests/test_writers/test_icechunk.py index c4cd002..3861e00 100644 --- a/virtualizarr/tests/test_writers/test_icechunk.py +++ b/virtualizarr/tests/test_writers/test_icechunk.py @@ -79,7 +79,7 @@ def test_set_single_virtual_ref( # instead for now just write out byte ranges explicitly manifest = ChunkManifest( - {"0.0": {"path": netcdf4_file, "offset": 15419, "length": 7738000}} + {"0.0.0": {"path": netcdf4_file, "offset": 15419, "length": 7738000}} ) zarray = ZArray( shape=(2920, 25, 53), From 4ffb55e4b86cc0cc941d00fc69f10fdfa4c7e132 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Tue, 1 Oct 2024 21:20:14 -0400 Subject: [PATCH 24/39] absolute path --- virtualizarr/tests/test_writers/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/virtualizarr/tests/test_writers/conftest.py b/virtualizarr/tests/test_writers/conftest.py index 27f4666..28c5b3d 100644 --- a/virtualizarr/tests/test_writers/conftest.py +++ b/virtualizarr/tests/test_writers/conftest.py @@ -10,7 +10,7 @@ def vds_with_manifest_arrays() -> Dataset: arr = ManifestArray( chunkmanifest=ChunkManifest( - entries={"0.0": dict(path="test.nc", offset=6144, length=48)} + entries={"0.0": dict(path="/test.nc", offset=6144, length=48)} ), zarray=dict( shape=(2, 3), From f929fcb885b00c318a9e2b9c17a86ca4fa7960ba Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Tue, 1 Oct 2024 23:50:02 -0400 Subject: [PATCH 25/39] convert to file URI before handing to icechunk --- virtualizarr/writers/icechunk.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index 80b8daf..fea3e26 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -13,6 +13,17 @@ from icechunk import IcechunkStore +VALID_URI_PREFIXES = { + "s3://", + "gs://", + "azure://", + "r2://", + "cos://", + "minio://", + "file:///", +} + + def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: """ Write an xarray dataset whose variables wrap ManifestArrays to an Icechunk store. @@ -157,7 +168,16 @@ async def write_manifest_virtual_refs( await store.set_virtual_ref( # TODO it would be marginally neater if I could pass the group and name as separate args key=f"{key_prefix}/c/{chunk_key}", # should be of form 'group/arr_name/c/0/1/2', where c stands for chunks - location=path.item(), + location=as_file_uri(path.item()), offset=offset.item(), length=length.item(), ) + + +def as_file_uri(path): + # TODO a more robust solution to this requirement exists in https://github.com/zarr-developers/VirtualiZarr/pull/243 + if not any(path.startswith(prefix) for prefix in VALID_URI_PREFIXES) and path != "": + # assume path is local + return f"file://{path}" + else: + return path From e9c12871fcdd58b5da18e5f9345790b367e112c8 Mon Sep 17 00:00:00 2001 From: TomNicholas Date: Wed, 2 Oct 2024 11:30:56 -0400 Subject: [PATCH 26/39] test that without encoding we can definitely read one chunk --- conftest.py | 15 ++++++ .../tests/test_writers/test_icechunk.py | 53 ++++++++++++++++++- 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/conftest.py b/conftest.py index 32b3581..58906de 100644 --- a/conftest.py +++ b/conftest.py @@ -1,6 +1,8 @@ import h5py +import numpy as np import pytest import xarray as xr +from xarray.core.variable import Variable def pytest_addoption(parser): @@ -82,3 +84,16 @@ def hdf5_scalar(tmpdir): dataset = f.create_dataset("scalar", data=0.1, dtype="float32") dataset.attrs["scalar"] = "true" return filepath + + +@pytest.fixture +def simple_netcdf4(tmpdir): + filepath = f"{tmpdir}/simple.nc" + + arr = np.arange(12, dtype=np.dtype("int32")).reshape(3, 4) + var = Variable(data=arr, dims=["x", "y"]) + ds = xr.Dataset({"foo": var}) + + ds.to_netcdf(filepath) + + return filepath diff --git a/virtualizarr/tests/test_writers/test_icechunk.py b/virtualizarr/tests/test_writers/test_icechunk.py index 3861e00..6c905f0 100644 --- a/virtualizarr/tests/test_writers/test_icechunk.py +++ b/virtualizarr/tests/test_writers/test_icechunk.py @@ -71,7 +71,50 @@ def test_write_new_virtual_variable( # check dimensions assert arr.attrs["DIMENSION_NAMES"] == ["x", "y"] - def test_set_single_virtual_ref( + def test_set_single_virtual_ref_without_encoding( + self, icechunk_filestore: "IcechunkStore", simple_netcdf4: Path + ): + # TODO kerchunk doesn't work with zarr-python v3 yet so we can't use open_virtual_dataset and icechunk together! + # vds = open_virtual_dataset(netcdf4_file, indexes={}) + + # instead for now just write out byte ranges explicitly + manifest = ChunkManifest( + {"0.0": {"path": simple_netcdf4, "offset": 6144, "length": 48}} + ) + zarray = ZArray( + shape=(3, 4), + chunks=(3, 4), + dtype=np.dtype("int32"), + compressor=None, + filters=None, + fill_value=None, + ) + ma = ManifestArray( + chunkmanifest=manifest, + zarray=zarray, + ) + foo = Variable(data=ma, dims=["x", "y"]) + vds = Dataset( + {"foo": foo}, + ) + + dataset_to_icechunk(vds, icechunk_filestore) + + root_group = group(store=icechunk_filestore) + array = root_group["foo"] + + # check chunk references + # TODO we can't explicitly check that the path/offset/length is correct because icechunk doesn't yet expose any get_virtual_refs method + + expected_ds = open_dataset(simple_netcdf4) + expected_array = expected_ds["foo"].to_numpy() + npt.assert_equal(array, expected_array) + + # note: we don't need to test that committing works, because now we have confirmed + # the refs are in the store (even uncommitted) it's icechunk's problem to manage them now. + + @pytest.mark.xfail(reason="Test doesn't account for scale factor encoding yet") + def test_set_single_virtual_ref_with_encoding( self, icechunk_filestore: "IcechunkStore", netcdf4_file: Path ): # TODO kerchunk doesn't work with zarr-python v3 yet so we can't use open_virtual_dataset and icechunk together! @@ -115,6 +158,14 @@ def test_set_single_virtual_ref( # the refs are in the store (even uncommitted) it's icechunk's problem to manage them now. +# TODO get test with encoding working + +# TODO test writing grids of multiple chunks + +# TODO test writing to a group that isn't the root group + # TODO test writing loadable variables # TODO roundtripping tests - requires icechunk compatibility with xarray + +# TODO test with S3 / minio From 2fe301279c4d0ed08ea651180aa26dac83ac3f45 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Wed, 2 Oct 2024 12:07:29 -0400 Subject: [PATCH 27/39] Work on encoding test --- virtualizarr/tests/test_writers/test_icechunk.py | 14 +++++++++++--- virtualizarr/writers/icechunk.py | 6 ++++++ 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/virtualizarr/tests/test_writers/test_icechunk.py b/virtualizarr/tests/test_writers/test_icechunk.py index 3861e00..53f4288 100644 --- a/virtualizarr/tests/test_writers/test_icechunk.py +++ b/virtualizarr/tests/test_writers/test_icechunk.py @@ -63,6 +63,7 @@ def test_write_new_virtual_variable( assert arr.order == "C" assert arr.fill_value == 0 # TODO check compressor, filters? + # # check array attrs # TODO somehow this is broken by setting the dimension names??? @@ -93,7 +94,7 @@ def test_set_single_virtual_ref( chunkmanifest=manifest, zarray=zarray, ) - air = Variable(data=ma, dims=["time", "lat", "lon"]) + air = Variable(data=ma, dims=["time", "lat", "lon"], encoding={"scale_factor": 0.01}) vds = Dataset( {"air": air}, ) @@ -102,13 +103,20 @@ def test_set_single_virtual_ref( root_group = group(store=icechunk_filestore) air_array = root_group["air"] - print(air_array) + + # check array metadata + assert air_array.shape == (2920, 25, 53) + assert air_array.chunks == (2920, 25, 53) + assert air_array.dtype == np.dtype("int16") + assert air_array.attrs['scale_factor'] == 0.01 + + scale_factor = air_array.attrs['scale_factor'] # check chunk references # TODO we can't explicitly check that the path/offset/length is correct because icechunk doesn't yet expose any get_virtual_refs method expected_ds = open_dataset(netcdf4_file) - expected_air_array = expected_ds["air"].to_numpy() + expected_air_array = expected_ds["air"].to_numpy() / scale_factor npt.assert_equal(air_array, expected_air_array) # note: we don't need to test that committing works, because now we have confirmed diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index fea3e26..f1c2f4f 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -118,6 +118,7 @@ async def write_virtual_variable_to_icechunk( shape=zarray.shape, chunk_shape=zarray.chunks, dtype=encode_dtype(zarray.dtype), + codecs=zarray._v3_codec_pipeline(), # TODO fill_value? # TODO order? # TODO zarr format? @@ -130,6 +131,11 @@ async def write_virtual_variable_to_icechunk( # TODO we should probably be doing some encoding of those attributes? arr.attrs["DIMENSION_NAMES"] = var.dims + _encoding_keys = {"_FillValue", "missing_value", "scale_factor", "add_offset"} + for k, v in var.encoding.items(): + if k in _encoding_keys: + arr.attrs[k] = v + await write_manifest_virtual_refs( store=store, group=group, From 8aa6034cd39d657984d25c29104eb177a93606c2 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Wed, 2 Oct 2024 13:09:28 -0400 Subject: [PATCH 28/39] Update test to match --- virtualizarr/tests/test_writers/test_icechunk.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/virtualizarr/tests/test_writers/test_icechunk.py b/virtualizarr/tests/test_writers/test_icechunk.py index 03bc8b6..ae6837e 100644 --- a/virtualizarr/tests/test_writers/test_icechunk.py +++ b/virtualizarr/tests/test_writers/test_icechunk.py @@ -114,7 +114,6 @@ def test_set_single_virtual_ref_without_encoding( # note: we don't need to test that committing works, because now we have confirmed # the refs are in the store (even uncommitted) it's icechunk's problem to manage them now. - @pytest.mark.xfail(reason="Test doesn't account for scale factor encoding yet") def test_set_single_virtual_ref_with_encoding( self, icechunk_filestore: "IcechunkStore", netcdf4_file: Path ): @@ -152,15 +151,15 @@ def test_set_single_virtual_ref_with_encoding( assert air_array.chunks == (2920, 25, 53) assert air_array.dtype == np.dtype("int16") assert air_array.attrs['scale_factor'] == 0.01 - scale_factor = air_array.attrs['scale_factor'] + scaled_air_array = air_array[:] * scale_factor # check chunk references # TODO we can't explicitly check that the path/offset/length is correct because icechunk doesn't yet expose any get_virtual_refs method expected_ds = open_dataset(netcdf4_file) - expected_air_array = expected_ds["air"].to_numpy() / scale_factor - npt.assert_equal(air_array, expected_air_array) + expected_air_array = expected_ds["air"].to_numpy() + npt.assert_equal(scaled_air_array, expected_air_array) # note: we don't need to test that committing works, because now we have confirmed # the refs are in the store (even uncommitted) it's icechunk's problem to manage them now. From aa2d415cfeab567859987cbfd44f463833801fad Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Wed, 2 Oct 2024 13:16:45 -0400 Subject: [PATCH 29/39] Quick comment --- virtualizarr/tests/test_writers/test_icechunk.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/virtualizarr/tests/test_writers/test_icechunk.py b/virtualizarr/tests/test_writers/test_icechunk.py index ae6837e..72c4fe6 100644 --- a/virtualizarr/tests/test_writers/test_icechunk.py +++ b/virtualizarr/tests/test_writers/test_icechunk.py @@ -151,6 +151,9 @@ def test_set_single_virtual_ref_with_encoding( assert air_array.chunks == (2920, 25, 53) assert air_array.dtype == np.dtype("int16") assert air_array.attrs['scale_factor'] == 0.01 + + # xarray performs this when cf_decoding is True, but we are not loading + # with xarray here so we scale it manually. scale_factor = air_array.attrs['scale_factor'] scaled_air_array = air_array[:] * scale_factor @@ -165,8 +168,6 @@ def test_set_single_virtual_ref_with_encoding( # the refs are in the store (even uncommitted) it's icechunk's problem to manage them now. -# TODO get test with encoding working - # TODO test writing grids of multiple chunks # TODO test writing to a group that isn't the root group From 7e4e2ce0ebc864b8421e07c59d48eba317df94fe Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Wed, 2 Oct 2024 14:44:08 -0400 Subject: [PATCH 30/39] more comprehensive --- virtualizarr/writers/icechunk.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index f1c2f4f..2b1454e 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -119,10 +119,9 @@ async def write_virtual_variable_to_icechunk( chunk_shape=zarray.chunks, dtype=encode_dtype(zarray.dtype), codecs=zarray._v3_codec_pipeline(), + dimension_names=var.dims, + fill_value=zarray.fill_value, # TODO fill_value? - # TODO order? - # TODO zarr format? - # TODO compressors? ) # TODO it would be nice if we could assign directly to the .attrs property From 9a03245991f86acd9fffb45b84aa902077ca86d3 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Thu, 3 Oct 2024 13:53:32 -0400 Subject: [PATCH 31/39] add attrtirbute encoding --- virtualizarr/writers/icechunk.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index 2b1454e..84fbe1b 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -2,7 +2,8 @@ from typing import TYPE_CHECKING import numpy as np -from xarray import Dataset +from xarray import Dataset, conventions +from xarray.backends.zarr import encode_zarr_attr_value from xarray.core.variable import Variable from zarr import Group @@ -15,11 +16,11 @@ VALID_URI_PREFIXES = { "s3://", - "gs://", - "azure://", - "r2://", - "cos://", - "minio://", + # "gs://", + # "azure://", + # "r2://", + # "cos://", + # "minio://", "file:///", } @@ -50,7 +51,7 @@ def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: # TODO this is Frozen, the API for setting attributes must be something else # root_group.attrs = ds.attrs for k, v in ds.attrs.items(): - root_group.attrs[k] = v + root_group.attrs[k] = encode_zarr_attr_value(v) asyncio.run( write_variables_to_icechunk_group( @@ -108,7 +109,6 @@ async def write_virtual_variable_to_icechunk( var: Variable, ) -> None: """Write a single virtual variable into an icechunk store""" - ma = var.data zarray = ma.zarray @@ -126,14 +126,13 @@ async def write_virtual_variable_to_icechunk( # TODO it would be nice if we could assign directly to the .attrs property for k, v in var.attrs.items(): - arr.attrs[k] = v - # TODO we should probably be doing some encoding of those attributes? - arr.attrs["DIMENSION_NAMES"] = var.dims + arr.attrs[k] = encode_zarr_attr_value(v) + arr.attrs["DIMENSION_NAMES"] = encode_zarr_attr_value(var.dims) _encoding_keys = {"_FillValue", "missing_value", "scale_factor", "add_offset"} for k, v in var.encoding.items(): if k in _encoding_keys: - arr.attrs[k] = v + arr.attrs[k] = encode_zarr_attr_value(v) await write_manifest_virtual_refs( store=store, From bbaf3baae31a04e52d957142862157c550aa77f8 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Thu, 10 Oct 2024 16:37:42 -0400 Subject: [PATCH 32/39] Fix array dimensions --- .../tests/test_writers/test_icechunk.py | 6 ++++-- virtualizarr/writers/icechunk.py | 21 ++++++++++++------- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/virtualizarr/tests/test_writers/test_icechunk.py b/virtualizarr/tests/test_writers/test_icechunk.py index 72c4fe6..bce95a7 100644 --- a/virtualizarr/tests/test_writers/test_icechunk.py +++ b/virtualizarr/tests/test_writers/test_icechunk.py @@ -8,7 +8,7 @@ import numpy as np import numpy.testing as npt -from xarray import Dataset, open_dataset +from xarray import Dataset, open_dataset, open_zarr from xarray.core.variable import Variable from zarr import Array, Group, group @@ -70,7 +70,7 @@ def test_write_new_virtual_variable( # assert dict(arr.attrs) == {"units": "km"} # check dimensions - assert arr.attrs["DIMENSION_NAMES"] == ["x", "y"] + assert arr.attrs["_ARRAY_DIMENSIONS"] == ["x", "y"] def test_set_single_virtual_ref_without_encoding( self, icechunk_filestore: "IcechunkStore", simple_netcdf4: Path @@ -111,6 +111,8 @@ def test_set_single_virtual_ref_without_encoding( expected_array = expected_ds["foo"].to_numpy() npt.assert_equal(array, expected_array) + #ds = open_zarr(store=icechunk_filestore, group='foo', zarr_format=3, consolidated=False) + # note: we don't need to test that committing works, because now we have confirmed # the refs are in the store (even uncommitted) it's icechunk's problem to manage them now. diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index 84fbe1b..091835b 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -25,7 +25,7 @@ } -def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: +async def dataset_to_icechunk_async(ds: Dataset, store: "IcechunkStore") -> None: """ Write an xarray dataset whose variables wrap ManifestArrays to an Icechunk store. @@ -52,13 +52,17 @@ def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: # root_group.attrs = ds.attrs for k, v in ds.attrs.items(): root_group.attrs[k] = encode_zarr_attr_value(v) + + return await write_variables_to_icechunk_group( + ds.variables, + store=store, + group=root_group, + ) + +def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: asyncio.run( - write_variables_to_icechunk_group( - ds.variables, - store=store, - group=root_group, - ) + dataset_to_icechunk_async(ds=ds, store=store) ) @@ -113,12 +117,13 @@ async def write_virtual_variable_to_icechunk( zarray = ma.zarray # creates array if it doesn't already exist + codecs = zarray._v3_codec_pipeline() arr = group.require_array( name=name, shape=zarray.shape, chunk_shape=zarray.chunks, dtype=encode_dtype(zarray.dtype), - codecs=zarray._v3_codec_pipeline(), + #codecs=codecs, dimension_names=var.dims, fill_value=zarray.fill_value, # TODO fill_value? @@ -127,7 +132,7 @@ async def write_virtual_variable_to_icechunk( # TODO it would be nice if we could assign directly to the .attrs property for k, v in var.attrs.items(): arr.attrs[k] = encode_zarr_attr_value(v) - arr.attrs["DIMENSION_NAMES"] = encode_zarr_attr_value(var.dims) + arr.attrs["_ARRAY_DIMENSIONS"] = encode_zarr_attr_value(var.dims) _encoding_keys = {"_FillValue", "missing_value", "scale_factor", "add_offset"} for k, v in var.encoding.items(): From 49daa7e75a10a0aa1a5d0f4012fff8276271281a Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Fri, 11 Oct 2024 13:00:29 -0400 Subject: [PATCH 33/39] Fix v3 codec pipeline --- pyproject.toml | 2 +- virtualizarr/writers/icechunk.py | 3 +-- virtualizarr/zarr.py | 9 +++++++-- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 6b0efe8..ec3a6b5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,7 @@ classifiers = [ requires-python = ">=3.10" dynamic = ["version"] dependencies = [ - "xarray>=2024.06.0", + "xarray", "kerchunk>=0.2.5", "h5netcdf", "numpy>=2.0.0", diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index 091835b..3fca1c7 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -117,13 +117,12 @@ async def write_virtual_variable_to_icechunk( zarray = ma.zarray # creates array if it doesn't already exist - codecs = zarray._v3_codec_pipeline() arr = group.require_array( name=name, shape=zarray.shape, chunk_shape=zarray.chunks, dtype=encode_dtype(zarray.dtype), - #codecs=codecs, + codecs=zarray._v3_codec_pipeline(), dimension_names=var.dims, fill_value=zarray.fill_value, # TODO fill_value? diff --git a/virtualizarr/zarr.py b/virtualizarr/zarr.py index f62b126..20788e8 100644 --- a/virtualizarr/zarr.py +++ b/virtualizarr/zarr.py @@ -4,6 +4,7 @@ import numcodecs import numpy as np import ujson # type: ignore +from zarr.core.metadata.v3 import parse_codecs if TYPE_CHECKING: pass @@ -140,7 +141,7 @@ def replace( replacements["zarr_format"] = zarr_format return dataclasses.replace(self, **replacements) - def _v3_codec_pipeline(self) -> list: + def _v3_codec_pipeline(self) -> Any: """ VirtualiZarr internally uses the `filters`, `compressor`, and `order` attributes from zarr v2, but to create conformant zarr v3 metadata those 3 must be turned into `codecs` objects. @@ -190,7 +191,11 @@ def _v3_codec_pipeline(self) -> list: # The order here is significant! # [ArrayArray] -> ArrayBytes -> [BytesBytes] - codec_pipeline = [transpose, bytes] + compressor + filters + raw_codec_pipeline = [transpose, bytes] + compressor + filters + + # convert the pipeline repr into actual v3 codec objects + codec_pipeline = parse_codecs(raw_codec_pipeline) + return codec_pipeline From 756ff9257e02e27a02190a286eb3c45399cffed2 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Fri, 11 Oct 2024 13:01:47 -0400 Subject: [PATCH 34/39] Put xarray dep back --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index ec3a6b5..6b0efe8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,7 @@ classifiers = [ requires-python = ">=3.10" dynamic = ["version"] dependencies = [ - "xarray", + "xarray>=2024.06.0", "kerchunk>=0.2.5", "h5netcdf", "numpy>=2.0.0", From 8c7242e9a2af221382a6ce78922b56ea0a54e7e4 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Fri, 11 Oct 2024 22:00:02 -0400 Subject: [PATCH 35/39] Handle codecs, but get bad results --- virtualizarr/writers/icechunk.py | 4 ++- virtualizarr/zarr.py | 52 +++++++++++++++++--------------- 2 files changed, 30 insertions(+), 26 deletions(-) diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index 3fca1c7..f789ebf 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -103,7 +103,8 @@ async def write_variable_to_icechunk( ) else: # TODO is writing loadable_variables just normal xarray ds.to_zarr? - raise NotImplementedError() + # raise NotImplementedError() + print("skipping non-virtual variable", name) async def write_virtual_variable_to_icechunk( @@ -117,6 +118,7 @@ async def write_virtual_variable_to_icechunk( zarray = ma.zarray # creates array if it doesn't already exist + print(name, zarray.fill_value) arr = group.require_array( name=name, shape=zarray.shape, diff --git a/virtualizarr/zarr.py b/virtualizarr/zarr.py index 20788e8..16e1a9a 100644 --- a/virtualizarr/zarr.py +++ b/virtualizarr/zarr.py @@ -4,7 +4,6 @@ import numcodecs import numpy as np import ujson # type: ignore -from zarr.core.metadata.v3 import parse_codecs if TYPE_CHECKING: pass @@ -75,8 +74,9 @@ def codec(self) -> Codec: @classmethod def from_kerchunk_refs(cls, decoded_arr_refs_zarray) -> "ZArray": # coerce type of fill_value as kerchunk can be inconsistent with this + dtype = np.dtype(decoded_arr_refs_zarray["dtype"]) fill_value = decoded_arr_refs_zarray["fill_value"] - if fill_value is None or fill_value == "NaN" or fill_value == "nan": + if np.issubdtype(dtype, np.floating) and (fill_value is None or fill_value == "NaN" or fill_value == "nan"): fill_value = np.nan compressor = decoded_arr_refs_zarray["compressor"] @@ -87,7 +87,7 @@ def from_kerchunk_refs(cls, decoded_arr_refs_zarray) -> "ZArray": return ZArray( chunks=tuple(decoded_arr_refs_zarray["chunks"]), compressor=compressor, - dtype=np.dtype(decoded_arr_refs_zarray["dtype"]), + dtype=dtype, fill_value=fill_value, filters=decoded_arr_refs_zarray["filters"], order=decoded_arr_refs_zarray["order"], @@ -154,47 +154,46 @@ def _v3_codec_pipeline(self) -> Any: post_compressor: Iterable[BytesBytesCodec] #optional ``` """ - if self.filters: - filter_codecs_configs = [ - numcodecs.get_codec(filter).get_config() for filter in self.filters - ] - filters = [ - dict(name=codec.pop("id"), configuration=codec) - for codec in filter_codecs_configs - ] - else: - filters = [] + try: + from zarr.core.metadata.v3 import parse_codecs + except ImportError: + raise ImportError( + "zarr v3 is required to generate v3 codec pipelines" + ) # Noting here that zarr v3 has very few codecs specificed in the official spec, # and that there are far more codecs in `numcodecs`. We take a gamble and assume # that the codec names and configuration are simply mapped into zarrv3 "configurables". - if self.compressor: - compressor = [_num_codec_config_to_configurable(self.compressor)] + if self.filters: + codec_configs = [ + _num_codec_config_to_configurable(filter) for filter in self.filters + ] else: - compressor = [] + codec_configs = [] + if self.compressor: + codec_configs.append(_num_codec_config_to_configurable(self.compressor)) # https://zarr-specs.readthedocs.io/en/latest/v3/codecs/transpose/v1.0.html#transpose-codec-v1 # Either "C" or "F", defining the layout of bytes within each chunk of the array. # "C" means row-major order, i.e., the last dimension varies fastest; # "F" means column-major order, i.e., the first dimension varies fastest. - if self.order == "C": - order = tuple(range(len(self.shape))) - elif self.order == "F": + # For now, we only need transpose if the order is not "C" + if self.order == "F": order = tuple(reversed(range(len(self.shape)))) - - transpose = dict(name="transpose", configuration=dict(order=order)) + transpose = dict(name="transpose", configuration=dict(order=order)) + codec_configs.append(transpose) # https://github.com/zarr-developers/zarr-python/pull/1944#issuecomment-2151994097 # "If no ArrayBytesCodec is supplied, we can auto-add a BytesCodec" bytes = dict( name="bytes", configuration={} ) # TODO need to handle endianess configuration - + codec_configs.append(bytes) # The order here is significant! # [ArrayArray] -> ArrayBytes -> [BytesBytes] - raw_codec_pipeline = [transpose, bytes] + compressor + filters + codec_configs.reverse() # convert the pipeline repr into actual v3 codec objects - codec_pipeline = parse_codecs(raw_codec_pipeline) + codec_pipeline = parse_codecs(codec_configs) return codec_pipeline @@ -218,4 +217,7 @@ def _num_codec_config_to_configurable(num_codec: dict) -> dict: Convert a numcodecs codec into a zarr v3 configurable. """ num_codec_copy = num_codec.copy() - return {"name": num_codec_copy.pop("id"), "configuration": num_codec_copy} + name = num_codec_copy.pop("id") + if name == 'zlib': + name = 'gzip' + return {"name": name, "configuration": num_codec_copy} From 666b6769e6bd735dc3f8340e9b2fe2dd0c903b9e Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Sat, 12 Oct 2024 07:01:47 -0400 Subject: [PATCH 36/39] Gzip an d zlib are not directly working --- virtualizarr/zarr.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/virtualizarr/zarr.py b/virtualizarr/zarr.py index 16e1a9a..550e083 100644 --- a/virtualizarr/zarr.py +++ b/virtualizarr/zarr.py @@ -218,6 +218,4 @@ def _num_codec_config_to_configurable(num_codec: dict) -> dict: """ num_codec_copy = num_codec.copy() name = num_codec_copy.pop("id") - if name == 'zlib': - name = 'gzip' return {"name": name, "configuration": num_codec_copy} From 9076ad76e3f6d65e6d7b161a906ce651182d75ee Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Sun, 13 Oct 2024 14:26:09 -0400 Subject: [PATCH 37/39] Get up working with numcodecs zarr 3 codecs --- virtualizarr/writers/icechunk.py | 1 - virtualizarr/zarr.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index f789ebf..965a766 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -118,7 +118,6 @@ async def write_virtual_variable_to_icechunk( zarray = ma.zarray # creates array if it doesn't already exist - print(name, zarray.fill_value) arr = group.require_array( name=name, shape=zarray.shape, diff --git a/virtualizarr/zarr.py b/virtualizarr/zarr.py index 550e083..3dd2055 100644 --- a/virtualizarr/zarr.py +++ b/virtualizarr/zarr.py @@ -217,5 +217,5 @@ def _num_codec_config_to_configurable(num_codec: dict) -> dict: Convert a numcodecs codec into a zarr v3 configurable. """ num_codec_copy = num_codec.copy() - name = num_codec_copy.pop("id") + name = "numcodecs." + num_codec_copy.pop("id") return {"name": name, "configuration": num_codec_copy} From 7a160fd15c11b77a9462c6a2af98eae5d57f96df Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Mon, 14 Oct 2024 13:48:34 -0400 Subject: [PATCH 38/39] Update codec pipeline --- virtualizarr/zarr.py | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/virtualizarr/zarr.py b/virtualizarr/zarr.py index 3dd2055..d047f91 100644 --- a/virtualizarr/zarr.py +++ b/virtualizarr/zarr.py @@ -160,18 +160,8 @@ def _v3_codec_pipeline(self) -> Any: raise ImportError( "zarr v3 is required to generate v3 codec pipelines" ) - - # Noting here that zarr v3 has very few codecs specificed in the official spec, - # and that there are far more codecs in `numcodecs`. We take a gamble and assume - # that the codec names and configuration are simply mapped into zarrv3 "configurables". - if self.filters: - codec_configs = [ - _num_codec_config_to_configurable(filter) for filter in self.filters - ] - else: - codec_configs = [] - if self.compressor: - codec_configs.append(_num_codec_config_to_configurable(self.compressor)) + + codec_configs = [] # https://zarr-specs.readthedocs.io/en/latest/v3/codecs/transpose/v1.0.html#transpose-codec-v1 # Either "C" or "F", defining the layout of bytes within each chunk of the array. @@ -182,15 +172,24 @@ def _v3_codec_pipeline(self) -> Any: order = tuple(reversed(range(len(self.shape)))) transpose = dict(name="transpose", configuration=dict(order=order)) codec_configs.append(transpose) + # https://github.com/zarr-developers/zarr-python/pull/1944#issuecomment-2151994097 # "If no ArrayBytesCodec is supplied, we can auto-add a BytesCodec" bytes = dict( name="bytes", configuration={} ) # TODO need to handle endianess configuration codec_configs.append(bytes) - # The order here is significant! - # [ArrayArray] -> ArrayBytes -> [BytesBytes] - codec_configs.reverse() + + # Noting here that zarr v3 has very few codecs specificed in the official spec, + # and that there are far more codecs in `numcodecs`. We take a gamble and assume + # that the codec names and configuration are simply mapped into zarrv3 "configurables". + if self.filters: + codec_configs.extend([ + _num_codec_config_to_configurable(filter) for filter in self.filters + ]) + + if self.compressor: + codec_configs.append(_num_codec_config_to_configurable(self.compressor)) # convert the pipeline repr into actual v3 codec objects codec_pipeline = parse_codecs(codec_configs) From 8f1f96e1f75a1080e80e60e17e8137902162df7e Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Tue, 15 Oct 2024 10:10:27 -0400 Subject: [PATCH 39/39] oUdpate to latest icechunk using sync api --- virtualizarr/writers/icechunk.py | 42 ++++++++++++-------------------- 1 file changed, 15 insertions(+), 27 deletions(-) diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index 965a766..b7a5982 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -25,7 +25,7 @@ } -async def dataset_to_icechunk_async(ds: Dataset, store: "IcechunkStore") -> None: +def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: """ Write an xarray dataset whose variables wrap ManifestArrays to an Icechunk store. @@ -53,40 +53,28 @@ async def dataset_to_icechunk_async(ds: Dataset, store: "IcechunkStore") -> None for k, v in ds.attrs.items(): root_group.attrs[k] = encode_zarr_attr_value(v) - return await write_variables_to_icechunk_group( + return write_variables_to_icechunk_group( ds.variables, store=store, group=root_group, ) -def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: - asyncio.run( - dataset_to_icechunk_async(ds=ds, store=store) - ) - - -async def write_variables_to_icechunk_group( +def write_variables_to_icechunk_group( variables, store, group, ): - # we should be able to write references for each variable concurrently - # TODO we could also write to multiple groups concurrently, i.e. in a future DataTree.to_zarr(icechunkstore) - await asyncio.gather( - *( - write_variable_to_icechunk( - store=store, - group=group, - name=name, - var=var, - ) - for name, var in variables.items() + for name, var in variables.items(): + write_variable_to_icechunk( + store=store, + group=group, + name=name, + var=var, ) - ) -async def write_variable_to_icechunk( +def write_variable_to_icechunk( store: "IcechunkStore", group: Group, name: str, @@ -95,7 +83,7 @@ async def write_variable_to_icechunk( """Write a single (possibly virtual) variable into an icechunk store""" if isinstance(var.data, ManifestArray): - await write_virtual_variable_to_icechunk( + write_virtual_variable_to_icechunk( store=store, group=group, name=name, @@ -107,7 +95,7 @@ async def write_variable_to_icechunk( print("skipping non-virtual variable", name) -async def write_virtual_variable_to_icechunk( +def write_virtual_variable_to_icechunk( store: "IcechunkStore", group: Group, name: str, @@ -139,7 +127,7 @@ async def write_virtual_variable_to_icechunk( if k in _encoding_keys: arr.attrs[k] = encode_zarr_attr_value(v) - await write_manifest_virtual_refs( + write_manifest_virtual_refs( store=store, group=group, arr_name=name, @@ -147,7 +135,7 @@ async def write_virtual_variable_to_icechunk( ) -async def write_manifest_virtual_refs( +def write_manifest_virtual_refs( store: "IcechunkStore", group: Group, arr_name: str, @@ -174,7 +162,7 @@ async def write_manifest_virtual_refs( chunk_key = "/".join(str(i) for i in index) # set each reference individually - await store.set_virtual_ref( + store.set_virtual_ref( # TODO it would be marginally neater if I could pass the group and name as separate args key=f"{key_prefix}/c/{chunk_key}", # should be of form 'group/arr_name/c/0/1/2', where c stands for chunks location=as_file_uri(path.item()),