You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm trying to create a Zarr Store in S3, but running into what appear to be asyncio issues.
Reproducible example
import zarr
import s3fs
import logging
import os
# ID = "elevation-test"
path = "test-datacube/elevation-test.zarr"
fs = s3fs.S3FileSystem(
endpoint_url=f'{os.getenv("AWS_ENDPOINT_URL")}',
key=os.getenv("AWS_ACCESS_KEY_ID"),
secret=os.getenv("AWS_SECRET_ACCESS_KEY"),
asynchronous=True
)
store = zarr.storage.FsspecStore(
fs,
path=path
)
# Needed to change boto3 version for this to work
# Was able to get this to work when I rolled back the botocore version a la https://stackoverflow.com/questions/79375793/s3uploadfailederror-due-to-missingcontentlength-when-calling-putobject-in-mlflow
zarr.create_group(store=store, overwrite=True)
I can create the group when I roll boto3 back to <1.36.0.
Expected behavior would be to be able to create the Zarr store.
The error message is:
---------------------------------------------------------------------------
ClientError Traceback (most recent call last)
File /opt/conda/lib/python3.12/site-packages/s3fs/core.py:114, in _error_wrapper(func, args, kwargs, retries)
113 try:
--> 114 return await func(*args, **kwargs)
115 except S3_RETRYABLE_ERRORS as e:
File /opt/conda/lib/python3.12/site-packages/aiobotocore/client.py:412, in AioBaseClient._make_api_call(self, operation_name, api_params)
411 error_class = self.exceptions.from_code(error_code)
--> 412 raise error_class(parsed_response, operation_name)
413 else:
ClientError: An error occurred (MissingContentLength) when calling the PutObject operation: Unknown
The above exception was the direct cause of the following exception:
OSError Traceback (most recent call last)
Cell In[2], line 33
18 return (
19 (
20 xr_zeros(
(...)
29 .to_dataset("var")
30 )
32 ds = get_data_layout(["t1", "t2"])
---> 33 ds.to_zarr(store, mode="w", compute=False)
File /opt/conda/lib/python3.12/site-packages/xarray/core/dataset.py:2629, in Dataset.to_zarr(self, store, chunk_store, mode, synchronizer, group, encoding, compute, consolidated, append_dim, region, safe_chunks, storage_options, zarr_version, zarr_format, write_empty_chunks, chunkmanager_store_kwargs)
2461 """Write dataset contents to a zarr group.
2462
2463 Zarr chunks are determined in the following way:
(...)
2625 The I/O user guide, with more details and examples.
2626 """
2627 from xarray.backends.api import to_zarr
-> 2629 return to_zarr( # type: ignore[call-overload,misc]
2630 self,
2631 store=store,
2632 chunk_store=chunk_store,
2633 storage_options=storage_options,
2634 mode=mode,
2635 synchronizer=synchronizer,
2636 group=group,
2637 encoding=encoding,
2638 compute=compute,
2639 consolidated=consolidated,
2640 append_dim=append_dim,
2641 region=region,
2642 safe_chunks=safe_chunks,
2643 zarr_version=zarr_version,
2644 zarr_format=zarr_format,
2645 write_empty_chunks=write_empty_chunks,
2646 chunkmanager_store_kwargs=chunkmanager_store_kwargs,
2647 )
File /opt/conda/lib/python3.12/site-packages/xarray/backends/api.py:2217, in to_zarr(dataset, store, chunk_store, mode, synchronizer, group, encoding, compute, consolidated, append_dim, region, safe_chunks, storage_options, zarr_version, zarr_format, write_empty_chunks, chunkmanager_store_kwargs)
2214 already_consolidated = False
2215 consolidate_on_close = consolidated or consolidated is None
-> 2217 zstore = backends.ZarrStore.open_group(
2218 store=mapper,
2219 mode=mode,
2220 synchronizer=synchronizer,
2221 group=group,
2222 consolidated=already_consolidated,
2223 consolidate_on_close=consolidate_on_close,
2224 chunk_store=chunk_mapper,
2225 append_dim=append_dim,
2226 write_region=region,
2227 safe_chunks=safe_chunks,
2228 zarr_version=zarr_version,
2229 zarr_format=zarr_format,
2230 write_empty=write_empty_chunks,
2231 **kwargs,
2232 )
2234 dataset = zstore._validate_and_autodetect_region(dataset)
2235 zstore._validate_encoding(encoding)
File /opt/conda/lib/python3.12/site-packages/xarray/backends/zarr.py:700, in ZarrStore.open_group(cls, store, mode, synchronizer, group, consolidated, consolidate_on_close, chunk_store, storage_options, append_dim, write_region, safe_chunks, zarr_version, zarr_format, use_zarr_fill_value_as_mask, write_empty, cache_members)
675 @classmethod
676 def open_group(
677 cls,
(...)
693 cache_members: bool = True,
694 ):
695 (
696 zarr_group,
697 consolidate_on_close,
698 close_store_on_close,
699 use_zarr_fill_value_as_mask,
--> 700 ) = _get_open_params(
701 store=store,
702 mode=mode,
703 synchronizer=synchronizer,
704 group=group,
705 consolidated=consolidated,
706 consolidate_on_close=consolidate_on_close,
707 chunk_store=chunk_store,
708 storage_options=storage_options,
709 zarr_version=zarr_version,
710 use_zarr_fill_value_as_mask=use_zarr_fill_value_as_mask,
711 zarr_format=zarr_format,
712 )
714 return cls(
715 zarr_group,
716 mode,
(...)
724 cache_members,
725 )
File /opt/conda/lib/python3.12/site-packages/xarray/backends/zarr.py:1791, in _get_open_params(store, mode, synchronizer, group, consolidated, consolidate_on_close, chunk_store, storage_options, zarr_version, use_zarr_fill_value_as_mask, zarr_format)
1787 if _zarr_v3():
1788 # we have determined that we don't want to use consolidated metadata
1789 # so we set that to False to avoid trying to read it
1790 open_kwargs["use_consolidated"] = False
-> 1791 zarr_group = zarr.open_group(store, **open_kwargs)
1792 close_store_on_close = zarr_group.store is not store
1794 # we use this to determine how to handle fill_value
File /opt/conda/lib/python3.12/site-packages/zarr/_compat.py:43, in _deprecate_positional_args.<locals>._inner_deprecate_positional_args.<locals>.inner_f(*args, **kwargs)
41 extra_args = len(args) - len(all_args)
42 if extra_args <= 0:
---> 43 return f(*args, **kwargs)
45 # extra_args > 0
46 args_msg = [
47 f"{name}={arg}"
48 for name, arg in zip(kwonly_args[:extra_args], args[-extra_args:], strict=False)
49 ]
File /opt/conda/lib/python3.12/site-packages/zarr/api/synchronous.py:525, in open_group(store, mode, cache_attrs, synchronizer, path, chunk_store, storage_options, zarr_version, zarr_format, meta_array, attributes, use_consolidated)
447 @_deprecate_positional_args
448 def open_group(
449 store: StoreLike | None = None,
(...)
461 use_consolidated: bool | str | None = None,
462 ) -> Group:
463 """Open a group using file-mode-like semantics.
464
465 Parameters
(...)
522 The new group.
523 """
524 return Group(
--> 525 sync(
526 async_api.open_group(
527 store=store,
528 mode=mode,
529 cache_attrs=cache_attrs,
530 synchronizer=synchronizer,
531 path=path,
532 chunk_store=chunk_store,
533 storage_options=storage_options,
534 zarr_version=zarr_version,
535 zarr_format=zarr_format,
536 meta_array=meta_array,
537 attributes=attributes,
538 use_consolidated=use_consolidated,
539 )
540 )
541 )
File /opt/conda/lib/python3.12/site-packages/zarr/core/sync.py:142, in sync(coro, loop, timeout)
139 return_result = next(iter(finished)).result()
141 if isinstance(return_result, BaseException):
--> 142 raise return_result
143 else:
144 return return_result
File /opt/conda/lib/python3.12/site-packages/zarr/core/sync.py:98, in _runner(coro)
93 """
94 Await a coroutine and return the result of running it. If awaiting the coroutine raises an
95 exception, the exception will be returned.
96 """
97 try:
---> 98 return await coro
99 except Exception as ex:
100 return ex
File /opt/conda/lib/python3.12/site-packages/zarr/api/asynchronous.py:815, in open_group(store, mode, cache_attrs, synchronizer, path, chunk_store, storage_options, zarr_version, zarr_format, meta_array, attributes, use_consolidated)
813 overwrite = _infer_overwrite(mode)
814 _zarr_format = zarr_format or _default_zarr_format()
--> 815 return await AsyncGroup.from_store(
816 store_path,
817 zarr_format=_zarr_format,
818 overwrite=overwrite,
819 attributes=attributes,
820 )
821 raise FileNotFoundError(f"Unable to find group: {store_path}")
File /opt/conda/lib/python3.12/site-packages/zarr/core/group.py:442, in AsyncGroup.from_store(cls, store, attributes, overwrite, zarr_format)
437 attributes = attributes or {}
438 group = cls(
439 metadata=GroupMetadata(attributes=attributes, zarr_format=zarr_format),
440 store_path=store_path,
441 )
--> 442 await group._save_metadata(ensure_parents=True)
443 return group
File /opt/conda/lib/python3.12/site-packages/zarr/core/group.py:816, in AsyncGroup._save_metadata(self, ensure_parents)
806 for parent in parents:
807 awaitables.extend(
808 [
809 (parent.store_path / key).set_if_not_exists(value)
(...)
813 ]
814 )
--> 816 await asyncio.gather(*awaitables)
File /opt/conda/lib/python3.12/site-packages/zarr/abc/store.py:487, in set_or_delete(byte_setter, value)
485 await byte_setter.delete()
486 else:
--> 487 await byte_setter.set(value)
File /opt/conda/lib/python3.12/site-packages/zarr/storage/_common.py:144, in StorePath.set(self, value, byte_range)
142 if byte_range is not None:
143 raise NotImplementedError("Store.set does not have partial writes yet")
--> 144 await self.store.set(self.path, value)
File /opt/conda/lib/python3.12/site-packages/zarr/storage/_fsspec.py:276, in FsspecStore.set(self, key, value, byte_range)
274 if byte_range:
275 raise NotImplementedError
--> 276 await self.fs._pipe_file(path, value.to_bytes())
File /opt/conda/lib/python3.12/site-packages/s3fs/core.py:1164, in S3FileSystem._pipe_file(self, path, data, chunksize, max_concurrency, mode, **kwargs)
1162 # 5 GB is the limit for an S3 PUT
1163 if size < min(5 * 2**30, 2 * chunksize):
-> 1164 out = await self._call_s3(
1165 "put_object", Bucket=bucket, Key=key, Body=data, **kwargs, **match
1166 )
1167 self.invalidate_cache(path)
1168 return out
File /opt/conda/lib/python3.12/site-packages/s3fs/core.py:371, in S3FileSystem._call_s3(self, method, *akwarglist, **kwargs)
369 logger.debug("CALL: %s - %s - %s", method.__name__, akwarglist, kw2)
370 additional_kwargs = self._get_s3_method_kwargs(method, *akwarglist, **kwargs)
--> 371 return await _error_wrapper(
372 method, kwargs=additional_kwargs, retries=self.retries
373 )
File /opt/conda/lib/python3.12/site-packages/s3fs/core.py:146, in _error_wrapper(func, args, kwargs, retries)
144 err = e
145 err = translate_boto_error(err)
--> 146 raise err
OSError: [Errno 22] An error occurred (MissingContentLength) when calling the PutObject operation: Unknown
Zarr version
v3.0.2
Numcodecs version
v0.15.0
Python Version
3.12
Operating System
Linux
Installation
conda
Description
I'm trying to create a Zarr Store in S3, but running into what appear to be asyncio issues.
Reproducible example
I can create the group when I roll boto3 back to <1.36.0.
Expected behavior would be to be able to create the Zarr store.
The error message is:
Steps to reproduce
Minimum environment
Which results in the following versions:
Additional output
No response
The text was updated successfully, but these errors were encountered: