diff --git a/indica/converters/bin_interp.py b/indica/converters/bin_interp.py deleted file mode 100644 index 5782bc69..00000000 --- a/indica/converters/bin_interp.py +++ /dev/null @@ -1,306 +0,0 @@ -"""Routines for averaging or interpolate along an axis given start, stop and bin size""" - -import numpy as np -from xarray import DataArray - - -def convert( - start: float, - stop: float, - step: float, - data: DataArray, - dim: str, - method: str = "linear", -) -> DataArray: - """Bin or interpolate data along specified dimension, discarding data before - or after the limits. - - Parameters - ---------- - start - Start of interval. The interval includes this value. - stop - End of interval. The interval includes this value. - step - Spacing between values. For any output `out`, this is the distance - between two adjacent values, ``out[i+1] - out[i]``. - data - Data to be binned. - dim - Dimension along which data is to be binned - - Returns - ------- - : - Array like the input, but binned/interpolated along the desired dimension - - """ - - coords = data.coords[dim] - data_step = coords[1] - coords[0] - if data_step <= step / 2: - return bin_to_dim(start, stop, step, data, dim) - else: - return interpolate_to_dim(start, stop, step, data, dim, method=method) - - -def interpolate_to_labels( - labels: np.ndarray, - data: DataArray, - dim: str, - method: str = "linear", -) -> DataArray: - """Interpolate data to sit on the specified dimension labels. - - Parameters - ---------- - labels - The values at which the data should be binned. - data - Data to be binned. - - Returns - ------- - : - Array like the input, but binned onto the dimension labels. - - """ - if data.coords[dim].shape == labels.shape and np.all(data.coords[dim] == labels): - return data - - interpolated = data.interp(dict([(dim, labels)]), method=method) - if "error" in data.attrs: - interpolated.attrs["error"] = interpolated.attrs["error"].interp( - dict([(dim, labels)]), method=method - ) - if "dropped" in data.attrs: - dropped = interpolated.attrs["dropped"].interp( - dict([(dim, labels)]), method=method - ) - if "error" in dropped.attrs: - dropped.attrs["error"] = dropped.attrs["error"].interp( - dict([(dim, labels)]), method=method - ) - interpolated.attrs["dropped"] = dropped - if "provenance" in data.attrs: - del interpolated.attrs["partial_provenance"] - del interpolated.attrs["provenance"] - - return interpolated - - -def bin_to_labels(labels: np.ndarray, data: DataArray, dim: str) -> DataArray: - """Bin data to sit on the specified dimension labels. - - Parameters - ---------- - labels - The values at which the data should be binned. - data - Data to be binned. - - Returns - ------- - : - Array like the input, but binned onto the dimension labels. - - """ - if data.coords[dim].shape == labels.shape and np.all(data.coords[dim] == labels): - return data - npoints = len(labels) - half_interval = 0.5 * (labels[1] - labels[0]) - bins = np.empty(npoints + 1) - bins[0] = labels[0] - half_interval - bins[1:] = labels + half_interval - grouped = data.sel(dict([(dim, slice(bins[0], bins[-1]))])).groupby_bins( - dim, bins, labels=labels - ) - averaged = grouped.mean(dim, keep_attrs=True) - stdev = grouped.std(dim, keep_attrs=True) - - if "error" in data.attrs: - grouped = ( - data.attrs["error"] - .sel(dict([(dim, slice(bins[0], bins[-1]))])) - .groupby_bins(dim, bins, labels=labels) - ) - uncertainty = np.sqrt( - grouped.reduce( - lambda x, axis: np.sum(x**2, axis) / np.size(x, axis) ** 2, dim - ) - ) - error = np.sqrt(uncertainty**2 + stdev**2) - averaged.attrs["error"] = error.rename(dict([(f"{dim}_bins", dim)])) - if "dropped" in data.attrs: - grouped = ( - data.attrs["dropped"] - .sel(dict([(dim, slice(bins[0], bins[-1]))])) - .groupby_bins(dim, bins, labels=labels) - ) - dropped = grouped.mean(dim) - stdev = grouped.std(dim) - averaged.attrs["dropped"] = dropped.rename(dict([(f"{dim}_bins", dim)])) - if "error" in data.attrs["dropped"].attrs: - grouped = ( - data.attrs["dropped"] - .attrs["error"] - .sel(dict([(dim, slice(bins[0], bins[-1]))])) - .groupby_bins(dim, bins, labels=labels) - ) - uncertainty = np.sqrt( - grouped.reduce( - lambda x, axis: np.sum(x**2, axis) / np.size(x, axis) ** 2, dim - ) - ) - error = np.sqrt(uncertainty**2 + stdev**2) - averaged.attrs["dropped"].attrs["error"] = error.rename( - dict([(f"{dim}_bins", dim)]) - ) - if "provenance" in data.attrs: - del averaged.attrs["partial_provenance"] - del averaged.attrs["provenance"] - - return averaged.rename(dict([(f"{dim}_bins", dim)])) - - -def interpolate_to_dim( - start: float, - stop: float, - step: float, - data: DataArray, - dim: str, - method: str = "linear", -) -> DataArray: - """Bin given data along specified dimension, discarding data before or after - the limits. - - Parameters - ---------- - start - Start of interval. The interval includes this value. - stop - End of interval. The interval includes this value. - step - Spacing between values. For any output `out`, this is the distance - between two adjacent values, ``out[i+1] - out[i]``. - data - Data to be binned. - dim - Dimension along which data is to be binned - method - Interpolation method to use. Must be a value accepted by - :py:class:`scipy.interpolate.interp1d`. - - Returns - ------- - : - Array like the input, but binned along the desired dimension - - """ - - check_bounds(start, stop, step, data, dim) - labels = get_labels(start, stop, step) - - return interpolate_to_labels(labels, data, dim, method=method) - - -def bin_to_dim( - start: float, stop: float, step: float, data: DataArray, dim: str -) -> DataArray: - """Bin given data along the dim axis, discarding data before or after - the limits. - - Parameters - ---------- - start - Start of interval. The interval includes this value. - stop - End of interval. The interval includes this value. - step - Spacing between values. For any output `out`, this is the distance - between two adjacent values, ``out[i+1] - out[i]``. - data - Data to be binned. - dim - Dimension along which data is to be binned - - Returns - ------- - : - Array like the input, but binned along the dim axis. - - """ - check_bounds(start, stop, step, data, dim) - labels = get_labels(start, stop, step) - return bin_to_labels(labels, data, dim) - - -def get_labels(start: float, stop: float, step: float): - """ - Build array given start, stop and bin step - - Parameters - ---------- - start - Start of interval. The interval includes this value. - stop - End of interval. The interval includes this value. - step - Spacing between values. For any output `out`, this is the distance - between two adjacent values, ``out[i+1] - out[i]``. - - Returns - ------- - labels - Binned dimension array - - """ - labels = np.arange(start, stop + step, step) - return labels - - -def check_bounds(start: float, stop: float, step: float, data: DataArray, dim: str): - """ - Check necessary bounds for binning/interpolating data in time - - Parameters - ---------- - start - Start of interval. The interval includes this value. - stop - End of interval. The interval includes this value. - step - Spacing between values. For any output `out`, this is the distance - between two adjacent values, ``out[i+1] - out[i]``. - data - Data to be binned. - dim - Dimension along which data is to be binned - """ - - coords = data.coords[dim] - data_step = coords[1] - coords[0] - half_interval = step / 2 - - # For both binning and interpolating - if start < coords.min(): - raise ValueError("Start {} not in range of provided data.".format(start)) - if stop > coords.max(): - raise ValueError("End {} not in range of provided data.".format(stop)) - - # For binning only - if data_step <= half_interval: - if coords[0] > start + half_interval: - raise ValueError( - "No data falls within first bin {}.".format( - (start - half_interval, start + half_interval) - ) - ) - if coords[-1] < stop - half_interval: - raise ValueError( - "No data falls within last bin {}.".format( - (stop - half_interval, stop + half_interval) - ) - ) - - return diff --git a/indica/converters/time.py b/indica/converters/time.py index b0b033ef..7ff704c6 100644 --- a/indica/converters/time.py +++ b/indica/converters/time.py @@ -103,17 +103,6 @@ def interpolate_to_time_labels( return data interpolated = data.interp(t=tlabels, method=method) - if "error" in data.attrs: - interpolated.attrs["error"] = interpolated.attrs["error"].interp( - t=tlabels, method=method - ) - if "dropped" in data.attrs: - dropped = interpolated.attrs["dropped"].interp(t=tlabels, method=method) - if "error" in dropped.attrs: - dropped.attrs["error"] = dropped.attrs["error"].interp( - t=tlabels, method=method - ) - interpolated.attrs["dropped"] = dropped return interpolated @@ -145,15 +134,16 @@ def bin_to_time_labels(tlabels: np.ndarray, data: DataArray) -> DataArray: "t", tbins, labels=tlabels ) averaged = grouped.mean("t", keep_attrs=True, skipna=True) + averaged = averaged.rename(t_bins="t") + stdev = grouped.std("t", keep_attrs=True, skipna=True) stdev = np.sqrt(stdev**2) - averaged.attrs["stdev"] = stdev.rename(t_bins="t") + stdev = stdev.rename(t_bins="t") + averaged = averaged.assign_coords(stdev=(data.dims, stdev.data)) - if "error" in data.attrs: - grouped = ( - data.attrs["error"] - .sel(t=slice(tbins[0], tbins[-1])) - .groupby_bins("t", tbins, labels=tlabels) + if "error" in data.coords: + grouped = data.error.sel(t=slice(tbins[0], tbins[-1])).groupby_bins( + "t", tbins, labels=tlabels ) uncertainty = np.sqrt( grouped.reduce( @@ -161,35 +151,10 @@ def bin_to_time_labels(tlabels: np.ndarray, data: DataArray) -> DataArray: ) ) error = np.sqrt(uncertainty**2) - averaged.attrs["error"] = error.rename(t_bins="t") - - if "dropped" in data.attrs: - grouped = ( - data.attrs["dropped"] - .sel(t=slice(tbins[0], tbins[-1])) - .groupby_bins("t", tbins, labels=tlabels) - ) - dropped = grouped.mean("t") - stdev = grouped.std("t") - averaged.attrs["dropped"] = dropped.rename(t_bins="t") - if "error" in data.attrs["dropped"].attrs: - grouped = ( - data.attrs["dropped"] - .attrs["error"] - .sel(t=slice(tbins[0], tbins[-1])) - .groupby_bins("t", tbins, labels=tlabels) - ) - uncertainty = np.sqrt( - grouped.reduce( - lambda x, axis: np.sum(x**2, axis) / np.size(x, axis) ** 2, "t" - ) - ) - error = np.sqrt(uncertainty**2) - averaged.attrs["dropped"].attrs["error"] = error.rename(t_bins="t") - stdev = np.sqrt(stdev**2) - averaged.attrs["dropped"].attrs["stdev"] = stdev.rename(t_bins="t") + error = error.rename(t_bins="t") + averaged = averaged.assign_coords(error=(data.dims, error.data)) - return averaged.rename(t_bins="t") + return averaged def interpolate_in_time( diff --git a/indica/data.py b/indica/data.py index e2fb8878..f28a4cea 100644 --- a/indica/data.py +++ b/indica/data.py @@ -8,7 +8,6 @@ """ -from itertools import filterfalse from numbers import Number from typing import Any from typing import Callable @@ -715,43 +714,6 @@ def equilibrium(self): if hasattr(self._obj.attrs["transform"], "equilibrium"): del self._obj.attrs["transform"].equilibrium - @property - def with_ignored_data(self) -> xr.DataArray: - """The full version of this data, including the channels which were - dropped at read-in. - - """ - if "dropped" in self._obj.attrs: - ddim = self.drop_dim - dropped = self._obj.attrs["dropped"] - result = self._obj.copy() - result.loc[{ddim: dropped.coords[ddim]}] = dropped - if "error" in self._obj.attrs: - result.attrs["error"] = result.attrs["error"].copy() - result.attrs["error"].loc[{ddim: dropped.coords[ddim]}] = dropped.attrs[ - "error" - ] - del result.attrs["dropped"] - return result - else: - return self._obj - - @property - def drop_dim(self) -> Optional[str]: - """The dimension, if any, which contains dropped channels.""" - if "dropped" in self._obj.attrs: - return str( - next( - filterfalse( - lambda dim: self._obj.coords[dim].equals( - self._obj.attrs["dropped"].coords[dim] - ), - self._obj.dims, - ) - ) - ) - return None - @xr.register_dataset_accessor("indica") class InDiCADatasetAccessor: diff --git a/indica/models/diode_filters.py b/indica/models/diode_filters.py index 2ab61056..51bf952e 100644 --- a/indica/models/diode_filters.py +++ b/indica/models/diode_filters.py @@ -132,13 +132,15 @@ def integrate_spectra(self, spectra: DataArray, fit_background: bool = True): spectra_to_integrate = _spectra spectra_to_integrate_err = _spectra_err - spectra_to_integrate.attrs["error"] = spectra_to_integrate_err + spectra_to_integrate = spectra_to_integrate.assign_coords( + error=(spectra_to_integrate.dims, spectra_to_integrate_err.data) + ) integral = (spectra_to_integrate * transmission).sum("wavelength") integral_err = (np.sqrt((spectra_to_integrate_err * transmission) ** 2)).sum( "wavelength" ) - integral.attrs["error"] = integral_err + integral = integral.assign_coords(error=(integral.dims, integral_err.data)) return spectra_to_integrate, integral diff --git a/indica/plotters/plot_results.py b/indica/plotters/plot_results.py index 2c62aa8a..52556415 100644 --- a/indica/plotters/plot_results.py +++ b/indica/plotters/plot_results.py @@ -589,14 +589,18 @@ def compare_pulses( _error = raw_data["cxff_pi"]["ti"].error.sel(channel=chan) _error = xr.where(_error > 0, _error, np.nan) raw_data["cxff_pi"]["ti"] = _data - raw_data["cxff_pi"]["ti"].attrs["error"] = _error + raw_data["cxff_pi"]["ti"] = raw_data["cxff_pi"]["ti"].assign_coords( + error=(raw_data["cxff_pi"]["ti"].dims, _error.data) + ) _data = raw_data["cxff_pi"]["vtor"].sel(channel=chan) _data = xr.where(_data > 0, _data, np.nan) _error = raw_data["cxff_pi"]["vtor"].error.sel(channel=chan) _error = xr.where(_error > 0, _error, np.nan) raw_data["cxff_pi"]["vtor"] = _data - raw_data["cxff_pi"]["vtor"].attrs["error"] = _error + raw_data["cxff_pi"]["vtor"] = raw_data["cxff_pi"]["vtor"].assign_coords( + error=(raw_data["cxff_pi"]["vtor"].dims, _error.data) + ) if "hnbi1" in raw_data.keys(): raw_data["nbi"] = { diff --git a/indica/plotters/plot_time_evolution.py b/indica/plotters/plot_time_evolution.py index da2662b9..722df526 100644 --- a/indica/plotters/plot_time_evolution.py +++ b/indica/plotters/plot_time_evolution.py @@ -120,9 +120,9 @@ def plot_data(data, quantity: str, pulse: int, tplot: float, key="raw", color=No _data = data[key][quantity][pulse] tslice = slice(_data.t.min().values, _data.t.max().values) if "error" not in _data.attrs: - _data.attrs["error"] = xr.full_like(_data, 0.0) + _data = _data.assign_coords(error=(_data.dims, xr.full_like(_data, 0.0).data)) if "stdev" not in _data.attrs: - _data.attrs["stdev"] = xr.full_like(_data, 0.0) + _data = _data.assign_coords(stdev=(_data.dims, xr.full_like(_data, 0.0).data)) _err = np.sqrt(_data.error**2 + _data.stdev**2) _err = xr.where(_err / _data.values < 1.0, _err, 0.0) if len(_data.dims) > 1: diff --git a/indica/utilities.py b/indica/utilities.py index 2cb94b64..d481942f 100644 --- a/indica/utilities.py +++ b/indica/utilities.py @@ -276,16 +276,7 @@ def input_check( ) or isinstance(var_to_check, bool): return - # Handles dropped channels, if present sliced_var_to_check = deepcopy(var_to_check) - if ( - isinstance(var_to_check, (DataArray, Dataset)) - and "dropped" in var_to_check.attrs - ): - dropped_coords = var_to_check.attrs["dropped"].coords - for icoord in dropped_coords.keys(): - dropped_coord = dropped_coords[icoord] - sliced_var_to_check = var_to_check.drop_sel({icoord: dropped_coord}) if np.any(np.isnan(sliced_var_to_check)): raise ValueError(f"{var_name} cannot contain any NaNs.") diff --git a/indica/workflows/load_modelling_plasma.py b/indica/workflows/load_modelling_plasma.py index c678f7fd..c9b7f273 100644 --- a/indica/workflows/load_modelling_plasma.py +++ b/indica/workflows/load_modelling_plasma.py @@ -554,9 +554,13 @@ def plot_data_bckc_comparison( tslice_binned = tslice if "error" not in _binned.attrs: - _binned.attrs["error"] = xr.full_like(_binned, 0.0) + _binned = _binned.assign_coords( + error=(_binned.dims, xr.full_like(_binned, 0.0).data) + ) if "stdev" not in _binned.attrs: - _binned.attrs["stdev"] = xr.full_like(_binned, 0.0) + _binned = _binned.assign_coords( + stdev=(_binned.dims, xr.full_like(_binned, 0.0).data) + ) err = np.sqrt(_binned.error**2 + _binned.stdev**2) err = xr.where(err / _binned.values < 1.0, err, 0.0) diff --git a/indica/workflows/run_tomo_1d.py b/indica/workflows/run_tomo_1d.py index b6fcdeba..a64132c1 100644 --- a/indica/workflows/run_tomo_1d.py +++ b/indica/workflows/run_tomo_1d.py @@ -85,7 +85,9 @@ def example_tomo( tomo.emiss_err, coords=[("t", tomo.tvec), ("rho_poloidal", tomo.rho_grid_centers)], ) - inverted_emissivity.attrs["error"] = inverted_error + inverted_emissivity = inverted_emissivity.assign_coords( + error=(inverted_emissivity.dims, inverted_error.data) + ) data_tomo = brightness bckc_tomo = DataArray(tomo.backprojection, coords=data_tomo.coords) diff --git a/indica/workflows/zeff_workflows.py b/indica/workflows/zeff_workflows.py index be295f7d..6c03ffd8 100644 --- a/indica/workflows/zeff_workflows.py +++ b/indica/workflows/zeff_workflows.py @@ -121,7 +121,9 @@ def calculate_zeff( spectra_to_integrate = None if not hasattr(filter_data, "error"): - filter_data.attrs["error"] = filter_data * default_perc_err + filter_data = filter_data.assign_coords( + error=(filter_data.dims, (filter_data * default_perc_err).data) + ) print("Calculate LOS-averaged Zeff") zeff_los_avrg = calculate_zeff_los_averaged( @@ -325,7 +327,7 @@ def calculate_zeff_profile( tomo_result["profile"]["sym_emissivity_err"], coords=coords, ) - emissivity.attrs["error"] = _error + emissivity = emissivity.assign_coords(error=(emissivity.dims, _error.data)) wlnght = filter_wavelength _te = te_fit.interp(rho_poloidal=emissivity.rho_poloidal) @@ -351,8 +353,9 @@ def calculate_zeff_profile( bremsstrahlung=emissivity + emissivity.error, gaunt_approx="callahan", ) - zeff_profile.attrs["error"] = np.abs(zeff_up - zeff_lo) - + zeff_profile = zeff_profile.assign_coords( + error=(zeff_profile.dims, np.abs(zeff_up - zeff_lo).data) + ) return zeff_profile, tomo diff --git a/tests/unit/_test_utilities.py b/tests/unit/_test_utilities.py deleted file mode 100644 index 7cd3f45a..00000000 --- a/tests/unit/_test_utilities.py +++ /dev/null @@ -1,303 +0,0 @@ -"""Test the contents of the utilities module.""" - - -# TODO - modernize this to not use hypothesis but pytest in the way the other tests do. -import re -import unittest - -from hypothesis import assume -from hypothesis import example -from hypothesis import given -from hypothesis.extra.numpy import array_shapes -from hypothesis.extra.numpy import arrays -from hypothesis.strategies import dictionaries -from hypothesis.strategies import from_regex -from hypothesis.strategies import integers -from hypothesis.strategies import none -from hypothesis.strategies import sampled_from -from hypothesis.strategies import text -import numpy as np -import pytest -import scipy -from xarray import DataArray -from xarray.testing import assert_allclose - -from indica import utilities - -VALID_FILENAME = re.compile(r"^[a-zA-Z0-9_\-().]+$") - - -def test_positional_parameters1(): - def example(a, b, c=None, d=5): - pass - - params, varpos = utilities.positional_parameters(example) - assert params == ["a", "b", "c", "d"] - assert varpos is None - - -def test_positional_parameters2(): - def example(): - pass - - params, varpos = utilities.positional_parameters(example) - assert params == [] - assert varpos is None - - -def test_positional_parameters3(): - def example(a, /, b, c, *, d): - pass - - params, varpos = utilities.positional_parameters(example) - assert params == ["a", "b", "c"] - assert varpos is None - - -def test_positional_parameters4(): - def example(a, b, c, *args, **kwargs): - pass - - params, varpos = utilities.positional_parameters(example) - assert params == ["a", "b", "c"] - assert varpos == "args" - - -@given(arrays(np.float64, array_shapes(), elements=sampled_from([1.0, -1.0]))) -def test_sum_squares_ones(a): - """Test summing arrays made up only of +/- 1.""" - print(a) - print(a.shape) - for i, l in enumerate(a.shape): - assert np.all(utilities.sum_squares(a, i) == l) - - -@given(dictionaries(from_regex("[_a-zA-Z0-9]+", fullmatch=True), none())) -def test_sum_squares_known(kwargs): - assume("x" not in kwargs) - assume("axis" not in kwargs) - a = np.array([0.0, 1.0, 2.0, 3.0, 4.0, 5.0]) - assert utilities.sum_squares(a, 0, **kwargs) == 55 - - -@given(from_regex(VALID_FILENAME.pattern[1:-1])) -@example("this/ is \\ the 'n@stiest' èxample_I-can think of") -def test_to_filename(name_in): - # Check there are some valid characthers in the filename to start with - assume(VALID_FILENAME.match(name_in)) - name_out = utilities.to_filename(name_in) - print(name_in, name_out, VALID_FILENAME.match(name_out)) - assert VALID_FILENAME.match(name_out) is not None - - -def test_to_filename_known_result(): - assert utilities.to_filename("a/b/C\\d-e(f, g)") == "a-b-C-d-e(f_g)" - - -# There appears to be a bug in the Hypothesis type annotation for the -# arrays() strategy -@given(arrays(float, integers(0, 100)), text()) # type: ignore -def test_coord_array(vals, name): - coords = utilities.coord_array(vals, name) - np.testing.assert_array_equal(coords.data, vals) - np.testing.assert_array_equal(coords.coords[name], vals) - assert len(coords.dims) == 1 - datatype = coords.attrs["datatype"] - if name == "R": - assert datatype[0] == "major_rad" - elif name == "t": - assert datatype[0] == "time" - elif name == "rho_poloidal": - assert datatype[0] == "norm_flux_pol" - elif name == "rho_toroidal": - assert datatype[0] == "norm_flux_tor" - else: - assert datatype[0] == name - assert datatype[1] == "plasma" - - -def func_3d(x, y, t): - return 2 * x + 3 * y - 0.5 * t - - -x = utilities.coord_array(np.linspace(0.0, 1.0, 5), "x") -y = utilities.coord_array(np.linspace(0.0, 10.0, 4), "y") -t = utilities.coord_array(np.linspace(50.0, 55.0, 6), "t") -spline_dims = ("y", "t") -spline_coords = {"y": y, "t": t} -interp_data = func_3d(x, y, t) -spline = scipy.interpolate.CubicSpline(x, interp_data, 0, "natural") - - -def test_broadcast_spline_new_coord(): - x_interp = utilities.coord_array(np.linspace(0.1, 0.4, 4), "x_new") - result = utilities.broadcast_spline(spline, spline_dims, spline_coords, x_interp) - assert_allclose(result, func_3d(x_interp, y, t)) - assert result.dims == ("x_new", "y", "t") - - -def test_broadcast_spline_t(): - x_interp = DataArray( - np.linspace(0.1, 0.4, 4), coords=[("t", np.linspace(52.0, 53.0, 4))] - ) - result = utilities.broadcast_spline(spline, spline_dims, spline_coords, x_interp) - assert_allclose(result, func_3d(x_interp, y, x_interp.t)) - assert result.dims == ("t", "y") - - -def test_broadcast_spline_2d(): - x_interp = utilities.coord_array( - np.linspace(0.2, 0.5, 3), "alpha" - ) * utilities.coord_array(np.linspace(0.5, 1.0, 4), "beta") - result = utilities.broadcast_spline(spline, spline_dims, spline_coords, x_interp) - assert_allclose(result, func_3d(x_interp, y, t)) - assert result.dims == ("alpha", "beta", "y", "t") - - -@pytest.mark.xfail(reason="Feature not implemented.") -def test_broadcast_spline_old_coord(): - x_interp = DataArray( - np.linspace(0.1, 0.4, 4), coords=[("y", np.linspace(2.5, 7.5, 4))] - ) - result = utilities.broadcast_spline(spline, spline_dims, spline_coords, x_interp) - assert_allclose(result, func_3d(x_interp, x_interp.y, t)) - assert result.dims == ("y", "t") - - -class Compatible_Input_Type_Test_Case(unittest.TestCase): - def __init__(self): - self.Ne = np.logspace(19.0, 16.0, 10) - - def type_check(self): - with self.assertRaises(TypeError): - utilities.input_check("Ne", self.Ne, str) - - def value_check(self): - Ne = 0 * self.Ne - with self.assertRaises(ValueError): - utilities.input_check("Ne", Ne, np.ndarray, strictly_positive=True) - - Ne = -1 * self.Ne - with self.assertRaises(ValueError): - utilities.input_check("Ne", Ne, np.ndarray, strictly_positive=False) - - Ne = np.nan * self.Ne - with self.assertRaises(ValueError): - utilities.input_check("Ne", Ne, np.ndarray) - - Ne = np.inf * self.Ne - with self.assertRaises(ValueError): - utilities.input_check("Ne", Ne, np.ndarray) - - Ne = -np.inf * self.Ne - with self.assertRaises(ValueError): - utilities.input_check("Ne", Ne, np.ndarray) - - Ne = self.Ne[:, np.newaxis] - with self.assertRaises(ValueError): - utilities.input_check("Ne", Ne, np.ndarray, ndim_to_check=1) - - # Check dropped channel handling - t = np.array([78.5, 80.5, 82.5]) - rho = np.linspace(0, 1, 11) - Ne = np.logspace(19.0, 16.0, 11) - Ne = np.tile(Ne, [3, 1]) - Ne[1, :] /= 10.0 - Ne[2, :] *= 10.0 - - dropped_t_coord = np.array([80.5]) - dropped_rho_coord = np.array([rho[3], rho[7]]) - - Ne = DataArray( - data=Ne, - coords=[("t", t), ("rho_poloidal", rho)], - dims=["t", "rho_poloidal"], - ) - - dropped = Ne.sel({"t": dropped_t_coord}) - dropped = dropped.sel({"rho_poloidal": dropped_rho_coord}) - - Ne.loc[{"t": dropped_t_coord, "rho_poloidal": dropped_rho_coord}] = np.nan - - Ne.attrs["dropped"] = dropped - - try: - utilities.input_check("Ne", Ne, DataArray, ndim_to_check=2) - except Exception as e: - raise e - - -def test_compatible_input_type(): - compatible_input_type = Compatible_Input_Type_Test_Case() - compatible_input_type.type_check() - compatible_input_type.value_check() - - -def test_input_check_float_passes(): - var_name = "test_var" - utilities.input_check(var_name, 5.0, float) - - -def test_input_check_nan(): - var_name = "test_var" - with pytest.raises(ValueError, match=f"{var_name} cannot contain any NaNs."): - utilities.input_check(var_name, float("nan"), float) - - -def test_input_check_inf(): - var_name = "test_var" - with pytest.raises(ValueError, match=f"{var_name} cannot contain any infinities."): - utilities.input_check(var_name, float("inf"), float) - - -def test_input_check_neg_inf(): - var_name = "test_var" - with pytest.raises(ValueError, match=f"{var_name} cannot contain any infinities."): - utilities.input_check(var_name, -float("inf"), float) - - -def test_input_check_strictly_positive(): - var_name = "test_var" - with pytest.raises( - ValueError, match=f"Cannot have any negative or zero values in {var_name}" - ): - utilities.input_check( - var_name, 0.0, float, positive=True, strictly_positive=True - ) - - -def test_input_check_positive_passes(): - var_name = "test_var" - utilities.input_check(var_name, 0.0, float, positive=True, strictly_positive=False) - - -def test_input_check_positive(): - var_name = "test_var" - with pytest.raises( - ValueError, match=f"Cannot have any negative values in {var_name}" - ): - utilities.input_check( - var_name, -1.0, float, positive=True, strictly_positive=False - ) - - -# Test script for intersections -def test_intersections(): - """Test script for intersections""" - - # Test parallel lines -> should return an empty list - line_1_x = np.array([0.0, 1.0]) - line_1_y = np.array([1.0, 2.0]) - line_2_x = np.array([0.0, 1.0]) - line_2_y = np.array([2.0, 3.0]) - - rx, zx, _, _ = utilities.intersection(line_1_x, line_1_y, line_2_x, line_2_y) - assert len(rx) == 0 - assert len(zx) == 0 - - # Test intersecting lines - should return list of len=1 - line_3_x = np.array([0.0, 1.0]) - line_3_y = np.array([2.0, 1.0]) - rx, zx, _, _ = utilities.intersection(line_1_x, line_1_y, line_3_x, line_3_y) - assert len(rx) != 0 - assert len(zx) != 0 diff --git a/tests/unit/converters/test_bin_interp.py b/tests/unit/converters/test_bin_interp.py deleted file mode 100644 index b3345bc1..00000000 --- a/tests/unit/converters/test_bin_interp.py +++ /dev/null @@ -1,236 +0,0 @@ -from copy import deepcopy - -import numpy as np -from pytest import approx -import xarray as xr -from xarray import DataArray - -from indica.converters.time import convert_in_time_dt - - -class Test_bin_interp: - """Provides unit tests for the binning/interpolation converter""" - - nt = 50 - time = np.linspace(0, 0.1, nt) - values = np.sin(np.linspace(0, np.pi * 3, nt)) + np.random.random(nt) - 0.5 - data = DataArray(values, coords=[("t", time)]) - channels = np.array([0, 1, 2, 3]) - d = [] - for c in channels: - d.append(deepcopy(data)) - - data = xr.concat(d, "chan").assign_coords(chan=channels) - error = deepcopy(data) - error.values = np.sqrt(np.abs(data.values)) - dropped = xr.full_like(data, np.nan) - provenance = {"none": None} - partial_provenance = {"none": None} - data.attrs = { - "error": error, - "provenance": provenance, - "partial_provenance": partial_provenance, - } - - dt_data = (data.t[1] - data.t[0]).values - - def test_identity(self): - """Checks identity""" - dt = self.dt_data * 1.0 - - tstart = self.data.t[0].values - tend = self.data.t[-1].values - - try: - _data = convert_in_time_dt(tstart, tend, dt, self.data) - except Exception as e: - raise e - - assert np.all(_data == self.data) - - def test_identity_dt(self): - """Checks identity for dt = dt_data""" - dt = self.dt_data * 1.0 - - tstart = (self.data.t[0] + 5 * self.dt_data).values - tend = (self.data.t[-1] - 10 * self.dt_data).values - - try: - _data = convert_in_time_dt(tstart, tend, dt, self.data) - except Exception as e: - raise e - - for t in _data.t: - delta = np.min(np.abs(t - self.data.t)) - try: - assert approx(delta) == 0 - except AssertionError as e: - print("Original and new time axis aren't identical") - raise e - - delta = np.min( - np.abs(_data.sel(t=t) - self.data.sel(t=t, method="nearest")) - ) - try: - assert approx(delta) == 0 - except AssertionError as e: - print("Original and new data aren't identical") - raise e - - for t in _data.error.t: - delta = np.min(np.abs(t - self.data.error.t)) - try: - assert approx(delta) == 0 - except AssertionError as e: - print("Original and new time axis of error aren't identical") - raise e - - delta = np.min( - np.abs( - _data.error.sel(t=t) - self.data.error.sel(t=t, method="nearest") - ) - ) - try: - assert approx(delta) == 0 - except AssertionError as e: - print("Original and new error aren't identical") - raise e - - def test_binning(self): - """Checks binning works as expected and returned data is withing limits""" - dt = self.dt_data * 3.0 - - tstart = (self.data.t[0] + 5 * self.dt_data).values - tend = (self.data.t[-1] - 10 * self.dt_data).values - - try: - _data = convert_in_time_dt(tstart, tend, dt, self.data) - except Exception as e: - raise e - - _dt = (_data.t[1] - _data.t[0]).values - assert np.all(_data.t <= self.data.t.max()) - assert np.all(_data.t >= self.data.t.min()) - assert _dt == approx(dt) - - _dt = (_data.error.t[1] - _data.error.t[0]).values - assert np.all(_data.error.t <= self.data.error.t.max()) - assert np.all(_data.error.t >= self.data.error.t.min()) - assert _dt == approx(dt) - - def test_interpolation(self): - """Checks interpolation works as expected and returned data is withing limits""" - dt = self.dt_data / 3.0 - - tstart = (self.data.t[0] + 5 * self.dt_data).values - tend = (self.data.t[-1] - 10 * self.dt_data).values - - try: - _data = convert_in_time_dt(tstart, tend, dt, self.data) - except Exception as e: - raise e - - _dt = (_data.t[1] - _data.t[0]).values - assert np.all(_data.t <= self.data.t.max()) - assert np.all(_data.t >= self.data.t.min()) - assert _dt == approx(dt) - - _dt = (_data.error.t[1] - _data.error.t[0]).values - assert np.all(_data.error.t <= self.data.error.t.max()) - assert np.all(_data.error.t >= self.data.error.t.min()) - assert _dt == approx(dt) - - def test_binning_dropped(self): - """Checks binning including dropped channels""" - dt = self.dt_data * 3.0 - - tstart = (self.data.t[0] + 5 * self.dt_data).values - tend = (self.data.t[-1] - 10 * self.dt_data).values - - chan_to_drop = 1 - data = deepcopy(self.data) - data.attrs["dropped"] = self.dropped - data.dropped.loc[dict(chan=chan_to_drop)] = data.sel(chan=chan_to_drop) - data.loc[dict(chan=chan_to_drop)] = np.full_like( - data.sel(chan=chan_to_drop), np.nan - ) - - try: - _data = convert_in_time_dt(tstart, tend, dt, data) - except Exception as e: - raise e - - _dt = (_data.t[1] - _data.t[0]).values - assert np.all(_data.t <= data.t.max()) - assert np.all(_data.t >= data.t.min()) - assert _dt == approx(dt) - - _dt = (_data.error.t[1] - _data.error.t[0]).values - assert np.all(_data.error.t <= data.error.t.max()) - assert np.all(_data.error.t >= data.error.t.min()) - assert _dt == approx(dt) - - _dt = (_data.dropped.t[1] - _data.dropped.t[0]).values - assert np.all(_data.dropped.t <= data.dropped.t.max()) - assert np.all(_data.dropped.t >= data.dropped.t.min()) - assert _dt == approx(dt) - - def test_interpolation_dropped(self): - """Dropped channels are correctly interpolated""" - - dt = self.dt_data / 3.0 - - tstart = (self.data.t[0] + 5 * self.dt_data).values - tend = (self.data.t[-1] - 10 * self.dt_data).values - - chan_to_drop = 1 - data = deepcopy(self.data) - data.attrs["dropped"] = self.dropped - data.dropped.loc[dict(chan=chan_to_drop)] = data.sel(chan=chan_to_drop) - data.loc[dict(chan=chan_to_drop)] = np.full_like( - data.sel(chan=chan_to_drop), np.nan - ) - - try: - _data = convert_in_time_dt(tstart, tend, dt, data) - except Exception as e: - raise e - - _dt = (_data.t[1] - _data.t[0]).values - assert np.all(_data.t <= data.t.max()) - assert np.all(_data.t >= data.t.min()) - assert _dt == approx(dt) - - _dt = (_data.error.t[1] - _data.error.t[0]).values - assert np.all(_data.error.t <= data.error.t.max()) - assert np.all(_data.error.t >= data.error.t.min()) - assert _dt == approx(dt) - - _dt = (_data.dropped.t[1] - _data.dropped.t[0]).values - assert np.all(_data.dropped.t <= data.dropped.t.max()) - assert np.all(_data.dropped.t >= data.dropped.t.min()) - assert _dt == approx(dt) - - def test_wrong_start_time(self): - """Checks start time wrongly set""" - dt = self.dt_data - - tstart = (self.data.t[0] - 5 * self.dt_data).values - tend = (self.data.t[-1] - 10 * self.dt_data).values - - try: - _ = convert_in_time_dt(tstart, tend, dt, self.data) - except ValueError as e: - assert e - - def test_wrong_end_time(self): - """Checks end time wrongly set""" - dt = self.dt_data - - tstart = (self.data.t[0] + 5 * self.dt_data).values - tend = (self.data.t[-1] + 10 * self.dt_data).values - - try: - _ = convert_in_time_dt(tstart, tend, dt, self.data) - except ValueError as e: - assert e diff --git a/tests/unit/converters/test_time_dt.py b/tests/unit/converters/test_time_dt.py index 2a1e61ab..d574060c 100644 --- a/tests/unit/converters/test_time_dt.py +++ b/tests/unit/converters/test_time_dt.py @@ -23,14 +23,7 @@ class Test_time: data = xr.concat(d, "chan").assign_coords(chan=channels) error = deepcopy(data) error.values = np.sqrt(np.abs(data.values)) - dropped = xr.full_like(data, np.nan) - provenance = {"none": None} - partial_provenance = {"none": None} - data.attrs = { - "error": error, - "provenance": provenance, - "partial_provenance": partial_provenance, - } + data = data.assign_coords(error=(data.dims, error.data)) dt_data = (data.t[1] - data.t[0]).values @@ -140,77 +133,6 @@ def test_interpolation(self): assert np.all(_data.error.t >= self.data.error.t.min()) assert _dt == approx(dt) - def test_binning_dropped(self): - """Checks binning including dropped channels""" - dt = self.dt_data * 3.0 - - tstart = (self.data.t[0] + 5 * self.dt_data).values - tend = (self.data.t[-1] - 10 * self.dt_data).values - - chan_to_drop = 1 - data = deepcopy(self.data) - data.attrs["dropped"] = self.dropped - data.dropped.loc[dict(chan=chan_to_drop)] = data.sel(chan=chan_to_drop) - data.loc[dict(chan=chan_to_drop)] = np.full_like( - data.sel(chan=chan_to_drop), np.nan - ) - - try: - _data = convert_in_time_dt(tstart, tend, dt, data) - except Exception as e: - raise e - - _dt = (_data.t[1] - _data.t[0]).values - assert np.all(_data.t <= data.t.max()) - assert np.all(_data.t >= data.t.min()) - assert _dt == approx(dt) - - _dt = (_data.error.t[1] - _data.error.t[0]).values - assert np.all(_data.error.t <= data.error.t.max()) - assert np.all(_data.error.t >= data.error.t.min()) - assert _dt == approx(dt) - - _dt = (_data.dropped.t[1] - _data.dropped.t[0]).values - assert np.all(_data.dropped.t <= data.dropped.t.max()) - assert np.all(_data.dropped.t >= data.dropped.t.min()) - assert _dt == approx(dt) - - def test_interpolation_dropped(self): - """Dropped channels are correctly interpolated""" - - dt = self.dt_data / 3.0 - - tstart = (self.data.t[0] + 5 * self.dt_data).values - tend = (self.data.t[-1] - 10 * self.dt_data).values - - chan_to_drop = 1 - data = deepcopy(self.data) - data.attrs["dropped"] = self.dropped - data.dropped.loc[dict(chan=chan_to_drop)] = data.sel(chan=chan_to_drop) - data.loc[dict(chan=chan_to_drop)] = np.full_like( - data.sel(chan=chan_to_drop), np.nan - ) - - try: - _data = convert_in_time_dt(tstart, tend, dt, data) - except Exception as e: - raise e - - _dt = (_data.t[1] - _data.t[0]).values - assert np.all(_data.t <= data.t.max()) - assert np.all(_data.t >= data.t.min()) - assert _dt == approx(dt) - - _dt = (_data.error.t[1] - _data.error.t[0]).values - assert np.all(_data.error.t <= data.error.t.max()) - assert np.all(_data.error.t >= data.error.t.min()) - assert _dt == approx(dt) - - _dt = (_data.dropped.t[1] - _data.dropped.t[0]).values - assert np.all(_data.dropped.t <= data.dropped.t.max()) - assert np.all(_data.dropped.t >= data.dropped.t.min()) - assert _dt == approx(dt) - def test_wrong_start_time(self): """Checks start time wrongly set""" dt = self.dt_data diff --git a/tests/unit/readers/test_surf.py b/tests/unit/readers/test_surf.py index 0a7c6ba5..3146111f 100644 --- a/tests/unit/readers/test_surf.py +++ b/tests/unit/readers/test_surf.py @@ -65,7 +65,8 @@ def assert_read_sxr_los( @given(integers(28792, 63899), lists(booleans(), min_size=1)) def test_read_sxr_t_los_1(pulse, upper_case): - """Test reading of lines of sight for SXR camera T""" + """Test reading of lines of sight for SXR camera T + TODO: check why this test sometimes fails in CI""" assert_read_sxr_los(pulse, upper_case, "t", 2.848, 2.172, 265, -1)