Skip to content

Commit

Permalink
Add runner integration test to check padding is applied correctly
Browse files Browse the repository at this point in the history
Note that this test currently makes some assumptions about the internals
of the task runner in order to verify that the result of processing the
padded blocks is as expected. See the code comments marked `NOTE` for
more details.
  • Loading branch information
yousefmoazzam committed Sep 17, 2024
1 parent 2ace489 commit 930e0da
Showing 1 changed file with 198 additions and 0 deletions.
198 changes: 198 additions & 0 deletions tests/runner/test_task_runner.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
from os import PathLike
from pathlib import Path
from typing import List, Tuple
from unittest.mock import ANY, call

import h5py
import pytest
import numpy as np
from mpi4py import MPI
from pytest_mock import MockerFixture

from httomo.darks_flats import DarksFlatsFileConfig
from httomo.data.dataset_store import DataSetStoreWriter
from httomo.loaders import make_loader
from httomo.loaders.types import RawAngles
from httomo.method_wrappers import make_method_wrapper
from httomo.methods_database.query import MethodDatabaseRepository, MethodsDatabaseQuery
from httomo.preview import PreviewConfig, PreviewDimConfig
from httomo.runner.auxiliary_data import AuxiliaryData
from httomo.runner.dataset import DataSetBlock
from httomo.runner.dataset_store_backing import DataSetStoreBacking
Expand Down Expand Up @@ -488,3 +496,193 @@ def test_warns_with_multiple_stores_from_side_outputs(
spy.assert_called()
args, _ = spy.call_args
assert "Data saving or/and reslicing operation will be performed 4 times" in args[0]


def test_execute_section_with_padding_produces_correct_result(
mocker: MockerFixture,
standard_data_path: str,
standard_image_key_path: str,
tmp_path: PathLike,
):
# Define loader wrapper to load standard test data
IN_FILE_PATH = Path(__file__).parent.parent / "test_data/tomo_standard.nxs"
DARKS_FLATS_CONFIG = DarksFlatsFileConfig(
file=IN_FILE_PATH,
data_path=standard_data_path,
image_key_path=standard_image_key_path,
)
ANGLES_CONFIG = RawAngles(data_path="/entry1/tomo_entry/data/rotation_angle")
COMM = MPI.COMM_WORLD
DATA_SHAPE = (180, 128, 160)
PREVIEW_CONFIG = PreviewConfig(
# Use only a small number of projections, for the sake of making the test not take too
# long to run (the dummy 3D method below is inefficiently implemented)
angles=PreviewDimConfig(start=0, stop=10),
detector_y=PreviewDimConfig(start=0, stop=DATA_SHAPE[1]),
detector_x=PreviewDimConfig(start=0, stop=DATA_SHAPE[2]),
)
loader_wrapper = make_loader(
repo=MethodDatabaseRepository(),
module_path="httomo.data.hdf.loaders",
method_name="standard_tomo",
in_file=IN_FILE_PATH,
data_path=standard_data_path,
image_key_path=standard_image_key_path,
angles=ANGLES_CONFIG,
comm=COMM,
darks=DARKS_FLATS_CONFIG,
flats=DARKS_FLATS_CONFIG,
preview=PREVIEW_CONFIG,
)

# Define dummy method function that depends on padded slices/neighbourhood
class FakeMethodsModule:
def method_using_padding_slices(data: np.ndarray, kernel_size: int):
"""
This is intended to be a very simple 3D method that depends on the padding
slices/neighbourhood, used to detect if padding is being applied correctly by the
runner in the test.
Note that the implementation is not intended to be efficient, but instead to be
clear that it's dependent on padding slices providing the required neighbourhood.
Therefore, the implementation's performance is certainly rather poor!
"""
out = np.empty_like(data)
# For each pixel, get 3D neighbourhood around it and perform a sum over the pixels
# in the neighbourhood. The size of the neighbourhood is defined by the
# `kernel_size` parameter.
radius = kernel_size // 2
for i in range(data.shape[0]):
angles_start = max(0, i - radius)
angles_stop = min(data.shape[0], i + radius)
for j in range(data.shape[1]):
det_y_start = max(0, j - radius)
det_y_stop = min(data.shape[1], j + radius)
for k in range(data.shape[2]):
det_x_start = max(0, k - radius)
det_x_stop = min(data.shape[2], k + radius)
neighbourhood = data[
angles_start:angles_stop,
det_y_start:det_y_stop,
det_x_start:det_x_stop,
]
out[i, j, k] = neighbourhood.sum()
return out

# Define padding calculator for dummy 3D method
class FakeSupportingFunctionsModule:
def _calc_padding_method_using_padding_slices(**kwargs) -> Tuple[int, int]:
return (kwargs["kernel_size"] // 2, kwargs["kernel_size"] // 2)

# Create method wrapper
KERNEL_SIZE = 3
MODULE_PATH = "module_path"
METHOD_NAME = "method_using_padding_slices"
mocker.patch(
"httomo.method_wrappers.generic.import_module", return_value=FakeMethodsModule
)
mock_repo = mocker.MagicMock()
method_query = MethodsDatabaseQuery(MODULE_PATH, METHOD_NAME)
mocker.patch.object(target=method_query, attribute="padding", return_value=True)
mocker.patch(
"httomo.methods_database.query.import_module",
return_value=FakeSupportingFunctionsModule,
)
mocker.patch.object(
target=method_query, attribute="get_pattern", return_value=Pattern.projection
)
mocker.patch.object(
target=method_query, attribute="get_output_dims_change", return_value=False
)
mocker.patch.object(
target=method_query, attribute="get_implementation", return_value="cpu"
)
mocker.patch.object(
target=method_query, attribute="get_memory_gpu_params", return_value=None
)
mocker.patch.object(
target=method_query, attribute="save_result_default", return_value=False
)
mocker.patch.object(target=mock_repo, attribute="query", return_value=method_query)
wrapper = make_method_wrapper(
method_repository=mock_repo,
module_path=MODULE_PATH,
method_name=METHOD_NAME,
comm=COMM,
kernel_size=KERNEL_SIZE,
)

# Define a pipeline containing one padding method
pipeline = Pipeline(loader=loader_wrapper, methods=[wrapper])

# Create task runner object + prepare
runner = TaskRunner(pipeline=pipeline, reslice_dir=tmp_path, comm=COMM)
runner._prepare()

# Patch `determine_max_slices()` to do nothing (so then the max slices can be set
# on the first section manually later)
mocker.patch.object(target=runner, attribute="determine_max_slices")

sections = sectionize(pipeline)

# Force the chunk to be split into multiple blocks for processing (so then the presence of
# padding slices will be detectable if padding is incorrectly applied by the runner)
MAX_SLICES = PREVIEW_CONFIG.angles.stop // 2
sections[0].max_slices = MAX_SLICES

# Set `is_last=False` on section object to force writing to a `DataSetStoreWriter` instead
# of a `DummySink`
#
# NOTE: This makes an assumption about the internals of the task runner, which isn't great.
# See note below when asserting that `runner.sink` is an instance of `DataSetStoreWriter`
# (in order to inspect the output data) for more info.
sections[0].is_last = False

# Execute the single section that contains the wrapper with the dummy 3D method
runner._execute_section(sections[0])

# Get subset of projection data (running on a subset because to check that padding is
# applied correctly doesn't require lots of data, and because the dummy 3D method
# implementation is very inefficient so processing lots of data will make the test slow)
with h5py.File(IN_FILE_PATH, "r") as f:
dataset: h5py.Dataset = f[standard_data_path]
projections = dataset[
PREVIEW_CONFIG.angles.start : PREVIEW_CONFIG.angles.stop,
PREVIEW_CONFIG.detector_y.start : PREVIEW_CONFIG.detector_y.stop,
PREVIEW_CONFIG.detector_x.start : PREVIEW_CONFIG.detector_x.stop,
]

# Pad the projection data by however much is needed by the dummy 3D method
padding = wrapper.calculate_padding()
projections = np.pad(
projections,
pad_width=(padding, (0, 0), (0, 0)),
mode="edge",
)

# Execute the dummy method with the full projection data
expected_output = FakeMethodsModule.method_using_padding_slices(
data=projections,
kernel_size=KERNEL_SIZE,
)

# NOTE: Assuming the `runner.sink` attribute is `DataSetStoreWriter`, in conjunction with
# setting `section.is_last = False` earlier is assuming that the `DataSetSink` implementor
# for `runner.sink` is `DataSetStoreWriter` for any section which has `section.is_last =
# False`.
#
# It's not good to assume the implementor of `DataSetSink` that is stored in
# `runner.sink`, but currently it seems that there's no way to check the data without
# digging into the internals at the moment.
assert isinstance(runner.sink, DataSetStoreWriter)
reader = runner.sink.make_reader()
runner_result = reader.read_block(
start=0, length=PREVIEW_CONFIG.angles.stop - PREVIEW_CONFIG.angles.start
)

# Compare contents of reader (result of processing multiple padded blocks that form the
# chunk) to expected result (result of processing the entire chunk in one go)
np.testing.assert_array_equal(
runner_result.data_unpadded,
expected_output[padding[0] : PREVIEW_CONFIG.angles.stop + padding[1], :, :],
)

0 comments on commit 930e0da

Please sign in to comment.