diff --git a/mala/datahandling/data_shuffler.py b/mala/datahandling/data_shuffler.py index 9303b0ee..96fd9ebc 100644 --- a/mala/datahandling/data_shuffler.py +++ b/mala/datahandling/data_shuffler.py @@ -288,6 +288,187 @@ def __init__(self): self.rank = 0 self.size = 1 + # Takes the `extent` of an n-dimensional grid, the coordinates of a start + # point `x` and of an end point `y`. + # Interpreting the grid as a row-major n-dimensional array yields a + # contiguous slice from `x` to `y`. + # !!! Both `x` and `y` are **inclusive** ends as the semantics of what would + # be the next exclusive upper limit are unnecessarily complex in + # n dimensions, especially since the function recurses over the number of + # dimensions. !!! + # This slice is not in general an n-dimensional cuboid within the grid, but + # may be a non-convex form composed of multiple cuboid chunks. + # + # This function returns a list of cuboid sub-chunks of the n-dimensional + # grid such that: + # + # 1. The union of those chunks is equal to the contiguous slice spanned + # between `x` and `y`, both ends inclusive. + # 2. The chunks do not overlap. + # 3. Each single chunk is a contiguous slice within the row-major grid. + # 4. The chunks are sorted in ascending order of their position within the + # row-major grid. + # + # Row-major within the context of this function means that the indexes go + # from the slowest-running dimension at the left end to the fastest-running + # dimension at the right end. + # + # The output is a list of chunks as defined in openPMD, i.e. a start offset + # and the extent, counted from this offset (not from the grid origin!). + # + # Example: From the below 2D grid with extent [8, 10], the slice marked by + # the X-es should be loaded. The slice is defined by its first item + # at [2, 3] and its last item at [6, 6]. + # The result would be a list of three chunks: + # + # 1. ([2, 3], [1, 7]) + # 2. ([3, 0], [3, 10]) + # 3. ([6, 0], [1, 7]) + # + # OOOOOOOOOO + # OOOOOOOOOO + # OOOXXXXXXX + # XXXXXXXXXX + # XXXXXXXXXX + # XXXXXXXXXX + # XXXXXXXOOO + # OOOOOOOOOO + + def __contiguous_slice_within_ndim_grid_as_blocks( + extent: list[int], x: list[int], y: list[int] + ): + # Used for converting a block defined by inclusive lower and upper + # coordinate to a chunk as defined by openPMD. + # The openPMD extent is defined by (to-from)+1 (to make up for the + # inclusive upper end). + def get_extent(from_, to_): + res = [upper + 1 - lower for (lower, upper) in zip(from_, to_)] + if any(x < 1 for x in res): + raise RuntimeError( + f"Cannot compute block extent from {from_} to {to_}." + ) + return res + + if len(extent) != len(x): + raise RuntimeError( + f"__shuffle_openpmd: Internal indexing error extent={extent} and x={x} must have the same length. This is a bug." + ) + if len(extent) != len(y): + raise RuntimeError( + f"__shuffle_openpmd: Internal indexing error extent={extent} and y={y} must have the same length. This is a bug." + ) + + # Recursive bottom cases + # 1. No items left + if y < x: + return [] + # 2. No distinct dimensions left + elif x[0:-1] == y[0:-1]: + return [(x.copy(), get_extent(x, y))] + + # Take the frontside and backside one-dimensional slices with extent + # defined by the last dimension, process the remainder recursively. + + # The offset of the front slice is equal to x. + front_slice = (x.copy(), [1 for _ in range(len(x))]) + # The chunk extent in the last dimension is the distance from the x + # coordinate to the grid extent in the last dimension. + # As the slice is one-dimensional, the extent is 1 in all other + # dimensions. + front_slice[1][-1] = extent[-1] - x[-1] + + # Similar for the back slice. + # The offset of the back slice is 0 in the last dimension, equal to y + # in all other dimensions. + back_slice = (y.copy(), [1 for _ in range(len(y))]) + back_slice[0][-1] = 0 + # The extent is equal to the coordinate of y+1 (to convert inclusive to + # exclusive upper end) in the last dimension, 1 otherwise. + back_slice[1][-1] = y[-1] + 1 + + # Strip the last (i.e. fast-running) dimension for a recursive call with + # one dimensionality reduced. + recursive_x = x[0:-1] + recursive_y = y[0:-1] + + # Since the first and last row are dealt with above, those rows are now + # stripped from the second-to-last dimension for the recursive call + # (in which they will be the last dimension) + recursive_x[-1] += 1 + recursive_y[-1] -= 1 + + rec_res = DataShuffler.__contiguous_slice_within_ndim_grid_as_blocks( + extent[0:-1], recursive_x, recursive_y + ) + + # Add the last dimension again. The last dimension is covered from start + # to end since the leftovers have been dealt with by the front and back + # slice. + rec_res = [(xx + [0], yy + [extent[-1]]) for (xx, yy) in rec_res] + + return [front_slice] + rec_res + [back_slice] + + # Interpreting `ndim_extent` as the extents of an n-dimensional grid, + # returns the n-dimensional coordinate of the `idx`th item in the row-major + # representation of the grid. + def __resolve_flattened_index_into_ndim(idx: int, ndim_extent: list[int]): + if not ndim_extent: + raise RuntimeError("Cannot index into a zero-dimensional array.") + strides = [] + current_stride = 1 + for ext in reversed(ndim_extent): + strides = [current_stride] + strides + # sic!, the last stride is ignored, as it's just the entire grid + # extent + current_stride *= ext + + def worker(inner_idx, inner_strides): + if not inner_strides: + if inner_idx != 0: + raise RuntimeError( + "This cannot happen. There is bug somewhere.") + else: + return [] + div, mod = divmod(inner_idx, inner_strides[0]) + return [div] + worker(mod, inner_strides[1:]) + + return worker(idx, strides) + + # Load a chunk from the openPMD `record` into the buffer at `arr`. + # The indexes `offset` and `extent` are scalar 1-dimensional coordinates, + # and apply to the n-dimensional record by reinterpreting (reshaped) it as + # a one-dimensional array. + # The function deals internally with splitting the 1-dimensional slice into + # a sequence of n-dimensional block load operations. + def __load_chunk_1D(mesh, arr, offset, extent): + start_idx = DataShuffler.__resolve_flattened_index_into_ndim( + offset, mesh.shape + ) + # Inclusive upper end. See the documentation in + # __contiguous_slice_within_ndim_grid_as_blocks() for the reason why + # we work with inclusive upper ends. + end_idx = DataShuffler.__resolve_flattened_index_into_ndim( + offset + extent - 1, mesh.shape + ) + blocks_to_load = ( + DataShuffler.__contiguous_slice_within_ndim_grid_as_blocks( + mesh.shape, start_idx, end_idx + ) + ) + # print(f"\n\nLOADING {offset}\t+{extent}\tFROM {mesh.shape}") + current_offset = 0 # offset within arr + for nd_offset, nd_extent in blocks_to_load: + flat_extent = np.prod(nd_extent) + # print( + # f"\t{nd_offset}\t-{nd_extent}\t->[{current_offset}:{current_offset + flat_extent}]" + # ) + mesh.load_chunk( + arr[current_offset : current_offset + flat_extent], + nd_offset, + nd_extent, + ) + current_offset += flat_extent + def __shuffle_openpmd( self, dot: __DescriptorOrTarget, @@ -369,23 +550,12 @@ def __shuffle_openpmd( # This gets the offset and extent of the i'th such slice. # The extent is given as in openPMD, i.e. the size of the block # (not its upper coordinate). - def from_chunk_i(i, n, dset, slice_dimension=0): + def from_chunk_i(i, n, dset): if isinstance(dset, io.Dataset): dset = dset.extent - dset = list(dset) - offset = [0 for _ in dset] - extent = dset - extent_dim_0 = dset[slice_dimension] - if extent_dim_0 % n != 0: - raise Exception( - "Dataset {} cannot be split into {} chunks on dimension {}.".format( - dset, n, slice_dimension - ) - ) - single_chunk_len = extent_dim_0 // n - offset[slice_dimension] = i * single_chunk_len - extent[slice_dimension] = single_chunk_len - return offset, extent + flat_extent = np.prod(dset) + one_slice_extent = flat_extent // n + return i * one_slice_extent, one_slice_extent import json @@ -446,9 +616,10 @@ def from_chunk_i(i, n, dset, slice_dimension=0): i, number_of_new_snapshots, extent_in ) to_chunk_offset = to_chunk_extent - to_chunk_extent = to_chunk_offset + np.prod(from_chunk_extent) + to_chunk_extent = to_chunk_offset + from_chunk_extent for dimension in range(len(mesh_in)): - mesh_in[str(dimension)].load_chunk( + DataShuffler.__load_chunk_1D( + mesh_in[str(dimension)], new_array[dimension, to_chunk_offset:to_chunk_extent], from_chunk_offset, from_chunk_extent, @@ -552,24 +723,6 @@ def shuffle_snapshots( if number_of_shuffled_snapshots is None: number_of_shuffled_snapshots = self.nr_snapshots - # Currently, the openPMD interface is not feature-complete. - if snapshot_type == "openpmd" and np.any( - np.array( - [ - snapshot.grid_dimension[0] % number_of_shuffled_snapshots - for snapshot in self.parameters.snapshot_directories_list - ] - ) - != 0 - ): - raise ValueError( - "Shuffling from OpenPMD files currently only " - "supported if first dimension of all snapshots " - "can evenly be divided by number of snapshots. " - "Please select a different number of shuffled " - "snapshots or use the numpy interface. " - ) - shuffled_gridsizes = snapshot_size_list // number_of_shuffled_snapshots if np.any( diff --git a/test/shuffling_test.py b/test/shuffling_test.py index 2ac09801..40e34544 100644 --- a/test/shuffling_test.py +++ b/test/shuffling_test.py @@ -327,7 +327,9 @@ def test_training_openpmd(self): new_loss = test_trainer.final_validation_loss assert old_loss > new_loss - def test_arbitrary_number_snapshots(self): + def worker_arbitrary_number_snapshots( + self, ext_in, ext_out, snapshot_type + ): parameters = mala.Parameters() # This ensures reproducibility of the created data sets. @@ -337,20 +339,46 @@ def test_arbitrary_number_snapshots(self): for i in range(5): data_shuffler.add_snapshot( - "Be_snapshot0.in.npy", + f"Be_snapshot0.in.{ext_in}", data_path, - "Be_snapshot0.out.npy", + f"Be_snapshot0.out.{ext_in}", data_path, + snapshot_type=snapshot_type, ) data_shuffler.shuffle_snapshots( complete_save_path=".", - save_name="Be_shuffled*", + save_name=f"Be_shuffled*{ext_out}", number_of_shuffled_snapshots=5, ) - for i in range(4): + + def test_arbitrary_number_snapshots(self): + self.worker_arbitrary_number_snapshots("npy", "", "numpy") + for i in range(5): bispectrum = np.load("Be_shuffled" + str(i) + ".in.npy") ldos = np.load("Be_shuffled" + str(i) + ".out.npy") assert not np.any(np.where(np.all(ldos == 0, axis=-1).squeeze())) assert not np.any( np.where(np.all(bispectrum == 0, axis=-1).squeeze()) ) + self.worker_arbitrary_number_snapshots("h5", ".h5", "openpmd") + import openpmd_api as opmd + + bispectrum_series = opmd.Series( + "Be_shuffled%T.in.h5", opmd.Access.read_only + ) + ldos_series = opmd.Series( + "Be_shuffled%T.out.h5", opmd.Access.read_only + ) + for i in range(5): + for name, series in [("Bispectrum", bispectrum_series), ("LDOS", ldos_series)]: + loaded_array = [ + component.load_chunk().squeeze() + for _, component in series.iterations[i] + .meshes[name] + .items() + ] + series.flush() + loaded_array = np.array(loaded_array) + assert not np.any( + np.where(np.all(loaded_array == 0, axis=0).squeeze()) + )