Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backend Config II]: Add dataset configuration tools #555

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
de97c31
add first placeholders
CodyCBakerPhD Aug 28, 2023
1b16b7c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 28, 2023
cb38e7d
reorganize tests
CodyCBakerPhD Aug 29, 2023
1af33f3
fix conflict
CodyCBakerPhD Aug 29, 2023
bf09db3
peel out models; fix conflicts
CodyCBakerPhD Aug 29, 2023
45f2be3
fix conflict
CodyCBakerPhD Aug 29, 2023
9017d6d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 29, 2023
d67ef30
Merge branch 'add_dataset_config_helper' into add_dataset_configuration
CodyCBakerPhD Aug 29, 2023
e265529
Merge branch 'add_dataset_config_helper' into add_dataset_configuration
CodyCBakerPhD Aug 29, 2023
e0cb578
add complex validiation
CodyCBakerPhD Aug 29, 2023
2cfea13
Merge branch 'add_dataset_configuration' of https://github.com/cataly…
CodyCBakerPhD Aug 29, 2023
ab22452
add backend models exposing parallelization; enhance models; finish p…
CodyCBakerPhD Aug 31, 2023
bfa9aa0
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 31, 2023
a45e1c0
debug previous tests
CodyCBakerPhD Aug 31, 2023
907fc0d
debugs and first working tests
CodyCBakerPhD Aug 31, 2023
400893d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 31, 2023
2c0713f
add chunk and buffer shape inferece; force staticmethods in neuroconv…
CodyCBakerPhD Sep 1, 2023
db3fe9a
resolve conflict
CodyCBakerPhD Sep 1, 2023
246614a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 1, 2023
5e95d4c
extensive refinement of models and workflow; many tests; many debugs
CodyCBakerPhD Sep 1, 2023
111797d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 1, 2023
f69daf3
backend config printout and integration tests
CodyCBakerPhD Sep 1, 2023
8cd9cec
merge conflict
CodyCBakerPhD Sep 1, 2023
23b531c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 1, 2023
721e09a
remove unused equality override
CodyCBakerPhD Sep 1, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

* Add tool function `nwb_helpers.get_configurable_datasets` and corresponding private methods and dataclass for detecting datasets from an in-memory `NWBFile` that can be wrapped in an H5DataIO before being written to a new or existing file. [PR #549](https://github.com/catalystneuro/neuroconv/pull/549)

* Add tool function `nwb_helpers.get_default_dataset_configurations(nwbfile: NWBFile) -> Dict[Dataset, DatasetConfiguration]` and Pydantic models `BackendConfiguration` for representing top-level backend configuration and `nwb_helpers.DatasetConfiguration` for representing configurable properties of the datasets (chunking & compression options) depending on each backend before writing to disk.

* Add tool function `nwb_helpers.configure_datasets(nwbfile: NWBFile, dataset_configurations: Dict[Dataset, DatasetConfiguration])` for configuring backend and dataset options for an `NWBFile` before writing to disk.



# v0.4.1
Expand Down
2 changes: 1 addition & 1 deletion requirements-minimal.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ jsonschema>=3.2.0
PyYAML>=5.4
scipy>=1.4.1
h5py>=2.10.0
hdmf>=3.4.7
hdmf @ git+https://github.com/hdmf-dev/hdmf.git@master
hdmf_zarr>=0.3.0
pynwb>=2.3.2;python_version>='3.8'
psutil>=5.8.0
Expand Down
80 changes: 61 additions & 19 deletions src/neuroconv/tools/hdmf.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,77 @@
"""Collection of modifications of HDMF functions that are to be tested/used on this repo until propagation upstream."""
import math
from typing import Tuple

import numpy as np
from hdmf.data_utils import GenericDataChunkIterator as HDMFGenericDataChunkIterator
from pydantic import Field
from typing_extensions import Annotated


class GenericDataChunkIterator(HDMFGenericDataChunkIterator):
def _get_default_buffer_shape(self, buffer_gb: float = 1.0) -> Tuple[int]:
num_axes = len(self.maxshape)
chunk_bytes = np.prod(self.chunk_shape) * self.dtype.itemsize
return self.estimate_default_buffer_shape(
buffer_gb=buffer_gb, chunk_shape=self.chunk_shape, maxshape=self.maxshape, dtype=self.dtype
)

# TODO: move this to the core iterator in HDMF so it can be easily swapped out as well as run on its own
@staticmethod
def estimate_default_chunk_shape(
chunk_mb: Annotated[float, Field(gt=0.0)],
maxshape: Tuple[int, ...],
dtype: np.dtype,
) -> Tuple[int, ...]:
"""
Select chunk shape with size in MB less than the threshold of chunk_mb.

Keeps the dimensional ratios of the original data.
"""
assert chunk_mb > 0.0, f"chunk_mb ({chunk_mb}) must be greater than zero!"
# Eventually, Pydantic validation can handle this validation for us

n_dims = len(maxshape)
itemsize = dtype.itemsize
chunk_bytes = chunk_mb * 1e6

min_maxshape = min(maxshape)
v = tuple(math.floor(maxshape_axis / min_maxshape) for maxshape_axis in maxshape)
prod_v = math.prod(v)
while prod_v * itemsize > chunk_bytes and prod_v != 1:
non_unit_min_v = min(x for x in v if x != 1)
v = tuple(math.floor(x / non_unit_min_v) if x != 1 else x for x in v)
prod_v = math.prod(v)
k = math.floor((chunk_bytes / (prod_v * itemsize)) ** (1 / n_dims))
return tuple([min(k * x, maxshape[dim]) for dim, x in enumerate(v)])

# TODO: move this to the core iterator in HDMF so it can be easily swapped out as well as run on its own
@staticmethod
def estimate_default_buffer_shape(
buffer_gb: Annotated[float, Field(gt=0.0)],
chunk_shape: Tuple[int, ...],
maxshape: Tuple[int, ...],
dtype: np.dtype,
) -> Tuple[int]:
num_axes = len(maxshape)
chunk_bytes = math.prod(chunk_shape) * dtype.itemsize
assert buffer_gb > 0, f"buffer_gb ({buffer_gb}) must be greater than zero!"
assert (
buffer_gb >= chunk_bytes / 1e9
), f"buffer_gb ({buffer_gb}) must be greater than the chunk size ({chunk_bytes / 1e9})!"
assert all(
np.array(self.chunk_shape) > 0
), f"Some dimensions of chunk_shape ({self.chunk_shape}) are less than zero!"
assert all(np.array(chunk_shape) > 0), f"Some dimensions of chunk_shape ({chunk_shape}) are less than zero!"

maxshape = np.array(self.maxshape)
maxshape = np.array(maxshape)

# Early termination condition
if np.prod(maxshape) * self.dtype.itemsize / 1e9 < buffer_gb:
return tuple(self.maxshape)
if math.prod(maxshape) * dtype.itemsize / 1e9 < buffer_gb:
return tuple(maxshape)

buffer_bytes = chunk_bytes
axis_sizes_bytes = maxshape * self.dtype.itemsize
smallest_chunk_axis, second_smallest_chunk_axis, *_ = np.argsort(self.chunk_shape)
axis_sizes_bytes = maxshape * dtype.itemsize
smallest_chunk_axis, second_smallest_chunk_axis, *_ = np.argsort(chunk_shape)
target_buffer_bytes = buffer_gb * 1e9

# If the smallest full axis does not fit within the buffer size, form a square along the two smallest axes
sub_square_buffer_shape = np.array(self.chunk_shape)
sub_square_buffer_shape = np.array(chunk_shape)
if min(axis_sizes_bytes) > target_buffer_bytes:
k1 = np.floor((target_buffer_bytes / chunk_bytes) ** 0.5)
for axis in [smallest_chunk_axis, second_smallest_chunk_axis]:
Expand All @@ -40,32 +82,32 @@ def _get_default_buffer_shape(self, buffer_gb: float = 1.0) -> Tuple[int]:
chunk_to_buffer_ratio = buffer_gb * 1e9 / chunk_bytes
chunk_scaling_factor = np.floor(chunk_to_buffer_ratio ** (1 / num_axes))
unpadded_buffer_shape = [
np.clip(a=int(x), a_min=self.chunk_shape[j], a_max=self.maxshape[j])
for j, x in enumerate(chunk_scaling_factor * np.array(self.chunk_shape))
np.clip(a=int(x), a_min=chunk_shape[j], a_max=maxshape[j])
for j, x in enumerate(chunk_scaling_factor * np.array(chunk_shape))
]

unpadded_buffer_bytes = np.prod(unpadded_buffer_shape) * self.dtype.itemsize
unpadded_buffer_bytes = math.prod(unpadded_buffer_shape) * dtype.itemsize

# Method that starts by filling the smallest axis completely or calculates best partial fill
padded_buffer_shape = np.array(self.chunk_shape)
chunks_per_axis = np.ceil(maxshape / self.chunk_shape)
padded_buffer_shape = np.array(chunk_shape)
chunks_per_axis = np.ceil(maxshape / chunk_shape)
small_axis_fill_size = chunk_bytes * min(chunks_per_axis)
full_axes_used = np.zeros(shape=num_axes, dtype=bool)
if small_axis_fill_size <= target_buffer_bytes:
buffer_bytes = small_axis_fill_size
padded_buffer_shape[smallest_chunk_axis] = self.maxshape[smallest_chunk_axis]
padded_buffer_shape[smallest_chunk_axis] = maxshape[smallest_chunk_axis]
full_axes_used[smallest_chunk_axis] = True
for axis, chunks_on_axis in enumerate(chunks_per_axis):
if full_axes_used[axis]: # If the smallest axis, skip since already used
continue
if chunks_on_axis * buffer_bytes <= target_buffer_bytes: # If multiple axes can be used together
buffer_bytes *= chunks_on_axis
padded_buffer_shape[axis] = self.maxshape[axis]
padded_buffer_shape[axis] = maxshape[axis]
else: # Found an axis that is too large to use with the rest of the buffer; calculate how much can be used
k3 = np.floor(target_buffer_bytes / buffer_bytes)
padded_buffer_shape[axis] *= k3
break
padded_buffer_bytes = np.prod(padded_buffer_shape) * self.dtype.itemsize
padded_buffer_bytes = math.prod(padded_buffer_shape) * dtype.itemsize

if padded_buffer_bytes >= unpadded_buffer_bytes:
return tuple(padded_buffer_shape)
Expand Down
16 changes: 15 additions & 1 deletion src/neuroconv/tools/nwb_helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
from ._dataset_configuration import ConfigurableDataset, get_configurable_datasets
from ._dataset_and_backend_models import (
BACKEND_TO_CONFIGURATION,
BACKEND_TO_DATASET_CONFIGURATION,
BackendConfiguration,
DatasetConfiguration,
DatasetInfo,
HDF5BackendConfiguration,
HDF5DatasetConfiguration,
ZarrBackendConfiguration,
ZarrDatasetConfiguration,
)
from ._dataset_configuration import (
get_default_backend_configuration,
get_default_dataset_configurations,
)
from ._metadata_and_file_helpers import (
add_device_from_metadata,
get_default_nwbfile_metadata,
Expand Down
204 changes: 204 additions & 0 deletions src/neuroconv/tools/nwb_helpers/_dataset_and_backend_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
"""Collection of helper functions related to configuration of datasets dependent on backend."""
from typing import Any, Dict, Literal, Tuple, Type, Union

import h5py
import hdf5plugin
import psutil
import zarr
from hdmf.backends.hdf5 import H5DataIO
from hdmf.container import DataIO
from hdmf_zarr import ZarrDataIO
from nwbinspector.utils import is_module_installed
from pydantic import BaseModel, Field, root_validator


class DatasetInfo(BaseModel):
object_id: str
location: str
maxshape: Tuple[int, ...]
dtype: str # Think about how to constrain/specify this more

class Config: # noqa: D106
allow_mutation = False

def __hash__(self):
"""To allow instances of this class to be used as keys in dictionaries."""
return hash((type(self),) + tuple(self.__dict__.values()))


class DatasetConfiguration(BaseModel):
"""A data model for configruing options about an object that will become a HDF5 or Zarr Dataset in the file."""

dataset_info: DatasetInfo
chunk_shape: Tuple[int, ...]
buffer_shape: Tuple[int, ...]
compression_method: Union[str, None] # Backend configurations should specify Literals; None means no compression
compression_options: Union[Dict[str, Any], None] = None

def __str__(self) -> str:
"""Not overriding __repr__ as this is intended to render only when wrapped in print()."""
string = (
f"{self.object_name} of {self.parent}\n"
+ f"{'-' * (len(self.object_name) + 4 + len(self.parent))}\n"
+ f" {self.field}\n"
+ f" maxshape: {self.maxshape}\n"
+ f" dtype: {self.dtype}"
)
return string


_available_hdf5_filters = set(h5py.filters.decode) - set(("shuffle", "fletcher32", "scaleoffset"))
if is_module_installed(module_name="hdf5plugin"):
_available_hdf5_filters = _available_hdf5_filters | set(
(filter_.filter_name for filter_ in hdf5plugin.get_filters())
)
AVAILABLE_HDF5_COMPRESSION_METHODS = Literal[tuple(_available_hdf5_filters)]


class HDF5DatasetConfiguration(DatasetConfiguration):
"""A data model for configruing options about an object that will become a HDF5 Dataset in the file."""

compression_method: Union[AVAILABLE_HDF5_COMPRESSION_METHODS, None] = "gzip"
# TODO: actually provide better schematic rendering of options. Only support defaults in GUIDE for now
# Looks like they'll have to be hand-typed however... Can try parsing the google docstrings but no annotation typing
compression_options: Union[Dict[str, Any], None] = None


_available_zarr_filters = (
set(zarr.codec_registry.keys())
- set(
# These filters do nothing for us, or are things that ought to be implemented at lower HDMF levels
# or indirectly using HDMF data structures
(
"json2", # no data savings
"pickle", # no data savings
"vlen-utf8", # enforced by HDMF
"vlen-array", # enforced by HDMF
"vlen-bytes", # enforced by HDMF
"adler32", # checksum
"crc32", # checksum
"fixedscaleoffset", # enforced indrectly by HDMF/PyNWB data types
"base64", # unsure what this would ever be used for
"n5_wrapper", # different data format
)
)
- set( # Forbidding lossy codecs for now, but they could be allowed in the future with warnings
("astype", "bitround", "quantize")
)
)
# TODO: would like to eventually (as separate feature) add an 'auto' method to Zarr
# to harness the wider range of potential methods that are ideal for certain dtypes or structures
# E.g., 'packbits' for boolean (logical) VectorData columns
# | set(("auto",))
AVAILABLE_ZARR_COMPRESSION_METHODS = Literal[tuple(_available_zarr_filters)]


class ZarrDatasetConfiguration(DatasetConfiguration):
"""A data model for configruing options about an object that will become a Zarr Dataset in the file."""

filter_methods: Union[Tuple[AVAILABLE_ZARR_COMPRESSION_METHODS, ...], None] = None
filter_options: Union[Tuple[Dict[str, Any]], None] = None
compression_method: Union[AVAILABLE_ZARR_COMPRESSION_METHODS, None] = "gzip" # TODO: would like this to be 'auto'
# TODO: actually provide better schematic rendering of options. Only support defaults in GUIDE for now
# Looks like they'll have to be hand-typed however... Can try parsing the google docstrings but no annotation typing
compression_options: Union[Dict[str, Any], None] = None

@root_validator()
def validate_filter_methods_and_options_match(cls, values: Dict[str, Any]):
filter_methods = values["filter_methods"]
filter_options = values["filter_options"]

if filter_methods is None and filter_options is not None:
raise ValueError(f"`filter_methods` is `None` but `filter_options` is not ({filter_options})!")
elif filter_methods is None and filter_options is None:
return values

len_filter_methods = len(filter_methods)
len_filter_options = len(filter_options)
if len_filter_methods != len_filter_options:
raise ValueError(
f"Length mismatch between `filter_methods` ({len_filter_methods} methods specified) and "
f"`filter_options` ({len_filter_options} options found)! These two must match one-to-one."
)

return values

# think about extra validation that msgpack2 compression only ideal for datasets of vlen strings


class BackendConfiguration(BaseModel):
"""A model for matching collections of DatasetConfigurations specific to the HDF5 backend."""

backend: Literal["hdf5", "zarr"]
data_io: Type[DataIO]
dataset_configurations: Dict[str, DatasetConfiguration] # str is location field of DatasetConfiguration

def __str__(self) -> str:
"""Not overriding __repr__ as this is intended to render only when wrapped in print()."""
string = (
f"Configurable datasets identified using the {self.backend} backend\n"
f"{'-' * (43 + len(self.backend) + 8)}\n"
)

for dataset_configuration in self.dataset_configurations.values():
dataset_info = dataset_configuration.dataset_info
string += (
f"{dataset_info.location}\n"
f" maxshape : {dataset_info.maxshape}\n"
f" dtype : {dataset_info.dtype}\n\n"
f" chunk shape : {dataset_configuration.chunk_shape}\n"
f" buffer shape : {dataset_configuration.buffer_shape}\n"
f" compression method : {dataset_configuration.compression_method}\n"
f" compression options : {dataset_configuration.compression_options}\n\n\n"
)

return string


class HDF5BackendConfiguration(BackendConfiguration):
"""A model for matching collections of DatasetConfigurations specific to the HDF5 backend."""

backend: Literal["hdf5"] = "hdf5"
data_io: Type[H5DataIO] = H5DataIO
dataset_configurations: Dict[str, HDF5DatasetConfiguration] # str is location field of DatasetConfiguration


class ZarrBackendConfiguration(BackendConfiguration):
"""A model for matching collections of DatasetConfigurations specific to the Zarr backend."""

backend: Literal["zarr"] = "zarr"
data_io: Type[ZarrDataIO] = ZarrDataIO
dataset_configurations: Dict[str, ZarrDatasetConfiguration] # str is location field of DatasetConfiguration
number_of_jobs: int = Field(
description="Number of jobs to use in parallel during write.",
ge=-psutil.cpu_count(), # TODO: should we specify logical=False in cpu_count?
le=psutil.cpu_count(),
default=-2, # -2 translates to 'all CPU except for one'
)

def __str__(self) -> str:
"""Not overriding __repr__ as this is intended to render only when wrapped in print()."""
string = (
f"Configurable datasets identified using the {self.backend} backend\n"
f"{'-' * (43 + len(self.backend) + 8)}\n"
)

for dataset_configuration in self.dataset_configurations.values():
dataset_info = dataset_configuration.dataset_info
string += (
f"{dataset_info.location}\n"
f" maxshape : {dataset_info.maxshape}\n"
f" dtype : {dataset_info.dtype}\n\n"
f" chunk shape : {dataset_configuration.chunk_shape}\n"
f" buffer shape : {dataset_configuration.buffer_shape}\n"
f" compression method : {dataset_configuration.compression_method}\n"
f" compression options : {dataset_configuration.compression_options}\n"
f" filter methods : {dataset_configuration.filter_methods}\n"
f" filter options : {dataset_configuration.filter_options}\n\n\n"
)

return string


BACKEND_TO_DATASET_CONFIGURATION = dict(hdf5=HDF5DatasetConfiguration, zarr=ZarrDatasetConfiguration)
BACKEND_TO_CONFIGURATION = dict(hdf5=HDF5BackendConfiguration, zarr=ZarrBackendConfiguration)
Loading