From 8a6d4e8fc7604e85ba2b3d9b669f093e6901ec2e Mon Sep 17 00:00:00 2001 From: Cody Baker Date: Wed, 8 Nov 2023 18:38:48 -0500 Subject: [PATCH] refactor of pr 996 --- src/hdmf/data_utils.py | 195 +++++++++++++++++++++++++++++++---------- 1 file changed, 148 insertions(+), 47 deletions(-) diff --git a/src/hdmf/data_utils.py b/src/hdmf/data_utils.py index 941c3f8c7..61e1314d8 100644 --- a/src/hdmf/data_utils.py +++ b/src/hdmf/data_utils.py @@ -182,6 +182,12 @@ class GenericDataChunkIterator(AbstractDataChunkIterator): doc="Dictionary of keyword arguments to be passed directly to tqdm.", default=None, ), + dict( + name="chunking_strategy", + type=str, + doc="One of ['axis_ratio', 'uniform'].", + default=None, + ), ) @docval(*__docval_init) @@ -196,8 +202,23 @@ def __init__(self, **kwargs): HDF5 recommends chunk size in the range of 2 to 16 MB for optimal cloud performance. https://youtu.be/rcS5vt-mKok?t=621 """ - buffer_gb, buffer_shape, chunk_mb, chunk_shape, self.display_progress, progress_bar_options = getargs( - "buffer_gb", "buffer_shape", "chunk_mb", "chunk_shape", "display_progress", "progress_bar_options", kwargs + ( + buffer_gb, + buffer_shape, + chunk_mb, + chunk_shape, + self.display_progress, + progress_bar_options, + chunking_strategy, + ) = getargs( + "buffer_gb", + "buffer_shape", + "chunk_mb", + "chunk_shape", + "display_progress", + "progress_bar_options", + "chunking_strategy", + kwargs ) self.progress_bar_options = progress_bar_options or dict() @@ -208,16 +229,28 @@ def __init__(self, **kwargs): assert (buffer_gb is not None) != ( buffer_shape is not None ), "Only one of 'buffer_gb' or 'buffer_shape' can be specified!" - assert (chunk_mb is not None) != ( - chunk_shape is not None - ), "Only one of 'chunk_mb' or 'chunk_shape' can be specified!" self._dtype = self._get_dtype() self._maxshape = tuple(int(x) for x in self._get_maxshape()) - chunk_shape = tuple(int(x) for x in chunk_shape) if chunk_shape else chunk_shape - self.chunk_shape = chunk_shape or self._get_default_chunk_shape(chunk_mb=chunk_mb) + + if chunk_shape is None or any(axis is None for axis in chunk_shape): + self.chunk_shape = self.estimate_chunk_shape( + maxshape=self._maxshape, + itemsize=self._dtype.itemsize, + chunk_mb=chunk_mb, + chunk_shape_constraints=chunk_shape, + strategy=chunking_strategy, + ) + else: + self.chunk_shape = tuple(int(x) for x in chunk_shape) + buffer_shape = tuple(int(x) for x in buffer_shape) if buffer_shape else buffer_shape - self.buffer_shape = buffer_shape or self._get_default_buffer_shape(buffer_gb=buffer_gb) + self.buffer_shape = buffer_shape or self.estimate_buffer_shape( + maxshape=self._maxshape, + itemsize=self._dtype.itemsize, + chunk_shape=self.chunk_shape, + buffer_gb=buffer_gb + ) # Shape assertions assert all( @@ -248,12 +281,7 @@ def __init__(self, **kwargs): ], ) self.buffer_selection_generator = ( - tuple( - [ - slice(lower_bound, upper_bound) - for lower_bound, upper_bound in zip(lower_bounds, upper_bounds) - ] - ) + tuple([slice(lower_bound, upper_bound) for lower_bound, upper_bound in zip(lower_bounds, upper_bounds)]) for lower_bounds, upper_bounds in zip( product( *[ @@ -286,69 +314,140 @@ def __init__(self, **kwargs): ) self.display_progress = False + @staticmethod @docval( + dict( + name="maxshape", + type=tuple, + doc="The maxshape of the data array.", + ), + dict( + name="itemsize", + type=int, + doc="The itemsize of the data dtype.", + ), dict( name="chunk_mb", type=(float, int), doc="Size of the HDF5 chunk in megabytes.", + default=10.0, + ), + dict( + name="chunk_shape_constraints", + type=tuple, + doc="A tuple of pre-constrained lengths for each axis; set an axis to `None` to estimate it.", default=None, - ) + ), + dict( + name="chunking_strategy", + type=str, + doc="Either 'axis_ratio' or 'uniform'.", + default="uniform", + ), ) - def _get_default_chunk_shape(self, **kwargs) -> Tuple[int, ...]: - """ - Select chunk shape with size in MB less than the threshold of chunk_mb. - - Keeps the dimensional ratios of the original data. - """ - chunk_mb = getargs("chunk_mb", kwargs) + def estimate_chunk_shape(**kwargs) -> Tuple[int, ...]: + """Select chunk shape with size in MB less than the threshold of chunk_mb.""" + maxshape, itemsize, chunk_mb, chunk_shape_constraints, chunking_strategy = getargs( + "maxshape", "itemsize", "chunk_mb", "chunk_shape_constraints", "chunking_strategy", kwargs + ) assert chunk_mb > 0, f"chunk_mb ({chunk_mb}) must be greater than zero!" + assert chunking_strategy in ['axis_ratio', 'uniform'], ( + "Unrecognized `chunking_strategy` selected! Please select either 'axis_ratio' or 'uniform'." + ) + + maxshape = np.array(maxshape) + number_of_dimensions = len(maxshape) + if chunk_shape_constraints is not None: + assert len(chunk_shape_constraints) == len(maxshape) + chunk_shape_constraints = np.array(chunk_shape_constraints) + else: + chunk_shape_constraints = np.array([None for _ in range(number_of_dimensions)]) - n_dims = len(self.maxshape) - itemsize = self.dtype.itemsize chunk_bytes = chunk_mb * 1e6 - min_maxshape = min(self.maxshape) - v = tuple(math.floor(maxshape_axis / min_maxshape) for maxshape_axis in self.maxshape) - prod_v = math.prod(v) - while prod_v * itemsize > chunk_bytes and prod_v != 1: - non_unit_min_v = min(x for x in v if x != 1) - v = tuple(math.floor(x / non_unit_min_v) if x != 1 else x for x in v) + none_axes = np.where([x is None for x in chunk_shape_constraints])[0] + constrained_axes = np.where([x is not None for x in chunk_shape_constraints])[0] + if chunking_strategy == "uniform": + estimated_chunk_shape = np.array(chunk_shape_constraints) + capped_axes = none_axes + while any(capped_axes): + number_of_free_axes = len(none_axes) + + # Estimate the amount to fill uniformly across the unconstrained axes + # Note that math.prod is 1 if all axes are None + constrained_bytes = math.prod(estimated_chunk_shape[constrained_axes]) * itemsize + estimated_fill_factor = math.floor((chunk_bytes / constrained_bytes) ** (1 / number_of_free_axes)) + + # Update axes and constraints in the event that the fill factor pushed some axes beyond their maximum + capped_axes = none_axes[np.where(maxshape[none_axes] <= estimated_fill_factor)[0]] + estimated_chunk_shape[capped_axes] = maxshape[capped_axes] # Cap the estimate at the max + chunk_shape_constraints[capped_axes] = maxshape[capped_axes] # Consider capped axis a constraint + none_axes = np.where([x is None for x in chunk_shape_constraints])[0] + constrained_axes = np.where([x is not None for x in chunk_shape_constraints])[0] + estimated_chunk_shape[none_axes] = estimated_fill_factor + if chunking_strategy == "axis_ratio": + if any(constrained_axes): + raise NotImplementedError( + "`chunking_strategy='axis_ratio'` does not yet support axis constraints! " + "Please use the 'uniform' strategy instead." + ) + + min_maxshape = min(maxshape) + v = tuple(math.floor(maxshape_axis / min_maxshape) for maxshape_axis in maxshape) prod_v = math.prod(v) - k = math.floor((chunk_bytes / (prod_v * itemsize)) ** (1 / n_dims)) - return tuple([min(k * x, self.maxshape[dim]) for dim, x in enumerate(v)]) + while prod_v * itemsize > chunk_bytes and prod_v != 1: + non_unit_min_v = min(x for x in v if x != 1) + v = tuple(math.floor(x / non_unit_min_v) if x != 1 else x for x in v) + prod_v = math.prod(v) + k = math.floor((chunk_bytes / (prod_v * itemsize)) ** (1 / number_of_dimensions)) + estimated_chunk_shape = [min(k * x, maxshape[dim]) for dim, x in enumerate(v)] + + return tuple(int(x) for x in estimated_chunk_shape) + @staticmethod @docval( + dict( + name="maxshape", + type=tuple, + doc="The maxshape of the data array.", + ), + dict( + name="itemsize", + type=int, + doc="The itemsize of the data dtype.", + ), + dict( + name="chunk_shape", + type=tuple, + doc="Manually defined shape of the chunks.", + default=None, + ), dict( name="buffer_gb", type=(float, int), doc="Size of the data buffer in gigabytes. Recommended to be as much free RAM as safely available.", default=None, - ) + ), ) - def _get_default_buffer_shape(self, **kwargs) -> Tuple[int, ...]: + def estimate_buffer_shape(**kwargs) -> Tuple[int, ...]: """ Select buffer shape with size in GB less than the threshold of buffer_gb. Keeps the dimensional ratios of the original data. Assumes the chunk_shape has already been set. """ - buffer_gb = getargs("buffer_gb", kwargs) - assert buffer_gb > 0, f"buffer_gb ({buffer_gb}) must be greater than zero!" - assert all(chunk_axis > 0 for chunk_axis in self.chunk_shape), ( - f"Some dimensions of chunk_shape ({self.chunk_shape}) are less than zero!" + maxshape, itemsize, chunk_shape, buffer_gb = getargs( + "maxshape", "chunk_shape", "itemsize", "buffer_gb", kwargs ) + assert buffer_gb > 0, f"buffer_gb ({buffer_gb}) must be greater than zero!" + assert all( + chunk_axis > 0 for chunk_axis in chunk_shape + ), f"Some dimensions of chunk_shape ({chunk_shape}) are less than zero!" k = math.floor( - ( - buffer_gb * 1e9 / (math.prod(self.chunk_shape) * self.dtype.itemsize) - ) ** (1 / len(self.chunk_shape)) - ) - return tuple( - [ - min(max(k * x, self.chunk_shape[j]), self.maxshape[j]) - for j, x in enumerate(self.chunk_shape) - ] + (buffer_gb * 1e9 / (math.prod(chunk_shape) * itemsize)) ** (1 / len(chunk_shape)) ) + return tuple([min(max(k * x, chunk_shape[j]), maxshape[j]) for j, x in enumerate(chunk_shape)]) def __iter__(self): return self @@ -427,11 +526,13 @@ def recommended_data_shape(self) -> Tuple[int, ...]: @property def maxshape(self) -> Tuple[int, ...]: return self._maxshape + @property def dtype(self) -> np.dtype: return self._dtype + class DataChunkIterator(AbstractDataChunkIterator): """ Custom iterator class used to iterate over chunks of data.