diff --git a/pybaselines/_weighting.py b/pybaselines/_weighting.py index 15d3ad0..dee24df 100644 --- a/pybaselines/_weighting.py +++ b/pybaselines/_weighting.py @@ -1,10 +1,12 @@ # -*- coding: utf-8 -*- """Contains various weighting schemes used in pybaselines.""" +import warnings + import numpy as np from scipy.special import expit -from .utils import _MIN_FLOAT +from .utils import _MIN_FLOAT, ParameterWarning def _asls(y, baseline, p): @@ -22,7 +24,7 @@ def _asls(y, baseline, p): p : float The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. + will be given `1 - p` weight. Returns ------- @@ -38,11 +40,83 @@ def _asls(y, baseline, p): asymmetric least squares method, Analytical Methods, 2014, 6(12), 4402-4407. """ - mask = y > baseline - weights = p * mask + (1 - p) * (~mask) + weights = np.where(y > baseline, p, 1 - p) return weights +def _airpls(y, baseline, iteration): + """ + The weighting for adaptive iteratively reweighted penalized least squares (airPLS). + + Parameters + ---------- + y : numpy.ndarray, shape (N,) + The measured data. + baseline : numpy.ndarray, shape (N,) + The calculated baseline. + iteration : int + The iteration number. Should be 1-based, such that the first iteration is 1 + instead of 0. + + Returns + ------- + weights : numpy.ndarray, shape (N,) + The calculated weights. + residual_l1_norm : float + The L1 norm of the negative residuals, used to calculate the exit criteria + for the airPLS algorithm. + exit_early : bool + Designates if there is a potential error with the calculation such that no further + iterations should be performed. + + References + ---------- + Zhang, Z.M., et al. Baseline correction using adaptive iteratively + reweighted penalized least squares. Analyst, 2010, 135(5), 1138-1146. + + Notes + ----- + Equation 9 in the original algorithm was misprinted according to the author + (https://github.com/zmzhang/airPLS/issues/8), so the correct weighting is used here. + + The pybaselines weighting differs from the original airPLS algorithm by normalizing the + weights by their maximum value to ensure that weights are within [0, 1]; the original + algorithm is defined such that all weights are greater than or equal to 1 for negative + residuals, which would differ from other Whittaker-smoothing based algorithms and could + potentially cause issues if combining with optimizer functions. + + """ + residual = y - baseline + neg_mask = residual < 0 + neg_residual = residual[neg_mask] + if neg_residual.size < 2: + exit_early = True + warnings.warn( + ('almost all baseline points are below the data, indicating that "tol"' + ' is too low and/or "max_iter" is too high'), ParameterWarning, + stacklevel=2 + ) + return np.zeros_like(y), 0.0, exit_early + else: + exit_early = False + + residual_l1_norm = abs(neg_residual.sum()) + # clip from [0, log(max dtype)] since the positive residuals (negative values) do not matter + log_max = np.log(np.finfo(y.dtype).max) + inner = np.clip( + (-iteration / residual_l1_norm) * neg_residual, + a_min=0, + a_max=log_max - np.spacing(log_max) + ) + new_weights = np.exp(inner) + # Not stated in the paper, but without dividing by the maximum weight, the + # calculated weights are all greater than or equal to 1 + weights = np.zeros_like(y) + weights[neg_mask] = new_weights / new_weights.max() + + return weights, residual_l1_norm, exit_early + + def _safe_std(array, **kwargs): """ Calculates the standard deviation and protects against nan and 0. @@ -96,6 +170,9 @@ def _arpls(y, baseline): ------- weights : numpy.ndarray, shape (N,) The calculated weights. + exit_early : bool + Designates if there is a potential error with the calculation such that no further + iterations should be performed. References ---------- @@ -105,10 +182,20 @@ def _arpls(y, baseline): """ residual = y - baseline neg_residual = residual[residual < 0] + if neg_residual.size < 2: + exit_early = True + warnings.warn( + ('almost all baseline points are below the data, indicating that "tol"' + ' is too low and/or "max_iter" is too high'), ParameterWarning, + stacklevel=2 + ) + return np.zeros_like(y), exit_early + else: + exit_early = False std = _safe_std(neg_residual, ddof=1) # use dof=1 since sampling subset # add a negative sign since expit performs 1/(1+exp(-input)) weights = expit(-(2 / std) * (residual - (2 * std - np.mean(neg_residual)))) - return weights + return weights, exit_early def _drpls(y, baseline, iteration): @@ -129,6 +216,9 @@ def _drpls(y, baseline, iteration): ------- weights : numpy.ndarray, shape (N,) The calculated weights. + exit_early : bool + Designates if there is a potential error with the calculation such that no further + iterations should be performed. References ---------- @@ -138,10 +228,25 @@ def _drpls(y, baseline, iteration): """ residual = y - baseline neg_residual = residual[residual < 0] + if neg_residual.size < 2: + exit_early = True + warnings.warn( + ('almost all baseline points are below the data, indicating that "tol"' + ' is too low and/or "max_iter" is too high'), ParameterWarning, + stacklevel=2 + ) + return np.zeros_like(y), exit_early + else: + exit_early = False + std = _safe_std(neg_residual, ddof=1) # use dof=1 since only sampling a subset - inner = (np.exp(iteration) / std) * (residual - (2 * std - np.mean(neg_residual))) + # the exponential term is used to change the shape of the weighting from a logistic curve + # at low iterations to a step curve at higher iterations (figure 1 in the paper); setting + # the maximum iteration to 100 still acheives this purpose while avoiding unnecesarry + # overflow for high iterations + inner = (np.exp(min(iteration, 100)) / std) * (residual - (2 * std - np.mean(neg_residual))) weights = 0.5 * (1 - (inner / (1 + np.abs(inner)))) - return weights + return weights, exit_early def _iarpls(y, baseline, iteration): @@ -162,6 +267,9 @@ def _iarpls(y, baseline, iteration): ------- weights : numpy.ndarray, shape (N,) The calculated weights. + exit_early : bool + Designates if there is a potential error with the calculation such that no further + iterations should be performed. References ---------- @@ -171,13 +279,29 @@ def _iarpls(y, baseline, iteration): """ residual = y - baseline - std = _safe_std(residual[residual < 0], ddof=1) # dof=1 since sampling a subset - inner = (np.exp(iteration) / std) * (residual - 2 * std) + neg_residual = residual[residual < 0] + if neg_residual.size < 2: + exit_early = True + warnings.warn( + ('almost all baseline points are below the data, indicating that "tol"' + ' is too low and/or "max_iter" is too high'), ParameterWarning, + stacklevel=2 + ) + return np.zeros_like(y), exit_early + else: + exit_early = False + + std = _safe_std(neg_residual, ddof=1) # use dof=1 since only sampling a subset + # the exponential term is used to change the shape of the weighting from a logistic curve + # at low iterations to a step curve at higher iterations (figure 1 in the paper); setting + # the maximum iteration to 100 still acheives this purpose while avoiding unnecesarry + # overflow for high iterations + inner = (np.exp(min(iteration, 100)) / std) * (residual - 2 * std) weights = 0.5 * (1 - (inner / np.sqrt(1 + inner**2))) - return weights + return weights, exit_early -def _aspls(y, baseline): +def _aspls(y, baseline, assymetric_coef): """ Weighting for the adaptive smoothness penalized least squares smoothing (aspls). @@ -187,6 +311,9 @@ def _aspls(y, baseline): The measured data. baseline : numpy.ndarray, shape (N,) The calculated baseline. + assymetric_coef : float + The assymetric coefficient for the weighting. Higher values leads to a steeper + weighting curve (ie. more step-like). Returns ------- @@ -194,10 +321,13 @@ def _aspls(y, baseline): The calculated weights. residual : numpy.ndarray, shape (N,) The residual, ``y - baseline``. + exit_early : bool + Designates if there is a potential error with the calculation such that no further + iterations should be performed. Notes ----- - The weighting uses an asymmetric coefficient (`k` in the asPLS paper) of 0.5 instead + The default asymmetric coefficient (`k` in the asPLS paper) is 0.5 instead of the 2 listed in the asPLS paper. pybaselines uses the factor of 0.5 since it matches the results in Table 2 and Figure 5 of the asPLS paper closer than the factor of 2 and fits noisy data much better. @@ -209,10 +339,22 @@ def _aspls(y, baseline): """ residual = y - baseline - std = _safe_std(residual[residual < 0], ddof=1) # use dof=1 since sampling a subset + neg_residual = residual[residual < 0] + if neg_residual.size < 2: + exit_early = True + warnings.warn( + ('almost all baseline points are below the data, indicating that "tol"' + ' is too low and/or "max_iter" is too high'), ParameterWarning, + stacklevel=2 + ) + return np.zeros_like(y), residual, exit_early + else: + exit_early = False + std = _safe_std(neg_residual, ddof=1) # use dof=1 since sampling subset + # add a negative sign since expit performs 1/(1+exp(-input)) - weights = expit(-(0.5 / std) * (residual - std)) - return weights, residual + weights = expit(-(assymetric_coef / std) * (residual - std)) + return weights, residual, exit_early def _psalsa(y, baseline, p, k, shape_y): @@ -228,7 +370,7 @@ def _psalsa(y, baseline, p, k, shape_y): p : float The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. + will be given `1 - p` weight. k : float A factor that controls the exponential decay of the weights for baseline values greater than the data. Should be approximately the height at which @@ -270,7 +412,7 @@ def _derpsalsa(y, baseline, p, k, shape_y, partial_weights): p : float The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. + will be given `1 - p` weight. k : float A factor that controls the exponential decay of the weights for baseline values greater than the data. Should be approximately the height at which @@ -306,7 +448,7 @@ def _derpsalsa(y, baseline, p, k, shape_y, partial_weights): # since it's faster than performing the square and exp on the full residual weights = np.full(shape_y, 1 - p, dtype=float) mask = residual > 0 - weights[mask] = p * np.exp(-((residual[mask] / k)**2) / 2) + weights[mask] = p * np.exp(-0.5 * ((residual[mask] / k)**2)) weights *= partial_weights return weights diff --git a/pybaselines/spline.py b/pybaselines/spline.py index d205692..5b29f18 100644 --- a/pybaselines/spline.py +++ b/pybaselines/spline.py @@ -16,7 +16,7 @@ from ._banded_utils import _add_diagonals, _shift_rows, diff_penalty_diagonals from ._compat import dia_object, jit, trapezoid from ._spline_utils import _basis_midpoints -from ._validation import _check_lam, _check_optional_array +from ._validation import _check_lam, _check_optional_array, _check_scalar_variable from .utils import ( ParameterWarning, _mollifier_kernel, _sort_array, gaussian, pad_edges, padded_convolve, relative_difference @@ -46,7 +46,7 @@ def mixture_model(self, data, lam=1e5, p=1e-2, num_knots=100, spline_degree=3, d p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Used to set the initial weights before performing + will be given `1 - p` weight. Used to set the initial weights before performing expectation-maximization. Default is 1e-2. num_knots : int, optional The number of knots for the spline. Default is 100. @@ -381,7 +381,7 @@ def pspline_asls(self, data, lam=1e3, p=1e-2, num_knots=100, spline_degree=3, di p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. num_knots : int, optional The number of knots for the spline. Default is 100. spline_degree : int, optional @@ -469,7 +469,7 @@ def pspline_iasls(self, data, lam=1e1, p=1e-2, lam_1=1e-4, num_knots=100, p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. lam_1 : float, optional The smoothing parameter for the first derivative of the residual. Default is 1e-4. num_knots : int, optional @@ -621,47 +621,22 @@ def pspline_airpls(self, data, lam=1e3, num_knots=100, spline_degree=3, """ y, weight_array = self._setup_spline( - data, weights, spline_degree, num_knots, True, diff_order, lam, copy_weights=True + data, weights, spline_degree, num_knots, True, diff_order, lam ) y_l1_norm = np.abs(y).sum() tol_history = np.empty(max_iter + 1) for i in range(1, max_iter + 2): - try: - output = self.pspline.solve_pspline(y, weight_array) - except np.linalg.LinAlgError: - warnings.warn( - ('error occurred during fitting, indicating that "tol"' - ' is too low, "max_iter" is too high, or "lam" is too high'), - ParameterWarning, stacklevel=2 - ) - i -= 1 # reduce i so that output tol_history indexing is correct - break - else: - baseline = output - - residual = y - baseline - neg_mask = residual < 0 - neg_residual = residual[neg_mask] - if len(neg_residual) < 2: - # exit if there are < 2 negative residuals since all points or all but one - # point would get a weight of 0, which fails the solver - warnings.warn( - ('almost all baseline points are below the data, indicating that "tol"' - ' is too low and/or "max_iter" is too high'), ParameterWarning, stacklevel=2 - ) + baseline = self.pspline.solve_pspline(y, weight_array) + new_weights, residual_l1_norm, exit_early = _weighting._airpls(y, baseline, i) + if exit_early: i -= 1 # reduce i so that output tol_history indexing is correct break - - residual_l1_norm = abs(neg_residual.sum()) calc_difference = residual_l1_norm / y_l1_norm tol_history[i - 1] = calc_difference if calc_difference < tol: break - # only use negative residual in exp to avoid exponential overflow warnings - # and accidently creating a weight of nan (inf * 0 = nan) - weight_array[neg_mask] = np.exp(i * neg_residual / residual_l1_norm) - weight_array[~neg_mask] = 0 + weight_array = new_weights params = {'weights': weight_array, 'tol_history': tol_history[:i]} @@ -730,7 +705,10 @@ def pspline_arpls(self, data, lam=1e3, num_knots=100, spline_degree=3, diff_orde tol_history = np.empty(max_iter + 1) for i in range(max_iter + 1): baseline = self.pspline.solve_pspline(y, weight_array) - new_weights = _weighting._arpls(y, baseline) + new_weights, exit_early = _weighting._arpls(y, baseline) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break calc_difference = relative_difference(weight_array, new_weights) tol_history[i] = calc_difference if calc_difference < tol: @@ -834,23 +812,14 @@ def pspline_drpls(self, data, lam=1e3, eta=0.5, num_knots=100, spline_degree=3, y, weight_array, penalty=_add_diagonals(self.pspline.penalty, diff_n_w_diagonals, lower_only=False) ) - new_weights = _weighting._drpls(y, baseline, i) + new_weights, exit_early = _weighting._drpls(y, baseline, i) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break + calc_difference = relative_difference(weight_array, new_weights) tol_history[i - 1] = calc_difference - if not np.isfinite(calc_difference): - # catches nan, inf and -inf due to exp(i) being too high or if there - # are too few negative residuals; no way to catch both conditions before - # new_weights calculation since it is hard to estimate if - # (exp(i) / std) * residual will overflow; check calc_difference rather - # than checking new_weights since non-finite values rarely occur and - # checking a scalar is faster; cannot use np.errstate since it is not 100% reliable - warnings.warn( - ('nan and/or +/- inf occurred in weighting calculation, likely meaning ' - '"tol" is too low and/or "max_iter" is too high'), ParameterWarning, - stacklevel=2 - ) - break - elif calc_difference < tol: + if calc_difference < tol: break weight_array = new_weights @@ -922,23 +891,13 @@ def pspline_iarpls(self, data, lam=1e3, num_knots=100, spline_degree=3, diff_ord tol_history = np.empty(max_iter + 1) for i in range(1, max_iter + 2): baseline = self.pspline.solve_pspline(y, weight_array) - new_weights = _weighting._iarpls(y, baseline, i) + new_weights, exit_early = _weighting._iarpls(y, baseline, i) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break calc_difference = relative_difference(weight_array, new_weights) tol_history[i - 1] = calc_difference - if not np.isfinite(calc_difference): - # catches nan, inf and -inf due to exp(i) being too high or if there - # are too few negative residuals; no way to catch both conditions before - # new_weights calculation since it is hard to estimate if - # (exp(i) / std) * residual will overflow; check calc_difference rather - # than checking new_weights since non-finite values rarely occur and - # checking a scalar is faster; cannot use np.errstate since it is not 100% reliable - warnings.warn( - ('nan and/or +/- inf occurred in weighting calculation, likely meaning ' - '"tol" is too low and/or "max_iter" is too high'), ParameterWarning, - stacklevel=2 - ) - break - elif calc_difference < tol: + if calc_difference < tol: break weight_array = new_weights @@ -948,7 +907,7 @@ def pspline_iarpls(self, data, lam=1e3, num_knots=100, spline_degree=3, diff_ord @_Algorithm._register(sort_keys=('weights', 'alpha'), dtype=float, order='C') def pspline_aspls(self, data, lam=1e4, num_knots=100, spline_degree=3, diff_order=2, - max_iter=100, tol=1e-3, weights=None, alpha=None): + max_iter=100, tol=1e-3, weights=None, alpha=None, assymetric_coef=0.5): """ A penalized spline version of the asPLS algorithm. @@ -978,6 +937,9 @@ def pspline_aspls(self, data, lam=1e4, num_knots=100, spline_degree=3, diff_orde An array of values that control the local value of `lam` to better fit peak and non-peak regions. If None (default), then the initial values will be an array with size equal to N and all values set to 1. + assymetric_coef : float + The assymetric coefficient for the weighting. Higher values leads to a steeper + weighting curve (ie. more step-like). Default is 0.5. Returns ------- @@ -996,13 +958,19 @@ def pspline_aspls(self, data, lam=1e4, num_knots=100, spline_degree=3, diff_orde completed. If the last value in the array is greater than the input `tol` value, then the function did not converge. + Raises + ------ + ValueError + Raised if `alpha` and `data` do not have the same shape. Also raised if + `assymetric_coef` is not greater than 0. + See Also -------- Baseline.aspls Notes ----- - The weighting uses an asymmetric coefficient (`k` in the asPLS paper) of 0.5 instead + The default asymmetric coefficient (`k` in the asPLS paper) is 0.5 instead of the 2 listed in the asPLS paper. pybaselines uses the factor of 0.5 since it matches the results in Table 2 and Figure 5 of the asPLS paper closer than the factor of 2 and fits noisy data much better. @@ -1026,6 +994,7 @@ def pspline_aspls(self, data, lam=1e4, num_knots=100, spline_degree=3, diff_orde ) if self._sort_order is not None and alpha is not None: alpha_array = alpha_array[self._sort_order] + assymetric_coef = _check_scalar_variable(assymetric_coef, variable_name='assymetric_coef') interp_pts = _basis_midpoints(self.pspline.knots, self.pspline.spline_degree) tol_history = np.empty(max_iter + 1) @@ -1036,7 +1005,10 @@ def pspline_aspls(self, data, lam=1e4, num_knots=100, spline_degree=3, diff_orde self.pspline.num_bands, self.pspline.num_bands ) baseline = self.pspline.solve_pspline(y, weight_array, alpha_penalty) - new_weights, residual = _weighting._aspls(y, baseline) + new_weights, residual, exit_early = _weighting._aspls(y, baseline, assymetric_coef) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break calc_difference = relative_difference(weight_array, new_weights) tol_history[i] = calc_difference if calc_difference < tol: @@ -1068,7 +1040,7 @@ def pspline_psalsa(self, data, lam=1e3, p=0.5, k=None, num_knots=100, spline_deg p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 0.5. + will be given `1 - p` weight. Default is 0.5. k : float, optional A factor that controls the exponential decay of the weights for baseline values greater than the data. Should be approximately the height at which @@ -1108,7 +1080,8 @@ def pspline_psalsa(self, data, lam=1e3, p=0.5, k=None, num_knots=100, spline_deg Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. See Also -------- @@ -1132,6 +1105,8 @@ def pspline_psalsa(self, data, lam=1e3, p=0.5, k=None, num_knots=100, spline_deg ) if k is None: k = np.std(y) / 10 + else: + k = _check_scalar_variable(k, variable_name='k') tol_history = np.empty(max_iter + 1) for i in range(max_iter + 1): baseline = self.pspline.solve_pspline(y, weight_array) @@ -1164,7 +1139,7 @@ def pspline_derpsalsa(self, data, lam=1e2, p=1e-2, k=None, num_knots=100, spline p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. k : float, optional A factor that controls the exponential decay of the weights for baseline values greater than the data. Should be approximately the height at which @@ -1213,7 +1188,8 @@ def pspline_derpsalsa(self, data, lam=1e2, p=1e-2, k=None, num_knots=100, spline Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. See Also -------- @@ -1236,6 +1212,8 @@ def pspline_derpsalsa(self, data, lam=1e2, p=1e-2, k=None, num_knots=100, spline ) if k is None: k = np.std(y) / 10 + else: + k = _check_scalar_variable(k, variable_name='k') if smooth_half_window is None: smooth_half_window = self._size // 200 @@ -1414,7 +1392,7 @@ def mixture_model(data, lam=1e5, p=1e-2, num_knots=100, spline_degree=3, diff_or p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Used to set the initial weights before performing + will be given `1 - p` weight. Used to set the initial weights before performing expectation-maximization. Default is 1e-2. num_knots : int, optional The number of knots for the spline. Default is 100. @@ -1743,7 +1721,7 @@ def pspline_asls(data, lam=1e3, p=1e-2, num_knots=100, spline_degree=3, diff_ord p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. num_knots : int, optional The number of knots for the spline. Default is 100. spline_degree : int, optional @@ -1819,7 +1797,7 @@ def pspline_iasls(data, x_data=None, lam=1e1, p=1e-2, lam_1=1e-4, num_knots=100, p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. lam_1 : float, optional The smoothing parameter for the first derivative of the residual. Default is 1e-4. num_knots : int, optional @@ -2132,7 +2110,8 @@ def pspline_iarpls(data, lam=1e3, num_knots=100, spline_degree=3, diff_order=2, @_spline_wrapper def pspline_aspls(data, lam=1e4, num_knots=100, spline_degree=3, diff_order=2, - max_iter=100, tol=1e-3, weights=None, alpha=None, x_data=None): + max_iter=100, tol=1e-3, weights=None, alpha=None, x_data=None, + assymetric_coef=0.5): """ A penalized spline version of the asPLS algorithm. @@ -2165,6 +2144,9 @@ def pspline_aspls(data, lam=1e4, num_knots=100, spline_degree=3, diff_order=2, x_data : array-like, shape (N,), optional The x-values of the measured data. Default is None, which will create an array from -1 to 1 with N points. + assymetric_coef : float + The assymetric coefficient for the weighting. Higher values leads to a steeper + weighting curve (ie. more step-like). Default is 0.5. Returns ------- @@ -2183,13 +2165,19 @@ def pspline_aspls(data, lam=1e4, num_knots=100, spline_degree=3, diff_order=2, completed. If the last value in the array is greater than the input `tol` value, then the function did not converge. + Raises + ------ + ValueError + Raised if `alpha` and `data` do not have the same shape. Also raised if + `assymetric_coef` is not greater than 0. + See Also -------- pybaselines.whittaker.aspls Notes ----- - The weighting uses an asymmetric coefficient (`k` in the asPLS paper) of 0.5 instead + The default asymmetric coefficient (`k` in the asPLS paper) is 0.5 instead of the 2 listed in the asPLS paper. pybaselines uses the factor of 0.5 since it matches the results in Table 2 and Figure 5 of the asPLS paper closer than the factor of 2 and fits noisy data much better. @@ -2223,7 +2211,7 @@ def pspline_psalsa(data, lam=1e3, p=0.5, k=None, num_knots=100, spline_degree=3, p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 0.5. + will be given `1 - p` weight. Default is 0.5. k : float, optional A factor that controls the exponential decay of the weights for baseline values greater than the data. Should be approximately the height at which @@ -2266,7 +2254,8 @@ def pspline_psalsa(data, lam=1e3, p=0.5, k=None, num_knots=100, spline_degree=3, Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. See Also -------- @@ -2302,7 +2291,7 @@ def pspline_derpsalsa(data, lam=1e2, p=1e-2, k=None, num_knots=100, spline_degre p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. k : float, optional A factor that controls the exponential decay of the weights for baseline values greater than the data. Should be approximately the height at which @@ -2354,7 +2343,8 @@ def pspline_derpsalsa(data, lam=1e2, p=1e-2, k=None, num_knots=100, spline_degre Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. See Also -------- diff --git a/pybaselines/two_d/spline.py b/pybaselines/two_d/spline.py index ccfca8f..ed5c799 100644 --- a/pybaselines/two_d/spline.py +++ b/pybaselines/two_d/spline.py @@ -11,6 +11,7 @@ import numpy as np from .. import _weighting +from .._validation import _check_scalar_variable from ..utils import ParameterWarning, gaussian, relative_difference from ._algorithm_setup import _Algorithm2D from ._whittaker_utils import PenalizedSystem2D @@ -39,7 +40,7 @@ def mixture_model(self, data, lam=1e3, p=1e-2, num_knots=25, spline_degree=3, di p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Used to set the initial weights before performing + will be given `1 - p` weight. Used to set the initial weights before performing expectation-maximization. Default is 1e-2. num_knots : int or Sequence[int, int], optional The number of knots for the splines along the rows and columns, respectively. If a @@ -292,7 +293,7 @@ def pspline_asls(self, data, lam=1e3, p=1e-2, num_knots=25, spline_degree=3, dif p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. num_knots : int or Sequence[int, int], optional The number of knots for the splines along the rows and columns, respectively. If a single value is given, both will use the same value. Default is 25. @@ -383,7 +384,7 @@ def pspline_iasls(self, data, lam=1e3, p=1e-2, lam_1=1e-4, num_knots=25, p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. lam_1 : float, optional The smoothing parameter for the first derivative of the residual. Default is 1e-4. num_knots : int, optional @@ -529,48 +530,22 @@ def pspline_airpls(self, data, lam=1e3, num_knots=25, spline_degree=3, """ y, weight_array = self._setup_spline( - data, weights, spline_degree, num_knots, True, diff_order, lam, copy_weights=True + data, weights, spline_degree, num_knots, True, diff_order, lam ) y_l1_norm = np.abs(y).sum() tol_history = np.empty(max_iter + 1) for i in range(1, max_iter + 2): - try: - output = self.pspline.solve(y, weight_array) - except np.linalg.LinAlgError: - warnings.warn( - ('error occurred during fitting, indicating that "tol"' - ' is too low, "max_iter" is too high, or "lam" is too high'), - ParameterWarning, stacklevel=2 - ) - i -= 1 # reduce i so that output tol_history indexing is correct - break - else: - baseline = output - - residual = y - baseline - neg_mask = residual < 0 - neg_residual = residual[neg_mask] - if len(neg_residual) < 2: - # exit if there are < 2 negative residuals since all points or all but one - # point would get a weight of 0, which fails the solver - warnings.warn( - ('almost all baseline points are below the data, indicating that "tol"' - ' is too low and/or "max_iter" is too high'), ParameterWarning, - stacklevel=2 - ) + baseline = self.pspline.solve(y, weight_array) + new_weights, residual_l1_norm, exit_early = _weighting._airpls(y, baseline, i) + if exit_early: i -= 1 # reduce i so that output tol_history indexing is correct break - - residual_l1_norm = abs(neg_residual.sum()) calc_difference = residual_l1_norm / y_l1_norm tol_history[i - 1] = calc_difference if calc_difference < tol: break - # only use negative residual in exp to avoid exponential overflow warnings - # and accidently creating a weight of nan (inf * 0 = nan) - weight_array[neg_mask] = np.exp(i * neg_residual / residual_l1_norm) - weight_array[~neg_mask] = 0 + weight_array = new_weights params = {'weights': weight_array, 'tol_history': tol_history[:i]} @@ -639,7 +614,10 @@ def pspline_arpls(self, data, lam=1e3, num_knots=25, spline_degree=3, diff_order tol_history = np.empty(max_iter + 1) for i in range(max_iter + 1): baseline = self.pspline.solve(y, weight_array) - new_weights = _weighting._arpls(y, baseline) + new_weights, exit_early = _weighting._arpls(y, baseline) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break calc_difference = relative_difference(weight_array, new_weights) tol_history[i] = calc_difference if calc_difference < tol: @@ -717,23 +695,13 @@ def pspline_iarpls(self, data, lam=1e3, num_knots=25, spline_degree=3, diff_orde tol_history = np.empty(max_iter + 1) for i in range(1, max_iter + 2): baseline = self.pspline.solve(y, weight_array) - new_weights = _weighting._iarpls(y, baseline, i) + new_weights, exit_early = _weighting._iarpls(y, baseline, i) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break calc_difference = relative_difference(weight_array, new_weights) tol_history[i - 1] = calc_difference - if not np.isfinite(calc_difference): - # catches nan, inf and -inf due to exp(i) being too high or if there - # are too few negative residuals; no way to catch both conditions before - # new_weights calculation since it is hard to estimate if - # (exp(i) / std) * residual will overflow; check calc_difference rather - # than checking new_weights since non-finite values rarely occur and - # checking a scalar is faster; cannot use np.errstate since it is not 100% reliable - warnings.warn( - ('nan and/or +/- inf occurred in weighting calculation, likely meaning ' - '"tol" is too low and/or "max_iter" is too high'), ParameterWarning, - stacklevel=2 - ) - break - elif calc_difference < tol: + if calc_difference < tol: break weight_array = new_weights @@ -758,7 +726,7 @@ def pspline_psalsa(self, data, lam=1e3, p=0.5, k=None, num_knots=25, spline_degr p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 0.5. + will be given `1 - p` weight. Default is 0.5. k : float, optional A factor that controls the exponential decay of the weights for baseline values greater than the data. Should be approximately the height at which @@ -798,7 +766,8 @@ def pspline_psalsa(self, data, lam=1e3, p=0.5, k=None, num_knots=25, spline_degr Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. See Also -------- @@ -822,6 +791,8 @@ def pspline_psalsa(self, data, lam=1e3, p=0.5, k=None, num_knots=25, spline_degr ) if k is None: k = np.std(y) / 10 + else: + k = _check_scalar_variable(k, variable_name='k') tol_history = np.empty(max_iter + 1) for i in range(max_iter + 1): baseline = self.pspline.solve(y, weight_array) diff --git a/pybaselines/two_d/whittaker.py b/pybaselines/two_d/whittaker.py index 87e8d70..b669db1 100644 --- a/pybaselines/two_d/whittaker.py +++ b/pybaselines/two_d/whittaker.py @@ -6,14 +6,12 @@ """ -import warnings - import numpy as np from .. import _weighting from .._compat import diags -from .._validation import _check_optional_array -from ..utils import _MIN_FLOAT, ParameterWarning, relative_difference +from .._validation import _check_optional_array, _check_scalar_variable +from ..utils import _MIN_FLOAT, relative_difference from ._algorithm_setup import _Algorithm2D from ._whittaker_utils import PenalizedSystem2D @@ -38,7 +36,7 @@ def asls(self, data, lam=1e6, p=1e-2, diff_order=2, max_iter=50, tol=1e-3, weigh p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. diff_order : int or Sequence[int, int], optional The order of the differential matrix for the rows and columns, respectively. If a single value is given, both will use the same value. Must be greater than 0. @@ -143,7 +141,7 @@ def iasls(self, data, lam=1e6, p=1e-2, lam_1=1e-4, max_iter=50, tol=1e-3, p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. lam_1 : float or Sequence[float, float], optional The smoothing parameter for the rows and columns, respectively, of the first derivative of the residual. Default is 1e-4. @@ -282,50 +280,22 @@ def airpls(self, data, lam=1e6, diff_order=2, max_iter=50, tol=1e-3, weights=Non """ y, weight_array = self._setup_whittaker( - data, lam, diff_order, weights, copy_weights=True, num_eigens=num_eigens + data, lam, diff_order, weights, num_eigens=num_eigens ) y_l1_norm = np.abs(y).sum() tol_history = np.empty(max_iter + 1) - # Have to have extensive error handling since the weights can all become - # very small due to the exp(i) term if too many iterations are performed; - # checking the negative residual length usually prevents any errors, but - # sometimes not so have to also catch any errors from the solvers for i in range(1, max_iter + 2): - try: - output = self.whittaker_system.solve(y, weight_array) - except np.linalg.LinAlgError: - warnings.warn( - ('error occurred during fitting, indicating that "tol"' - ' is too low, "max_iter" is too high, or "lam" is too high'), - ParameterWarning, stacklevel=2 - ) - i -= 1 # reduce i so that output tol_history indexing is correct - break - else: - baseline = output - residual = y - baseline - neg_mask = residual < 0 - neg_residual = residual[neg_mask] - if neg_residual.size < 2: - # exit if there are < 2 negative residuals since all points or all but one - # point would get a weight of 0, which fails the solver - warnings.warn( - ('almost all baseline points are below the data, indicating that "tol"' - ' is too low and/or "max_iter" is too high'), ParameterWarning, stacklevel=2 - ) + baseline = self.whittaker_system.solve(y, weight_array) + new_weights, residual_l1_norm, exit_early = _weighting._airpls(y, baseline, i) + if exit_early: i -= 1 # reduce i so that output tol_history indexing is correct break - - residual_l1_norm = abs(neg_residual.sum()) calc_difference = residual_l1_norm / y_l1_norm tol_history[i - 1] = calc_difference if calc_difference < tol: break - # only use negative residual in exp to avoid exponential overflow warnings - # and accidently creating a weight of nan (inf * 0 = nan) - weight_array[neg_mask] = np.exp(i * neg_residual / residual_l1_norm) - weight_array[~neg_mask] = 0 + weight_array = new_weights params = {'tol_history': tol_history[:i]} if self.whittaker_system._using_svd: @@ -408,7 +378,10 @@ def arpls(self, data, lam=1e3, diff_order=2, max_iter=50, tol=1e-3, weights=None tol_history = np.empty(max_iter + 1) for i in range(max_iter + 1): baseline = self.whittaker_system.solve(y, weight_array) - new_weights = _weighting._arpls(y, baseline) + new_weights, exit_early = _weighting._arpls(y, baseline) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break calc_difference = relative_difference(weight_array, new_weights) tol_history[i] = calc_difference if calc_difference < tol: @@ -500,23 +473,14 @@ def drpls(self, data, lam=1e5, eta=0.5, max_iter=50, tol=1e-3, weights=None, dif baseline = self.whittaker_system.direct_solve( partial_penalty + weight_matrix @ partial_penalty_2, weight_array * y ) - new_weights = _weighting._drpls(y, baseline, i) + new_weights, exit_early = _weighting._drpls(y, baseline, i) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break + calc_difference = relative_difference(weight_array, new_weights) tol_history[i - 1] = calc_difference - if not np.isfinite(calc_difference): - # catches nan, inf and -inf due to exp(i) being too high or if there - # are too few negative residuals; no way to catch both conditions before - # new_weights calculation since it is hard to estimate if - # (exp(i) / std) * residual will overflow; check calc_difference rather - # than checking new_weights since non-finite values rarely occur and - # checking a scalar is faster; cannot use np.errstate since it is not 100% reliable - warnings.warn( - ('nan and/or +/- inf occurred in weighting calculation, likely meaning ' - '"tol" is too low and/or "max_iter" is too high'), ParameterWarning, - stacklevel=2 - ) - break - elif calc_difference < tol: + if calc_difference < tol: break weight_array = new_weights weight_matrix.setdiag(weight_array) @@ -596,23 +560,13 @@ def iarpls(self, data, lam=1e5, diff_order=2, max_iter=50, tol=1e-3, weights=Non tol_history = np.empty(max_iter + 1) for i in range(1, max_iter + 2): baseline = self.whittaker_system.solve(y, weight_array) - new_weights = _weighting._iarpls(y, baseline, i) + new_weights, exit_early = _weighting._iarpls(y, baseline, i) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break calc_difference = relative_difference(weight_array, new_weights) tol_history[i - 1] = calc_difference - if not np.isfinite(calc_difference): - # catches nan, inf and -inf due to exp(i) being too high or if there - # are too few negative residuals; no way to catch both conditions before - # new_weights calculation since it is hard to estimate if - # (exp(i) / std) * residual will overflow; check calc_difference rather - # than checking new_weights since non-finite values rarely occur and - # checking a scalar is faster; cannot use np.errstate since it is not 100% reliable - warnings.warn( - ('nan and/or +/- inf occurred in weighting calculation, likely meaning ' - '"tol" is too low and/or "max_iter" is too high'), ParameterWarning, - stacklevel=2 - ) - break - elif calc_difference < tol: + if calc_difference < tol: break weight_array = new_weights @@ -631,7 +585,7 @@ def iarpls(self, data, lam=1e5, diff_order=2, max_iter=50, tol=1e-3, weights=Non sort_keys=('weights', 'alpha'), reshape_keys=('weights', 'alpha'), reshape_baseline=True ) def aspls(self, data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, - weights=None, alpha=None): + weights=None, alpha=None, assymetric_coef=0.5): """ Adaptive smoothness penalized least squares smoothing (asPLS). @@ -658,6 +612,9 @@ def aspls(self, data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, An array of values that control the local value of `lam` to better fit peak and non-peak regions. If None (default), then the initial values will be an array with shape equal to (M, N) and all values set to 1. + assymetric_coef : float + The assymetric coefficient for the weighting. Higher values leads to a steeper + weighting curve (ie. more step-like). Default is 0.5. Returns ------- @@ -676,9 +633,15 @@ def aspls(self, data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, completed. If the last value in the array is greater than the input `tol` value, then the function did not converge. + Raises + ------ + ValueError + Raised if `alpha` and `data` do not have the same shape. Also raised if + `assymetric_coef` is not greater than 0. + Notes ----- - The weighting uses an asymmetric coefficient (`k` in the asPLS paper) of 0.5 instead + The default asymmetric coefficient (`k` in the asPLS paper) is 0.5 instead of the 2 listed in the asPLS paper. pybaselines uses the factor of 0.5 since it matches the results in Table 2 and Figure 5 of the asPLS paper closer than the factor of 2 and fits noisy data much better. @@ -697,6 +660,7 @@ def aspls(self, data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, ) if self._sort_order is not None and alpha is not None: alpha_array = alpha_array[self._sort_order] + assymetric_coef = _check_scalar_variable(assymetric_coef, variable_name='assymetric_coef') # use a sparse matrix to maintain sparsity after multiplication; implementation note: # could skip making an alpha matrix and just use alpha_array[:, None] * penalty once @@ -706,7 +670,10 @@ def aspls(self, data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, for i in range(max_iter + 1): penalty = alpha_matrix @ self.whittaker_system.penalty baseline = self.whittaker_system.solve(y, weight_array, penalty=penalty) - new_weights, residual = _weighting._aspls(y, baseline) + new_weights, residual, exit_early = _weighting._aspls(y, baseline, assymetric_coef) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break calc_difference = relative_difference(weight_array, new_weights) tol_history[i] = calc_difference if calc_difference < tol: @@ -745,7 +712,7 @@ def psalsa(self, data, lam=1e5, p=0.5, k=None, diff_order=2, max_iter=50, tol=1e p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 0.5. + will be given `1 - p` weight. Default is 0.5. k : float, optional A factor that controls the exponential decay of the weights for baseline values greater than the data. Should be approximately the height at which @@ -796,7 +763,8 @@ def psalsa(self, data, lam=1e5, p=0.5, k=None, diff_order=2, max_iter=50, tol=1e Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. Notes ----- @@ -822,6 +790,8 @@ def psalsa(self, data, lam=1e5, p=0.5, k=None, diff_order=2, max_iter=50, tol=1e ) if k is None: k = np.std(y) / 10 + else: + k = _check_scalar_variable(k, variable_name='k') shape = self._shape if self.whittaker_system._using_svd else self._size tol_history = np.empty(max_iter + 1) diff --git a/pybaselines/whittaker.py b/pybaselines/whittaker.py index 4ee45ad..720f6b5 100644 --- a/pybaselines/whittaker.py +++ b/pybaselines/whittaker.py @@ -6,17 +6,13 @@ """ -import warnings - import numpy as np from . import _weighting from ._algorithm_setup import _Algorithm, _class_wrapper from ._banded_utils import _shift_rows, diff_penalty_diagonals -from ._validation import _check_lam, _check_optional_array -from .utils import ( - ParameterWarning, _mollifier_kernel, pad_edges, padded_convolve, relative_difference -) +from ._validation import _check_lam, _check_optional_array, _check_scalar_variable +from .utils import _mollifier_kernel, pad_edges, padded_convolve, relative_difference class _Whittaker(_Algorithm): @@ -38,7 +34,7 @@ def asls(self, data, lam=1e6, p=1e-2, diff_order=2, max_iter=50, tol=1e-3, weigh p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. diff_order : int, optional The order of the differential matrix. Must be greater than 0. Default is 2 (second order differential matrix). Typical values are 2 or 1. @@ -117,7 +113,7 @@ def iasls(self, data, lam=1e6, p=1e-2, lam_1=1e-4, max_iter=50, tol=1e-3, p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. lam_1 : float, optional The smoothing parameter for the first derivative of the residual. Default is 1e-4. max_iter : int, optional @@ -245,54 +241,23 @@ def airpls(self, data, lam=1e6, diff_order=2, max_iter=50, tol=1e-3, weights=Non reweighted penalized least squares. Analyst, 2010, 135(5), 1138-1146. """ - y, weight_array = self._setup_whittaker( - data, lam, diff_order, weights, copy_weights=True - ) + y, weight_array = self._setup_whittaker(data, lam, diff_order, weights) y_l1_norm = np.abs(y).sum() tol_history = np.empty(max_iter + 1) - # Have to have extensive error handling since the weights can all become - # very small due to the exp(i) term if too many iterations are performed; - # checking the negative residual length usually prevents any errors, but - # sometimes not so have to also catch any errors from the solvers for i in range(1, max_iter + 2): - try: - output = self.whittaker_system.solve( - self.whittaker_system.add_diagonal(weight_array), weight_array * y, - overwrite_b=True, check_output=True - ) - except np.linalg.LinAlgError: - warnings.warn( - ('error occurred during fitting, indicating that "tol"' - ' is too low, "max_iter" is too high, or "lam" is too high'), - ParameterWarning, stacklevel=2 - ) - i -= 1 # reduce i so that output tol_history indexing is correct - break - else: - baseline = output - residual = y - baseline - neg_mask = residual < 0 - neg_residual = residual[neg_mask] - if len(neg_residual) < 2: - # exit if there are < 2 negative residuals since all points or all but one - # point would get a weight of 0, which fails the solver - warnings.warn( - ('almost all baseline points are below the data, indicating that "tol"' - ' is too low and/or "max_iter" is too high'), ParameterWarning, - stacklevel=2 - ) + baseline = self.whittaker_system.solve( + self.whittaker_system.add_diagonal(weight_array), weight_array * y, + overwrite_b=True, check_output=True + ) + new_weights, residual_l1_norm, exit_early = _weighting._airpls(y, baseline, i) + if exit_early: i -= 1 # reduce i so that output tol_history indexing is correct break - - residual_l1_norm = abs(neg_residual.sum()) calc_difference = residual_l1_norm / y_l1_norm tol_history[i - 1] = calc_difference if calc_difference < tol: break - # only use negative residual in exp to avoid exponential overflow warnings - # and accidently creating a weight of nan (inf * 0 = nan) - weight_array[neg_mask] = np.exp(i * neg_residual / residual_l1_norm) - weight_array[~neg_mask] = 0 + weight_array = new_weights params = {'weights': weight_array, 'tol_history': tol_history[:i]} @@ -350,7 +315,10 @@ def arpls(self, data, lam=1e5, diff_order=2, max_iter=50, tol=1e-3, weights=None self.whittaker_system.add_diagonal(weight_array), weight_array * y, overwrite_b=True ) - new_weights = _weighting._arpls(y, baseline) + new_weights, exit_early = _weighting._arpls(y, baseline) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break calc_difference = relative_difference(weight_array, new_weights) tol_history[i] = calc_difference if calc_difference < tol: @@ -442,23 +410,14 @@ def drpls(self, data, lam=1e5, eta=0.5, max_iter=50, tol=1e-3, weights=None, dif self.whittaker_system.penalty + penalty_with_weights, weight_array * y, overwrite_ab=True, overwrite_b=True, l_and_u=lower_upper_bands ) - new_weights = _weighting._drpls(y, baseline, i) + new_weights, exit_early = _weighting._drpls(y, baseline, i) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break + calc_difference = relative_difference(weight_array, new_weights) tol_history[i - 1] = calc_difference - if not np.isfinite(calc_difference): - # catches nan, inf and -inf due to exp(i) being too high or if there - # are too few negative residuals; no way to catch both conditions before - # new_weights calculation since it is hard to estimate if - # (exp(i) / std) * residual will overflow; check calc_difference rather - # than checking new_weights since non-finite values rarely occur and - # checking a scalar is faster; cannot use np.errstate since it is not 100% reliable - warnings.warn( - ('nan and/or +/- inf occurred in weighting calculation, likely meaning ' - '"tol" is too low and/or "max_iter" is too high'), ParameterWarning, - stacklevel=2 - ) - break - elif calc_difference < tol: + if calc_difference < tol: break weight_array = new_weights @@ -519,23 +478,13 @@ def iarpls(self, data, lam=1e5, diff_order=2, max_iter=50, tol=1e-3, weights=Non self.whittaker_system.add_diagonal(weight_array), weight_array * y, overwrite_b=True ) - new_weights = _weighting._iarpls(y, baseline, i) + new_weights, exit_early = _weighting._iarpls(y, baseline, i) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break calc_difference = relative_difference(weight_array, new_weights) tol_history[i - 1] = calc_difference - if not np.isfinite(calc_difference): - # catches nan, inf and -inf due to exp(i) being too high or if there - # are too few negative residuals; no way to catch both conditions before - # new_weights calculation since it is hard to estimate if - # (exp(i) / std) * residual will overflow; check calc_difference rather - # than checking new_weights since non-finite values rarely occur and - # checking a scalar is faster; cannot use np.errstate since it is not 100% reliable - warnings.warn( - ('nan and/or +/- inf occurred in weighting calculation, likely meaning ' - '"tol" is too low and/or "max_iter" is too high'), ParameterWarning, - stacklevel=2 - ) - break - elif calc_difference < tol: + if calc_difference < tol: break weight_array = new_weights @@ -545,7 +494,7 @@ def iarpls(self, data, lam=1e5, diff_order=2, max_iter=50, tol=1e-3, weights=Non @_Algorithm._register(sort_keys=('weights', 'alpha')) def aspls(self, data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, - weights=None, alpha=None): + weights=None, alpha=None, assymetric_coef=0.5): """ Adaptive smoothness penalized least squares smoothing (asPLS). @@ -571,6 +520,9 @@ def aspls(self, data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, An array of values that control the local value of `lam` to better fit peak and non-peak regions. If None (default), then the initial values will be an array with size equal to N and all values set to 1. + assymetric_coef : float + The assymetric coefficient for the weighting. Higher values leads to a steeper + weighting curve (ie. more step-like). Default is 0.5. Returns ------- @@ -589,9 +541,15 @@ def aspls(self, data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, completed. If the last value in the array is greater than the input `tol` value, then the function did not converge. + Raises + ------ + ValueError + Raised if `alpha` and `data` do not have the same shape. Also raised if + `assymetric_coef` is not greater than 0. + Notes ----- - The weighting uses an asymmetric coefficient (`k` in the asPLS paper) of 0.5 instead + The default asymmetric coefficient (`k` in the asPLS paper) is 0.5 instead of the 2 listed in the asPLS paper. pybaselines uses the factor of 0.5 since it matches the results in Table 2 and Figure 5 of the asPLS paper closer than the factor of 2 and fits noisy data much better. @@ -611,6 +569,7 @@ def aspls(self, data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, ) if self._sort_order is not None and alpha is not None: alpha_array = alpha_array[self._sort_order] + assymetric_coef = _check_scalar_variable(assymetric_coef, variable_name='assymetric_coef') main_diag_idx = self.whittaker_system.main_diagonal_index lower_upper_bands = (diff_order, diff_order) @@ -624,7 +583,10 @@ def aspls(self, data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, lhs, weight_array * y, overwrite_ab=True, overwrite_b=True, l_and_u=lower_upper_bands ) - new_weights, residual = _weighting._aspls(y, baseline) + new_weights, residual, exit_early = _weighting._aspls(y, baseline, assymetric_coef) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break calc_difference = relative_difference(weight_array, new_weights) tol_history[i] = calc_difference if calc_difference < tol: @@ -660,7 +622,7 @@ def psalsa(self, data, lam=1e5, p=0.5, k=None, diff_order=2, max_iter=50, tol=1e p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 0.5. + will be given `1 - p` weight. Default is 0.5. k : float, optional A factor that controls the exponential decay of the weights for baseline values greater than the data. Should be approximately the height at which @@ -696,7 +658,8 @@ def psalsa(self, data, lam=1e5, p=0.5, k=None, diff_order=2, max_iter=50, tol=1e Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. Notes ----- @@ -717,6 +680,8 @@ def psalsa(self, data, lam=1e5, p=0.5, k=None, diff_order=2, max_iter=50, tol=1e y, weight_array = self._setup_whittaker(data, lam, diff_order, weights) if k is None: k = np.std(y) / 10 + else: + k = _check_scalar_variable(k, variable_name='k') tol_history = np.empty(max_iter + 1) for i in range(max_iter + 1): baseline = self.whittaker_system.solve( @@ -751,7 +716,7 @@ def derpsalsa(self, data, lam=1e6, p=0.01, k=None, diff_order=2, max_iter=50, to p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. k : float, optional A factor that controls the exponential decay of the weights for baseline values greater than the data. Should be approximately the height at which @@ -796,7 +761,8 @@ def derpsalsa(self, data, lam=1e6, p=0.01, k=None, diff_order=2, max_iter=50, to Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. References ---------- @@ -810,6 +776,8 @@ def derpsalsa(self, data, lam=1e6, p=0.01, k=None, diff_order=2, max_iter=50, to y, weight_array = self._setup_whittaker(data, lam, diff_order, weights) if k is None: k = np.std(y) / 10 + else: + k = _check_scalar_variable(k, variable_name='k') if smooth_half_window is None: smooth_half_window = self._size // 200 # could pad the data every iteration, but it is ~2-3 times slower and only affects @@ -868,7 +836,7 @@ def asls(data, lam=1e6, p=1e-2, diff_order=2, max_iter=50, tol=1e-3, weights=Non p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. diff_order : int, optional The order of the differential matrix. Must be greater than 0. Default is 2 (second order differential matrix). Typical values are 2 or 1. @@ -935,7 +903,7 @@ def iasls(data, x_data=None, lam=1e6, p=1e-2, lam_1=1e-4, max_iter=50, tol=1e-3, p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. lam_1 : float, optional The smoothing parameter for the first derivative of the residual. Default is 1e-4. max_iter : int, optional @@ -1189,7 +1157,7 @@ def iarpls(data, lam=1e5, diff_order=2, max_iter=50, tol=1e-3, weights=None, x_d @_whittaker_wrapper def aspls(data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, weights=None, - alpha=None, x_data=None): + alpha=None, x_data=None, assymetric_coef=0.5): """ Adaptive smoothness penalized least squares smoothing (asPLS). @@ -1218,6 +1186,9 @@ def aspls(data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, weights=None, x_data : array-like, optional The x-values. Not used by this function, but input is allowed for consistency with other functions. + assymetric_coef : float + The assymetric coefficient for the weighting. Higher values leads to a steeper + weighting curve (ie. more step-like). Default is 0.5. Returns ------- @@ -1239,11 +1210,12 @@ def aspls(data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, weights=None, Raises ------ ValueError - Raised if `alpha` and `data` do not have the same shape. + Raised if `alpha` and `data` do not have the same shape. Also raised if `assymetric_coef` + is not greater than 0. Notes ----- - The weighting uses an asymmetric coefficient (`k` in the asPLS paper) of 0.5 instead + The default asymmetric coefficient (`k` in the asPLS paper) is 0.5 instead of the 2 listed in the asPLS paper. pybaselines uses the factor of 0.5 since it matches the results in Table 2 and Figure 5 of the asPLS paper closer than the factor of 2 and fits noisy data much better. @@ -1278,7 +1250,7 @@ def psalsa(data, lam=1e5, p=0.5, k=None, diff_order=2, max_iter=50, tol=1e-3, p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 0.5. + will be given `1 - p` weight. Default is 0.5. k : float, optional A factor that controls the exponential decay of the weights for baseline values greater than the data. Should be approximately the height at which @@ -1317,7 +1289,8 @@ def psalsa(data, lam=1e5, p=0.5, k=None, diff_order=2, max_iter=50, tol=1e-3, Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. Notes ----- @@ -1352,7 +1325,7 @@ def derpsalsa(data, lam=1e6, p=0.01, k=None, diff_order=2, max_iter=50, tol=1e-3 p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. k : float, optional A factor that controls the exponential decay of the weights for baseline values greater than the data. Should be approximately the height at which @@ -1400,7 +1373,8 @@ def derpsalsa(data, lam=1e6, p=0.01, k=None, diff_order=2, max_iter=50, tol=1e-3 Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. References ---------- diff --git a/tests/conftest.py b/tests/conftest.py index 600dec5..d248f3c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -82,7 +82,7 @@ def gaussian2d(x, z, height=1.0, center_x=0.0, center_z=0.0, sigma_x=1.0, sigma_ Notes ----- - This is the same code as in pybaselines.utils.gaussian, but + This is the same code as in pybaselines.utils.gaussian2d, but this removes the dependence on pybaselines so that if an error with pybaselines occurs, this will be unaffected. diff --git a/tests/test_spline.py b/tests/test_spline.py index 298a9f0..4530d17 100644 --- a/tests/test_spline.py +++ b/tests/test_spline.py @@ -232,15 +232,12 @@ def test_avoid_nonfinite_weights(self, no_noise_data_fixture): Use data without noise since the lack of noise makes it easier to induce failure. Set tol to -1 so that it is never reached, and set max_iter to a high value. - Uses np.isfinite on the dot product of the baseline since the dot product is fast, - would propogate the nan or inf, and will create only a single value to check - for finite-ness. """ x, y = no_noise_data_fixture with pytest.warns(utils.ParameterWarning): baseline = self.class_func(y, tol=-1, max_iter=7000)[0] - assert np.isfinite(baseline.dot(baseline)) + assert np.isfinite(baseline).all() @pytest.mark.parametrize('lam', (1e1, 1e5)) @pytest.mark.parametrize('diff_order', (1, 2, 3)) @@ -260,7 +257,6 @@ def test_diff_orders(self, diff_order): lam = {1: 1e2, 3: 1e10}[diff_order] self.class_func(self.y, lam=lam, diff_order=diff_order) - @pytest.mark.skip(reason='overflow will be addressed next version') def test_avoid_overflow_warning(self, no_noise_data_fixture): """ Ensures no warning is emitted for exponential overflow. @@ -277,7 +273,7 @@ def test_avoid_overflow_warning(self, no_noise_data_fixture): with np.errstate(over='raise'): baseline = self.class_func(y, tol=-1, max_iter=1000)[0] - assert np.isfinite(baseline.dot(baseline)) + assert np.isfinite(baseline).all() @pytest.mark.parametrize('lam', (1e1, 1e5)) @pytest.mark.parametrize('diff_order', (1, 2, 3)) @@ -297,32 +293,26 @@ def test_diff_orders(self, diff_order): lam = {2: 1e6, 3: 1e10}[diff_order] self.class_func(self.y, lam=lam, diff_order=diff_order) - # ignore the RuntimeWarning that occurs from using +/- inf or nan - @pytest.mark.filterwarnings('ignore::RuntimeWarning') def test_avoid_nonfinite_weights(self, no_noise_data_fixture): """ - Ensures that the function gracefully exits when non-finite weights are created. + Ensures that the function does not create non-finite weights. - When there are no negative residuals or exp(iterations) / std is very high, both - of which occur when a low tol value is used with a high max_iter value, the - weighting function would produce non-finite values. The returned baseline should - be the last iteration that was successful, and thus should not contain nan or +/- inf. + drpls should not experience overflow since there is a cap on the iteration used + within the exponential, so no warnings or errors should be emitted even when using + a very high max_iter and low tol. Use data without noise since the lack of noise makes it easier to induce failure. Set tol to -1 so that it is never reached, and set max_iter to a high value. - Uses np.isfinite on the dot product of the baseline since the dot product is fast, - would propogate the nan or inf, and will create only a single value to check - for finite-ness. """ x, y = no_noise_data_fixture - with pytest.warns(utils.ParameterWarning): - baseline, params = self.class_func(y, tol=-1, max_iter=1000) + baseline, params = self.class_func(y, tol=-1, max_iter=1000) - assert np.isfinite(baseline.dot(baseline)) - # ensure last tolerence calculation was non-finite as a double-check that + assert np.isfinite(baseline).all() + # ensure last tolerence calculation was finite as a double-check that # this test is actually doing what it should be doing - assert not np.isfinite(params['tol_history'][-1]) + assert np.isfinite(params['tol_history'][-1]) + assert np.isfinite(params['weights']).all() @pytest.mark.parametrize('lam', (1e1, 1e5)) @pytest.mark.parametrize('eta', (0.2, 0.8)) @@ -361,16 +351,13 @@ def test_diff_orders(self, diff_order): lam = {1: 1e2, 3: 1e10}[diff_order] self.class_func(self.y, lam=lam, diff_order=diff_order) - # ignore the RuntimeWarning that occurs from using +/- inf or nan - @pytest.mark.filterwarnings('ignore::RuntimeWarning') def test_avoid_nonfinite_weights(self, no_noise_data_fixture): """ - Ensures that the function gracefully exits when non-finite weights are created. + Ensures that the function does not create non-finite weights. - When there are no negative residuals or exp(iterations) / std is very high, both - of which occur when a low tol value is used with a high max_iter value, the - weighting function would produce non-finite values. The returned baseline should - be the last iteration that was successful, and thus should not contain nan or +/- inf. + iarpls should not experience overflow since there is a cap on the iteration used + within the exponential, so no warnings or errors should be emitted even when using + a very high max_iter and low tol. Use data without noise since the lack of noise makes it easier to induce failure. Set tol to -1 so that it is never reached, and set max_iter to a high value. @@ -380,13 +367,14 @@ def test_avoid_nonfinite_weights(self, no_noise_data_fixture): """ x, y = no_noise_data_fixture - with pytest.warns(utils.ParameterWarning): + with np.errstate(over='raise'): baseline, params = self.class_func(y, tol=-1, max_iter=1000) - assert np.isfinite(baseline.dot(baseline)) - # ensure last tolerence calculation was non-finite as a double-check that + assert np.isfinite(baseline).all() + # ensure last tolerence calculation was finite as a double-check that # this test is actually doing what it should be doing - assert not np.isfinite(params['tol_history'][-1]) + assert np.isfinite(params['tol_history'][-1]) + assert np.isfinite(params['weights']).all() @pytest.mark.parametrize('lam', (1e1, 1e5)) @pytest.mark.parametrize('diff_order', (1, 2, 3)) @@ -449,6 +437,12 @@ def test_whittaker_comparison(self, lam, diff_order): self, whittaker.aspls, self.y, lam=lam, diff_order=diff_order, test_rtol=rtol ) + @pytest.mark.parametrize('assymetric_coef', (0, -1)) + def test_outside_assymetric_coef_fails(self, assymetric_coef): + """Ensures assymetric_coef values not greater than 0 raise an exception.""" + with pytest.raises(ValueError): + self.class_func(self.y, assymetric_coef=assymetric_coef) + class TestPsplinePsalsa(IterativeSplineTester): """Class for testing pspline_psalsa baseline.""" @@ -476,6 +470,12 @@ def test_whittaker_comparison(self, lam, p, diff_order): self, whittaker.psalsa, self.y, lam=lam, p=p, diff_order=diff_order ) + @pytest.mark.parametrize('k', (0, -1)) + def test_outside_k_fails(self, k): + """Ensures k values not greater than 0 raise an exception.""" + with pytest.raises(ValueError): + self.class_func(self.y, k=k) + class TestPsplineDerpsalsa(IterativeSplineTester): """Class for testing pspline_derpsalsa baseline.""" @@ -503,6 +503,12 @@ def test_whittaker_comparison(self, lam, p, diff_order): self, whittaker.derpsalsa, self.y, lam=lam, p=p, diff_order=diff_order ) + @pytest.mark.parametrize('k', (0, -1)) + def test_outside_k_fails(self, k): + """Ensures k values not greater than 0 raise an exception.""" + with pytest.raises(ValueError): + self.class_func(self.y, k=k) + class TestPsplineMPLS(SplineTester, InputWeightsMixin): """Class for testing pspline_mpls baseline.""" diff --git a/tests/test_weighting.py b/tests/test_weighting.py index 6ba2e84..5e56d75 100644 --- a/tests/test_weighting.py +++ b/tests/test_weighting.py @@ -5,7 +5,67 @@ from numpy.testing import assert_allclose import pytest -from pybaselines import _weighting, utils +from pybaselines import _weighting, utils, Baseline2D + +from .conftest import get_data, get_data2d + + +def baseline_1d_normal(): + """ + Values for testing weights for a normally fit baseline. + + Approximates the baseline as a first order polynomial fit, which is the + approximate fit for the first iteration of all Whittaker-smoothing-based + algorithms with a `diff_order` of 2 and a sufficiently high `lam` value. + + """ + x_data, y_data = get_data() + baseline = np.polynomial.Polynomial.fit(x_data, y_data, deg=1)(x_data) + return y_data, baseline + + +def baseline_1d_all_above(): + """Values for testing weights when all baseline points are above the data.""" + x_data, y_data = get_data() + baseline = np.full_like(y_data, y_data.max() + 10) + return y_data, baseline + + +def baseline_1d_all_below(): + """Values for testing weights when all baseline points are below the data.""" + x_data, y_data = get_data() + baseline = np.full_like(y_data, y_data.min() - 10) + return y_data, baseline + + +def baseline_2d_normal(): + """ + Values for testing weights for a normally fit baseline. + + Approximates the baseline as a first order polynomial fit, which is the + approximate fit for the first iteration of all Whittaker-smoothing-based + algorithms with a `diff_order` of 2 and a sufficiently high `lam` value. + + """ + x_data, z_data, y_data = get_data2d() + baseline = Baseline2D(x_data, z_data, check_finite=False, assume_sorted=True).poly( + y_data, poly_order=1 + )[0] + return y_data, baseline + + +def baseline_2d_all_above(): + """Values for testing weights when all baseline points are above the data.""" + x_data, z_data, y_data = get_data2d() + baseline = np.full_like(y_data, y_data.max() + 10) + return y_data, baseline + + +def baseline_2d_all_below(): + """Values for testing weights when all baseline points are below the data.""" + x_data, z_data, y_data = get_data2d() + baseline = np.full_like(y_data, y_data.min() - 10) + return y_data, baseline def test_safe_std(): @@ -63,10 +123,14 @@ def test_safe_std_allow_nan(run_enum): @pytest.mark.parametrize('quantile', np.linspace(0, 1, 21)) -def test_quantile_weighting(quantile): +@pytest.mark.parametrize('one_d', (True, False)) +def test_quantile_weighting(quantile, one_d): """Ensures the quantile weighting calculation is correct.""" - y = np.linspace(-1, 1) - fit = np.zeros(y.shape[0]) + if one_d: + y, fit = baseline_1d_normal() + else: + y, fit = baseline_2d_normal() + residual = y - fit eps = 1e-10 calc_loss = _weighting._quantile(y, fit, quantile, eps) @@ -77,3 +141,971 @@ def test_quantile_weighting(quantile): expected_loss = numerator / denominator assert_allclose(calc_loss, expected_loss) + + +@pytest.mark.parametrize('p', (0.01, 0.99)) +@pytest.mark.parametrize('one_d', (True, False)) +def test_asls_normal(p, one_d): + """Ensures asls weighting works as intented for a normal baseline.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + weights = _weighting._asls(y_data, baseline, p) + expected_weights = np.where(y_data > baseline, p, 1 - p) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + # ensure all weights are between 0 and 1 + assert ((weights >= 0) & (weights <= 1)).all() + + +@pytest.mark.parametrize('p', (0.01, 0.99)) +@pytest.mark.parametrize('one_d', (True, False)) +def test_asls_all_above(p, one_d): + """Ensures asls weighting works as intented for a baseline with all points above the data.""" + if one_d: + y_data, baseline = baseline_1d_all_above() + else: + y_data, baseline = baseline_2d_all_above() + weights = _weighting._asls(y_data, baseline, p) + expected_weights = np.full_like(y_data, 1 - p) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + + +@pytest.mark.parametrize('p', (0.01, 0.99)) +@pytest.mark.parametrize('one_d', (True, False)) +def test_asls_all_below(p, one_d): + """Ensures asls weighting works as intented for a baseline with all points below the data.""" + if one_d: + y_data, baseline = baseline_1d_all_below() + else: + y_data, baseline = baseline_2d_all_below() + weights = _weighting._asls(y_data, baseline, p) + expected_weights = np.full_like(y_data, p) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + + +def expected_airpls(y, baseline, iteration): + """ + The weighting for adaptive iteratively reweighted penalized least squares (airPLS). + + Does not perform error checking since this is just used for simple weighting cases. + + Parameters + ---------- + y : numpy.ndarray, shape (N,) + The measured data. + baseline : numpy.ndarray, shape (N,) + The calculated baseline. + iteration : int + The iteration number. Should be 1-based, such that the first iteration is 1 + instead of 0. + + Returns + ------- + weights : numpy.ndarray, shape (N,) + The calculated weights. + residual_l1_norm : float + The L1 norm of the negative residuals, used to calculate the exit criteria + for the airPLS algorithm. + exit_early : bool + Designates if there is a potential error with the calculation such that no further + iterations should be performed. + + References + ---------- + Zhang, Z.M., et al. Baseline correction using adaptive iteratively + reweighted penalized least squares. Analyst, 2010, 135(5), 1138-1146. + + Notes + ----- + Equation 9 in the original algorithm was misprinted according to the author + (https://github.com/zmzhang/airPLS/issues/8), so the correct weighting is used here. + + The pybaselines weighting differs from the original airPLS algorithm by dividing the + weights by their maximum value to ensure that weights are within [0, 1]; the original + algorithm allowed weights to be greater than 1 which does not make sense mathematically + and does not match any other Whittaker-smoothing based algorithms. + + """ + residual = y - baseline + neg_mask = residual < 0 + neg_residual = residual[neg_mask] + residual_l1_norm = abs(neg_residual).sum() + + new_weights = np.exp((-iteration / residual_l1_norm) * neg_residual) + # Not stated in the paper, but without dividing by the maximum weight, the + # calculated weights can be greater than 1 which does not make sense mathematically + weights = np.zeros_like(y) + weights[neg_mask] = new_weights / new_weights.max() + + return weights + + +@pytest.mark.parametrize('iteration', (1, 10)) +@pytest.mark.parametrize('one_d', (True, False)) +def test_airpls_normal(iteration, one_d): + """Ensures airpls weighting works as intented for a normal baseline.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + weights, residual_l1_norm, exit_early = _weighting._airpls(y_data, baseline, iteration) + expected_weights = expected_airpls(y_data, baseline, iteration) + + residual = y_data - baseline + expected_residual_l1_norm = abs(residual[residual < 0].sum()) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert_allclose(residual_l1_norm, expected_residual_l1_norm, rtol=1e-12, atol=1e-12) + assert not exit_early + # ensure all weights are between 0 and 1 + assert ((weights >= 0) & (weights <= 1)).all() + + +@pytest.mark.parametrize('iteration', (1, 10)) +@pytest.mark.parametrize('one_d', (True, False)) +def test_airpls_all_above(iteration, one_d): + """Ensures airpls weighting works as intented for a baseline with all points above the data.""" + if one_d: + y_data, baseline = baseline_1d_all_above() + else: + y_data, baseline = baseline_2d_all_above() + weights, residual_l1_norm, exit_early = _weighting._airpls(y_data, baseline, iteration) + expected_weights = expected_airpls(y_data, baseline, iteration) + expected_residual_l1_norm = abs(y_data - baseline).sum() # all residuals should be negative + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert_allclose(residual_l1_norm, expected_residual_l1_norm, rtol=1e-12, atol=1e-12) + assert not exit_early + + +@pytest.mark.parametrize('iteration', (1, 10)) +@pytest.mark.parametrize('one_d', (True, False)) +def test_airpls_all_below(iteration, one_d): + """Ensures airpls weighting works as intented for a baseline with all points below the data.""" + if one_d: + y_data, baseline = baseline_1d_all_below() + else: + y_data, baseline = baseline_2d_all_below() + + with pytest.warns(utils.ParameterWarning): + weights, residual_l1_norm, exit_early = _weighting._airpls(y_data, baseline, iteration) + expected_weights = np.zeros_like(y_data) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert_allclose(residual_l1_norm, 0.0) + assert exit_early + + +@pytest.mark.parametrize('one_d', (True, False)) +@pytest.mark.parametrize('dtype', (float, np.float16, np.float32)) +def test_airpls_overflow(one_d, dtype): + """Ensures exponential overflow does not occur from airpls weighting.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + y_data = y_data.astype(dtype) + baseline = baseline.astype(dtype) + + residual = y_data - baseline + neg_mask = residual < 0 + neg_residual = residual[neg_mask] + expected_residual_l1_norm = abs(neg_residual).sum() + + # lowest iteration needed to cause exponential overflow for airpls weighting, plus 1 + iteration = ( + 1 + + np.log(np.finfo(y_data.dtype).max) * expected_residual_l1_norm / abs(neg_residual).max() + ) + + # sanity check to ensure overflow actually should occur and that it produces nan weights + with pytest.warns(RuntimeWarning): + expected_weights = expected_airpls(y_data, baseline, iteration) + assert (~np.isfinite(expected_weights)).any() + + with np.errstate(over='raise'): + weights, residual_l1_norm, exit_early = _weighting._airpls(y_data, baseline, iteration) + + assert np.isfinite(weights).all() + assert not exit_early + assert_allclose(residual_l1_norm, expected_residual_l1_norm, rtol=1e-12, atol=1e-12) + + +def expected_arpls(y, baseline): + """ + The weighting for asymmetrically reweighted penalized least squares smoothing (arpls). + + Does not perform error checking since this is just used for simple weighting cases. + + Parameters + ---------- + y : numpy.ndarray, shape (N,) + The measured data. + baseline : numpy.ndarray, shape (N,) + The calculated baseline. + + Returns + ------- + weights : numpy.ndarray, shape (N,) + The calculated weights. + + References + ---------- + Baek, S.J., et al. Baseline correction using asymmetrically reweighted + penalized least squares smoothing. Analyst, 2015, 140, 250-257. + + """ + residual = y - baseline + neg_residual = residual[residual < 0] + std = _weighting._safe_std(neg_residual, ddof=1) # use dof=1 since sampling subset + weights = 1 / (1 + np.exp((2 / std) * (residual - (2 * std - np.mean(neg_residual))))) + return weights + + +@pytest.mark.parametrize('one_d', (True, False)) +def test_arpls_normal(one_d): + """Ensures arpls weighting works as intented for a normal baseline.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + weights, exit_early = _weighting._arpls(y_data, baseline) + expected_weights = expected_arpls(y_data, baseline) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert not exit_early + # ensure all weights are between 0 and 1 + assert ((weights >= 0) & (weights <= 1)).all() + + +@pytest.mark.parametrize('one_d', (True, False)) +def test_arpls_all_above(one_d): + """Ensures arpls weighting works as intented for a baseline with all points above the data.""" + if one_d: + y_data, baseline = baseline_1d_all_above() + else: + y_data, baseline = baseline_2d_all_above() + weights, exit_early = _weighting._arpls(y_data, baseline) + expected_weights = expected_arpls(y_data, baseline) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert not exit_early + + +@pytest.mark.parametrize('one_d', (True, False)) +def test_arpls_all_below(one_d): + """Ensures arpls weighting works as intented for a baseline with all points below the data.""" + if one_d: + y_data, baseline = baseline_1d_all_below() + else: + y_data, baseline = baseline_2d_all_below() + + with pytest.warns(utils.ParameterWarning): + weights, exit_early = _weighting._arpls(y_data, baseline) + expected_weights = np.zeros_like(y_data) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert exit_early + + +@pytest.mark.parametrize('one_d', (True, False)) +def test_arpls_overflow(one_d): + """Ensures exponential overflow does not occur from arpls weighting.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + log_max = np.log(np.finfo(y_data.dtype).max) + residual = y_data - baseline + neg_residual = residual[residual < 0] + std = np.std(neg_residual, ddof=1) + mean = np.mean(neg_residual) + # for exponential overlow, (residual + mean) / std > 0.5 * log_max + 2 + # changing one value in the baseline to cause overflow will not cause the mean and + # standard deviation to change since the residual value will be positive at that index + overflow_index = 10 + overflow_value = (0.5 * log_max + 2) * std - mean + 10 # add 10 for good measure + if one_d: + baseline[overflow_index] = y_data[overflow_index] - overflow_value + else: + baseline[overflow_index, overflow_index] = ( + y_data[overflow_index, overflow_index] - overflow_value + ) + + # sanity check to ensure overflow actually should occur + with pytest.warns(RuntimeWarning): + expected_weights = expected_arpls(y_data, baseline) + # the resulting weights should still be finite since 1 / (1 + inf) == 0 + assert np.isfinite(expected_weights).all() + + with np.errstate(over='raise'): + weights, exit_early = _weighting._arpls(y_data, baseline) + + assert np.isfinite(weights).all() + assert not exit_early + + # the actual weight where overflow should have occurred should be 0 + if one_d: + assert_allclose(weights[overflow_index], 0.0, atol=1e-14) + else: + assert_allclose(weights[overflow_index, overflow_index], 0.0, atol=1e-14) + + # weights should still be the same as the nieve calculation regardless of exponential overflow + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + + +def expected_drpls(y, baseline, iteration): + """ + The weighting for doubly reweighted penalized least squares (drpls). + + Does not perform error checking since this is just used for simple weighting cases. + + Parameters + ---------- + y : numpy.ndarray, shape (N,) + The measured data. + baseline : numpy.ndarray, shape (N,) + The calculated baseline. + iteration : int + The iteration number. Should be 1-based, such that the first iteration is 1 + instead of 0. + + Returns + ------- + weights : numpy.ndarray, shape (N,) + The calculated weights. + + References + ---------- + Xu, D. et al. Baseline correction method based on doubly reweighted + penalized least squares, Applied Optics, 2019, 58, 3913-3920. + + """ + residual = y - baseline + neg_residual = residual[residual < 0] + std = _weighting._safe_std(neg_residual, ddof=1) # use dof=1 since sampling subset + inner = (np.exp(iteration) / std) * (residual - (2 * std - np.mean(neg_residual))) + weights = 0.5 * (1 - (inner / (1 + np.abs(inner)))) + return weights + + +@pytest.mark.parametrize('iteration', (1, 10)) +@pytest.mark.parametrize('one_d', (True, False)) +def test_drpls_normal(iteration, one_d): + """Ensures drpls weighting works as intented for a normal baseline.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + weights, exit_early = _weighting._drpls(y_data, baseline, iteration) + expected_weights = expected_drpls(y_data, baseline, iteration) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert not exit_early + # ensure all weights are between 0 and 1 + assert ((weights >= 0) & (weights <= 1)).all() + + +@pytest.mark.parametrize('iteration', (1, 10)) +@pytest.mark.parametrize('one_d', (True, False)) +def test_drpls_all_above(iteration, one_d): + """Ensures drpls weighting works as intented for a baseline with all points above the data.""" + if one_d: + y_data, baseline = baseline_1d_all_above() + else: + y_data, baseline = baseline_2d_all_above() + weights, exit_early = _weighting._drpls(y_data, baseline, iteration) + expected_weights = expected_drpls(y_data, baseline, iteration) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert not exit_early + + +@pytest.mark.parametrize('iteration', (1, 10)) +@pytest.mark.parametrize('one_d', (True, False)) +def test_drpls_all_below(iteration, one_d): + """Ensures drpls weighting works as intented for a baseline with all points below the data.""" + if one_d: + y_data, baseline = baseline_1d_all_below() + else: + y_data, baseline = baseline_2d_all_below() + + with pytest.warns(utils.ParameterWarning): + weights, exit_early = _weighting._drpls(y_data, baseline, iteration) + expected_weights = np.zeros_like(y_data) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert exit_early + + +@pytest.mark.parametrize('one_d', (True, False)) +def test_drpls_overflow(one_d): + """Ensures exponential overflow does not occur from drpls weighting.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + iteration = 1000 + # sanity check to ensure overflow actually should occur and that it produces nan weights + with pytest.warns(RuntimeWarning): + expected_weights = expected_drpls(y_data, baseline, iteration) + assert (~np.isfinite(expected_weights)).any() + + with np.errstate(over='raise'): + weights, exit_early = _weighting._drpls(y_data, baseline, iteration) + + assert np.isfinite(weights).all() + assert not exit_early + + +def expected_iarpls(y, baseline, iteration): + """ + The weighting for doubly reweighted penalized least squares (drpls). + + Does not perform error checking since this is just used for simple weighting cases. + + Parameters + ---------- + y : numpy.ndarray, shape (N,) + The measured data. + baseline : numpy.ndarray, shape (N,) + The calculated baseline. + iteration : int + The iteration number. Should be 1-based, such that the first iteration is 1 + instead of 0. + + Returns + ------- + weights : numpy.ndarray, shape (N,) + The calculated weights. + + References + ---------- + Ye, J., et al. Baseline correction method based on improved asymmetrically + reweighted penalized least squares for Raman spectrum. Applied Optics, 2020, + 59, 10933-10943. + + """ + residual = y - baseline + neg_residual = residual[residual < 0] + + std = np.std(neg_residual, ddof=1) # use dof=1 since only sampling a subset + # the exponential term is used to change the shape of the weighting from a logistic curve + # at low iterations to a step curve at higher iterations (figure 1 in the paper); setting + # the maximum iteration to 100 still acheives this purpose while avoiding unnecesarry + # overflow for high iterations + inner = (np.exp(iteration) / std) * (residual - 2 * std) + weights = 0.5 * (1 - (inner / np.sqrt(1 + inner**2))) + return weights + + +@pytest.mark.parametrize('iteration', (1, 10)) +@pytest.mark.parametrize('one_d', (True, False)) +def test_iarpls_normal(iteration, one_d): + """Ensures iarpls weighting works as intented for a normal baseline.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + weights, exit_early = _weighting._iarpls(y_data, baseline, iteration) + expected_weights = expected_iarpls(y_data, baseline, iteration) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert not exit_early + # ensure all weights are between 0 and 1 + assert ((weights >= 0) & (weights <= 1)).all() + + +@pytest.mark.parametrize('iteration', (1, 10)) +@pytest.mark.parametrize('one_d', (True, False)) +def test_iarpls_all_above(iteration, one_d): + """Ensures iarpls weighting works as intented for a baseline with all points above the data.""" + if one_d: + y_data, baseline = baseline_1d_all_above() + else: + y_data, baseline = baseline_2d_all_above() + weights, exit_early = _weighting._iarpls(y_data, baseline, iteration) + expected_weights = expected_iarpls(y_data, baseline, iteration) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert not exit_early + + +@pytest.mark.parametrize('iteration', (1, 10)) +@pytest.mark.parametrize('one_d', (True, False)) +def test_iarpls_all_below(iteration, one_d): + """Ensures iarpls weighting works as intented for a baseline with all points below the data.""" + if one_d: + y_data, baseline = baseline_1d_all_below() + else: + y_data, baseline = baseline_2d_all_below() + + with pytest.warns(utils.ParameterWarning): + weights, exit_early = _weighting._iarpls(y_data, baseline, iteration) + expected_weights = np.zeros_like(y_data) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert exit_early + + +@pytest.mark.parametrize('one_d', (True, False)) +def test_iarpls_overflow(one_d): + """Ensures exponential overflow does not occur from iarpls weighting.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + iteration = 1000 + # sanity check to ensure overflow actually should occur and that it produces nan weights + with pytest.warns(RuntimeWarning): + expected_weights = expected_iarpls(y_data, baseline, iteration) + assert (~np.isfinite(expected_weights)).any() + + with np.errstate(over='raise'): + weights, exit_early = _weighting._iarpls(y_data, baseline, iteration) + + assert np.isfinite(weights).all() + assert not exit_early + + +def expected_aspls(y, baseline, assymetric_coef=0.5): + """ + The weighting for adaptive smoothness penalized least squares smoothing (aspls). + + Does not perform error checking since this is just used for simple weighting cases. + + Parameters + ---------- + y : numpy.ndarray, shape (N,) + The measured data. + baseline : numpy.ndarray, shape (N,) + The calculated baseline. + assymetric_coef : float + The assymetric coefficient for the weighting. Higher values leads to a steeper + weighting curve (ie. more step-like). Default is 0.5. + + Returns + ------- + weights : numpy.ndarray, shape (N,) + The calculated weights. + + References + ---------- + Zhang, F., et al. Baseline correction for infrared spectra using adaptive smoothness + parameter penalized least squares method. Spectroscopy Letters, 2020, 53(3), 222-233. + + """ + residual = y - baseline + std = np.std(residual[residual < 0], ddof=1) # use dof=1 since sampling subset + + weights = 1 / (1 + np.exp(assymetric_coef * (residual - std) / std)) + + return weights, residual + + +@pytest.mark.parametrize('one_d', (True, False)) +@pytest.mark.parametrize('assymetric_coef', (0.5, 2)) +def test_aspls_normal(one_d, assymetric_coef): + """Ensures aspls weighting works as intented for a normal baseline.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + weights, residual, exit_early = _weighting._aspls(y_data, baseline, assymetric_coef) + expected_weights, expected_residual = expected_aspls(y_data, baseline, assymetric_coef) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert_allclose(residual, expected_residual, rtol=1e-12, atol=1e-12) + assert not exit_early + # ensure all weights are between 0 and 1 + assert ((weights >= 0) & (weights <= 1)).all() + + +@pytest.mark.parametrize('one_d', (True, False)) +@pytest.mark.parametrize('assymetric_coef', (0.5, 2)) +def test_aspls_all_above(one_d, assymetric_coef): + """Ensures aspls weighting works as intented for a baseline with all points above the data.""" + if one_d: + y_data, baseline = baseline_1d_all_above() + else: + y_data, baseline = baseline_2d_all_above() + + weights, residual, exit_early = _weighting._aspls(y_data, baseline, assymetric_coef) + expected_weights, expected_residual = expected_aspls(y_data, baseline, assymetric_coef) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert_allclose(residual, expected_residual, rtol=1e-12, atol=1e-12) + assert not exit_early + + +@pytest.mark.parametrize('one_d', (True, False)) +@pytest.mark.parametrize('assymetric_coef', (0.5, 2)) +def test_aspls_all_below(one_d, assymetric_coef): + """Ensures aspls weighting works as intented for a baseline with all points below the data.""" + if one_d: + y_data, baseline = baseline_1d_all_below() + else: + y_data, baseline = baseline_2d_all_below() + + with pytest.warns(utils.ParameterWarning): + weights, residual, exit_early = _weighting._aspls(y_data, baseline, assymetric_coef) + expected_weights = np.zeros_like(y_data) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert_allclose(residual, y_data - baseline, rtol=1e-12, atol=1e-12) + assert exit_early + + +@pytest.mark.parametrize('one_d', (True, False)) +@pytest.mark.parametrize('assymetric_coef', (0.5, 2)) +def test_aspls_overflow(one_d, assymetric_coef): + """Ensures exponential overflow does not occur from aspls weighting.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + log_max = np.log(np.finfo(y_data.dtype).max) + residual = y_data - baseline + std = np.std(residual[residual < 0], ddof=1) + # for exponential overlow, (residual / std) > (log_max / assymetric_coef) + 1 + # changing one value in the baseline to cause overflow will not cause the + # standard deviation to change since the residual value will be positive at that index + overflow_index = 10 + overflow_value = ((log_max / assymetric_coef) + 1) * std + 10 # add 10 for good measure + if one_d: + baseline[overflow_index] = y_data[overflow_index] - overflow_value + else: + baseline[overflow_index, overflow_index] = ( + y_data[overflow_index, overflow_index] - overflow_value + ) + + # sanity check to ensure overflow actually should occur + with pytest.warns(RuntimeWarning): + expected_weights, expected_residual = expected_aspls(y_data, baseline, assymetric_coef) + # the resulting weights should still be finite since 1 / (1 + inf) == 0 + assert np.isfinite(expected_weights).all() + + with np.errstate(over='raise'): + weights, residual, exit_early = _weighting._aspls(y_data, baseline, assymetric_coef) + + assert np.isfinite(weights).all() + assert not exit_early + + # the actual weight where overflow should have occurred should be 0 + if one_d: + assert_allclose(weights[overflow_index], 0.0, atol=1e-14) + else: + assert_allclose(weights[overflow_index, overflow_index], 0.0, atol=1e-14) + + # weights should still be the same as the nieve calculation regardless of exponential overflow + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert_allclose(residual, expected_residual, rtol=1e-12, atol=1e-12) + + +def expected_psalsa(y, baseline, p, k, shape_y): + """ + Weighting for the peaked signal's asymmetric least squares algorithm (psalsa). + + Does not perform error checking since this is just used for simple weighting cases. + + Parameters + ---------- + y : numpy.ndarray, shape (N,) + The measured data. + baseline : numpy.ndarray, shape (N,) + The calculated baseline. + p : float + The penalizing weighting factor. Must be between 0 and 1. Values greater + than the baseline will be given `p` weight, and values less than the baseline + will be given `1 - p` weight. + k : float + A factor that controls the exponential decay of the weights for baseline + values greater than the data. Should be approximately the height at which + a value could be considered a peak. + shape_y : int or (int,) or (int, int) + The length of `y`, `N`. Precomputed to avoid repeated calculations. + + Returns + ------- + weights : numpy.ndarray, shape (N,) + The calculated weights. + + References + ---------- + Oller-Moreno, S., et al. Adaptive Asymmetric Least Squares baseline estimation + for analytical instruments. 2014 IEEE 11th International Multi-Conference on + Systems, Signals, and Devices, 2014, 1-5. + + """ + residual = y - baseline + weights = np.where(residual > 0, p * np.exp(-residual / k), 1 - p) + return weights + + +@pytest.mark.parametrize('one_d', (True, False)) +@pytest.mark.parametrize('k', (0.5, 2)) +@pytest.mark.parametrize('p', (0.01, 0.99)) +def test_psalsa_normal(one_d, k, p): + """Ensures psalsa weighting works as intented for a normal baseline.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + weights = _weighting._psalsa(y_data, baseline, p, k, y_data.shape) + expected_weights = expected_psalsa(y_data, baseline, p, k, y_data.shape) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + # ensure all weights are between 0 and 1 + assert ((weights >= 0) & (weights <= 1)).all() + + +@pytest.mark.parametrize('one_d', (True, False)) +@pytest.mark.parametrize('k', (0.5, 2)) +@pytest.mark.parametrize('p', (0.01, 0.99)) +def test_psalsa_all_above(one_d, k, p): + """Ensures psalsa weighting works as intented for a baseline with all points above the data.""" + if one_d: + y_data, baseline = baseline_1d_all_above() + else: + y_data, baseline = baseline_2d_all_above() + + weights = _weighting._psalsa(y_data, baseline, p, k, y_data.shape) + expected_weights = np.full_like(y_data, 1 - p) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + + +@pytest.mark.parametrize('one_d', (True, False)) +@pytest.mark.parametrize('k', (0.5, 2)) +@pytest.mark.parametrize('p', (0.01, 0.99)) +def test_psalsa_all_below(one_d, k, p): + """Ensures psalsa weighting works as intented for a baseline with all points below the data.""" + if one_d: + y_data, baseline = baseline_1d_all_below() + else: + y_data, baseline = baseline_2d_all_below() + + weights = _weighting._psalsa(y_data, baseline, p, k, y_data.shape) + expected_weights = p * np.exp(-(y_data - baseline) / k) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + + +@pytest.mark.parametrize('one_d', (True, False)) +@pytest.mark.parametrize('k', (0.5, 2)) +@pytest.mark.parametrize('p', (0.01, 0.99)) +def test_psalsa_overflow(one_d, k, p): + """Ensures exponential overflow does not occur from psalsa weighting.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + log_max = np.log(np.finfo(y_data.dtype).max) + # for exponential overlow, -(residual / k) > log_max + overflow_index = 10 + overflow_value = k * log_max + 10 # add 10 for good measure + if one_d: + baseline[overflow_index] = y_data[overflow_index] + overflow_value + else: + baseline[overflow_index, overflow_index] = ( + y_data[overflow_index, overflow_index] + overflow_value + ) + + # sanity check to ensure overflow actually should occur + with pytest.warns(RuntimeWarning): + expected_weights = expected_psalsa(y_data, baseline, p, k, y_data.shape) + # weights in nieve approach should still be finite since overflow only occurs in regions + # where the exponential value is not actually used + assert np.isfinite(expected_weights).all() + + with np.errstate(over='raise'): + weights = _weighting._psalsa(y_data, baseline, p, k, y_data.shape) + + assert np.isfinite(weights).all() + + # the actual weight where overflow should have occurred should be 1 - p + if one_d: + assert_allclose(weights[overflow_index], 1 - p, atol=1e-14) + else: + assert_allclose(weights[overflow_index, overflow_index], 1 - p, atol=1e-14) + # weights should still be the same as the nieve calculation regardless of exponential overflow + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + + +def expected_derpsalsa(y, baseline, p, k, shape_y, partial_weights): + """ + Weights for derivative peak-screening asymmetric least squares algorithm (derpsalsa). + + Parameters + ---------- + y : numpy.ndarray, shape (N,) + The measured data. + baseline : numpy.ndarray, shape (N,) + The calculated baseline. + p : float + The penalizing weighting factor. Must be between 0 and 1. Values greater + than the baseline will be given `p` weight, and values less than the baseline + will be given `1 - p` weight. + k : float + A factor that controls the exponential decay of the weights for baseline + values greater than the data. Should be approximately the height at which + a value could be considered a peak. + shape_y : int or (int,) or (int, int) + The length of `y`, `N`. Precomputed to avoid repeated calculations. + partial_weights : numpy.ndarray, shape (N,) + The weights associated with the first and second derivatives of the data. + + Returns + ------- + weights : numpy.ndarray, shape (N,) + The calculated weights. + + Notes + ----- + The reference is not clear as to how `p` and `1-p` are applied. An alternative could + be that `partial_weights` are multiplied only where the residual is greater than + 0 and that all other weights are `1-p`, but based on Figure 1c in the reference, the + total weights are never greater than `partial_weights`, so that must mean the non-peak + regions have a weight of `1-p` times `partial_weights` rather than just `1-p`; + both weighting systems give near identical results, so it is not a big deal. + + References + ---------- + Korepanov, V. Asymmetric least-squares baseline algorithm with peak screening for + automatic processing of the Raman spectra. Journal of Raman Spectroscopy. 2020, + 51(10), 2061-2065. + + """ + residual = y - baseline + weights = partial_weights * np.where(residual > 0, p * np.exp(-((residual / k)**2) / 2), 1 - p) + return weights + + +@pytest.mark.parametrize('k', (0.5, 2)) +@pytest.mark.parametrize('p', (0.01, 0.99)) +def test_derpsalsa_normal(k, p): + """Ensures derpsalsa weighting works as intented for a normal baseline.""" + y_data, baseline = baseline_1d_normal() + + diff_y_1 = np.gradient(y_data) + diff_y_2 = np.gradient(diff_y_1) + rms_diff_1 = np.sqrt(diff_y_1.dot(diff_y_1) / y_data.size) + rms_diff_2 = np.sqrt(diff_y_2.dot(diff_y_2) / y_data.size) + + diff_1_weights = np.exp(-((diff_y_1 / rms_diff_1)**2) / 2) + diff_2_weights = np.exp(-((diff_y_2 / rms_diff_2)**2) / 2) + partial_weights = diff_1_weights * diff_2_weights + + weights = _weighting._derpsalsa(y_data, baseline, p, k, y_data.shape, partial_weights) + expected_weights = expected_derpsalsa(y_data, baseline, p, k, y_data.shape, partial_weights) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + # ensure all weights are between 0 and 1 + assert ((weights >= 0) & (weights <= 1)).all() + + +@pytest.mark.parametrize('k', (0.5, 2)) +@pytest.mark.parametrize('p', (0.01, 0.99)) +def test_derpsalsa_all_above(k, p): + """Ensures derpsalsa weighting works as intented for a baseline completely above the data.""" + y_data, baseline = baseline_1d_all_above() + + diff_y_1 = np.gradient(y_data) + diff_y_2 = np.gradient(diff_y_1) + rms_diff_1 = np.sqrt(diff_y_1.dot(diff_y_1) / y_data.size) + rms_diff_2 = np.sqrt(diff_y_2.dot(diff_y_2) / y_data.size) + + diff_1_weights = np.exp(-((diff_y_1 / rms_diff_1)**2) / 2) + diff_2_weights = np.exp(-((diff_y_2 / rms_diff_2)**2) / 2) + partial_weights = diff_1_weights * diff_2_weights + + weights = _weighting._derpsalsa(y_data, baseline, p, k, y_data.shape, partial_weights) + expected_weights = np.full_like(y_data, partial_weights * (1 - p)) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + + +@pytest.mark.parametrize('k', (0.5, 2)) +@pytest.mark.parametrize('p', (0.01, 0.99)) +def test_derpsalsa_all_below(k, p): + """Ensures derpsalsa weighting works as intented for a baseline completely below the data.""" + y_data, baseline = baseline_1d_all_below() + + diff_y_1 = np.gradient(y_data) + diff_y_2 = np.gradient(diff_y_1) + rms_diff_1 = np.sqrt(diff_y_1.dot(diff_y_1) / y_data.size) + rms_diff_2 = np.sqrt(diff_y_2.dot(diff_y_2) / y_data.size) + + diff_1_weights = np.exp(-((diff_y_1 / rms_diff_1)**2) / 2) + diff_2_weights = np.exp(-((diff_y_2 / rms_diff_2)**2) / 2) + partial_weights = diff_1_weights * diff_2_weights + + weights = _weighting._derpsalsa(y_data, baseline, p, k, y_data.shape, partial_weights) + expected_weights = partial_weights * p * np.exp(-0.5 * ((y_data - baseline) / k)**2) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) diff --git a/tests/test_whittaker.py b/tests/test_whittaker.py index 28297b2..917d26c 100644 --- a/tests/test_whittaker.py +++ b/tests/test_whittaker.py @@ -150,7 +150,7 @@ def test_avoid_overflow_warning(self, no_noise_data_fixture): with np.errstate(over='raise'): baseline = self.class_func(y, tol=-1, max_iter=1000)[0] - assert np.isfinite(baseline.dot(baseline)) + assert np.isfinite(baseline).all() class TestDrPLS(WhittakerTester): @@ -175,32 +175,27 @@ def test_diff_order_one_fails(self): with pytest.raises(ValueError): self.class_func(self.y, lam=1e2, diff_order=1) - # ignore the RuntimeWarning that occurs from using +/- inf or nan - @pytest.mark.filterwarnings('ignore::RuntimeWarning') def test_avoid_nonfinite_weights(self, no_noise_data_fixture): """ - Ensures that the function gracefully exits when non-finite weights are created. + Ensures that the function does not create non-finite weights. - When there are no negative residuals or exp(iterations) / std is very high, both - of which occur when a low tol value is used with a high max_iter value, the - weighting function would produce non-finite values. The returned baseline should - be the last iteration that was successful, and thus should not contain nan or +/- inf. + drpls should not experience overflow since there is a cap on the iteration used + within the exponential, so no warnings or errors should be emitted even when using + a very high max_iter and low tol. Use data without noise since the lack of noise makes it easier to induce failure. Set tol to -1 so that it is never reached, and set max_iter to a high value. - Uses np.isfinite on the dot product of the baseline since the dot product is fast, - would propogate the nan or inf, and will create only a single value to check - for finite-ness. """ x, y = no_noise_data_fixture - with pytest.warns(ParameterWarning): + with np.errstate(over='raise'): baseline, params = self.class_func(y, tol=-1, max_iter=1000) - assert np.isfinite(baseline.dot(baseline)) - # ensure last tolerence calculation was non-finite as a double-check that + assert np.isfinite(baseline).all() + # ensure last tolerence calculation was finite as a double-check that # this test is actually doing what it should be doing - assert not np.isfinite(params['tol_history'][-1]) + assert np.isfinite(params['tol_history'][-1]) + assert np.isfinite(params['weights']).all() class TestIArPLS(WhittakerTester): @@ -214,16 +209,13 @@ def test_diff_orders(self, diff_order): lam = {1: 1e2, 3: 1e10}[diff_order] self.class_func(self.y, lam=lam, diff_order=diff_order) - # ignore the RuntimeWarning that occurs from using +/- inf or nan - @pytest.mark.filterwarnings('ignore::RuntimeWarning') def test_avoid_nonfinite_weights(self, no_noise_data_fixture): """ - Ensures that the function gracefully exits when non-finite weights are created. + Ensures that the function does not create non-finite weights. - When there are no negative residuals or exp(iterations) / std is very high, both - of which occur when a low tol value is used with a high max_iter value, the - weighting function would produce non-finite values. The returned baseline should - be the last iteration that was successful, and thus should not contain nan or +/- inf. + iarpls should not experience overflow since there is a cap on the iteration used + within the exponential, so no warnings or errors should be emitted even when using + a very high max_iter and low tol. Use data without noise since the lack of noise makes it easier to induce failure. Set tol to -1 so that it is never reached, and set max_iter to a high value. @@ -233,13 +225,14 @@ def test_avoid_nonfinite_weights(self, no_noise_data_fixture): """ x, y = no_noise_data_fixture - with pytest.warns(ParameterWarning): + with np.errstate(over='raise'): baseline, params = self.class_func(y, tol=-1, max_iter=1000) - assert np.isfinite(baseline.dot(baseline)) - # ensure last tolerence calculation was non-finite as a double-check that + assert np.isfinite(baseline).all() + # ensure last tolerence calculation was finite as a double-check that # this test is actually doing what it should be doing - assert not np.isfinite(params['tol_history'][-1]) + assert np.isfinite(params['tol_history'][-1]) + assert np.isfinite(params['weights']).all() class TestAsPLS(WhittakerTester): @@ -296,6 +289,12 @@ def test_alpha_multiplication(self, diff_order): result = _banded_utils._shift_rows(result, diff_order, diff_order) assert_allclose(result, expected_result, rtol=1e-13, atol=1e-13) + @pytest.mark.parametrize('assymetric_coef', (0, -1)) + def test_outside_assymetric_coef_fails(self, assymetric_coef): + """Ensures assymetric_coef values not greater than 0 raise an exception.""" + with pytest.raises(ValueError): + self.class_func(self.y, assymetric_coef=assymetric_coef) + class TestPsalsa(WhittakerTester): """Class for testing psalsa baseline.""" @@ -314,6 +313,12 @@ def test_diff_orders(self, diff_order): lam = {1: 1e2, 3: 1e10}[diff_order] self.class_func(self.y, lam=lam, diff_order=diff_order) + @pytest.mark.parametrize('k', (0, -1)) + def test_outside_k_fails(self, k): + """Ensures k values not greater than 0 raise an exception.""" + with pytest.raises(ValueError): + self.class_func(self.y, k=k) + class TestDerpsalsa(WhittakerTester): """Class for testing derpsalsa baseline.""" @@ -331,3 +336,9 @@ def test_diff_orders(self, diff_order): """Ensure that other difference orders work.""" lam = {1: 1e2, 3: 1e10}[diff_order] self.class_func(self.y, lam=lam, diff_order=diff_order) + + @pytest.mark.parametrize('k', (0, -1)) + def test_outside_k_fails(self, k): + """Ensures k values not greater than 0 raise an exception.""" + with pytest.raises(ValueError): + self.class_func(self.y, k=k) diff --git a/tests/two_d/test_spline.py b/tests/two_d/test_spline.py index e952bd3..a359e19 100644 --- a/tests/two_d/test_spline.py +++ b/tests/two_d/test_spline.py @@ -10,7 +10,6 @@ from numpy.testing import assert_allclose import pytest -from pybaselines import utils from pybaselines.two_d import spline, whittaker from ..conftest import BaseTester2D, InputWeightsMixin @@ -211,32 +210,6 @@ def test_diff_orders(self, diff_order): """Ensure that other difference orders work.""" self.class_func(self.y, diff_order=diff_order) - @pytest.mark.skip(reason='test is too slow') - # ignore the RuntimeWarning that occurs from using +/- inf or nan - @pytest.mark.filterwarnings('ignore::RuntimeWarning') - def test_avoid_nonfinite_weights(self, no_noise_data_fixture2d): - """ - Ensures that the function gracefully exits when errors occur. - - When there are no negative residuals, which occurs when a low tol value is used with - a high max_iter value, the weighting function would produce values all ~0, which - can fail the solvers. The returned baseline should be the last iteration that was - successful, and thus should not contain nan or +/- inf. - - Use data without noise since the lack of noise makes it easier to induce failure. - Set tol to -1 so that it is never reached, and set max_iter to a high value. - Uses np.isfinite on the dot product of the baseline since the dot product is fast, - would propogate the nan or inf, and will create only a single value to check - for finite-ness. - - """ - x, z, y = no_noise_data_fixture2d - with pytest.warns(utils.ParameterWarning): - baseline, _ = getattr(self.algorithm_base(x, z), self.func_name)( - y, tol=-1, max_iter=7000 - ) - assert np.isfinite(baseline).all() - @pytest.mark.parametrize('lam', (1e1, 1e5, [1e1, 1e5])) @pytest.mark.parametrize('diff_order', (1, 3, [2, 3])) def test_whittaker_comparison(self, lam, diff_order): @@ -254,27 +227,6 @@ def test_diff_orders(self, diff_order): """Ensure that other difference orders work.""" self.class_func(self.y, diff_order=diff_order) - @pytest.mark.skip(reason='test is too slow') - def test_avoid_overflow_warning(self, no_noise_data_fixture2d): - """ - Ensures no warning is emitted for exponential overflow. - - The weighting is 1 / (1 + exp(values)), so if values is too high, - exp(values) is inf, which should usually emit an overflow warning. - However, the resulting weight is 0, which is fine, so the warning is - not needed and should be avoided. This test ensures the overflow warning - is not emitted, and also ensures that the output is all finite, just in - case the weighting was not actually stable. - - """ - x, z, y = no_noise_data_fixture2d - with np.errstate(over='raise'): - baseline, params = getattr(self.algorithm_base(x, z), self.func_name)( - y, tol=-1, max_iter=1000 - ) - - assert np.isfinite(baseline).all() - @pytest.mark.parametrize('lam', (1e1, 1e5, [1e1, 1e5])) @pytest.mark.parametrize('diff_order', (1, 3, [2, 3])) def test_whittaker_comparison(self, lam, diff_order): @@ -292,36 +244,6 @@ def test_diff_orders(self, diff_order): """Ensure that other difference orders work.""" self.class_func(self.y, diff_order=diff_order) - @pytest.mark.skip(reason='test is too slow') - # ignore the RuntimeWarning that occurs from using +/- inf or nan - @pytest.mark.filterwarnings('ignore::RuntimeWarning') - def test_avoid_nonfinite_weights(self, no_noise_data_fixture2d): - """ - Ensures that the function gracefully exits when non-finite weights are created. - - When there are no negative residuals or exp(iterations) / std is very high, both - of which occur when a low tol value is used with a high max_iter value, the - weighting function would produce non-finite values. The returned baseline should - be the last iteration that was successful, and thus should not contain nan or +/- inf. - - Use data without noise since the lack of noise makes it easier to induce failure. - Set tol to -1 so that it is never reached, and set max_iter to a high value. - Uses np.isfinite on the dot product of the baseline since the dot product is fast, - would propogate the nan or inf, and will create only a single value to check - for finite-ness. - - """ - x, z, y = no_noise_data_fixture2d - with pytest.warns(utils.ParameterWarning): - baseline, params = getattr(self.algorithm_base(x, z), self.func_name)( - y, tol=-1, max_iter=1000 - ) - - assert np.isfinite(baseline).all() - # ensure last tolerence calculation was non-finite as a double-check that - # this test is actually doing what it should be doing - assert not np.isfinite(params['tol_history'][-1]) - @pytest.mark.parametrize('lam', (1e1, 1e5, [1e1, 1e5])) @pytest.mark.parametrize('diff_order', (1, 3, [2, 3])) def test_whittaker_comparison(self, lam, diff_order): @@ -354,3 +276,8 @@ def test_whittaker_comparison(self, lam, p, diff_order): self, 'psalsa', self.y, lam=lam, p=p, diff_order=diff_order, test_rtol=1e5 ) + @pytest.mark.parametrize('k', (0, -1)) + def test_outside_k_fails(self, k): + """Ensures k values not greater than 0 raise an exception.""" + with pytest.raises(ValueError): + self.class_func(self.y, k=k) diff --git a/tests/two_d/test_whittaker.py b/tests/two_d/test_whittaker.py index c5fd229..1b685f6 100644 --- a/tests/two_d/test_whittaker.py +++ b/tests/two_d/test_whittaker.py @@ -10,7 +10,6 @@ import pytest from pybaselines.two_d import whittaker -from pybaselines.utils import ParameterWarning from ..conftest import BaseTester2D, InputWeightsMixin @@ -108,33 +107,6 @@ def test_diff_orders(self, diff_order): """Ensure that other difference orders work.""" self.class_func(self.y, diff_order=diff_order) - @pytest.mark.skip(reason='test is too slow') - # ignore the RuntimeWarning that occurs from using +/- inf or nan - @pytest.mark.filterwarnings('ignore::RuntimeWarning') - def test_avoid_nonfinite_weights(self, no_noise_data_fixture2d): - """ - Ensures that the function gracefully exits when errors occur. - - When there are no negative residuals, which occurs when a low tol value is used with - a high max_iter value, the weighting function would produce values all ~0, which - can fail the solvers. The returned baseline should be the last iteration that was - successful, and thus should not contain nan or +/- inf. - - Use data without noise since the lack of noise makes it easier to induce failure. - Set tol to -1 so that it is never reached, and set max_iter to a high value. - Uses np.isfinite on the dot product of the baseline since the dot product is fast, - would propogate the nan or inf, and will create only a single value to check - for finite-ness. - - """ - x, z, y = no_noise_data_fixture2d - with pytest.warns(ParameterWarning): - baseline, _ = getattr(self.algorithm_base(x, z), self.func_name)( - y, tol=-1, max_iter=3000 - ) - - assert np.isfinite(baseline).all() - class TestArPLS(WhittakerTester, EigenvalueMixin): """Class for testing arpls baseline.""" @@ -146,27 +118,6 @@ def test_diff_orders(self, diff_order): """Ensure that other difference orders work.""" self.class_func(self.y, diff_order=diff_order) - @pytest.mark.skip(reason='test is too slow') - def test_avoid_overflow_warning(self, no_noise_data_fixture2d): - """ - Ensures no warning is emitted for exponential overflow. - - The weighting is 1 / (1 + exp(values)), so if values is too high, - exp(values) is inf, which should usually emit an overflow warning. - However, the resulting weight is 0, which is fine, so the warning is - not needed and should be avoided. This test ensures the overflow warning - is not emitted, and also ensures that the output is all finite, just in - case the weighting was not actually stable. - - """ - x, z, y = no_noise_data_fixture2d - with np.errstate(over='raise'): - baseline, _ = getattr(self.algorithm_base(x, z), self.func_name)( - y, tol=-1, max_iter=1000 - ) - - assert np.isfinite(baseline).all() - class TestDrPLS(WhittakerTester): """Class for testing drpls baseline.""" @@ -195,36 +146,6 @@ def test_diff_order_one_fails(self): with pytest.raises(ValueError): self.class_func(self.y, lam=1e2, diff_order=[2, 1]) - @pytest.mark.skip(reason='test is too slow') - # ignore the RuntimeWarning that occurs from using +/- inf or nan - @pytest.mark.filterwarnings('ignore::RuntimeWarning') - def test_avoid_nonfinite_weights(self, no_noise_data_fixture2d): - """ - Ensures that the function gracefully exits when non-finite weights are created. - - When there are no negative residuals or exp(iterations) / std is very high, both - of which occur when a low tol value is used with a high max_iter value, the - weighting function would produce non-finite values. The returned baseline should - be the last iteration that was successful, and thus should not contain nan or +/- inf. - - Use data without noise since the lack of noise makes it easier to induce failure. - Set tol to -1 so that it is never reached, and set max_iter to a high value. - Uses np.isfinite on the dot product of the baseline since the dot product is fast, - would propogate the nan or inf, and will create only a single value to check - for finite-ness. - - """ - x, z, y = no_noise_data_fixture2d - with pytest.warns(ParameterWarning): - baseline, params = getattr(self.algorithm_base(x, z), self.func_name)( - y, tol=-1, max_iter=1000 - ) - - assert np.isfinite(baseline).all() - # ensure last tolerence calculation was non-finite as a double-check that - # this test is actually doing what it should be doing - assert not np.isfinite(params['tol_history'][-1]) - class TestIArPLS(WhittakerTester, EigenvalueMixin): """Class for testing iarpls baseline.""" @@ -236,36 +157,6 @@ def test_diff_orders(self, diff_order): """Ensure that other difference orders work.""" self.class_func(self.y, diff_order=diff_order) - @pytest.mark.skip(reason='test is too slow') - # ignore the RuntimeWarning that occurs from using +/- inf or nan - @pytest.mark.filterwarnings('ignore::RuntimeWarning') - def test_avoid_nonfinite_weights(self, no_noise_data_fixture2d): - """ - Ensures that the function gracefully exits when non-finite weights are created. - - When there are no negative residuals or exp(iterations) / std is very high, both - of which occur when a low tol value is used with a high max_iter value, the - weighting function would produce non-finite values. The returned baseline should - be the last iteration that was successful, and thus should not contain nan or +/- inf. - - Use data without noise since the lack of noise makes it easier to induce failure. - Set tol to -1 so that it is never reached, and set max_iter to a high value. - Uses np.isfinite on the dot product of the baseline since the dot product is fast, - would propogate the nan or inf, and will create only a single value to check - for finite-ness. - - """ - x, z, y = no_noise_data_fixture2d - with pytest.warns(ParameterWarning): - baseline, params = getattr(self.algorithm_base(x, z), self.func_name)( - y, tol=-1, max_iter=1000 - ) - - assert np.isfinite(baseline).all() - # ensure last tolerence calculation was non-finite as a double-check that - # this test is actually doing what it should be doing - assert not np.isfinite(params['tol_history'][-1]) - class TestAsPLS(WhittakerTester): """Class for testing aspls baseline.""" @@ -289,26 +180,11 @@ def test_wrong_alpha_shape(self, alpha_enum): with pytest.raises(ValueError): self.class_func(self.y, alpha=alpha) - @pytest.mark.skip(reason='test is too slow') - def test_avoid_overflow_warning(self, no_noise_data_fixture2d): - """ - Ensures no warning is emitted for exponential overflow. - - The weighting is 1 / (1 + exp(values)), so if values is too high, - exp(values) is inf, which should usually emit an overflow warning. - However, the resulting weight is 0, which is fine, so the warning is - not needed and should be avoided. This test ensures the overflow warning - is not emitted, and also ensures that the output is all finite, just in - case the weighting was not actually stable. - - """ - x, z, y = no_noise_data_fixture2d - with np.errstate(over='raise'): - baseline, _ = getattr(self.algorithm_base(x, z), self.func_name)( - y, tol=-1, max_iter=1000 - ) - - assert np.isfinite(baseline).all() + @pytest.mark.parametrize('assymetric_coef', (0, -1)) + def test_outside_assymetric_coef_fails(self, assymetric_coef): + """Ensures assymetric_coef values not greater than 0 raise an exception.""" + with pytest.raises(ValueError): + self.class_func(self.y, assymetric_coef=assymetric_coef) class TestPsalsa(WhittakerTester, EigenvalueMixin): @@ -326,3 +202,9 @@ def test_outside_p_fails(self, p): def test_diff_orders(self, diff_order): """Ensure that other difference orders work.""" self.class_func(self.y, diff_order=diff_order) + + @pytest.mark.parametrize('k', (0, -1)) + def test_outside_k_fails(self, k): + """Ensures k values not greater than 0 raise an exception.""" + with pytest.raises(ValueError): + self.class_func(self.y, k=k)