-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Constrained chunk shape estimation #997
Draft
CodyCBakerPhD
wants to merge
1
commit into
dev
Choose a base branch
from
partial_chunk_size
base: dev
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -182,6 +182,12 @@ class GenericDataChunkIterator(AbstractDataChunkIterator): | |
doc="Dictionary of keyword arguments to be passed directly to tqdm.", | ||
default=None, | ||
), | ||
dict( | ||
name="chunking_strategy", | ||
type=str, | ||
doc="One of ['axis_ratio', 'uniform'].", | ||
default=None, | ||
), | ||
) | ||
|
||
@docval(*__docval_init) | ||
|
@@ -196,8 +202,23 @@ def __init__(self, **kwargs): | |
HDF5 recommends chunk size in the range of 2 to 16 MB for optimal cloud performance. | ||
https://youtu.be/rcS5vt-mKok?t=621 | ||
""" | ||
buffer_gb, buffer_shape, chunk_mb, chunk_shape, self.display_progress, progress_bar_options = getargs( | ||
"buffer_gb", "buffer_shape", "chunk_mb", "chunk_shape", "display_progress", "progress_bar_options", kwargs | ||
( | ||
buffer_gb, | ||
buffer_shape, | ||
chunk_mb, | ||
chunk_shape, | ||
self.display_progress, | ||
progress_bar_options, | ||
chunking_strategy, | ||
) = getargs( | ||
"buffer_gb", | ||
"buffer_shape", | ||
"chunk_mb", | ||
"chunk_shape", | ||
"display_progress", | ||
"progress_bar_options", | ||
"chunking_strategy", | ||
kwargs | ||
) | ||
self.progress_bar_options = progress_bar_options or dict() | ||
|
||
|
@@ -208,16 +229,28 @@ def __init__(self, **kwargs): | |
assert (buffer_gb is not None) != ( | ||
buffer_shape is not None | ||
), "Only one of 'buffer_gb' or 'buffer_shape' can be specified!" | ||
assert (chunk_mb is not None) != ( | ||
chunk_shape is not None | ||
), "Only one of 'chunk_mb' or 'chunk_shape' can be specified!" | ||
|
||
self._dtype = self._get_dtype() | ||
self._maxshape = tuple(int(x) for x in self._get_maxshape()) | ||
chunk_shape = tuple(int(x) for x in chunk_shape) if chunk_shape else chunk_shape | ||
self.chunk_shape = chunk_shape or self._get_default_chunk_shape(chunk_mb=chunk_mb) | ||
|
||
if chunk_shape is None or any(axis is None for axis in chunk_shape): | ||
self.chunk_shape = self.estimate_chunk_shape( | ||
maxshape=self._maxshape, | ||
itemsize=self._dtype.itemsize, | ||
chunk_mb=chunk_mb, | ||
chunk_shape_constraints=chunk_shape, | ||
strategy=chunking_strategy, | ||
) | ||
else: | ||
self.chunk_shape = tuple(int(x) for x in chunk_shape) | ||
|
||
buffer_shape = tuple(int(x) for x in buffer_shape) if buffer_shape else buffer_shape | ||
self.buffer_shape = buffer_shape or self._get_default_buffer_shape(buffer_gb=buffer_gb) | ||
self.buffer_shape = buffer_shape or self.estimate_buffer_shape( | ||
maxshape=self._maxshape, | ||
itemsize=self._dtype.itemsize, | ||
chunk_shape=self.chunk_shape, | ||
buffer_gb=buffer_gb | ||
) | ||
|
||
# Shape assertions | ||
assert all( | ||
|
@@ -248,12 +281,7 @@ def __init__(self, **kwargs): | |
], | ||
) | ||
self.buffer_selection_generator = ( | ||
tuple( | ||
[ | ||
slice(lower_bound, upper_bound) | ||
for lower_bound, upper_bound in zip(lower_bounds, upper_bounds) | ||
] | ||
) | ||
tuple([slice(lower_bound, upper_bound) for lower_bound, upper_bound in zip(lower_bounds, upper_bounds)]) | ||
for lower_bounds, upper_bounds in zip( | ||
product( | ||
*[ | ||
|
@@ -286,69 +314,140 @@ def __init__(self, **kwargs): | |
) | ||
self.display_progress = False | ||
|
||
@staticmethod | ||
@docval( | ||
dict( | ||
name="maxshape", | ||
type=tuple, | ||
doc="The maxshape of the data array.", | ||
), | ||
dict( | ||
name="itemsize", | ||
type=int, | ||
doc="The itemsize of the data dtype.", | ||
), | ||
dict( | ||
name="chunk_mb", | ||
type=(float, int), | ||
doc="Size of the HDF5 chunk in megabytes.", | ||
default=10.0, | ||
), | ||
dict( | ||
name="chunk_shape_constraints", | ||
type=tuple, | ||
doc="A tuple of pre-constrained lengths for each axis; set an axis to `None` to estimate it.", | ||
default=None, | ||
) | ||
), | ||
dict( | ||
name="chunking_strategy", | ||
type=str, | ||
doc="Either 'axis_ratio' or 'uniform'.", | ||
default="uniform", | ||
), | ||
) | ||
def _get_default_chunk_shape(self, **kwargs) -> Tuple[int, ...]: | ||
""" | ||
Select chunk shape with size in MB less than the threshold of chunk_mb. | ||
|
||
Keeps the dimensional ratios of the original data. | ||
""" | ||
chunk_mb = getargs("chunk_mb", kwargs) | ||
def estimate_chunk_shape(**kwargs) -> Tuple[int, ...]: | ||
"""Select chunk shape with size in MB less than the threshold of chunk_mb.""" | ||
maxshape, itemsize, chunk_mb, chunk_shape_constraints, chunking_strategy = getargs( | ||
"maxshape", "itemsize", "chunk_mb", "chunk_shape_constraints", "chunking_strategy", kwargs | ||
) | ||
assert chunk_mb > 0, f"chunk_mb ({chunk_mb}) must be greater than zero!" | ||
assert chunking_strategy in ['axis_ratio', 'uniform'], ( | ||
"Unrecognized `chunking_strategy` selected! Please select either 'axis_ratio' or 'uniform'." | ||
) | ||
|
||
maxshape = np.array(maxshape) | ||
number_of_dimensions = len(maxshape) | ||
if chunk_shape_constraints is not None: | ||
assert len(chunk_shape_constraints) == len(maxshape) | ||
chunk_shape_constraints = np.array(chunk_shape_constraints) | ||
else: | ||
chunk_shape_constraints = np.array([None for _ in range(number_of_dimensions)]) | ||
|
||
n_dims = len(self.maxshape) | ||
itemsize = self.dtype.itemsize | ||
chunk_bytes = chunk_mb * 1e6 | ||
|
||
min_maxshape = min(self.maxshape) | ||
v = tuple(math.floor(maxshape_axis / min_maxshape) for maxshape_axis in self.maxshape) | ||
prod_v = math.prod(v) | ||
while prod_v * itemsize > chunk_bytes and prod_v != 1: | ||
non_unit_min_v = min(x for x in v if x != 1) | ||
v = tuple(math.floor(x / non_unit_min_v) if x != 1 else x for x in v) | ||
none_axes = np.where([x is None for x in chunk_shape_constraints])[0] | ||
constrained_axes = np.where([x is not None for x in chunk_shape_constraints])[0] | ||
if chunking_strategy == "uniform": | ||
estimated_chunk_shape = np.array(chunk_shape_constraints) | ||
capped_axes = none_axes | ||
while any(capped_axes): | ||
number_of_free_axes = len(none_axes) | ||
|
||
# Estimate the amount to fill uniformly across the unconstrained axes | ||
# Note that math.prod is 1 if all axes are None | ||
constrained_bytes = math.prod(estimated_chunk_shape[constrained_axes]) * itemsize | ||
estimated_fill_factor = math.floor((chunk_bytes / constrained_bytes) ** (1 / number_of_free_axes)) | ||
|
||
# Update axes and constraints in the event that the fill factor pushed some axes beyond their maximum | ||
capped_axes = none_axes[np.where(maxshape[none_axes] <= estimated_fill_factor)[0]] | ||
estimated_chunk_shape[capped_axes] = maxshape[capped_axes] # Cap the estimate at the max | ||
chunk_shape_constraints[capped_axes] = maxshape[capped_axes] # Consider capped axis a constraint | ||
none_axes = np.where([x is None for x in chunk_shape_constraints])[0] | ||
constrained_axes = np.where([x is not None for x in chunk_shape_constraints])[0] | ||
estimated_chunk_shape[none_axes] = estimated_fill_factor | ||
if chunking_strategy == "axis_ratio": | ||
if any(constrained_axes): | ||
raise NotImplementedError( | ||
"`chunking_strategy='axis_ratio'` does not yet support axis constraints! " | ||
"Please use the 'uniform' strategy instead." | ||
) | ||
|
||
min_maxshape = min(maxshape) | ||
v = tuple(math.floor(maxshape_axis / min_maxshape) for maxshape_axis in maxshape) | ||
prod_v = math.prod(v) | ||
k = math.floor((chunk_bytes / (prod_v * itemsize)) ** (1 / n_dims)) | ||
return tuple([min(k * x, self.maxshape[dim]) for dim, x in enumerate(v)]) | ||
while prod_v * itemsize > chunk_bytes and prod_v != 1: | ||
non_unit_min_v = min(x for x in v if x != 1) | ||
v = tuple(math.floor(x / non_unit_min_v) if x != 1 else x for x in v) | ||
prod_v = math.prod(v) | ||
k = math.floor((chunk_bytes / (prod_v * itemsize)) ** (1 / number_of_dimensions)) | ||
estimated_chunk_shape = [min(k * x, maxshape[dim]) for dim, x in enumerate(v)] | ||
|
||
return tuple(int(x) for x in estimated_chunk_shape) | ||
|
||
@staticmethod | ||
@docval( | ||
dict( | ||
name="maxshape", | ||
type=tuple, | ||
doc="The maxshape of the data array.", | ||
), | ||
dict( | ||
name="itemsize", | ||
type=int, | ||
doc="The itemsize of the data dtype.", | ||
), | ||
dict( | ||
name="chunk_shape", | ||
type=tuple, | ||
doc="Manually defined shape of the chunks.", | ||
default=None, | ||
), | ||
dict( | ||
name="buffer_gb", | ||
type=(float, int), | ||
doc="Size of the data buffer in gigabytes. Recommended to be as much free RAM as safely available.", | ||
default=None, | ||
) | ||
), | ||
) | ||
def _get_default_buffer_shape(self, **kwargs) -> Tuple[int, ...]: | ||
def estimate_buffer_shape(**kwargs) -> Tuple[int, ...]: | ||
""" | ||
Select buffer shape with size in GB less than the threshold of buffer_gb. | ||
|
||
Keeps the dimensional ratios of the original data. | ||
Assumes the chunk_shape has already been set. | ||
""" | ||
buffer_gb = getargs("buffer_gb", kwargs) | ||
assert buffer_gb > 0, f"buffer_gb ({buffer_gb}) must be greater than zero!" | ||
assert all(chunk_axis > 0 for chunk_axis in self.chunk_shape), ( | ||
f"Some dimensions of chunk_shape ({self.chunk_shape}) are less than zero!" | ||
maxshape, itemsize, chunk_shape, buffer_gb = getargs( | ||
"maxshape", "chunk_shape", "itemsize", "buffer_gb", kwargs | ||
) | ||
assert buffer_gb > 0, f"buffer_gb ({buffer_gb}) must be greater than zero!" | ||
assert all( | ||
chunk_axis > 0 for chunk_axis in chunk_shape | ||
), f"Some dimensions of chunk_shape ({chunk_shape}) are less than zero!" | ||
|
||
k = math.floor( | ||
( | ||
buffer_gb * 1e9 / (math.prod(self.chunk_shape) * self.dtype.itemsize) | ||
) ** (1 / len(self.chunk_shape)) | ||
) | ||
return tuple( | ||
[ | ||
min(max(k * x, self.chunk_shape[j]), self.maxshape[j]) | ||
for j, x in enumerate(self.chunk_shape) | ||
] | ||
(buffer_gb * 1e9 / (math.prod(chunk_shape) * itemsize)) ** (1 / len(chunk_shape)) | ||
) | ||
return tuple([min(max(k * x, chunk_shape[j]), maxshape[j]) for j, x in enumerate(chunk_shape)]) | ||
Comment on lines
+439
to
+450
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice, pulling these attributes out made the code much more readable. |
||
|
||
def __iter__(self): | ||
return self | ||
|
@@ -427,11 +526,13 @@ def recommended_data_shape(self) -> Tuple[int, ...]: | |
@property | ||
def maxshape(self) -> Tuple[int, ...]: | ||
return self._maxshape | ||
|
||
@property | ||
def dtype(self) -> np.dtype: | ||
return self._dtype | ||
|
||
|
||
|
||
class DataChunkIterator(AbstractDataChunkIterator): | ||
""" | ||
Custom iterator class used to iterate over chunks of data. | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I think this seems like a good alternative solution, which should achieve the same answer in far fewer loops, but I agree with @oruebel that it could go in a separate function. I liked the way I broke it apart where the math was in a separate function that was just trying to achieve a vector with a certain product (not knowing that it was related to the size and shape of an array).
Also, a minor point: I would remove the
np.where
s. If you are going this way you can just index with the boolean np arrays, which will make the syntax a bit cleaner.