Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zarr example broken #738

Open
ianhi opened this issue Feb 17, 2025 · 4 comments
Open

Zarr example broken #738

ianhi opened this issue Feb 17, 2025 · 4 comments

Comments

@ianhi
Copy link
Contributor

ianhi commented Feb 17, 2025

When I try to follow the zarr example page on the python docs when I run this code:

icechunk.dask.store_dask(icechunk_session, sources=[dask_array], targets=[zarray])

or with allow pickling

with icechunk_session.allow_pickling():
    icechunk.dask.store_dask(icechunk_session, sources=[dask_array], targets=[zarray])

I get this confusing error:

2025-02-17 10:24:27,769 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 1 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x15e93a030>
 0. 5936991360
>.
Traceback (most recent call last):
  File "[/Users/ian/miniforge3/envs/icechunk/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 60](http://localhost:8888/Users/ian/miniforge3/envs/icechunk/lib/python3.12/site-packages/distributed/protocol/pickle.py#line=59), in dumps
    result = pickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "[/Users/ian/miniforge3/envs/icechunk/lib/python3.12/site-packages/icechunk/store.py", line 69](http://localhost:8888/Users/ian/miniforge3/envs/icechunk/lib/python3.12/site-packages/icechunk/store.py#line=68), in __getstate__
    raise ValueError(
ValueError: You must opt in to pickling this *writable* store by using `Session.allow_pickling` context manager

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "[/Users/ian/miniforge3/envs/icechunk/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 65](http://localhost:8888/Users/ian/miniforge3/envs/icechunk/lib/python3.12/site-packages/distributed/protocol/pickle.py#line=64), in dumps
    pickler.dump(x)
  File "[/Users/ian/miniforge3/envs/icechunk/lib/python3.12/site-packages/icechunk/store.py", line 69](http://localhost:8888/Users/ian/miniforge3/envs/icechunk/lib/python3.12/site-packages/icechunk/store.py#line=68), in __getstate__
    raise ValueError(
ValueError: You must opt in to pickling this *writable* store by using `Session.allow_pickling` context manager

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "[/Users/ian/miniforge3/envs/icechunk/lib/python3.12/site-packages/distributed/protocol/pickle.py", line 77](http://localhost:8888/Users/ian/miniforge3/envs/icechunk/lib/python3.12/site-packages/distributed/protocol/pickle.py#line=76), in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "[/Users/ian/miniforge3/envs/icechunk/lib/python3.12/site-packages/cloudpickle/cloudpickle.py", line 1537](http://localhost:8888/Users/ian/miniforge3/envs/icechunk/lib/python3.12/site-packages/cloudpickle/cloudpickle.py#line=1536), in dumps
    cp.dump(obj)
  File "[/Users/ian/miniforge3/envs/icechunk/lib/python3.12/site-packages/cloudpickle/cloudpickle.py", line 1303](http://localhost:8888/Users/ian/miniforge3/envs/icechunk/lib/python3.12/site-packages/cloudpickle/cloudpickle.py#line=1302), in dump
    return super().dump(obj)
           ^^^^^^^^^^^^^^^^^
  File "[/Users/ian/miniforge3/envs/icechunk/lib/python3.12/site-packages/icechunk/store.py", line 69](http://localhost:8888/Users/ian/miniforge3/envs/icechunk/lib/python3.12/site-packages/icechunk/store.py#line=68), in __getstate__
    raise ValueError(
ValueError: You must opt in to pickling this *writable* store by using `Session.allow_pickling` context manager

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
File [~/miniforge3/envs/icechunk/lib/python3.12/site-packages/distributed/protocol/pickle.py:60](http://localhost:8888/lab/workspaces/auto-Q/tree/icechunk-learning/~/miniforge3/envs/icechunk/lib/python3.12/site-packages/distributed/protocol/pickle.py#line=59), in dumps(x, buffer_callback, protocol)
     59 try:
---> 60     result = pickle.dumps(x, **dump_kwargs)
     61 except Exception:

File [~/miniforge3/envs/icechunk/lib/python3.12/site-packages/icechunk/store.py:69](http://localhost:8888/lab/workspaces/auto-Q/tree/icechunk-learning/~/miniforge3/envs/icechunk/lib/python3.12/site-packages/icechunk/store.py#line=68), in IcechunkStore.__getstate__(self)
     68 if not self._allow_pickling and not self._store.read_only:
---> 69     raise ValueError(
     70         "You must opt in to pickling this *writable* store by using `Session.allow_pickling` context manager"
     71     )
     72 d = self.__dict__.copy()

ValueError: You must opt in to pickling this *writable* store by using `Session.allow_pickling` context manager

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
File [~/miniforge3/envs/icechunk/lib/python3.12/site-packages/distributed/protocol/pickle.py:65](http://localhost:8888/lab/workspaces/auto-Q/tree/icechunk-learning/~/miniforge3/envs/icechunk/lib/python3.12/site-packages/distributed/protocol/pickle.py#line=64), in dumps(x, buffer_callback, protocol)
     64 buffers.clear()
---> 65 pickler.dump(x)
     66 result = f.getvalue()

File [~/miniforge3/envs/icechunk/lib/python3.12/site-packages/icechunk/store.py:69](http://localhost:8888/lab/workspaces/auto-Q/tree/icechunk-learning/~/miniforge3/envs/icechunk/lib/python3.12/site-packages/icechunk/store.py#line=68), in IcechunkStore.__getstate__(self)
     68 if not self._allow_pickling and not self._store.read_only:
---> 69     raise ValueError(
     70         "You must opt in to pickling this *writable* store by using `Session.allow_pickling` context manager"
     71     )
     72 d = self.__dict__.copy()

ValueError: You must opt in to pickling this *writable* store by using `Session.allow_pickling` context manager

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
File [~/miniforge3/envs/icechunk/lib/python3.12/site-packages/distributed/protocol/serialize.py:366](http://localhost:8888/lab/workspaces/auto-Q/tree/icechunk-learning/~/miniforge3/envs/icechunk/lib/python3.12/site-packages/distributed/protocol/serialize.py#line=365), in serialize(x, serializers, on_error, context, iterate_collection)
    365 try:
--> 366     header, frames = dumps(x, context=context) if wants_context else dumps(x)
    367     header["serializer"] = name

File [~/miniforge3/envs/icechunk/lib/python3.12/site-packages/distributed/protocol/serialize.py:78](http://localhost:8888/lab/workspaces/auto-Q/tree/icechunk-learning/~/miniforge3/envs/icechunk/lib/python3.12/site-packages/distributed/protocol/serialize.py#line=77), in pickle_dumps(x, context)
     76     writeable.append(not f.readonly)
---> 78 frames[0] = pickle.dumps(
     79     x,
     80     buffer_callback=buffer_callback,
     81     protocol=context.get("pickle-protocol", None) if context else None,
     82 )
     83 header = {
     84     "serializer": "pickle",
     85     "writeable": tuple(writeable),
     86 }

File [~/miniforge3/envs/icechunk/lib/python3.12/site-packages/distributed/protocol/pickle.py:77](http://localhost:8888/lab/workspaces/auto-Q/tree/icechunk-learning/~/miniforge3/envs/icechunk/lib/python3.12/site-packages/distributed/protocol/pickle.py#line=76), in dumps(x, buffer_callback, protocol)
     76     buffers.clear()
---> 77     result = cloudpickle.dumps(x, **dump_kwargs)
     78 except Exception:

File [~/miniforge3/envs/icechunk/lib/python3.12/site-packages/cloudpickle/cloudpickle.py:1537](http://localhost:8888/lab/workspaces/auto-Q/tree/icechunk-learning/~/miniforge3/envs/icechunk/lib/python3.12/site-packages/cloudpickle/cloudpickle.py#line=1536), in dumps(obj, protocol, buffer_callback)
   1536 cp = Pickler(file, protocol=protocol, buffer_callback=buffer_callback)
-> 1537 cp.dump(obj)
   1538 return file.getvalue()

File [~/miniforge3/envs/icechunk/lib/python3.12/site-packages/cloudpickle/cloudpickle.py:1303](http://localhost:8888/lab/workspaces/auto-Q/tree/icechunk-learning/~/miniforge3/envs/icechunk/lib/python3.12/site-packages/cloudpickle/cloudpickle.py#line=1302), in Pickler.dump(self, obj)
   1302 try:
-> 1303     return super().dump(obj)
   1304 except RuntimeError as e:

File [~/miniforge3/envs/icechunk/lib/python3.12/site-packages/icechunk/store.py:69](http://localhost:8888/lab/workspaces/auto-Q/tree/icechunk-learning/~/miniforge3/envs/icechunk/lib/python3.12/site-packages/icechunk/store.py#line=68), in IcechunkStore.__getstate__(self)
     68 if not self._allow_pickling and not self._store.read_only:
---> 69     raise ValueError(
     70         "You must opt in to pickling this *writable* store by using `Session.allow_pickling` context manager"
     71     )
     72 d = self.__dict__.copy()

ValueError: You must opt in to pickling this *writable* store by using `Session.allow_pickling` context manager

The above exception was the direct cause of the following exception:

TypeError                                 Traceback (most recent call last)
Cell In[8], line 4
      1 import icechunk.dask
      3 with icechunk_session.allow_pickling():
----> 4     icechunk.dask.store_dask(icechunk_session, sources=[dask_array], targets=[zarray])

File [~/miniforge3/envs/icechunk/lib/python3.12/site-packages/icechunk/dask.py:82](http://localhost:8888/lab/workspaces/auto-Q/tree/icechunk-learning/~/miniforge3/envs/icechunk/lib/python3.12/site-packages/icechunk/dask.py#line=81), in store_dask(session, sources, targets, regions, split_every, **store_kwargs)
     71 stored_arrays = dask.array.store(  # type: ignore[attr-defined]
     72     sources=sources,
     73     targets=targets,  # type: ignore[arg-type]
   (...)
     79     **store_kwargs,
     80 )
     81 # Now we tree-reduce all changesets
---> 82 merged_session = stateful_store_reduce(
     83     stored_arrays,
     84     prefix="ice-changeset",
     85     chunk=extract_session,
     86     aggregate=merge_sessions,
     87     split_every=split_every,
     88     compute=True,
     89     **store_kwargs,
     90 )
     91 session.merge(merged_session)

File [~/miniforge3/envs/icechunk/lib/python3.12/site-packages/icechunk/dask.py:206](http://localhost:8888/lab/workspaces/auto-Q/tree/icechunk-learning/~/miniforge3/envs/icechunk/lib/python3.12/site-packages/icechunk/dask.py#line=205), in stateful_store_reduce(stored_arrays, chunk, aggregate, compute, prefix, split_every, **kwargs)
    200 store_dsk = HighLevelGraph.merge(
    201     HighLevelGraph(layers, dependencies),  # type: ignore[arg-type]
    202     *[array.__dask_graph__() for array in stored_arrays],
    203 )
    204 if compute:
    205     # copied from dask.array.store
--> 206     merged_session, *_ = compute_as_if_collection(  # type: ignore[no-untyped-call]
    207         Array, store_dsk, list(layers[latest_layer].keys()), **kwargs
    208     )
    209     if TYPE_CHECKING:
    210         assert isinstance(merged_session, Session)

File [~/miniforge3/envs/icechunk/lib/python3.12/site-packages/dask/base.py:399](http://localhost:8888/lab/workspaces/auto-Q/tree/icechunk-learning/~/miniforge3/envs/icechunk/lib/python3.12/site-packages/dask/base.py#line=398), in compute_as_if_collection(cls, dsk, keys, scheduler, get, **kwargs)
    397 schedule = get_scheduler(scheduler=scheduler, cls=cls, get=get)
    398 dsk2 = optimization_function(cls)(dsk, keys, **kwargs)
--> 399 return schedule(dsk2, keys, **kwargs)

File [~/miniforge3/envs/icechunk/lib/python3.12/site-packages/distributed/client.py:3471](http://localhost:8888/lab/workspaces/auto-Q/tree/icechunk-learning/~/miniforge3/envs/icechunk/lib/python3.12/site-packages/distributed/client.py#line=3470), in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3398 def get(
   3399     self,
   3400     dsk,
   (...)
   3412     **kwargs,
   3413 ):
   3414     """Compute dask graph
   3415 
   3416     Parameters
   (...)
   3469     Client.compute : Compute asynchronous collections
   3470     """
-> 3471     futures = self._graph_to_futures(
   3472         dsk,
   3473         keys=set(flatten([keys])),
   3474         workers=workers,
   3475         allow_other_workers=allow_other_workers,
   3476         resources=resources,
   3477         fifo_timeout=fifo_timeout,
   3478         retries=retries,
   3479         user_priority=priority,
   3480         actors=actors,
   3481         span_metadata=SpanMetadata(collections=[{"type": "low-level-graph"}]),
   3482     )
   3483     packed = pack_data(keys, futures)
   3484     if sync:

File [~/miniforge3/envs/icechunk/lib/python3.12/site-packages/distributed/client.py:3364](http://localhost:8888/lab/workspaces/auto-Q/tree/icechunk-learning/~/miniforge3/envs/icechunk/lib/python3.12/site-packages/distributed/client.py#line=3363), in Client._graph_to_futures(self, dsk, keys, span_metadata, workers, allow_other_workers, internal_priority, user_priority, resources, retries, fifo_timeout, actors)
   3361 from distributed.protocol import serialize
   3362 from distributed.protocol.serialize import ToPickle
-> 3364 header, frames = serialize(ToPickle(dsk), on_error="raise")
   3366 pickled_size = sum(map(nbytes, [header] + frames))
   3367 if pickled_size > parse_bytes(
   3368     dask.config.get("distributed.admin.large-graph-warning-threshold")
   3369 ):

File [~/miniforge3/envs/icechunk/lib/python3.12/site-packages/distributed/protocol/serialize.py:392](http://localhost:8888/lab/workspaces/auto-Q/tree/icechunk-learning/~/miniforge3/envs/icechunk/lib/python3.12/site-packages/distributed/protocol/serialize.py#line=391), in serialize(x, serializers, on_error, context, iterate_collection)
    390     except Exception:
    391         raise TypeError(msg) from exc
--> 392     raise TypeError(msg, str_x) from exc
    393 else:  # pragma: nocover
    394     raise ValueError(f"{on_error=}; expected 'message' or 'raise'")

TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x15e93a030>\n 0. 5936991360\n>')
@dcherian
Copy link
Contributor

A gotcha here is that you need to allow pickling before creating the zarray which embeds a copy of the store and therefore session.

@paraseba
Copy link
Collaborator

ahh great catch @dcherian

@ianhi
Copy link
Contributor Author

ianhi commented Feb 17, 2025

I also had to wrap the creation of the group into allow pickling. Based on the distributed wites example I would have thought that if threads are used then pickling isn't necessary, but I also get a similar error when I start a dask cluster using processes=False

@dcherian
Copy link
Contributor

dcherian commented Feb 17, 2025

So that's a dask gotcha. If you use distributed you have at least one worker (so at least one process). If you don't create a cluster, or using dask.config.set(scheduler="threads") it will work.

I also had to wrap the creation of the group into allow pickling.

This also embeds a copy of the store and session

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants