Skip to content

Commit

Permalink
(fix): fall back for unsupported codecs
Browse files Browse the repository at this point in the history
  • Loading branch information
ilan-gold committed Feb 4, 2025
1 parent 1a6dc77 commit 008fd6a
Showing 1 changed file with 31 additions and 14 deletions.
45 changes: 31 additions & 14 deletions python/zarrs/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import json
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING, TypedDict

Expand Down Expand Up @@ -35,19 +36,29 @@ class UnsupportedDataTypeError(Exception):
pass


def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl:
return CodecPipelineImpl(
codec_metadata_json,
validate_checksums=config.get("codec_pipeline.validate_checksums", None),
store_empty_chunks=config.get("array.write_empty_chunks", None),
chunk_concurrent_minimum=config.get(
"codec_pipeline.chunk_concurrent_minimum", None
),
chunk_concurrent_maximum=config.get(
"codec_pipeline.chunk_concurrent_maximum", None
),
num_threads=config.get("threading.max_workers", None),
)
class UnsupportedMetadataError(Exception):
pass


def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl | None:
try:
return CodecPipelineImpl(
codec_metadata_json,
validate_checksums=config.get("codec_pipeline.validate_checksums", None),
store_empty_chunks=config.get("array.write_empty_chunks", None),
chunk_concurrent_minimum=config.get(
"codec_pipeline.chunk_concurrent_minimum", None
),
chunk_concurrent_maximum=config.get(
"codec_pipeline.chunk_concurrent_maximum", None
),
num_threads=config.get("threading.max_workers", None),
)
except TypeError as e:
if re.match(r"codec (delta|zlib) is not supported", str(e)):
return None
else:
raise e


def codecs_to_dict(codecs: Iterable[Codec]) -> Generator[dict[str, Any], None, None]:
Expand Down Expand Up @@ -120,7 +131,7 @@ class ZarrsCodecPipelineState(TypedDict):
@dataclass
class ZarrsCodecPipeline(CodecPipeline):
codecs: tuple[Codec, ...]
impl: CodecPipelineImpl
impl: CodecPipelineImpl | None
codec_metadata_json: str
python_impl: BatchedCodecPipeline

Expand Down Expand Up @@ -194,11 +205,14 @@ async def read(
if not out.dtype.isnative:
raise RuntimeError("Non-native byte order not supported")
try:
if self.impl is None:
raise UnsupportedMetadataError()
self._raise_error_on_unsupported_batch_dtype(batch_info)
chunks_desc = make_chunk_info_for_rust_with_indices(
batch_info, drop_axes, out.shape
)
except (
UnsupportedMetadataError,
DiscontiguousArrayError,
CollapsedDimensionError,
UnsupportedDataTypeError,
Expand All @@ -224,11 +238,14 @@ async def write(
drop_axes: tuple[int, ...] = (),
) -> None:
try:
if self.impl is None:
raise UnsupportedMetadataError()
self._raise_error_on_unsupported_batch_dtype(batch_info)
chunks_desc = make_chunk_info_for_rust_with_indices(
batch_info, drop_axes, value.shape
)
except (
UnsupportedMetadataError,
DiscontiguousArrayError,
CollapsedDimensionError,
UnsupportedDataTypeError,
Expand Down

0 comments on commit 008fd6a

Please sign in to comment.