Skip to content

Commit

Permalink
Merge pull request #622 from franzpoeschel/arbitrary-shuffling-openpmd
Browse files Browse the repository at this point in the history
Add arbitrary shuffling for openPMD input
  • Loading branch information
RandomDefaultUser authored Dec 6, 2024
2 parents a7dc2cb + 9e14e5e commit 74b56f5
Show file tree
Hide file tree
Showing 2 changed files with 221 additions and 40 deletions.
223 changes: 188 additions & 35 deletions mala/datahandling/data_shuffler.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,187 @@ def __init__(self):
self.rank = 0
self.size = 1

# Takes the `extent` of an n-dimensional grid, the coordinates of a start
# point `x` and of an end point `y`.
# Interpreting the grid as a row-major n-dimensional array yields a
# contiguous slice from `x` to `y`.
# !!! Both `x` and `y` are **inclusive** ends as the semantics of what would
# be the next exclusive upper limit are unnecessarily complex in
# n dimensions, especially since the function recurses over the number of
# dimensions. !!!
# This slice is not in general an n-dimensional cuboid within the grid, but
# may be a non-convex form composed of multiple cuboid chunks.
#
# This function returns a list of cuboid sub-chunks of the n-dimensional
# grid such that:
#
# 1. The union of those chunks is equal to the contiguous slice spanned
# between `x` and `y`, both ends inclusive.
# 2. The chunks do not overlap.
# 3. Each single chunk is a contiguous slice within the row-major grid.
# 4. The chunks are sorted in ascending order of their position within the
# row-major grid.
#
# Row-major within the context of this function means that the indexes go
# from the slowest-running dimension at the left end to the fastest-running
# dimension at the right end.
#
# The output is a list of chunks as defined in openPMD, i.e. a start offset
# and the extent, counted from this offset (not from the grid origin!).
#
# Example: From the below 2D grid with extent [8, 10], the slice marked by
# the X-es should be loaded. The slice is defined by its first item
# at [2, 3] and its last item at [6, 6].
# The result would be a list of three chunks:
#
# 1. ([2, 3], [1, 7])
# 2. ([3, 0], [3, 10])
# 3. ([6, 0], [1, 7])
#
# OOOOOOOOOO
# OOOOOOOOOO
# OOOXXXXXXX
# XXXXXXXXXX
# XXXXXXXXXX
# XXXXXXXXXX
# XXXXXXXOOO
# OOOOOOOOOO

def __contiguous_slice_within_ndim_grid_as_blocks(
extent: list[int], x: list[int], y: list[int]
):
# Used for converting a block defined by inclusive lower and upper
# coordinate to a chunk as defined by openPMD.
# The openPMD extent is defined by (to-from)+1 (to make up for the
# inclusive upper end).
def get_extent(from_, to_):
res = [upper + 1 - lower for (lower, upper) in zip(from_, to_)]
if any(x < 1 for x in res):
raise RuntimeError(
f"Cannot compute block extent from {from_} to {to_}."
)
return res

if len(extent) != len(x):
raise RuntimeError(
f"__shuffle_openpmd: Internal indexing error extent={extent} and x={x} must have the same length. This is a bug."
)
if len(extent) != len(y):
raise RuntimeError(
f"__shuffle_openpmd: Internal indexing error extent={extent} and y={y} must have the same length. This is a bug."
)

# Recursive bottom cases
# 1. No items left
if y < x:
return []
# 2. No distinct dimensions left
elif x[0:-1] == y[0:-1]:
return [(x.copy(), get_extent(x, y))]

# Take the frontside and backside one-dimensional slices with extent
# defined by the last dimension, process the remainder recursively.

# The offset of the front slice is equal to x.
front_slice = (x.copy(), [1 for _ in range(len(x))])
# The chunk extent in the last dimension is the distance from the x
# coordinate to the grid extent in the last dimension.
# As the slice is one-dimensional, the extent is 1 in all other
# dimensions.
front_slice[1][-1] = extent[-1] - x[-1]

# Similar for the back slice.
# The offset of the back slice is 0 in the last dimension, equal to y
# in all other dimensions.
back_slice = (y.copy(), [1 for _ in range(len(y))])
back_slice[0][-1] = 0
# The extent is equal to the coordinate of y+1 (to convert inclusive to
# exclusive upper end) in the last dimension, 1 otherwise.
back_slice[1][-1] = y[-1] + 1

# Strip the last (i.e. fast-running) dimension for a recursive call with
# one dimensionality reduced.
recursive_x = x[0:-1]
recursive_y = y[0:-1]

# Since the first and last row are dealt with above, those rows are now
# stripped from the second-to-last dimension for the recursive call
# (in which they will be the last dimension)
recursive_x[-1] += 1
recursive_y[-1] -= 1

rec_res = DataShuffler.__contiguous_slice_within_ndim_grid_as_blocks(
extent[0:-1], recursive_x, recursive_y
)

# Add the last dimension again. The last dimension is covered from start
# to end since the leftovers have been dealt with by the front and back
# slice.
rec_res = [(xx + [0], yy + [extent[-1]]) for (xx, yy) in rec_res]

return [front_slice] + rec_res + [back_slice]

# Interpreting `ndim_extent` as the extents of an n-dimensional grid,
# returns the n-dimensional coordinate of the `idx`th item in the row-major
# representation of the grid.
def __resolve_flattened_index_into_ndim(idx: int, ndim_extent: list[int]):
if not ndim_extent:
raise RuntimeError("Cannot index into a zero-dimensional array.")
strides = []
current_stride = 1
for ext in reversed(ndim_extent):
strides = [current_stride] + strides
# sic!, the last stride is ignored, as it's just the entire grid
# extent
current_stride *= ext

def worker(inner_idx, inner_strides):
if not inner_strides:
if inner_idx != 0:
raise RuntimeError(
"This cannot happen. There is bug somewhere.")
else:
return []
div, mod = divmod(inner_idx, inner_strides[0])
return [div] + worker(mod, inner_strides[1:])

return worker(idx, strides)

# Load a chunk from the openPMD `record` into the buffer at `arr`.
# The indexes `offset` and `extent` are scalar 1-dimensional coordinates,
# and apply to the n-dimensional record by reinterpreting (reshaped) it as
# a one-dimensional array.
# The function deals internally with splitting the 1-dimensional slice into
# a sequence of n-dimensional block load operations.
def __load_chunk_1D(mesh, arr, offset, extent):
start_idx = DataShuffler.__resolve_flattened_index_into_ndim(
offset, mesh.shape
)
# Inclusive upper end. See the documentation in
# __contiguous_slice_within_ndim_grid_as_blocks() for the reason why
# we work with inclusive upper ends.
end_idx = DataShuffler.__resolve_flattened_index_into_ndim(
offset + extent - 1, mesh.shape
)
blocks_to_load = (
DataShuffler.__contiguous_slice_within_ndim_grid_as_blocks(
mesh.shape, start_idx, end_idx
)
)
# print(f"\n\nLOADING {offset}\t+{extent}\tFROM {mesh.shape}")
current_offset = 0 # offset within arr
for nd_offset, nd_extent in blocks_to_load:
flat_extent = np.prod(nd_extent)
# print(
# f"\t{nd_offset}\t-{nd_extent}\t->[{current_offset}:{current_offset + flat_extent}]"
# )
mesh.load_chunk(
arr[current_offset : current_offset + flat_extent],
nd_offset,
nd_extent,
)
current_offset += flat_extent

def __shuffle_openpmd(
self,
dot: __DescriptorOrTarget,
Expand Down Expand Up @@ -369,23 +550,12 @@ def __shuffle_openpmd(
# This gets the offset and extent of the i'th such slice.
# The extent is given as in openPMD, i.e. the size of the block
# (not its upper coordinate).
def from_chunk_i(i, n, dset, slice_dimension=0):
def from_chunk_i(i, n, dset):
if isinstance(dset, io.Dataset):
dset = dset.extent
dset = list(dset)
offset = [0 for _ in dset]
extent = dset
extent_dim_0 = dset[slice_dimension]
if extent_dim_0 % n != 0:
raise Exception(
"Dataset {} cannot be split into {} chunks on dimension {}.".format(
dset, n, slice_dimension
)
)
single_chunk_len = extent_dim_0 // n
offset[slice_dimension] = i * single_chunk_len
extent[slice_dimension] = single_chunk_len
return offset, extent
flat_extent = np.prod(dset)
one_slice_extent = flat_extent // n
return i * one_slice_extent, one_slice_extent

import json

Expand Down Expand Up @@ -446,9 +616,10 @@ def from_chunk_i(i, n, dset, slice_dimension=0):
i, number_of_new_snapshots, extent_in
)
to_chunk_offset = to_chunk_extent
to_chunk_extent = to_chunk_offset + np.prod(from_chunk_extent)
to_chunk_extent = to_chunk_offset + from_chunk_extent
for dimension in range(len(mesh_in)):
mesh_in[str(dimension)].load_chunk(
DataShuffler.__load_chunk_1D(
mesh_in[str(dimension)],
new_array[dimension, to_chunk_offset:to_chunk_extent],
from_chunk_offset,
from_chunk_extent,
Expand Down Expand Up @@ -552,24 +723,6 @@ def shuffle_snapshots(
if number_of_shuffled_snapshots is None:
number_of_shuffled_snapshots = self.nr_snapshots

# Currently, the openPMD interface is not feature-complete.
if snapshot_type == "openpmd" and np.any(
np.array(
[
snapshot.grid_dimension[0] % number_of_shuffled_snapshots
for snapshot in self.parameters.snapshot_directories_list
]
)
!= 0
):
raise ValueError(
"Shuffling from OpenPMD files currently only "
"supported if first dimension of all snapshots "
"can evenly be divided by number of snapshots. "
"Please select a different number of shuffled "
"snapshots or use the numpy interface. "
)

shuffled_gridsizes = snapshot_size_list // number_of_shuffled_snapshots

if np.any(
Expand Down
38 changes: 33 additions & 5 deletions test/shuffling_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,9 @@ def test_training_openpmd(self):
new_loss = test_trainer.final_validation_loss
assert old_loss > new_loss

def test_arbitrary_number_snapshots(self):
def worker_arbitrary_number_snapshots(
self, ext_in, ext_out, snapshot_type
):
parameters = mala.Parameters()

# This ensures reproducibility of the created data sets.
Expand All @@ -337,20 +339,46 @@ def test_arbitrary_number_snapshots(self):

for i in range(5):
data_shuffler.add_snapshot(
"Be_snapshot0.in.npy",
f"Be_snapshot0.in.{ext_in}",
data_path,
"Be_snapshot0.out.npy",
f"Be_snapshot0.out.{ext_in}",
data_path,
snapshot_type=snapshot_type,
)
data_shuffler.shuffle_snapshots(
complete_save_path=".",
save_name="Be_shuffled*",
save_name=f"Be_shuffled*{ext_out}",
number_of_shuffled_snapshots=5,
)
for i in range(4):

def test_arbitrary_number_snapshots(self):
self.worker_arbitrary_number_snapshots("npy", "", "numpy")
for i in range(5):
bispectrum = np.load("Be_shuffled" + str(i) + ".in.npy")
ldos = np.load("Be_shuffled" + str(i) + ".out.npy")
assert not np.any(np.where(np.all(ldos == 0, axis=-1).squeeze()))
assert not np.any(
np.where(np.all(bispectrum == 0, axis=-1).squeeze())
)
self.worker_arbitrary_number_snapshots("h5", ".h5", "openpmd")
import openpmd_api as opmd

bispectrum_series = opmd.Series(
"Be_shuffled%T.in.h5", opmd.Access.read_only
)
ldos_series = opmd.Series(
"Be_shuffled%T.out.h5", opmd.Access.read_only
)
for i in range(5):
for name, series in [("Bispectrum", bispectrum_series), ("LDOS", ldos_series)]:
loaded_array = [
component.load_chunk().squeeze()
for _, component in series.iterations[i]
.meshes[name]
.items()
]
series.flush()
loaded_array = np.array(loaded_array)
assert not np.any(
np.where(np.all(loaded_array == 0, axis=0).squeeze())
)

0 comments on commit 74b56f5

Please sign in to comment.