From 4a0bf54c6ecd6cf633059458013ff08e2a78ea21 Mon Sep 17 00:00:00 2001 From: Donnie Erb <55961724+derb12@users.noreply.github.com> Date: Sun, 15 Dec 2024 09:02:51 -0500 Subject: [PATCH 1/6] FIX: Corrected airpls weighting The weighting equation for airpls was misprinted in its journal article, so changed to the correct weighting scheme. Also improved overflow avoidance for airpls so that try except blocks are no longer needed, and slightly sped up asls weighting. --- pybaselines/_weighting.py | 86 ++++++++++- pybaselines/spline.py | 55 ++----- pybaselines/two_d/spline.py | 44 ++---- pybaselines/two_d/whittaker.py | 44 +----- pybaselines/whittaker.py | 63 ++------ tests/conftest.py | 2 +- tests/test_weighting.py | 266 ++++++++++++++++++++++++++++++++- 7 files changed, 394 insertions(+), 166 deletions(-) diff --git a/pybaselines/_weighting.py b/pybaselines/_weighting.py index 15d3ad0..b07dad3 100644 --- a/pybaselines/_weighting.py +++ b/pybaselines/_weighting.py @@ -1,10 +1,12 @@ # -*- coding: utf-8 -*- """Contains various weighting schemes used in pybaselines.""" +import warnings + import numpy as np from scipy.special import expit -from .utils import _MIN_FLOAT +from .utils import _MIN_FLOAT, ParameterWarning def _asls(y, baseline, p): @@ -22,7 +24,7 @@ def _asls(y, baseline, p): p : float The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. + will be given `1 - p` weight. Returns ------- @@ -38,11 +40,83 @@ def _asls(y, baseline, p): asymmetric least squares method, Analytical Methods, 2014, 6(12), 4402-4407. """ - mask = y > baseline - weights = p * mask + (1 - p) * (~mask) + weights = np.where(y > baseline, p, 1 - p) return weights +def _airpls(y, baseline, iteration): + """ + The weighting for adaptive iteratively reweighted penalized least squares (airPLS). + + Parameters + ---------- + y : numpy.ndarray, shape (N,) + The measured data. + baseline : numpy.ndarray, shape (N,) + The calculated baseline. + iteration : int + The iteration number. Should be 1-based, such that the first iteration is 1 + instead of 0. + + Returns + ------- + weights : numpy.ndarray, shape (N,) + The calculated weights. + residual_l1_norm : float + The L1 norm of the negative residuals, used to calculate the exit criteria + for the airPLS algorithm. + exit_early : bool + Designates if there is a potential error with the calculation such that no further + iterations should be performed. + + References + ---------- + Zhang, Z.M., et al. Baseline correction using adaptive iteratively + reweighted penalized least squares. Analyst, 2010, 135(5), 1138-1146. + + Notes + ----- + Equation 9 in the original algorithm was misprinted according to the author + (https://github.com/zmzhang/airPLS/issues/8), so the correct weighting is used here. + + The pybaselines weighting differs from the original airPLS algorithm by normalizing the + weights by their maximum value to ensure that weights are within [0, 1]; the original + algorithm is defined such that all weights are greater than or equal to 1 for negative + residuals, which would differ from other Whittaker-smoothing based algorithms and could + potentially cause issues if combining with optimizer functions. + + """ + residual = y - baseline + neg_mask = residual < 0 + neg_residual = residual[neg_mask] + if neg_residual.size < 2: + exit_early = True + warnings.warn( + ('almost all baseline points are below the data, indicating that "tol"' + ' is too low and/or "max_iter" is too high'), ParameterWarning, + stacklevel=2 + ) + return np.zeros_like(y), 0.0, exit_early + else: + exit_early = False + + residual_l1_norm = abs(neg_residual.sum()) + # clip from [0, log(max dtype)] since the positive residuals (negative values) do not matter + log_max = np.log(np.finfo(y.dtype).max) + inner = np.clip( + (-iteration / residual_l1_norm) * neg_residual, + a_min=0, + a_max=log_max - np.spacing(log_max) + ) + new_weights = np.exp(inner) + # Not stated in the paper, but without dividing by the maximum weight, the + # calculated weights are all greater than or equal to 1 + weights = np.zeros_like(y) + weights[neg_mask] = new_weights / new_weights.max() + + return weights, residual_l1_norm, exit_early + + def _safe_std(array, **kwargs): """ Calculates the standard deviation and protects against nan and 0. @@ -228,7 +302,7 @@ def _psalsa(y, baseline, p, k, shape_y): p : float The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. + will be given `1 - p` weight. k : float A factor that controls the exponential decay of the weights for baseline values greater than the data. Should be approximately the height at which @@ -270,7 +344,7 @@ def _derpsalsa(y, baseline, p, k, shape_y, partial_weights): p : float The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. + will be given `1 - p` weight. k : float A factor that controls the exponential decay of the weights for baseline values greater than the data. Should be approximately the height at which diff --git a/pybaselines/spline.py b/pybaselines/spline.py index d205692..aa7098c 100644 --- a/pybaselines/spline.py +++ b/pybaselines/spline.py @@ -46,7 +46,7 @@ def mixture_model(self, data, lam=1e5, p=1e-2, num_knots=100, spline_degree=3, d p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Used to set the initial weights before performing + will be given `1 - p` weight. Used to set the initial weights before performing expectation-maximization. Default is 1e-2. num_knots : int, optional The number of knots for the spline. Default is 100. @@ -381,7 +381,7 @@ def pspline_asls(self, data, lam=1e3, p=1e-2, num_knots=100, spline_degree=3, di p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. num_knots : int, optional The number of knots for the spline. Default is 100. spline_degree : int, optional @@ -469,7 +469,7 @@ def pspline_iasls(self, data, lam=1e1, p=1e-2, lam_1=1e-4, num_knots=100, p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. lam_1 : float, optional The smoothing parameter for the first derivative of the residual. Default is 1e-4. num_knots : int, optional @@ -621,47 +621,22 @@ def pspline_airpls(self, data, lam=1e3, num_knots=100, spline_degree=3, """ y, weight_array = self._setup_spline( - data, weights, spline_degree, num_knots, True, diff_order, lam, copy_weights=True + data, weights, spline_degree, num_knots, True, diff_order, lam ) y_l1_norm = np.abs(y).sum() tol_history = np.empty(max_iter + 1) for i in range(1, max_iter + 2): - try: - output = self.pspline.solve_pspline(y, weight_array) - except np.linalg.LinAlgError: - warnings.warn( - ('error occurred during fitting, indicating that "tol"' - ' is too low, "max_iter" is too high, or "lam" is too high'), - ParameterWarning, stacklevel=2 - ) - i -= 1 # reduce i so that output tol_history indexing is correct - break - else: - baseline = output - - residual = y - baseline - neg_mask = residual < 0 - neg_residual = residual[neg_mask] - if len(neg_residual) < 2: - # exit if there are < 2 negative residuals since all points or all but one - # point would get a weight of 0, which fails the solver - warnings.warn( - ('almost all baseline points are below the data, indicating that "tol"' - ' is too low and/or "max_iter" is too high'), ParameterWarning, stacklevel=2 - ) + baseline = self.pspline.solve_pspline(y, weight_array) + new_weights, residual_l1_norm, exit_early = _weighting._airpls(y, baseline, i) + if exit_early: i -= 1 # reduce i so that output tol_history indexing is correct break - - residual_l1_norm = abs(neg_residual.sum()) calc_difference = residual_l1_norm / y_l1_norm tol_history[i - 1] = calc_difference if calc_difference < tol: break - # only use negative residual in exp to avoid exponential overflow warnings - # and accidently creating a weight of nan (inf * 0 = nan) - weight_array[neg_mask] = np.exp(i * neg_residual / residual_l1_norm) - weight_array[~neg_mask] = 0 + weight_array = new_weights params = {'weights': weight_array, 'tol_history': tol_history[:i]} @@ -1068,7 +1043,7 @@ def pspline_psalsa(self, data, lam=1e3, p=0.5, k=None, num_knots=100, spline_deg p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 0.5. + will be given `1 - p` weight. Default is 0.5. k : float, optional A factor that controls the exponential decay of the weights for baseline values greater than the data. Should be approximately the height at which @@ -1164,7 +1139,7 @@ def pspline_derpsalsa(self, data, lam=1e2, p=1e-2, k=None, num_knots=100, spline p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. k : float, optional A factor that controls the exponential decay of the weights for baseline values greater than the data. Should be approximately the height at which @@ -1414,7 +1389,7 @@ def mixture_model(data, lam=1e5, p=1e-2, num_knots=100, spline_degree=3, diff_or p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Used to set the initial weights before performing + will be given `1 - p` weight. Used to set the initial weights before performing expectation-maximization. Default is 1e-2. num_knots : int, optional The number of knots for the spline. Default is 100. @@ -1743,7 +1718,7 @@ def pspline_asls(data, lam=1e3, p=1e-2, num_knots=100, spline_degree=3, diff_ord p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. num_knots : int, optional The number of knots for the spline. Default is 100. spline_degree : int, optional @@ -1819,7 +1794,7 @@ def pspline_iasls(data, x_data=None, lam=1e1, p=1e-2, lam_1=1e-4, num_knots=100, p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. lam_1 : float, optional The smoothing parameter for the first derivative of the residual. Default is 1e-4. num_knots : int, optional @@ -2223,7 +2198,7 @@ def pspline_psalsa(data, lam=1e3, p=0.5, k=None, num_knots=100, spline_degree=3, p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 0.5. + will be given `1 - p` weight. Default is 0.5. k : float, optional A factor that controls the exponential decay of the weights for baseline values greater than the data. Should be approximately the height at which @@ -2302,7 +2277,7 @@ def pspline_derpsalsa(data, lam=1e2, p=1e-2, k=None, num_knots=100, spline_degre p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. k : float, optional A factor that controls the exponential decay of the weights for baseline values greater than the data. Should be approximately the height at which diff --git a/pybaselines/two_d/spline.py b/pybaselines/two_d/spline.py index ccfca8f..c43bc80 100644 --- a/pybaselines/two_d/spline.py +++ b/pybaselines/two_d/spline.py @@ -39,7 +39,7 @@ def mixture_model(self, data, lam=1e3, p=1e-2, num_knots=25, spline_degree=3, di p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Used to set the initial weights before performing + will be given `1 - p` weight. Used to set the initial weights before performing expectation-maximization. Default is 1e-2. num_knots : int or Sequence[int, int], optional The number of knots for the splines along the rows and columns, respectively. If a @@ -292,7 +292,7 @@ def pspline_asls(self, data, lam=1e3, p=1e-2, num_knots=25, spline_degree=3, dif p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. num_knots : int or Sequence[int, int], optional The number of knots for the splines along the rows and columns, respectively. If a single value is given, both will use the same value. Default is 25. @@ -383,7 +383,7 @@ def pspline_iasls(self, data, lam=1e3, p=1e-2, lam_1=1e-4, num_knots=25, p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. lam_1 : float, optional The smoothing parameter for the first derivative of the residual. Default is 1e-4. num_knots : int, optional @@ -529,48 +529,22 @@ def pspline_airpls(self, data, lam=1e3, num_knots=25, spline_degree=3, """ y, weight_array = self._setup_spline( - data, weights, spline_degree, num_knots, True, diff_order, lam, copy_weights=True + data, weights, spline_degree, num_knots, True, diff_order, lam ) y_l1_norm = np.abs(y).sum() tol_history = np.empty(max_iter + 1) for i in range(1, max_iter + 2): - try: - output = self.pspline.solve(y, weight_array) - except np.linalg.LinAlgError: - warnings.warn( - ('error occurred during fitting, indicating that "tol"' - ' is too low, "max_iter" is too high, or "lam" is too high'), - ParameterWarning, stacklevel=2 - ) - i -= 1 # reduce i so that output tol_history indexing is correct - break - else: - baseline = output - - residual = y - baseline - neg_mask = residual < 0 - neg_residual = residual[neg_mask] - if len(neg_residual) < 2: - # exit if there are < 2 negative residuals since all points or all but one - # point would get a weight of 0, which fails the solver - warnings.warn( - ('almost all baseline points are below the data, indicating that "tol"' - ' is too low and/or "max_iter" is too high'), ParameterWarning, - stacklevel=2 - ) + baseline = self.pspline.solve(y, weight_array) + new_weights, residual_l1_norm, exit_early = _weighting._airpls(y, baseline, i) + if exit_early: i -= 1 # reduce i so that output tol_history indexing is correct break - - residual_l1_norm = abs(neg_residual.sum()) calc_difference = residual_l1_norm / y_l1_norm tol_history[i - 1] = calc_difference if calc_difference < tol: break - # only use negative residual in exp to avoid exponential overflow warnings - # and accidently creating a weight of nan (inf * 0 = nan) - weight_array[neg_mask] = np.exp(i * neg_residual / residual_l1_norm) - weight_array[~neg_mask] = 0 + weight_array = new_weights params = {'weights': weight_array, 'tol_history': tol_history[:i]} @@ -758,7 +732,7 @@ def pspline_psalsa(self, data, lam=1e3, p=0.5, k=None, num_knots=25, spline_degr p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 0.5. + will be given `1 - p` weight. Default is 0.5. k : float, optional A factor that controls the exponential decay of the weights for baseline values greater than the data. Should be approximately the height at which diff --git a/pybaselines/two_d/whittaker.py b/pybaselines/two_d/whittaker.py index 87e8d70..eeac26a 100644 --- a/pybaselines/two_d/whittaker.py +++ b/pybaselines/two_d/whittaker.py @@ -38,7 +38,7 @@ def asls(self, data, lam=1e6, p=1e-2, diff_order=2, max_iter=50, tol=1e-3, weigh p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. diff_order : int or Sequence[int, int], optional The order of the differential matrix for the rows and columns, respectively. If a single value is given, both will use the same value. Must be greater than 0. @@ -143,7 +143,7 @@ def iasls(self, data, lam=1e6, p=1e-2, lam_1=1e-4, max_iter=50, tol=1e-3, p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. lam_1 : float or Sequence[float, float], optional The smoothing parameter for the rows and columns, respectively, of the first derivative of the residual. Default is 1e-4. @@ -282,50 +282,22 @@ def airpls(self, data, lam=1e6, diff_order=2, max_iter=50, tol=1e-3, weights=Non """ y, weight_array = self._setup_whittaker( - data, lam, diff_order, weights, copy_weights=True, num_eigens=num_eigens + data, lam, diff_order, weights, num_eigens=num_eigens ) y_l1_norm = np.abs(y).sum() tol_history = np.empty(max_iter + 1) - # Have to have extensive error handling since the weights can all become - # very small due to the exp(i) term if too many iterations are performed; - # checking the negative residual length usually prevents any errors, but - # sometimes not so have to also catch any errors from the solvers for i in range(1, max_iter + 2): - try: - output = self.whittaker_system.solve(y, weight_array) - except np.linalg.LinAlgError: - warnings.warn( - ('error occurred during fitting, indicating that "tol"' - ' is too low, "max_iter" is too high, or "lam" is too high'), - ParameterWarning, stacklevel=2 - ) - i -= 1 # reduce i so that output tol_history indexing is correct - break - else: - baseline = output - residual = y - baseline - neg_mask = residual < 0 - neg_residual = residual[neg_mask] - if neg_residual.size < 2: - # exit if there are < 2 negative residuals since all points or all but one - # point would get a weight of 0, which fails the solver - warnings.warn( - ('almost all baseline points are below the data, indicating that "tol"' - ' is too low and/or "max_iter" is too high'), ParameterWarning, stacklevel=2 - ) + baseline = self.whittaker_system.solve(y, weight_array) + new_weights, residual_l1_norm, exit_early = _weighting._airpls(y, baseline, i) + if exit_early: i -= 1 # reduce i so that output tol_history indexing is correct break - - residual_l1_norm = abs(neg_residual.sum()) calc_difference = residual_l1_norm / y_l1_norm tol_history[i - 1] = calc_difference if calc_difference < tol: break - # only use negative residual in exp to avoid exponential overflow warnings - # and accidently creating a weight of nan (inf * 0 = nan) - weight_array[neg_mask] = np.exp(i * neg_residual / residual_l1_norm) - weight_array[~neg_mask] = 0 + weight_array = new_weights params = {'tol_history': tol_history[:i]} if self.whittaker_system._using_svd: @@ -745,7 +717,7 @@ def psalsa(self, data, lam=1e5, p=0.5, k=None, diff_order=2, max_iter=50, tol=1e p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 0.5. + will be given `1 - p` weight. Default is 0.5. k : float, optional A factor that controls the exponential decay of the weights for baseline values greater than the data. Should be approximately the height at which diff --git a/pybaselines/whittaker.py b/pybaselines/whittaker.py index 4ee45ad..5473450 100644 --- a/pybaselines/whittaker.py +++ b/pybaselines/whittaker.py @@ -38,7 +38,7 @@ def asls(self, data, lam=1e6, p=1e-2, diff_order=2, max_iter=50, tol=1e-3, weigh p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. diff_order : int, optional The order of the differential matrix. Must be greater than 0. Default is 2 (second order differential matrix). Typical values are 2 or 1. @@ -117,7 +117,7 @@ def iasls(self, data, lam=1e6, p=1e-2, lam_1=1e-4, max_iter=50, tol=1e-3, p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. lam_1 : float, optional The smoothing parameter for the first derivative of the residual. Default is 1e-4. max_iter : int, optional @@ -245,54 +245,23 @@ def airpls(self, data, lam=1e6, diff_order=2, max_iter=50, tol=1e-3, weights=Non reweighted penalized least squares. Analyst, 2010, 135(5), 1138-1146. """ - y, weight_array = self._setup_whittaker( - data, lam, diff_order, weights, copy_weights=True - ) + y, weight_array = self._setup_whittaker(data, lam, diff_order, weights) y_l1_norm = np.abs(y).sum() tol_history = np.empty(max_iter + 1) - # Have to have extensive error handling since the weights can all become - # very small due to the exp(i) term if too many iterations are performed; - # checking the negative residual length usually prevents any errors, but - # sometimes not so have to also catch any errors from the solvers for i in range(1, max_iter + 2): - try: - output = self.whittaker_system.solve( - self.whittaker_system.add_diagonal(weight_array), weight_array * y, - overwrite_b=True, check_output=True - ) - except np.linalg.LinAlgError: - warnings.warn( - ('error occurred during fitting, indicating that "tol"' - ' is too low, "max_iter" is too high, or "lam" is too high'), - ParameterWarning, stacklevel=2 - ) - i -= 1 # reduce i so that output tol_history indexing is correct - break - else: - baseline = output - residual = y - baseline - neg_mask = residual < 0 - neg_residual = residual[neg_mask] - if len(neg_residual) < 2: - # exit if there are < 2 negative residuals since all points or all but one - # point would get a weight of 0, which fails the solver - warnings.warn( - ('almost all baseline points are below the data, indicating that "tol"' - ' is too low and/or "max_iter" is too high'), ParameterWarning, - stacklevel=2 - ) + baseline = self.whittaker_system.solve( + self.whittaker_system.add_diagonal(weight_array), weight_array * y, + overwrite_b=True, check_output=True + ) + new_weights, residual_l1_norm, exit_early = _weighting._airpls(y, baseline, i) + if exit_early: i -= 1 # reduce i so that output tol_history indexing is correct break - - residual_l1_norm = abs(neg_residual.sum()) calc_difference = residual_l1_norm / y_l1_norm tol_history[i - 1] = calc_difference if calc_difference < tol: break - # only use negative residual in exp to avoid exponential overflow warnings - # and accidently creating a weight of nan (inf * 0 = nan) - weight_array[neg_mask] = np.exp(i * neg_residual / residual_l1_norm) - weight_array[~neg_mask] = 0 + weight_array = new_weights params = {'weights': weight_array, 'tol_history': tol_history[:i]} @@ -660,7 +629,7 @@ def psalsa(self, data, lam=1e5, p=0.5, k=None, diff_order=2, max_iter=50, tol=1e p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 0.5. + will be given `1 - p` weight. Default is 0.5. k : float, optional A factor that controls the exponential decay of the weights for baseline values greater than the data. Should be approximately the height at which @@ -751,7 +720,7 @@ def derpsalsa(self, data, lam=1e6, p=0.01, k=None, diff_order=2, max_iter=50, to p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. k : float, optional A factor that controls the exponential decay of the weights for baseline values greater than the data. Should be approximately the height at which @@ -868,7 +837,7 @@ def asls(data, lam=1e6, p=1e-2, diff_order=2, max_iter=50, tol=1e-3, weights=Non p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. diff_order : int, optional The order of the differential matrix. Must be greater than 0. Default is 2 (second order differential matrix). Typical values are 2 or 1. @@ -935,7 +904,7 @@ def iasls(data, x_data=None, lam=1e6, p=1e-2, lam_1=1e-4, max_iter=50, tol=1e-3, p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. lam_1 : float, optional The smoothing parameter for the first derivative of the residual. Default is 1e-4. max_iter : int, optional @@ -1278,7 +1247,7 @@ def psalsa(data, lam=1e5, p=0.5, k=None, diff_order=2, max_iter=50, tol=1e-3, p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 0.5. + will be given `1 - p` weight. Default is 0.5. k : float, optional A factor that controls the exponential decay of the weights for baseline values greater than the data. Should be approximately the height at which @@ -1352,7 +1321,7 @@ def derpsalsa(data, lam=1e6, p=0.01, k=None, diff_order=2, max_iter=50, tol=1e-3 p : float, optional The penalizing weighting factor. Must be between 0 and 1. Values greater than the baseline will be given `p` weight, and values less than the baseline - will be given `p - 1` weight. Default is 1e-2. + will be given `1 - p` weight. Default is 1e-2. k : float, optional A factor that controls the exponential decay of the weights for baseline values greater than the data. Should be approximately the height at which diff --git a/tests/conftest.py b/tests/conftest.py index 600dec5..d248f3c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -82,7 +82,7 @@ def gaussian2d(x, z, height=1.0, center_x=0.0, center_z=0.0, sigma_x=1.0, sigma_ Notes ----- - This is the same code as in pybaselines.utils.gaussian, but + This is the same code as in pybaselines.utils.gaussian2d, but this removes the dependence on pybaselines so that if an error with pybaselines occurs, this will be unaffected. diff --git a/tests/test_weighting.py b/tests/test_weighting.py index 6ba2e84..ad3f21f 100644 --- a/tests/test_weighting.py +++ b/tests/test_weighting.py @@ -5,7 +5,67 @@ from numpy.testing import assert_allclose import pytest -from pybaselines import _weighting, utils +from pybaselines import _weighting, utils, Baseline2D + +from .conftest import get_data, get_data2d + + +def baseline_1d_normal(): + """ + Values for testing weights for a normally fit baseline. + + Approximates the baseline as a first order polynomial fit, which is the + approximate fit for the first iteration of all Whittaker-smoothing-based + algorithms with a `diff_order` of 2 and a sufficiently high `lam` value. + + """ + x_data, y_data = get_data() + baseline = np.polynomial.Polynomial.fit(x_data, y_data, deg=1)(x_data) + return y_data, baseline + + +def baseline_1d_all_above(): + """Values for testing weights when all baseline points are above the data.""" + x_data, y_data = get_data() + baseline = np.full_like(y_data, y_data.max() + 10) + return y_data, baseline + + +def baseline_1d_all_below(): + """Values for testing weights when all baseline points are below the data.""" + x_data, y_data = get_data() + baseline = np.full_like(y_data, y_data.min() - 10) + return y_data, baseline + + +def baseline_2d_normal(): + """ + Values for testing weights for a normally fit baseline. + + Approximates the baseline as a first order polynomial fit, which is the + approximate fit for the first iteration of all Whittaker-smoothing-based + algorithms with a `diff_order` of 2 and a sufficiently high `lam` value. + + """ + x_data, z_data, y_data = get_data2d() + baseline = Baseline2D(x_data, z_data, check_finite=False, assume_sorted=True).poly( + y_data, poly_order=1 + )[0] + return y_data, baseline + + +def baseline_2d_all_above(): + """Values for testing weights when all baseline points are above the data.""" + x_data, z_data, y_data = get_data2d() + baseline = np.full_like(y_data, y_data.max() + 10) + return y_data, baseline + + +def baseline_2d_all_below(): + """Values for testing weights when all baseline points are below the data.""" + x_data, z_data, y_data = get_data2d() + baseline = np.full_like(y_data, y_data.min() - 10) + return y_data, baseline def test_safe_std(): @@ -77,3 +137,207 @@ def test_quantile_weighting(quantile): expected_loss = numerator / denominator assert_allclose(calc_loss, expected_loss) + + +@pytest.mark.parametrize('p', (0.01, 0.99)) +@pytest.mark.parametrize('two_d', (True, False)) +def test_asls_normal(p, two_d): + """Ensures asls weighting works as intented for a normal baseline.""" + if two_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + weights = _weighting._asls(y_data, baseline, p) + expected_weights = np.where(y_data > baseline, p, 1 - p) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + + +@pytest.mark.parametrize('p', (0.01, 0.99)) +@pytest.mark.parametrize('two_d', (True, False)) +def test_asls_all_above(p, two_d): + """Ensures asls weighting works as intented for a baseline with all points above the data.""" + if two_d: + y_data, baseline = baseline_1d_all_above() + else: + y_data, baseline = baseline_2d_all_above() + weights = _weighting._asls(y_data, baseline, p) + expected_weights = np.full_like(y_data, 1 - p) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + + +@pytest.mark.parametrize('p', (0.01, 0.99)) +@pytest.mark.parametrize('two_d', (True, False)) +def test_asls_all_below(p, two_d): + """Ensures asls weighting works as intented for a baseline with all points below the data.""" + if two_d: + y_data, baseline = baseline_1d_all_below() + else: + y_data, baseline = baseline_2d_all_below() + weights = _weighting._asls(y_data, baseline, p) + expected_weights = np.full_like(y_data, p) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + + +def expected_airpls(y, baseline, iteration): + """ + The weighting for adaptive iteratively reweighted penalized least squares (airPLS). + + Does not perform error checking since this is just used for simple weighting cases. + + Parameters + ---------- + y : numpy.ndarray, shape (N,) + The measured data. + baseline : numpy.ndarray, shape (N,) + The calculated baseline. + iteration : int + The iteration number. Should be 1-based, such that the first iteration is 1 + instead of 0. + + Returns + ------- + weights : numpy.ndarray, shape (N,) + The calculated weights. + residual_l1_norm : float + The L1 norm of the negative residuals, used to calculate the exit criteria + for the airPLS algorithm. + exit_early : bool + Designates if there is a potential error with the calculation such that no further + iterations should be performed. + + References + ---------- + Zhang, Z.M., et al. Baseline correction using adaptive iteratively + reweighted penalized least squares. Analyst, 2010, 135(5), 1138-1146. + + Notes + ----- + Equation 9 in the original algorithm was misprinted according to the author + (https://github.com/zmzhang/airPLS/issues/8), so the correct weighting is used here. + + The pybaselines weighting differs from the original airPLS algorithm by dividing the + weights by their maximum value to ensure that weights are within [0, 1]; the original + algorithm allowed weights to be greater than 1 which does not make sense mathematically + and does not match any other Whittaker-smoothing based algorithms. + + """ + residual = y - baseline + neg_mask = residual < 0 + neg_residual = residual[neg_mask] + residual_l1_norm = abs(neg_residual).sum() + + new_weights = np.exp((-iteration / residual_l1_norm) * neg_residual) + # Not stated in the paper, but without dividing by the maximum weight, the + # calculated weights can be greater than 1 which does not make sense mathematically + weights = np.zeros_like(y) + weights[neg_mask] = new_weights / new_weights.max() + + return weights + + +@pytest.mark.parametrize('iteration', (1, 10)) +@pytest.mark.parametrize('two_d', (True, False)) +def test_airpls_normal(iteration, two_d): + """Ensures airpls weighting works as intented for a normal baseline.""" + if two_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + weights, residual_l1_norm, exit_early = _weighting._airpls(y_data, baseline, iteration) + expected_weights = expected_airpls(y_data, baseline, iteration) + + residual = y_data - baseline + expected_residual_l1_norm = abs(residual[residual < 0].sum()) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert_allclose(residual_l1_norm, expected_residual_l1_norm, rtol=1e-12, atol=1e-12) + assert not exit_early + + +@pytest.mark.parametrize('iteration', (1, 10)) +@pytest.mark.parametrize('two_d', (True, False)) +def test_airpls_all_above(iteration, two_d): + """Ensures airpls weighting works as intented for a baseline with all points above the data.""" + if two_d: + y_data, baseline = baseline_1d_all_above() + else: + y_data, baseline = baseline_2d_all_above() + weights, residual_l1_norm, exit_early = _weighting._airpls(y_data, baseline, iteration) + expected_weights = expected_airpls(y_data, baseline, iteration) + expected_residual_l1_norm = abs(y_data - baseline).sum() # all residuals should be negative + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert_allclose(residual_l1_norm, expected_residual_l1_norm, rtol=1e-12, atol=1e-12) + assert not exit_early + + +@pytest.mark.parametrize('iteration', (1, 10)) +@pytest.mark.parametrize('two_d', (True, False)) +def test_airpls_all_below(iteration, two_d): + """Ensures airpls weighting works as intented for a baseline with all points below the data.""" + if two_d: + y_data, baseline = baseline_1d_all_below() + else: + y_data, baseline = baseline_2d_all_below() + + with pytest.warns(utils.ParameterWarning): + weights, residual_l1_norm, exit_early = _weighting._airpls(y_data, baseline, iteration) + expected_weights = np.zeros_like(y_data) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert_allclose(residual_l1_norm, 0.0) + assert exit_early + + +@pytest.mark.parametrize('two_d', (True, False)) +@pytest.mark.parametrize('dtype', (float, np.float16, np.float32)) +def test_airpls_overflow(two_d, dtype): + """Ensures exponential overflow does not occur from airpls weighting.""" + if two_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + y_data = y_data.astype(dtype) + baseline = baseline.astype(dtype) + + residual = y_data - baseline + neg_mask = residual < 0 + neg_residual = residual[neg_mask] + expected_residual_l1_norm = abs(neg_residual).sum() + + # lowest iteration needed to cause exponential overflow for airpls weighting, plus 1 + iteration = ( + 1 + + np.log(np.finfo(y_data.dtype).max) * expected_residual_l1_norm / abs(neg_residual).max() + ) + + # sanity check to ensure overflow actually should occur and that it produces nan weights + with pytest.warns(RuntimeWarning): + expected_weights = expected_airpls(y_data, baseline, iteration) + assert (~np.isfinite(expected_weights)).any() + + with np.errstate(over='raise'): + weights, residual_l1_norm, exit_early = _weighting._airpls(y_data, baseline, iteration) + + assert np.isfinite(weights).all() + assert not exit_early + assert_allclose(residual_l1_norm, expected_residual_l1_norm, rtol=1e-12, atol=1e-12) + From 40d2325563da45296b0dfab2a39fdbd5d57fdf35 Mon Sep 17 00:00:00 2001 From: Donnie Erb <55961724+derb12@users.noreply.github.com> Date: Wed, 18 Dec 2024 13:00:37 -0500 Subject: [PATCH 2/6] OTHER: Improved arpls weighting error handling --- pybaselines/_weighting.py | 15 ++- pybaselines/spline.py | 5 +- pybaselines/two_d/spline.py | 5 +- pybaselines/two_d/whittaker.py | 5 +- pybaselines/whittaker.py | 5 +- tests/test_spline.py | 1 - tests/test_weighting.py | 178 ++++++++++++++++++++++++++++----- 7 files changed, 184 insertions(+), 30 deletions(-) diff --git a/pybaselines/_weighting.py b/pybaselines/_weighting.py index b07dad3..56af1d9 100644 --- a/pybaselines/_weighting.py +++ b/pybaselines/_weighting.py @@ -170,6 +170,9 @@ def _arpls(y, baseline): ------- weights : numpy.ndarray, shape (N,) The calculated weights. + exit_early : bool + Designates if there is a potential error with the calculation such that no further + iterations should be performed. References ---------- @@ -179,10 +182,20 @@ def _arpls(y, baseline): """ residual = y - baseline neg_residual = residual[residual < 0] + if neg_residual.size < 2: + exit_early = True + warnings.warn( + ('almost all baseline points are below the data, indicating that "tol"' + ' is too low and/or "max_iter" is too high'), ParameterWarning, + stacklevel=2 + ) + return np.zeros_like(y), exit_early + else: + exit_early = False std = _safe_std(neg_residual, ddof=1) # use dof=1 since sampling subset # add a negative sign since expit performs 1/(1+exp(-input)) weights = expit(-(2 / std) * (residual - (2 * std - np.mean(neg_residual)))) - return weights + return weights, exit_early def _drpls(y, baseline, iteration): diff --git a/pybaselines/spline.py b/pybaselines/spline.py index aa7098c..c787359 100644 --- a/pybaselines/spline.py +++ b/pybaselines/spline.py @@ -705,7 +705,10 @@ def pspline_arpls(self, data, lam=1e3, num_knots=100, spline_degree=3, diff_orde tol_history = np.empty(max_iter + 1) for i in range(max_iter + 1): baseline = self.pspline.solve_pspline(y, weight_array) - new_weights = _weighting._arpls(y, baseline) + new_weights, exit_early = _weighting._arpls(y, baseline) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break calc_difference = relative_difference(weight_array, new_weights) tol_history[i] = calc_difference if calc_difference < tol: diff --git a/pybaselines/two_d/spline.py b/pybaselines/two_d/spline.py index c43bc80..4e8eb69 100644 --- a/pybaselines/two_d/spline.py +++ b/pybaselines/two_d/spline.py @@ -613,7 +613,10 @@ def pspline_arpls(self, data, lam=1e3, num_knots=25, spline_degree=3, diff_order tol_history = np.empty(max_iter + 1) for i in range(max_iter + 1): baseline = self.pspline.solve(y, weight_array) - new_weights = _weighting._arpls(y, baseline) + new_weights, exit_early = _weighting._arpls(y, baseline) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break calc_difference = relative_difference(weight_array, new_weights) tol_history[i] = calc_difference if calc_difference < tol: diff --git a/pybaselines/two_d/whittaker.py b/pybaselines/two_d/whittaker.py index eeac26a..4819181 100644 --- a/pybaselines/two_d/whittaker.py +++ b/pybaselines/two_d/whittaker.py @@ -380,7 +380,10 @@ def arpls(self, data, lam=1e3, diff_order=2, max_iter=50, tol=1e-3, weights=None tol_history = np.empty(max_iter + 1) for i in range(max_iter + 1): baseline = self.whittaker_system.solve(y, weight_array) - new_weights = _weighting._arpls(y, baseline) + new_weights, exit_early = _weighting._arpls(y, baseline) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break calc_difference = relative_difference(weight_array, new_weights) tol_history[i] = calc_difference if calc_difference < tol: diff --git a/pybaselines/whittaker.py b/pybaselines/whittaker.py index 5473450..14f3e1d 100644 --- a/pybaselines/whittaker.py +++ b/pybaselines/whittaker.py @@ -319,7 +319,10 @@ def arpls(self, data, lam=1e5, diff_order=2, max_iter=50, tol=1e-3, weights=None self.whittaker_system.add_diagonal(weight_array), weight_array * y, overwrite_b=True ) - new_weights = _weighting._arpls(y, baseline) + new_weights, exit_early = _weighting._arpls(y, baseline) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break calc_difference = relative_difference(weight_array, new_weights) tol_history[i] = calc_difference if calc_difference < tol: diff --git a/tests/test_spline.py b/tests/test_spline.py index 298a9f0..da9da08 100644 --- a/tests/test_spline.py +++ b/tests/test_spline.py @@ -260,7 +260,6 @@ def test_diff_orders(self, diff_order): lam = {1: 1e2, 3: 1e10}[diff_order] self.class_func(self.y, lam=lam, diff_order=diff_order) - @pytest.mark.skip(reason='overflow will be addressed next version') def test_avoid_overflow_warning(self, no_noise_data_fixture): """ Ensures no warning is emitted for exponential overflow. diff --git a/tests/test_weighting.py b/tests/test_weighting.py index ad3f21f..7cf39ba 100644 --- a/tests/test_weighting.py +++ b/tests/test_weighting.py @@ -123,10 +123,14 @@ def test_safe_std_allow_nan(run_enum): @pytest.mark.parametrize('quantile', np.linspace(0, 1, 21)) -def test_quantile_weighting(quantile): +@pytest.mark.parametrize('one_d', (True, False)) +def test_quantile_weighting(quantile, one_d): """Ensures the quantile weighting calculation is correct.""" - y = np.linspace(-1, 1) - fit = np.zeros(y.shape[0]) + if one_d: + y, fit = baseline_1d_normal() + else: + y, fit = baseline_2d_normal() + residual = y - fit eps = 1e-10 calc_loss = _weighting._quantile(y, fit, quantile, eps) @@ -140,10 +144,10 @@ def test_quantile_weighting(quantile): @pytest.mark.parametrize('p', (0.01, 0.99)) -@pytest.mark.parametrize('two_d', (True, False)) -def test_asls_normal(p, two_d): +@pytest.mark.parametrize('one_d', (True, False)) +def test_asls_normal(p, one_d): """Ensures asls weighting works as intented for a normal baseline.""" - if two_d: + if one_d: y_data, baseline = baseline_1d_normal() else: y_data, baseline = baseline_2d_normal() @@ -157,10 +161,10 @@ def test_asls_normal(p, two_d): @pytest.mark.parametrize('p', (0.01, 0.99)) -@pytest.mark.parametrize('two_d', (True, False)) -def test_asls_all_above(p, two_d): +@pytest.mark.parametrize('one_d', (True, False)) +def test_asls_all_above(p, one_d): """Ensures asls weighting works as intented for a baseline with all points above the data.""" - if two_d: + if one_d: y_data, baseline = baseline_1d_all_above() else: y_data, baseline = baseline_2d_all_above() @@ -173,10 +177,10 @@ def test_asls_all_above(p, two_d): @pytest.mark.parametrize('p', (0.01, 0.99)) -@pytest.mark.parametrize('two_d', (True, False)) -def test_asls_all_below(p, two_d): +@pytest.mark.parametrize('one_d', (True, False)) +def test_asls_all_below(p, one_d): """Ensures asls weighting works as intented for a baseline with all points below the data.""" - if two_d: + if one_d: y_data, baseline = baseline_1d_all_below() else: y_data, baseline = baseline_2d_all_below() @@ -246,10 +250,10 @@ def expected_airpls(y, baseline, iteration): @pytest.mark.parametrize('iteration', (1, 10)) -@pytest.mark.parametrize('two_d', (True, False)) -def test_airpls_normal(iteration, two_d): +@pytest.mark.parametrize('one_d', (True, False)) +def test_airpls_normal(iteration, one_d): """Ensures airpls weighting works as intented for a normal baseline.""" - if two_d: + if one_d: y_data, baseline = baseline_1d_normal() else: y_data, baseline = baseline_2d_normal() @@ -268,10 +272,10 @@ def test_airpls_normal(iteration, two_d): @pytest.mark.parametrize('iteration', (1, 10)) -@pytest.mark.parametrize('two_d', (True, False)) -def test_airpls_all_above(iteration, two_d): +@pytest.mark.parametrize('one_d', (True, False)) +def test_airpls_all_above(iteration, one_d): """Ensures airpls weighting works as intented for a baseline with all points above the data.""" - if two_d: + if one_d: y_data, baseline = baseline_1d_all_above() else: y_data, baseline = baseline_2d_all_above() @@ -287,10 +291,10 @@ def test_airpls_all_above(iteration, two_d): @pytest.mark.parametrize('iteration', (1, 10)) -@pytest.mark.parametrize('two_d', (True, False)) -def test_airpls_all_below(iteration, two_d): +@pytest.mark.parametrize('one_d', (True, False)) +def test_airpls_all_below(iteration, one_d): """Ensures airpls weighting works as intented for a baseline with all points below the data.""" - if two_d: + if one_d: y_data, baseline = baseline_1d_all_below() else: y_data, baseline = baseline_2d_all_below() @@ -306,11 +310,11 @@ def test_airpls_all_below(iteration, two_d): assert exit_early -@pytest.mark.parametrize('two_d', (True, False)) +@pytest.mark.parametrize('one_d', (True, False)) @pytest.mark.parametrize('dtype', (float, np.float16, np.float32)) -def test_airpls_overflow(two_d, dtype): +def test_airpls_overflow(one_d, dtype): """Ensures exponential overflow does not occur from airpls weighting.""" - if two_d: + if one_d: y_data, baseline = baseline_1d_normal() else: y_data, baseline = baseline_2d_normal() @@ -341,3 +345,129 @@ def test_airpls_overflow(two_d, dtype): assert not exit_early assert_allclose(residual_l1_norm, expected_residual_l1_norm, rtol=1e-12, atol=1e-12) + +def expected_arpls(y, baseline): + """ + The weighting for asymmetrically reweighted penalized least squares smoothing (arpls). + + Does not perform error checking since this is just used for simple weighting cases. + + Parameters + ---------- + y : numpy.ndarray, shape (N,) + The measured data. + baseline : numpy.ndarray, shape (N,) + The calculated baseline. + + Returns + ------- + weights : numpy.ndarray, shape (N,) + The calculated weights. + + References + ---------- + Baek, S.J., et al. Baseline correction using asymmetrically reweighted + penalized least squares smoothing. Analyst, 2015, 140, 250-257. + + """ + residual = y - baseline + neg_residual = residual[residual < 0] + std = _weighting._safe_std(neg_residual, ddof=1) # use dof=1 since sampling subset + weights = 1 / (1 + np.exp((2 / std) * (residual - (2 * std - np.mean(neg_residual))))) + return weights + + +@pytest.mark.parametrize('one_d', (True, False)) +def test_arpls_normal(one_d): + """Ensures arpls weighting works as intented for a normal baseline.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + weights, exit_early = _weighting._arpls(y_data, baseline) + expected_weights = expected_arpls(y_data, baseline) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert not exit_early + + +@pytest.mark.parametrize('one_d', (True, False)) +def test_arpls_all_above(one_d): + """Ensures arpls weighting works as intented for a baseline with all points above the data.""" + if one_d: + y_data, baseline = baseline_1d_all_above() + else: + y_data, baseline = baseline_2d_all_above() + weights, exit_early = _weighting._arpls(y_data, baseline) + expected_weights = expected_arpls(y_data, baseline) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert not exit_early + + +@pytest.mark.parametrize('one_d', (True, False)) +def test_arpls_all_below(one_d): + """Ensures arpls weighting works as intented for a baseline with all points below the data.""" + if one_d: + y_data, baseline = baseline_1d_all_below() + else: + y_data, baseline = baseline_2d_all_below() + + with pytest.warns(utils.ParameterWarning): + weights, exit_early = _weighting._arpls(y_data, baseline) + expected_weights = np.zeros_like(y_data) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert exit_early + + +@pytest.mark.parametrize('one_d', (True, False)) +def test_arpls_overflow(one_d): + """Ensures exponential overflow does not occur from arpls weighting.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + log_max = np.log(np.finfo(y_data.dtype).max) + residual = y_data - baseline + neg_residual = residual[residual < 0] + std = np.std(neg_residual, ddof=1) + mean = np.mean(neg_residual) + # for exponential overlow, (residual + mean) / std > 0.5 * log_max + 2 + # changing one value in the baseline to cause overflow will not cause the mean and + # standard deviation to change since the residual value will be positive at that index + overflow_index = 10 + overflow_value = (0.5 * log_max + 2) * std - mean + 10 # add 10 for good measure + if one_d: + baseline[overflow_index] = overflow_value + else: + baseline[overflow_index, overflow_index] = overflow_value + + # sanity check to ensure overflow actually should occur + with pytest.warns(RuntimeWarning): + expected_weights = expected_arpls(y_data, baseline) + # the resulting weights should still be finite since 1 / (1 + inf) == 0 + assert np.isfinite(expected_weights).all() + + with np.errstate(over='raise'): + weights, exit_early = _weighting._arpls(y_data, baseline) + + assert np.isfinite(weights).all() + assert not exit_early + + # the actual weight where overflow should have occurred should be 0 + if one_d: + assert_allclose(weights[overflow_index], 0.0, atol=1e-14) + else: + assert_allclose(weights[overflow_index, overflow_index], 0.0, atol=1e-14) + + # weights should still be the same as the nieve calculation regardless of exponential overflow + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) From eb78ce55b92815bf91419d812eaf9073ee69cab5 Mon Sep 17 00:00:00 2001 From: Donnie Erb <55961724+derb12@users.noreply.github.com> Date: Wed, 18 Dec 2024 14:13:01 -0500 Subject: [PATCH 3/6] OTHER: Made drpls weighting more robust Also improved overflow avoidance for drpls so that try except blocks are no longer needed. --- pybaselines/_weighting.py | 22 ++++++- pybaselines/spline.py | 21 ++----- pybaselines/two_d/whittaker.py | 21 ++----- pybaselines/whittaker.py | 21 ++----- tests/test_spline.py | 31 ++++------ tests/test_weighting.py | 110 +++++++++++++++++++++++++++++++++ tests/test_whittaker.py | 25 +++----- 7 files changed, 169 insertions(+), 82 deletions(-) diff --git a/pybaselines/_weighting.py b/pybaselines/_weighting.py index 56af1d9..ef049dd 100644 --- a/pybaselines/_weighting.py +++ b/pybaselines/_weighting.py @@ -216,6 +216,9 @@ def _drpls(y, baseline, iteration): ------- weights : numpy.ndarray, shape (N,) The calculated weights. + exit_early : bool + Designates if there is a potential error with the calculation such that no further + iterations should be performed. References ---------- @@ -225,10 +228,25 @@ def _drpls(y, baseline, iteration): """ residual = y - baseline neg_residual = residual[residual < 0] + if neg_residual.size < 2: + exit_early = True + warnings.warn( + ('almost all baseline points are below the data, indicating that "tol"' + ' is too low and/or "max_iter" is too high'), ParameterWarning, + stacklevel=2 + ) + return np.zeros_like(y), exit_early + else: + exit_early = False + std = _safe_std(neg_residual, ddof=1) # use dof=1 since only sampling a subset - inner = (np.exp(iteration) / std) * (residual - (2 * std - np.mean(neg_residual))) + # the exponential term is used to change the shape of the weighting from a logistic curve + # at low iterations to a step curve at higher iterations (figure 1 in the paper); setting + # the maximum iteration to 100 still acheives this purpose while avoiding unnecesarry + # overflow for high iterations + inner = (np.exp(min(iteration, 100)) / std) * (residual - (2 * std - np.mean(neg_residual))) weights = 0.5 * (1 - (inner / (1 + np.abs(inner)))) - return weights + return weights, exit_early def _iarpls(y, baseline, iteration): diff --git a/pybaselines/spline.py b/pybaselines/spline.py index c787359..5dcf11c 100644 --- a/pybaselines/spline.py +++ b/pybaselines/spline.py @@ -812,23 +812,14 @@ def pspline_drpls(self, data, lam=1e3, eta=0.5, num_knots=100, spline_degree=3, y, weight_array, penalty=_add_diagonals(self.pspline.penalty, diff_n_w_diagonals, lower_only=False) ) - new_weights = _weighting._drpls(y, baseline, i) + new_weights, exit_early = _weighting._drpls(y, baseline, i) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break + calc_difference = relative_difference(weight_array, new_weights) tol_history[i - 1] = calc_difference - if not np.isfinite(calc_difference): - # catches nan, inf and -inf due to exp(i) being too high or if there - # are too few negative residuals; no way to catch both conditions before - # new_weights calculation since it is hard to estimate if - # (exp(i) / std) * residual will overflow; check calc_difference rather - # than checking new_weights since non-finite values rarely occur and - # checking a scalar is faster; cannot use np.errstate since it is not 100% reliable - warnings.warn( - ('nan and/or +/- inf occurred in weighting calculation, likely meaning ' - '"tol" is too low and/or "max_iter" is too high'), ParameterWarning, - stacklevel=2 - ) - break - elif calc_difference < tol: + if calc_difference < tol: break weight_array = new_weights diff --git a/pybaselines/two_d/whittaker.py b/pybaselines/two_d/whittaker.py index 4819181..c237bae 100644 --- a/pybaselines/two_d/whittaker.py +++ b/pybaselines/two_d/whittaker.py @@ -475,23 +475,14 @@ def drpls(self, data, lam=1e5, eta=0.5, max_iter=50, tol=1e-3, weights=None, dif baseline = self.whittaker_system.direct_solve( partial_penalty + weight_matrix @ partial_penalty_2, weight_array * y ) - new_weights = _weighting._drpls(y, baseline, i) + new_weights, exit_early = _weighting._drpls(y, baseline, i) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break + calc_difference = relative_difference(weight_array, new_weights) tol_history[i - 1] = calc_difference - if not np.isfinite(calc_difference): - # catches nan, inf and -inf due to exp(i) being too high or if there - # are too few negative residuals; no way to catch both conditions before - # new_weights calculation since it is hard to estimate if - # (exp(i) / std) * residual will overflow; check calc_difference rather - # than checking new_weights since non-finite values rarely occur and - # checking a scalar is faster; cannot use np.errstate since it is not 100% reliable - warnings.warn( - ('nan and/or +/- inf occurred in weighting calculation, likely meaning ' - '"tol" is too low and/or "max_iter" is too high'), ParameterWarning, - stacklevel=2 - ) - break - elif calc_difference < tol: + if calc_difference < tol: break weight_array = new_weights weight_matrix.setdiag(weight_array) diff --git a/pybaselines/whittaker.py b/pybaselines/whittaker.py index 14f3e1d..8ccd171 100644 --- a/pybaselines/whittaker.py +++ b/pybaselines/whittaker.py @@ -414,23 +414,14 @@ def drpls(self, data, lam=1e5, eta=0.5, max_iter=50, tol=1e-3, weights=None, dif self.whittaker_system.penalty + penalty_with_weights, weight_array * y, overwrite_ab=True, overwrite_b=True, l_and_u=lower_upper_bands ) - new_weights = _weighting._drpls(y, baseline, i) + new_weights, exit_early = _weighting._drpls(y, baseline, i) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break + calc_difference = relative_difference(weight_array, new_weights) tol_history[i - 1] = calc_difference - if not np.isfinite(calc_difference): - # catches nan, inf and -inf due to exp(i) being too high or if there - # are too few negative residuals; no way to catch both conditions before - # new_weights calculation since it is hard to estimate if - # (exp(i) / std) * residual will overflow; check calc_difference rather - # than checking new_weights since non-finite values rarely occur and - # checking a scalar is faster; cannot use np.errstate since it is not 100% reliable - warnings.warn( - ('nan and/or +/- inf occurred in weighting calculation, likely meaning ' - '"tol" is too low and/or "max_iter" is too high'), ParameterWarning, - stacklevel=2 - ) - break - elif calc_difference < tol: + if calc_difference < tol: break weight_array = new_weights diff --git a/tests/test_spline.py b/tests/test_spline.py index da9da08..fa53f6d 100644 --- a/tests/test_spline.py +++ b/tests/test_spline.py @@ -232,15 +232,12 @@ def test_avoid_nonfinite_weights(self, no_noise_data_fixture): Use data without noise since the lack of noise makes it easier to induce failure. Set tol to -1 so that it is never reached, and set max_iter to a high value. - Uses np.isfinite on the dot product of the baseline since the dot product is fast, - would propogate the nan or inf, and will create only a single value to check - for finite-ness. """ x, y = no_noise_data_fixture with pytest.warns(utils.ParameterWarning): baseline = self.class_func(y, tol=-1, max_iter=7000)[0] - assert np.isfinite(baseline.dot(baseline)) + assert np.isfinite(baseline).all() @pytest.mark.parametrize('lam', (1e1, 1e5)) @pytest.mark.parametrize('diff_order', (1, 2, 3)) @@ -276,7 +273,7 @@ def test_avoid_overflow_warning(self, no_noise_data_fixture): with np.errstate(over='raise'): baseline = self.class_func(y, tol=-1, max_iter=1000)[0] - assert np.isfinite(baseline.dot(baseline)) + assert np.isfinite(baseline).all() @pytest.mark.parametrize('lam', (1e1, 1e5)) @pytest.mark.parametrize('diff_order', (1, 2, 3)) @@ -296,32 +293,26 @@ def test_diff_orders(self, diff_order): lam = {2: 1e6, 3: 1e10}[diff_order] self.class_func(self.y, lam=lam, diff_order=diff_order) - # ignore the RuntimeWarning that occurs from using +/- inf or nan - @pytest.mark.filterwarnings('ignore::RuntimeWarning') def test_avoid_nonfinite_weights(self, no_noise_data_fixture): """ - Ensures that the function gracefully exits when non-finite weights are created. + Ensures that the function does not create non-finite weights. - When there are no negative residuals or exp(iterations) / std is very high, both - of which occur when a low tol value is used with a high max_iter value, the - weighting function would produce non-finite values. The returned baseline should - be the last iteration that was successful, and thus should not contain nan or +/- inf. + drpls should not experience overflow since there is a cap on the iteration used + within the exponential, so no warnings or errors should be emitted even when using + a very high max_iter and low tol. Use data without noise since the lack of noise makes it easier to induce failure. Set tol to -1 so that it is never reached, and set max_iter to a high value. - Uses np.isfinite on the dot product of the baseline since the dot product is fast, - would propogate the nan or inf, and will create only a single value to check - for finite-ness. """ x, y = no_noise_data_fixture - with pytest.warns(utils.ParameterWarning): - baseline, params = self.class_func(y, tol=-1, max_iter=1000) + baseline, params = self.class_func(y, tol=-1, max_iter=1000) - assert np.isfinite(baseline.dot(baseline)) - # ensure last tolerence calculation was non-finite as a double-check that + assert np.isfinite(baseline).all() + # ensure last tolerence calculation was finite as a double-check that # this test is actually doing what it should be doing - assert not np.isfinite(params['tol_history'][-1]) + assert np.isfinite(params['tol_history'][-1]) + assert np.isfinite(params['weights']).all() @pytest.mark.parametrize('lam', (1e1, 1e5)) @pytest.mark.parametrize('eta', (0.2, 0.8)) diff --git a/tests/test_weighting.py b/tests/test_weighting.py index 7cf39ba..dcf1235 100644 --- a/tests/test_weighting.py +++ b/tests/test_weighting.py @@ -471,3 +471,113 @@ def test_arpls_overflow(one_d): # weights should still be the same as the nieve calculation regardless of exponential overflow assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + + +def expected_drpls(y, baseline, iteration): + """ + The weighting for doubly reweighted penalized least squares (drpls). + + Does not perform error checking since this is just used for simple weighting cases. + + Parameters + ---------- + y : numpy.ndarray, shape (N,) + The measured data. + baseline : numpy.ndarray, shape (N,) + The calculated baseline. + iteration : int + The iteration number. Should be 1-based, such that the first iteration is 1 + instead of 0. + + Returns + ------- + weights : numpy.ndarray, shape (N,) + The calculated weights. + + References + ---------- + Xu, D. et al. Baseline correction method based on doubly reweighted + penalized least squares, Applied Optics, 2019, 58, 3913-3920. + + """ + residual = y - baseline + neg_residual = residual[residual < 0] + std = _weighting._safe_std(neg_residual, ddof=1) # use dof=1 since sampling subset + inner = (np.exp(iteration) / std) * (residual - (2 * std - np.mean(neg_residual))) + weights = 0.5 * (1 - (inner / (1 + np.abs(inner)))) + return weights + + +@pytest.mark.parametrize('iteration', (1, 10)) +@pytest.mark.parametrize('one_d', (True, False)) +def test_drpls_normal(iteration, one_d): + """Ensures drpls weighting works as intented for a normal baseline.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + weights, exit_early = _weighting._drpls(y_data, baseline, iteration) + expected_weights = expected_drpls(y_data, baseline, iteration) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert not exit_early + + +@pytest.mark.parametrize('iteration', (1, 10)) +@pytest.mark.parametrize('one_d', (True, False)) +def test_drpls_all_above(iteration, one_d): + """Ensures drpls weighting works as intented for a baseline with all points above the data.""" + if one_d: + y_data, baseline = baseline_1d_all_above() + else: + y_data, baseline = baseline_2d_all_above() + weights, exit_early = _weighting._drpls(y_data, baseline, iteration) + expected_weights = expected_drpls(y_data, baseline, iteration) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert not exit_early + + +@pytest.mark.parametrize('iteration', (1, 10)) +@pytest.mark.parametrize('one_d', (True, False)) +def test_drpls_all_below(iteration, one_d): + """Ensures drpls weighting works as intented for a baseline with all points below the data.""" + if one_d: + y_data, baseline = baseline_1d_all_below() + else: + y_data, baseline = baseline_2d_all_below() + + with pytest.warns(utils.ParameterWarning): + weights, exit_early = _weighting._drpls(y_data, baseline, iteration) + expected_weights = np.zeros_like(y_data) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert exit_early + + +@pytest.mark.parametrize('one_d', (True, False)) +def test_drpls_overflow(one_d): + """Ensures exponential overflow does not occur from drpls weighting.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + iteration = 1000 + # sanity check to ensure overflow actually should occur and that it produces nan weights + with pytest.warns(RuntimeWarning): + expected_weights = expected_drpls(y_data, baseline, iteration) + assert (~np.isfinite(expected_weights)).any() + + with np.errstate(over='raise'): + weights, exit_early = _weighting._drpls(y_data, baseline, iteration) + + assert np.isfinite(weights).all() + assert not exit_early diff --git a/tests/test_whittaker.py b/tests/test_whittaker.py index 28297b2..2754661 100644 --- a/tests/test_whittaker.py +++ b/tests/test_whittaker.py @@ -150,7 +150,7 @@ def test_avoid_overflow_warning(self, no_noise_data_fixture): with np.errstate(over='raise'): baseline = self.class_func(y, tol=-1, max_iter=1000)[0] - assert np.isfinite(baseline.dot(baseline)) + assert np.isfinite(baseline).all() class TestDrPLS(WhittakerTester): @@ -175,32 +175,27 @@ def test_diff_order_one_fails(self): with pytest.raises(ValueError): self.class_func(self.y, lam=1e2, diff_order=1) - # ignore the RuntimeWarning that occurs from using +/- inf or nan - @pytest.mark.filterwarnings('ignore::RuntimeWarning') def test_avoid_nonfinite_weights(self, no_noise_data_fixture): """ - Ensures that the function gracefully exits when non-finite weights are created. + Ensures that the function does not create non-finite weights. - When there are no negative residuals or exp(iterations) / std is very high, both - of which occur when a low tol value is used with a high max_iter value, the - weighting function would produce non-finite values. The returned baseline should - be the last iteration that was successful, and thus should not contain nan or +/- inf. + drpls should not experience overflow since there is a cap on the iteration used + within the exponential, so no warnings or errors should be emitted even when using + a very high max_iter and low tol. Use data without noise since the lack of noise makes it easier to induce failure. Set tol to -1 so that it is never reached, and set max_iter to a high value. - Uses np.isfinite on the dot product of the baseline since the dot product is fast, - would propogate the nan or inf, and will create only a single value to check - for finite-ness. """ x, y = no_noise_data_fixture - with pytest.warns(ParameterWarning): + with np.errstate(over='raise'): baseline, params = self.class_func(y, tol=-1, max_iter=1000) - assert np.isfinite(baseline.dot(baseline)) - # ensure last tolerence calculation was non-finite as a double-check that + assert np.isfinite(baseline).all() + # ensure last tolerence calculation was finite as a double-check that # this test is actually doing what it should be doing - assert not np.isfinite(params['tol_history'][-1]) + assert np.isfinite(params['tol_history'][-1]) + assert np.isfinite(params['weights']).all() class TestIArPLS(WhittakerTester): From 23c6c5ba8d14517824fe8bc56e9bb331145bc296 Mon Sep 17 00:00:00 2001 From: Donnie Erb <55961724+derb12@users.noreply.github.com> Date: Wed, 18 Dec 2024 14:41:41 -0500 Subject: [PATCH 4/6] OTHER: Made iarpls weighting more robust Improved overflow avoidance for iarpls so that try except blocks are no longer needed. Also removed all skipped tests for 2D Whittaker and spline tests since they had no use. --- pybaselines/_weighting.py | 25 ++++++- pybaselines/spline.py | 20 ++--- pybaselines/two_d/spline.py | 20 ++--- pybaselines/two_d/whittaker.py | 24 ++---- pybaselines/whittaker.py | 26 ++----- tests/test_spline.py | 20 +++-- tests/test_weighting.py | 116 +++++++++++++++++++++++++++++ tests/test_whittaker.py | 20 +++-- tests/two_d/test_spline.py | 79 -------------------- tests/two_d/test_whittaker.py | 130 --------------------------------- 10 files changed, 178 insertions(+), 302 deletions(-) diff --git a/pybaselines/_weighting.py b/pybaselines/_weighting.py index ef049dd..e55b402 100644 --- a/pybaselines/_weighting.py +++ b/pybaselines/_weighting.py @@ -267,6 +267,9 @@ def _iarpls(y, baseline, iteration): ------- weights : numpy.ndarray, shape (N,) The calculated weights. + exit_early : bool + Designates if there is a potential error with the calculation such that no further + iterations should be performed. References ---------- @@ -276,10 +279,26 @@ def _iarpls(y, baseline, iteration): """ residual = y - baseline - std = _safe_std(residual[residual < 0], ddof=1) # dof=1 since sampling a subset - inner = (np.exp(iteration) / std) * (residual - 2 * std) + neg_residual = residual[residual < 0] + if neg_residual.size < 2: + exit_early = True + warnings.warn( + ('almost all baseline points are below the data, indicating that "tol"' + ' is too low and/or "max_iter" is too high'), ParameterWarning, + stacklevel=2 + ) + return np.zeros_like(y), exit_early + else: + exit_early = False + + std = _safe_std(neg_residual, ddof=1) # use dof=1 since only sampling a subset + # the exponential term is used to change the shape of the weighting from a logistic curve + # at low iterations to a step curve at higher iterations (figure 1 in the paper); setting + # the maximum iteration to 100 still acheives this purpose while avoiding unnecesarry + # overflow for high iterations + inner = (np.exp(min(iteration, 100)) / std) * (residual - 2 * std) weights = 0.5 * (1 - (inner / np.sqrt(1 + inner**2))) - return weights + return weights, exit_early def _aspls(y, baseline): diff --git a/pybaselines/spline.py b/pybaselines/spline.py index 5dcf11c..3de828e 100644 --- a/pybaselines/spline.py +++ b/pybaselines/spline.py @@ -891,23 +891,13 @@ def pspline_iarpls(self, data, lam=1e3, num_knots=100, spline_degree=3, diff_ord tol_history = np.empty(max_iter + 1) for i in range(1, max_iter + 2): baseline = self.pspline.solve_pspline(y, weight_array) - new_weights = _weighting._iarpls(y, baseline, i) + new_weights, exit_early = _weighting._iarpls(y, baseline, i) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break calc_difference = relative_difference(weight_array, new_weights) tol_history[i - 1] = calc_difference - if not np.isfinite(calc_difference): - # catches nan, inf and -inf due to exp(i) being too high or if there - # are too few negative residuals; no way to catch both conditions before - # new_weights calculation since it is hard to estimate if - # (exp(i) / std) * residual will overflow; check calc_difference rather - # than checking new_weights since non-finite values rarely occur and - # checking a scalar is faster; cannot use np.errstate since it is not 100% reliable - warnings.warn( - ('nan and/or +/- inf occurred in weighting calculation, likely meaning ' - '"tol" is too low and/or "max_iter" is too high'), ParameterWarning, - stacklevel=2 - ) - break - elif calc_difference < tol: + if calc_difference < tol: break weight_array = new_weights diff --git a/pybaselines/two_d/spline.py b/pybaselines/two_d/spline.py index 4e8eb69..9f92697 100644 --- a/pybaselines/two_d/spline.py +++ b/pybaselines/two_d/spline.py @@ -694,23 +694,13 @@ def pspline_iarpls(self, data, lam=1e3, num_knots=25, spline_degree=3, diff_orde tol_history = np.empty(max_iter + 1) for i in range(1, max_iter + 2): baseline = self.pspline.solve(y, weight_array) - new_weights = _weighting._iarpls(y, baseline, i) + new_weights, exit_early = _weighting._iarpls(y, baseline, i) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break calc_difference = relative_difference(weight_array, new_weights) tol_history[i - 1] = calc_difference - if not np.isfinite(calc_difference): - # catches nan, inf and -inf due to exp(i) being too high or if there - # are too few negative residuals; no way to catch both conditions before - # new_weights calculation since it is hard to estimate if - # (exp(i) / std) * residual will overflow; check calc_difference rather - # than checking new_weights since non-finite values rarely occur and - # checking a scalar is faster; cannot use np.errstate since it is not 100% reliable - warnings.warn( - ('nan and/or +/- inf occurred in weighting calculation, likely meaning ' - '"tol" is too low and/or "max_iter" is too high'), ParameterWarning, - stacklevel=2 - ) - break - elif calc_difference < tol: + if calc_difference < tol: break weight_array = new_weights diff --git a/pybaselines/two_d/whittaker.py b/pybaselines/two_d/whittaker.py index c237bae..f049b68 100644 --- a/pybaselines/two_d/whittaker.py +++ b/pybaselines/two_d/whittaker.py @@ -6,14 +6,12 @@ """ -import warnings - import numpy as np from .. import _weighting from .._compat import diags from .._validation import _check_optional_array -from ..utils import _MIN_FLOAT, ParameterWarning, relative_difference +from ..utils import _MIN_FLOAT, relative_difference from ._algorithm_setup import _Algorithm2D from ._whittaker_utils import PenalizedSystem2D @@ -562,23 +560,13 @@ def iarpls(self, data, lam=1e5, diff_order=2, max_iter=50, tol=1e-3, weights=Non tol_history = np.empty(max_iter + 1) for i in range(1, max_iter + 2): baseline = self.whittaker_system.solve(y, weight_array) - new_weights = _weighting._iarpls(y, baseline, i) + new_weights, exit_early = _weighting._iarpls(y, baseline, i) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break calc_difference = relative_difference(weight_array, new_weights) tol_history[i - 1] = calc_difference - if not np.isfinite(calc_difference): - # catches nan, inf and -inf due to exp(i) being too high or if there - # are too few negative residuals; no way to catch both conditions before - # new_weights calculation since it is hard to estimate if - # (exp(i) / std) * residual will overflow; check calc_difference rather - # than checking new_weights since non-finite values rarely occur and - # checking a scalar is faster; cannot use np.errstate since it is not 100% reliable - warnings.warn( - ('nan and/or +/- inf occurred in weighting calculation, likely meaning ' - '"tol" is too low and/or "max_iter" is too high'), ParameterWarning, - stacklevel=2 - ) - break - elif calc_difference < tol: + if calc_difference < tol: break weight_array = new_weights diff --git a/pybaselines/whittaker.py b/pybaselines/whittaker.py index 8ccd171..3cf9904 100644 --- a/pybaselines/whittaker.py +++ b/pybaselines/whittaker.py @@ -6,17 +6,13 @@ """ -import warnings - import numpy as np from . import _weighting from ._algorithm_setup import _Algorithm, _class_wrapper from ._banded_utils import _shift_rows, diff_penalty_diagonals from ._validation import _check_lam, _check_optional_array -from .utils import ( - ParameterWarning, _mollifier_kernel, pad_edges, padded_convolve, relative_difference -) +from .utils import _mollifier_kernel, pad_edges, padded_convolve, relative_difference class _Whittaker(_Algorithm): @@ -482,23 +478,13 @@ def iarpls(self, data, lam=1e5, diff_order=2, max_iter=50, tol=1e-3, weights=Non self.whittaker_system.add_diagonal(weight_array), weight_array * y, overwrite_b=True ) - new_weights = _weighting._iarpls(y, baseline, i) + new_weights, exit_early = _weighting._iarpls(y, baseline, i) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break calc_difference = relative_difference(weight_array, new_weights) tol_history[i - 1] = calc_difference - if not np.isfinite(calc_difference): - # catches nan, inf and -inf due to exp(i) being too high or if there - # are too few negative residuals; no way to catch both conditions before - # new_weights calculation since it is hard to estimate if - # (exp(i) / std) * residual will overflow; check calc_difference rather - # than checking new_weights since non-finite values rarely occur and - # checking a scalar is faster; cannot use np.errstate since it is not 100% reliable - warnings.warn( - ('nan and/or +/- inf occurred in weighting calculation, likely meaning ' - '"tol" is too low and/or "max_iter" is too high'), ParameterWarning, - stacklevel=2 - ) - break - elif calc_difference < tol: + if calc_difference < tol: break weight_array = new_weights diff --git a/tests/test_spline.py b/tests/test_spline.py index fa53f6d..1884c5a 100644 --- a/tests/test_spline.py +++ b/tests/test_spline.py @@ -351,16 +351,13 @@ def test_diff_orders(self, diff_order): lam = {1: 1e2, 3: 1e10}[diff_order] self.class_func(self.y, lam=lam, diff_order=diff_order) - # ignore the RuntimeWarning that occurs from using +/- inf or nan - @pytest.mark.filterwarnings('ignore::RuntimeWarning') def test_avoid_nonfinite_weights(self, no_noise_data_fixture): """ - Ensures that the function gracefully exits when non-finite weights are created. + Ensures that the function does not create non-finite weights. - When there are no negative residuals or exp(iterations) / std is very high, both - of which occur when a low tol value is used with a high max_iter value, the - weighting function would produce non-finite values. The returned baseline should - be the last iteration that was successful, and thus should not contain nan or +/- inf. + iarpls should not experience overflow since there is a cap on the iteration used + within the exponential, so no warnings or errors should be emitted even when using + a very high max_iter and low tol. Use data without noise since the lack of noise makes it easier to induce failure. Set tol to -1 so that it is never reached, and set max_iter to a high value. @@ -370,13 +367,14 @@ def test_avoid_nonfinite_weights(self, no_noise_data_fixture): """ x, y = no_noise_data_fixture - with pytest.warns(utils.ParameterWarning): + with np.errstate(over='raise'): baseline, params = self.class_func(y, tol=-1, max_iter=1000) - assert np.isfinite(baseline.dot(baseline)) - # ensure last tolerence calculation was non-finite as a double-check that + assert np.isfinite(baseline).all() + # ensure last tolerence calculation was finite as a double-check that # this test is actually doing what it should be doing - assert not np.isfinite(params['tol_history'][-1]) + assert np.isfinite(params['tol_history'][-1]) + assert np.isfinite(params['weights']).all() @pytest.mark.parametrize('lam', (1e1, 1e5)) @pytest.mark.parametrize('diff_order', (1, 2, 3)) diff --git a/tests/test_weighting.py b/tests/test_weighting.py index dcf1235..2028cb1 100644 --- a/tests/test_weighting.py +++ b/tests/test_weighting.py @@ -581,3 +581,119 @@ def test_drpls_overflow(one_d): assert np.isfinite(weights).all() assert not exit_early + + +def expected_iarpls(y, baseline, iteration): + """ + The weighting for doubly reweighted penalized least squares (drpls). + + Does not perform error checking since this is just used for simple weighting cases. + + Parameters + ---------- + y : numpy.ndarray, shape (N,) + The measured data. + baseline : numpy.ndarray, shape (N,) + The calculated baseline. + iteration : int + The iteration number. Should be 1-based, such that the first iteration is 1 + instead of 0. + + Returns + ------- + weights : numpy.ndarray, shape (N,) + The calculated weights. + + References + ---------- + Ye, J., et al. Baseline correction method based on improved asymmetrically + reweighted penalized least squares for Raman spectrum. Applied Optics, 2020, + 59, 10933-10943. + + """ + residual = y - baseline + neg_residual = residual[residual < 0] + + std = np.std(neg_residual, ddof=1) # use dof=1 since only sampling a subset + # the exponential term is used to change the shape of the weighting from a logistic curve + # at low iterations to a step curve at higher iterations (figure 1 in the paper); setting + # the maximum iteration to 100 still acheives this purpose while avoiding unnecesarry + # overflow for high iterations + inner = (np.exp(iteration) / std) * (residual - 2 * std) + weights = 0.5 * (1 - (inner / np.sqrt(1 + inner**2))) + return weights + + +@pytest.mark.parametrize('iteration', (1, 10)) +@pytest.mark.parametrize('one_d', (True, False)) +def test_iarpls_normal(iteration, one_d): + """Ensures iarpls weighting works as intented for a normal baseline.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + weights, exit_early = _weighting._iarpls(y_data, baseline, iteration) + expected_weights = expected_iarpls(y_data, baseline, iteration) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert not exit_early + + +@pytest.mark.parametrize('iteration', (1, 10)) +@pytest.mark.parametrize('one_d', (True, False)) +def test_iarpls_all_above(iteration, one_d): + """Ensures iarpls weighting works as intented for a baseline with all points above the data.""" + if one_d: + y_data, baseline = baseline_1d_all_above() + else: + y_data, baseline = baseline_2d_all_above() + weights, exit_early = _weighting._iarpls(y_data, baseline, iteration) + expected_weights = expected_iarpls(y_data, baseline, iteration) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert not exit_early + + +@pytest.mark.parametrize('iteration', (1, 10)) +@pytest.mark.parametrize('one_d', (True, False)) +def test_iarpls_all_below(iteration, one_d): + """Ensures iarpls weighting works as intented for a baseline with all points below the data.""" + if one_d: + y_data, baseline = baseline_1d_all_below() + else: + y_data, baseline = baseline_2d_all_below() + + with pytest.warns(utils.ParameterWarning): + weights, exit_early = _weighting._iarpls(y_data, baseline, iteration) + expected_weights = np.zeros_like(y_data) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert exit_early + + +@pytest.mark.parametrize('one_d', (True, False)) +def test_iarpls_overflow(one_d): + """Ensures exponential overflow does not occur from iarpls weighting.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + iteration = 1000 + # sanity check to ensure overflow actually should occur and that it produces nan weights + with pytest.warns(RuntimeWarning): + expected_weights = expected_iarpls(y_data, baseline, iteration) + assert (~np.isfinite(expected_weights)).any() + + with np.errstate(over='raise'): + weights, exit_early = _weighting._iarpls(y_data, baseline, iteration) + + assert np.isfinite(weights).all() + assert not exit_early diff --git a/tests/test_whittaker.py b/tests/test_whittaker.py index 2754661..2270c3f 100644 --- a/tests/test_whittaker.py +++ b/tests/test_whittaker.py @@ -209,16 +209,13 @@ def test_diff_orders(self, diff_order): lam = {1: 1e2, 3: 1e10}[diff_order] self.class_func(self.y, lam=lam, diff_order=diff_order) - # ignore the RuntimeWarning that occurs from using +/- inf or nan - @pytest.mark.filterwarnings('ignore::RuntimeWarning') def test_avoid_nonfinite_weights(self, no_noise_data_fixture): """ - Ensures that the function gracefully exits when non-finite weights are created. + Ensures that the function does not create non-finite weights. - When there are no negative residuals or exp(iterations) / std is very high, both - of which occur when a low tol value is used with a high max_iter value, the - weighting function would produce non-finite values. The returned baseline should - be the last iteration that was successful, and thus should not contain nan or +/- inf. + iarpls should not experience overflow since there is a cap on the iteration used + within the exponential, so no warnings or errors should be emitted even when using + a very high max_iter and low tol. Use data without noise since the lack of noise makes it easier to induce failure. Set tol to -1 so that it is never reached, and set max_iter to a high value. @@ -228,13 +225,14 @@ def test_avoid_nonfinite_weights(self, no_noise_data_fixture): """ x, y = no_noise_data_fixture - with pytest.warns(ParameterWarning): + with np.errstate(over='raise'): baseline, params = self.class_func(y, tol=-1, max_iter=1000) - assert np.isfinite(baseline.dot(baseline)) - # ensure last tolerence calculation was non-finite as a double-check that + assert np.isfinite(baseline).all() + # ensure last tolerence calculation was finite as a double-check that # this test is actually doing what it should be doing - assert not np.isfinite(params['tol_history'][-1]) + assert np.isfinite(params['tol_history'][-1]) + assert np.isfinite(params['weights']).all() class TestAsPLS(WhittakerTester): diff --git a/tests/two_d/test_spline.py b/tests/two_d/test_spline.py index e952bd3..cd918b9 100644 --- a/tests/two_d/test_spline.py +++ b/tests/two_d/test_spline.py @@ -10,7 +10,6 @@ from numpy.testing import assert_allclose import pytest -from pybaselines import utils from pybaselines.two_d import spline, whittaker from ..conftest import BaseTester2D, InputWeightsMixin @@ -211,32 +210,6 @@ def test_diff_orders(self, diff_order): """Ensure that other difference orders work.""" self.class_func(self.y, diff_order=diff_order) - @pytest.mark.skip(reason='test is too slow') - # ignore the RuntimeWarning that occurs from using +/- inf or nan - @pytest.mark.filterwarnings('ignore::RuntimeWarning') - def test_avoid_nonfinite_weights(self, no_noise_data_fixture2d): - """ - Ensures that the function gracefully exits when errors occur. - - When there are no negative residuals, which occurs when a low tol value is used with - a high max_iter value, the weighting function would produce values all ~0, which - can fail the solvers. The returned baseline should be the last iteration that was - successful, and thus should not contain nan or +/- inf. - - Use data without noise since the lack of noise makes it easier to induce failure. - Set tol to -1 so that it is never reached, and set max_iter to a high value. - Uses np.isfinite on the dot product of the baseline since the dot product is fast, - would propogate the nan or inf, and will create only a single value to check - for finite-ness. - - """ - x, z, y = no_noise_data_fixture2d - with pytest.warns(utils.ParameterWarning): - baseline, _ = getattr(self.algorithm_base(x, z), self.func_name)( - y, tol=-1, max_iter=7000 - ) - assert np.isfinite(baseline).all() - @pytest.mark.parametrize('lam', (1e1, 1e5, [1e1, 1e5])) @pytest.mark.parametrize('diff_order', (1, 3, [2, 3])) def test_whittaker_comparison(self, lam, diff_order): @@ -254,27 +227,6 @@ def test_diff_orders(self, diff_order): """Ensure that other difference orders work.""" self.class_func(self.y, diff_order=diff_order) - @pytest.mark.skip(reason='test is too slow') - def test_avoid_overflow_warning(self, no_noise_data_fixture2d): - """ - Ensures no warning is emitted for exponential overflow. - - The weighting is 1 / (1 + exp(values)), so if values is too high, - exp(values) is inf, which should usually emit an overflow warning. - However, the resulting weight is 0, which is fine, so the warning is - not needed and should be avoided. This test ensures the overflow warning - is not emitted, and also ensures that the output is all finite, just in - case the weighting was not actually stable. - - """ - x, z, y = no_noise_data_fixture2d - with np.errstate(over='raise'): - baseline, params = getattr(self.algorithm_base(x, z), self.func_name)( - y, tol=-1, max_iter=1000 - ) - - assert np.isfinite(baseline).all() - @pytest.mark.parametrize('lam', (1e1, 1e5, [1e1, 1e5])) @pytest.mark.parametrize('diff_order', (1, 3, [2, 3])) def test_whittaker_comparison(self, lam, diff_order): @@ -292,36 +244,6 @@ def test_diff_orders(self, diff_order): """Ensure that other difference orders work.""" self.class_func(self.y, diff_order=diff_order) - @pytest.mark.skip(reason='test is too slow') - # ignore the RuntimeWarning that occurs from using +/- inf or nan - @pytest.mark.filterwarnings('ignore::RuntimeWarning') - def test_avoid_nonfinite_weights(self, no_noise_data_fixture2d): - """ - Ensures that the function gracefully exits when non-finite weights are created. - - When there are no negative residuals or exp(iterations) / std is very high, both - of which occur when a low tol value is used with a high max_iter value, the - weighting function would produce non-finite values. The returned baseline should - be the last iteration that was successful, and thus should not contain nan or +/- inf. - - Use data without noise since the lack of noise makes it easier to induce failure. - Set tol to -1 so that it is never reached, and set max_iter to a high value. - Uses np.isfinite on the dot product of the baseline since the dot product is fast, - would propogate the nan or inf, and will create only a single value to check - for finite-ness. - - """ - x, z, y = no_noise_data_fixture2d - with pytest.warns(utils.ParameterWarning): - baseline, params = getattr(self.algorithm_base(x, z), self.func_name)( - y, tol=-1, max_iter=1000 - ) - - assert np.isfinite(baseline).all() - # ensure last tolerence calculation was non-finite as a double-check that - # this test is actually doing what it should be doing - assert not np.isfinite(params['tol_history'][-1]) - @pytest.mark.parametrize('lam', (1e1, 1e5, [1e1, 1e5])) @pytest.mark.parametrize('diff_order', (1, 3, [2, 3])) def test_whittaker_comparison(self, lam, diff_order): @@ -353,4 +275,3 @@ def test_whittaker_comparison(self, lam, p, diff_order): compare_pspline_whittaker( self, 'psalsa', self.y, lam=lam, p=p, diff_order=diff_order, test_rtol=1e5 ) - diff --git a/tests/two_d/test_whittaker.py b/tests/two_d/test_whittaker.py index c5fd229..dba8376 100644 --- a/tests/two_d/test_whittaker.py +++ b/tests/two_d/test_whittaker.py @@ -10,7 +10,6 @@ import pytest from pybaselines.two_d import whittaker -from pybaselines.utils import ParameterWarning from ..conftest import BaseTester2D, InputWeightsMixin @@ -108,33 +107,6 @@ def test_diff_orders(self, diff_order): """Ensure that other difference orders work.""" self.class_func(self.y, diff_order=diff_order) - @pytest.mark.skip(reason='test is too slow') - # ignore the RuntimeWarning that occurs from using +/- inf or nan - @pytest.mark.filterwarnings('ignore::RuntimeWarning') - def test_avoid_nonfinite_weights(self, no_noise_data_fixture2d): - """ - Ensures that the function gracefully exits when errors occur. - - When there are no negative residuals, which occurs when a low tol value is used with - a high max_iter value, the weighting function would produce values all ~0, which - can fail the solvers. The returned baseline should be the last iteration that was - successful, and thus should not contain nan or +/- inf. - - Use data without noise since the lack of noise makes it easier to induce failure. - Set tol to -1 so that it is never reached, and set max_iter to a high value. - Uses np.isfinite on the dot product of the baseline since the dot product is fast, - would propogate the nan or inf, and will create only a single value to check - for finite-ness. - - """ - x, z, y = no_noise_data_fixture2d - with pytest.warns(ParameterWarning): - baseline, _ = getattr(self.algorithm_base(x, z), self.func_name)( - y, tol=-1, max_iter=3000 - ) - - assert np.isfinite(baseline).all() - class TestArPLS(WhittakerTester, EigenvalueMixin): """Class for testing arpls baseline.""" @@ -146,27 +118,6 @@ def test_diff_orders(self, diff_order): """Ensure that other difference orders work.""" self.class_func(self.y, diff_order=diff_order) - @pytest.mark.skip(reason='test is too slow') - def test_avoid_overflow_warning(self, no_noise_data_fixture2d): - """ - Ensures no warning is emitted for exponential overflow. - - The weighting is 1 / (1 + exp(values)), so if values is too high, - exp(values) is inf, which should usually emit an overflow warning. - However, the resulting weight is 0, which is fine, so the warning is - not needed and should be avoided. This test ensures the overflow warning - is not emitted, and also ensures that the output is all finite, just in - case the weighting was not actually stable. - - """ - x, z, y = no_noise_data_fixture2d - with np.errstate(over='raise'): - baseline, _ = getattr(self.algorithm_base(x, z), self.func_name)( - y, tol=-1, max_iter=1000 - ) - - assert np.isfinite(baseline).all() - class TestDrPLS(WhittakerTester): """Class for testing drpls baseline.""" @@ -195,36 +146,6 @@ def test_diff_order_one_fails(self): with pytest.raises(ValueError): self.class_func(self.y, lam=1e2, diff_order=[2, 1]) - @pytest.mark.skip(reason='test is too slow') - # ignore the RuntimeWarning that occurs from using +/- inf or nan - @pytest.mark.filterwarnings('ignore::RuntimeWarning') - def test_avoid_nonfinite_weights(self, no_noise_data_fixture2d): - """ - Ensures that the function gracefully exits when non-finite weights are created. - - When there are no negative residuals or exp(iterations) / std is very high, both - of which occur when a low tol value is used with a high max_iter value, the - weighting function would produce non-finite values. The returned baseline should - be the last iteration that was successful, and thus should not contain nan or +/- inf. - - Use data without noise since the lack of noise makes it easier to induce failure. - Set tol to -1 so that it is never reached, and set max_iter to a high value. - Uses np.isfinite on the dot product of the baseline since the dot product is fast, - would propogate the nan or inf, and will create only a single value to check - for finite-ness. - - """ - x, z, y = no_noise_data_fixture2d - with pytest.warns(ParameterWarning): - baseline, params = getattr(self.algorithm_base(x, z), self.func_name)( - y, tol=-1, max_iter=1000 - ) - - assert np.isfinite(baseline).all() - # ensure last tolerence calculation was non-finite as a double-check that - # this test is actually doing what it should be doing - assert not np.isfinite(params['tol_history'][-1]) - class TestIArPLS(WhittakerTester, EigenvalueMixin): """Class for testing iarpls baseline.""" @@ -236,36 +157,6 @@ def test_diff_orders(self, diff_order): """Ensure that other difference orders work.""" self.class_func(self.y, diff_order=diff_order) - @pytest.mark.skip(reason='test is too slow') - # ignore the RuntimeWarning that occurs from using +/- inf or nan - @pytest.mark.filterwarnings('ignore::RuntimeWarning') - def test_avoid_nonfinite_weights(self, no_noise_data_fixture2d): - """ - Ensures that the function gracefully exits when non-finite weights are created. - - When there are no negative residuals or exp(iterations) / std is very high, both - of which occur when a low tol value is used with a high max_iter value, the - weighting function would produce non-finite values. The returned baseline should - be the last iteration that was successful, and thus should not contain nan or +/- inf. - - Use data without noise since the lack of noise makes it easier to induce failure. - Set tol to -1 so that it is never reached, and set max_iter to a high value. - Uses np.isfinite on the dot product of the baseline since the dot product is fast, - would propogate the nan or inf, and will create only a single value to check - for finite-ness. - - """ - x, z, y = no_noise_data_fixture2d - with pytest.warns(ParameterWarning): - baseline, params = getattr(self.algorithm_base(x, z), self.func_name)( - y, tol=-1, max_iter=1000 - ) - - assert np.isfinite(baseline).all() - # ensure last tolerence calculation was non-finite as a double-check that - # this test is actually doing what it should be doing - assert not np.isfinite(params['tol_history'][-1]) - class TestAsPLS(WhittakerTester): """Class for testing aspls baseline.""" @@ -289,27 +180,6 @@ def test_wrong_alpha_shape(self, alpha_enum): with pytest.raises(ValueError): self.class_func(self.y, alpha=alpha) - @pytest.mark.skip(reason='test is too slow') - def test_avoid_overflow_warning(self, no_noise_data_fixture2d): - """ - Ensures no warning is emitted for exponential overflow. - - The weighting is 1 / (1 + exp(values)), so if values is too high, - exp(values) is inf, which should usually emit an overflow warning. - However, the resulting weight is 0, which is fine, so the warning is - not needed and should be avoided. This test ensures the overflow warning - is not emitted, and also ensures that the output is all finite, just in - case the weighting was not actually stable. - - """ - x, z, y = no_noise_data_fixture2d - with np.errstate(over='raise'): - baseline, _ = getattr(self.algorithm_base(x, z), self.func_name)( - y, tol=-1, max_iter=1000 - ) - - assert np.isfinite(baseline).all() - class TestPsalsa(WhittakerTester, EigenvalueMixin): """Class for testing psalsa baseline.""" From fcf4cfd5a19adef6682b2f0bd37e082910ab05c7 Mon Sep 17 00:00:00 2001 From: Donnie Erb <55961724+derb12@users.noreply.github.com> Date: Thu, 19 Dec 2024 14:38:35 -0500 Subject: [PATCH 5/6] OTHER: Allow inputting assymetric_coef for aspls and pspline_aspls Also added tests for the aspls weighting. --- pybaselines/_weighting.py | 28 ++++-- pybaselines/spline.py | 35 ++++++-- pybaselines/two_d/whittaker.py | 21 ++++- pybaselines/whittaker.py | 31 +++++-- tests/test_spline.py | 6 ++ tests/test_weighting.py | 158 ++++++++++++++++++++++++++++++++- tests/test_whittaker.py | 6 ++ tests/two_d/test_whittaker.py | 6 ++ 8 files changed, 267 insertions(+), 24 deletions(-) diff --git a/pybaselines/_weighting.py b/pybaselines/_weighting.py index e55b402..8b663df 100644 --- a/pybaselines/_weighting.py +++ b/pybaselines/_weighting.py @@ -301,7 +301,7 @@ def _iarpls(y, baseline, iteration): return weights, exit_early -def _aspls(y, baseline): +def _aspls(y, baseline, assymetric_coef): """ Weighting for the adaptive smoothness penalized least squares smoothing (aspls). @@ -311,6 +311,9 @@ def _aspls(y, baseline): The measured data. baseline : numpy.ndarray, shape (N,) The calculated baseline. + assymetric_coef : float + The assymetric coefficient for the weighting. Higher values leads to a steeper + weighting curve (ie. more step-like). Returns ------- @@ -318,10 +321,13 @@ def _aspls(y, baseline): The calculated weights. residual : numpy.ndarray, shape (N,) The residual, ``y - baseline``. + exit_early : bool + Designates if there is a potential error with the calculation such that no further + iterations should be performed. Notes ----- - The weighting uses an asymmetric coefficient (`k` in the asPLS paper) of 0.5 instead + The default asymmetric coefficient (`k` in the asPLS paper) is 0.5 instead of the 2 listed in the asPLS paper. pybaselines uses the factor of 0.5 since it matches the results in Table 2 and Figure 5 of the asPLS paper closer than the factor of 2 and fits noisy data much better. @@ -333,10 +339,22 @@ def _aspls(y, baseline): """ residual = y - baseline - std = _safe_std(residual[residual < 0], ddof=1) # use dof=1 since sampling a subset + neg_residual = residual[residual < 0] + if neg_residual.size < 2: + exit_early = True + warnings.warn( + ('almost all baseline points are below the data, indicating that "tol"' + ' is too low and/or "max_iter" is too high'), ParameterWarning, + stacklevel=2 + ) + return np.zeros_like(y), residual, exit_early + else: + exit_early = False + std = _safe_std(neg_residual, ddof=1) # use dof=1 since sampling subset + # add a negative sign since expit performs 1/(1+exp(-input)) - weights = expit(-(0.5 / std) * (residual - std)) - return weights, residual + weights = expit(-(assymetric_coef / std) * (residual - std)) + return weights, residual, exit_early def _psalsa(y, baseline, p, k, shape_y): diff --git a/pybaselines/spline.py b/pybaselines/spline.py index 3de828e..dd1eccb 100644 --- a/pybaselines/spline.py +++ b/pybaselines/spline.py @@ -16,7 +16,7 @@ from ._banded_utils import _add_diagonals, _shift_rows, diff_penalty_diagonals from ._compat import dia_object, jit, trapezoid from ._spline_utils import _basis_midpoints -from ._validation import _check_lam, _check_optional_array +from ._validation import _check_lam, _check_optional_array, _check_scalar_variable from .utils import ( ParameterWarning, _mollifier_kernel, _sort_array, gaussian, pad_edges, padded_convolve, relative_difference @@ -907,7 +907,7 @@ def pspline_iarpls(self, data, lam=1e3, num_knots=100, spline_degree=3, diff_ord @_Algorithm._register(sort_keys=('weights', 'alpha'), dtype=float, order='C') def pspline_aspls(self, data, lam=1e4, num_knots=100, spline_degree=3, diff_order=2, - max_iter=100, tol=1e-3, weights=None, alpha=None): + max_iter=100, tol=1e-3, weights=None, alpha=None, assymetric_coef=0.5): """ A penalized spline version of the asPLS algorithm. @@ -937,6 +937,9 @@ def pspline_aspls(self, data, lam=1e4, num_knots=100, spline_degree=3, diff_orde An array of values that control the local value of `lam` to better fit peak and non-peak regions. If None (default), then the initial values will be an array with size equal to N and all values set to 1. + assymetric_coef : float + The assymetric coefficient for the weighting. Higher values leads to a steeper + weighting curve (ie. more step-like). Default is 0.5. Returns ------- @@ -955,13 +958,19 @@ def pspline_aspls(self, data, lam=1e4, num_knots=100, spline_degree=3, diff_orde completed. If the last value in the array is greater than the input `tol` value, then the function did not converge. + Raises + ------ + ValueError + Raised if `alpha` and `data` do not have the same shape. Also raised if + `assymetric_coef` is not greater than 0. + See Also -------- Baseline.aspls Notes ----- - The weighting uses an asymmetric coefficient (`k` in the asPLS paper) of 0.5 instead + The default asymmetric coefficient (`k` in the asPLS paper) is 0.5 instead of the 2 listed in the asPLS paper. pybaselines uses the factor of 0.5 since it matches the results in Table 2 and Figure 5 of the asPLS paper closer than the factor of 2 and fits noisy data much better. @@ -985,6 +994,7 @@ def pspline_aspls(self, data, lam=1e4, num_knots=100, spline_degree=3, diff_orde ) if self._sort_order is not None and alpha is not None: alpha_array = alpha_array[self._sort_order] + assymetric_coef = _check_scalar_variable(assymetric_coef, variable_name='assymetric_coef') interp_pts = _basis_midpoints(self.pspline.knots, self.pspline.spline_degree) tol_history = np.empty(max_iter + 1) @@ -995,7 +1005,10 @@ def pspline_aspls(self, data, lam=1e4, num_knots=100, spline_degree=3, diff_orde self.pspline.num_bands, self.pspline.num_bands ) baseline = self.pspline.solve_pspline(y, weight_array, alpha_penalty) - new_weights, residual = _weighting._aspls(y, baseline) + new_weights, residual, exit_early = _weighting._aspls(y, baseline, assymetric_coef) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break calc_difference = relative_difference(weight_array, new_weights) tol_history[i] = calc_difference if calc_difference < tol: @@ -2091,7 +2104,8 @@ def pspline_iarpls(data, lam=1e3, num_knots=100, spline_degree=3, diff_order=2, @_spline_wrapper def pspline_aspls(data, lam=1e4, num_knots=100, spline_degree=3, diff_order=2, - max_iter=100, tol=1e-3, weights=None, alpha=None, x_data=None): + max_iter=100, tol=1e-3, weights=None, alpha=None, x_data=None, + assymetric_coef=0.5): """ A penalized spline version of the asPLS algorithm. @@ -2124,6 +2138,9 @@ def pspline_aspls(data, lam=1e4, num_knots=100, spline_degree=3, diff_order=2, x_data : array-like, shape (N,), optional The x-values of the measured data. Default is None, which will create an array from -1 to 1 with N points. + assymetric_coef : float + The assymetric coefficient for the weighting. Higher values leads to a steeper + weighting curve (ie. more step-like). Default is 0.5. Returns ------- @@ -2142,13 +2159,19 @@ def pspline_aspls(data, lam=1e4, num_knots=100, spline_degree=3, diff_order=2, completed. If the last value in the array is greater than the input `tol` value, then the function did not converge. + Raises + ------ + ValueError + Raised if `alpha` and `data` do not have the same shape. Also raised if + `assymetric_coef` is not greater than 0. + See Also -------- pybaselines.whittaker.aspls Notes ----- - The weighting uses an asymmetric coefficient (`k` in the asPLS paper) of 0.5 instead + The default asymmetric coefficient (`k` in the asPLS paper) is 0.5 instead of the 2 listed in the asPLS paper. pybaselines uses the factor of 0.5 since it matches the results in Table 2 and Figure 5 of the asPLS paper closer than the factor of 2 and fits noisy data much better. diff --git a/pybaselines/two_d/whittaker.py b/pybaselines/two_d/whittaker.py index f049b68..540ed1c 100644 --- a/pybaselines/two_d/whittaker.py +++ b/pybaselines/two_d/whittaker.py @@ -10,7 +10,7 @@ from .. import _weighting from .._compat import diags -from .._validation import _check_optional_array +from .._validation import _check_optional_array, _check_scalar_variable from ..utils import _MIN_FLOAT, relative_difference from ._algorithm_setup import _Algorithm2D from ._whittaker_utils import PenalizedSystem2D @@ -585,7 +585,7 @@ def iarpls(self, data, lam=1e5, diff_order=2, max_iter=50, tol=1e-3, weights=Non sort_keys=('weights', 'alpha'), reshape_keys=('weights', 'alpha'), reshape_baseline=True ) def aspls(self, data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, - weights=None, alpha=None): + weights=None, alpha=None, assymetric_coef=0.5): """ Adaptive smoothness penalized least squares smoothing (asPLS). @@ -612,6 +612,9 @@ def aspls(self, data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, An array of values that control the local value of `lam` to better fit peak and non-peak regions. If None (default), then the initial values will be an array with shape equal to (M, N) and all values set to 1. + assymetric_coef : float + The assymetric coefficient for the weighting. Higher values leads to a steeper + weighting curve (ie. more step-like). Default is 0.5. Returns ------- @@ -630,9 +633,15 @@ def aspls(self, data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, completed. If the last value in the array is greater than the input `tol` value, then the function did not converge. + Raises + ------ + ValueError + Raised if `alpha` and `data` do not have the same shape. Also raised if + `assymetric_coef` is not greater than 0. + Notes ----- - The weighting uses an asymmetric coefficient (`k` in the asPLS paper) of 0.5 instead + The default asymmetric coefficient (`k` in the asPLS paper) is 0.5 instead of the 2 listed in the asPLS paper. pybaselines uses the factor of 0.5 since it matches the results in Table 2 and Figure 5 of the asPLS paper closer than the factor of 2 and fits noisy data much better. @@ -651,6 +660,7 @@ def aspls(self, data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, ) if self._sort_order is not None and alpha is not None: alpha_array = alpha_array[self._sort_order] + assymetric_coef = _check_scalar_variable(assymetric_coef, variable_name='assymetric_coef') # use a sparse matrix to maintain sparsity after multiplication; implementation note: # could skip making an alpha matrix and just use alpha_array[:, None] * penalty once @@ -660,7 +670,10 @@ def aspls(self, data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, for i in range(max_iter + 1): penalty = alpha_matrix @ self.whittaker_system.penalty baseline = self.whittaker_system.solve(y, weight_array, penalty=penalty) - new_weights, residual = _weighting._aspls(y, baseline) + new_weights, residual, exit_early = _weighting._aspls(y, baseline, assymetric_coef) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break calc_difference = relative_difference(weight_array, new_weights) tol_history[i] = calc_difference if calc_difference < tol: diff --git a/pybaselines/whittaker.py b/pybaselines/whittaker.py index 3cf9904..d1f11e2 100644 --- a/pybaselines/whittaker.py +++ b/pybaselines/whittaker.py @@ -11,7 +11,7 @@ from . import _weighting from ._algorithm_setup import _Algorithm, _class_wrapper from ._banded_utils import _shift_rows, diff_penalty_diagonals -from ._validation import _check_lam, _check_optional_array +from ._validation import _check_lam, _check_optional_array, _check_scalar_variable from .utils import _mollifier_kernel, pad_edges, padded_convolve, relative_difference @@ -494,7 +494,7 @@ def iarpls(self, data, lam=1e5, diff_order=2, max_iter=50, tol=1e-3, weights=Non @_Algorithm._register(sort_keys=('weights', 'alpha')) def aspls(self, data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, - weights=None, alpha=None): + weights=None, alpha=None, assymetric_coef=0.5): """ Adaptive smoothness penalized least squares smoothing (asPLS). @@ -520,6 +520,9 @@ def aspls(self, data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, An array of values that control the local value of `lam` to better fit peak and non-peak regions. If None (default), then the initial values will be an array with size equal to N and all values set to 1. + assymetric_coef : float + The assymetric coefficient for the weighting. Higher values leads to a steeper + weighting curve (ie. more step-like). Default is 0.5. Returns ------- @@ -538,9 +541,15 @@ def aspls(self, data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, completed. If the last value in the array is greater than the input `tol` value, then the function did not converge. + Raises + ------ + ValueError + Raised if `alpha` and `data` do not have the same shape. Also raised if + `assymetric_coef` is not greater than 0. + Notes ----- - The weighting uses an asymmetric coefficient (`k` in the asPLS paper) of 0.5 instead + The default asymmetric coefficient (`k` in the asPLS paper) is 0.5 instead of the 2 listed in the asPLS paper. pybaselines uses the factor of 0.5 since it matches the results in Table 2 and Figure 5 of the asPLS paper closer than the factor of 2 and fits noisy data much better. @@ -560,6 +569,7 @@ def aspls(self, data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, ) if self._sort_order is not None and alpha is not None: alpha_array = alpha_array[self._sort_order] + assymetric_coef = _check_scalar_variable(assymetric_coef, variable_name='assymetric_coef') main_diag_idx = self.whittaker_system.main_diagonal_index lower_upper_bands = (diff_order, diff_order) @@ -573,7 +583,10 @@ def aspls(self, data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, lhs, weight_array * y, overwrite_ab=True, overwrite_b=True, l_and_u=lower_upper_bands ) - new_weights, residual = _weighting._aspls(y, baseline) + new_weights, residual, exit_early = _weighting._aspls(y, baseline, assymetric_coef) + if exit_early: + i -= 1 # reduce i so that output tol_history indexing is correct + break calc_difference = relative_difference(weight_array, new_weights) tol_history[i] = calc_difference if calc_difference < tol: @@ -1138,7 +1151,7 @@ def iarpls(data, lam=1e5, diff_order=2, max_iter=50, tol=1e-3, weights=None, x_d @_whittaker_wrapper def aspls(data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, weights=None, - alpha=None, x_data=None): + alpha=None, x_data=None, assymetric_coef=0.5): """ Adaptive smoothness penalized least squares smoothing (asPLS). @@ -1167,6 +1180,9 @@ def aspls(data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, weights=None, x_data : array-like, optional The x-values. Not used by this function, but input is allowed for consistency with other functions. + assymetric_coef : float + The assymetric coefficient for the weighting. Higher values leads to a steeper + weighting curve (ie. more step-like). Default is 0.5. Returns ------- @@ -1188,11 +1204,12 @@ def aspls(data, lam=1e5, diff_order=2, max_iter=100, tol=1e-3, weights=None, Raises ------ ValueError - Raised if `alpha` and `data` do not have the same shape. + Raised if `alpha` and `data` do not have the same shape. Also raised if `assymetric_coef` + is not greater than 0. Notes ----- - The weighting uses an asymmetric coefficient (`k` in the asPLS paper) of 0.5 instead + The default asymmetric coefficient (`k` in the asPLS paper) is 0.5 instead of the 2 listed in the asPLS paper. pybaselines uses the factor of 0.5 since it matches the results in Table 2 and Figure 5 of the asPLS paper closer than the factor of 2 and fits noisy data much better. diff --git a/tests/test_spline.py b/tests/test_spline.py index 1884c5a..0c8d303 100644 --- a/tests/test_spline.py +++ b/tests/test_spline.py @@ -437,6 +437,12 @@ def test_whittaker_comparison(self, lam, diff_order): self, whittaker.aspls, self.y, lam=lam, diff_order=diff_order, test_rtol=rtol ) + @pytest.mark.parametrize('assymetric_coef', (0, -1)) + def test_outside_assymetric_coef_fails(self, assymetric_coef): + """Ensures assymetric_coef values not greater than 0 raise an exception.""" + with pytest.raises(ValueError): + self.class_func(self.y, assymetric_coef=assymetric_coef) + class TestPsplinePsalsa(IterativeSplineTester): """Class for testing pspline_psalsa baseline.""" diff --git a/tests/test_weighting.py b/tests/test_weighting.py index 2028cb1..50eb082 100644 --- a/tests/test_weighting.py +++ b/tests/test_weighting.py @@ -158,6 +158,8 @@ def test_asls_normal(p, one_d): assert isinstance(weights, np.ndarray) assert weights.shape == y_data.shape assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + # ensure all weights are between 0 and 1 + assert ((weights >= 0) & (weights <= 1)).all() @pytest.mark.parametrize('p', (0.01, 0.99)) @@ -269,6 +271,8 @@ def test_airpls_normal(iteration, one_d): assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) assert_allclose(residual_l1_norm, expected_residual_l1_norm, rtol=1e-12, atol=1e-12) assert not exit_early + # ensure all weights are between 0 and 1 + assert ((weights >= 0) & (weights <= 1)).all() @pytest.mark.parametrize('iteration', (1, 10)) @@ -392,6 +396,8 @@ def test_arpls_normal(one_d): assert weights.shape == y_data.shape assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) assert not exit_early + # ensure all weights are between 0 and 1 + assert ((weights >= 0) & (weights <= 1)).all() @pytest.mark.parametrize('one_d', (True, False)) @@ -447,9 +453,11 @@ def test_arpls_overflow(one_d): overflow_index = 10 overflow_value = (0.5 * log_max + 2) * std - mean + 10 # add 10 for good measure if one_d: - baseline[overflow_index] = overflow_value + baseline[overflow_index] = y_data[overflow_index] - overflow_value else: - baseline[overflow_index, overflow_index] = overflow_value + baseline[overflow_index, overflow_index] = ( + y_data[overflow_index, overflow_index] - overflow_value + ) # sanity check to ensure overflow actually should occur with pytest.warns(RuntimeWarning): @@ -524,6 +532,8 @@ def test_drpls_normal(iteration, one_d): assert weights.shape == y_data.shape assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) assert not exit_early + # ensure all weights are between 0 and 1 + assert ((weights >= 0) & (weights <= 1)).all() @pytest.mark.parametrize('iteration', (1, 10)) @@ -640,6 +650,8 @@ def test_iarpls_normal(iteration, one_d): assert weights.shape == y_data.shape assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) assert not exit_early + # ensure all weights are between 0 and 1 + assert ((weights >= 0) & (weights <= 1)).all() @pytest.mark.parametrize('iteration', (1, 10)) @@ -697,3 +709,145 @@ def test_iarpls_overflow(one_d): assert np.isfinite(weights).all() assert not exit_early + + +def expected_aspls(y, baseline, assymetric_coef=0.5): + """ + The weighting for adaptive smoothness penalized least squares smoothing (aspls). + + Does not perform error checking since this is just used for simple weighting cases. + + Parameters + ---------- + y : numpy.ndarray, shape (N,) + The measured data. + baseline : numpy.ndarray, shape (N,) + The calculated baseline. + assymetric_coef : float + The assymetric coefficient for the weighting. Higher values leads to a steeper + weighting curve (ie. more step-like). Default is 0.5. + + Returns + ------- + weights : numpy.ndarray, shape (N,) + The calculated weights. + + References + ---------- + Zhang, F., et al. Baseline correction for infrared spectra using adaptive smoothness + parameter penalized least squares method. Spectroscopy Letters, 2020, 53(3), 222-233. + + """ + residual = y - baseline + std = np.std(residual[residual < 0], ddof=1) # use dof=1 since sampling subset + + weights = 1 / (1 + np.exp(assymetric_coef * (residual - std) / std)) + + return weights, residual + + +@pytest.mark.parametrize('one_d', (True, False)) +@pytest.mark.parametrize('assymetric_coef', (0.5, 2)) +def test_aspls_normal(one_d, assymetric_coef): + """Ensures aspls weighting works as intented for a normal baseline.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + weights, residual, exit_early = _weighting._aspls(y_data, baseline, assymetric_coef) + expected_weights, expected_residual = expected_aspls(y_data, baseline, assymetric_coef) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert_allclose(residual, expected_residual, rtol=1e-12, atol=1e-12) + assert not exit_early + # ensure all weights are between 0 and 1 + assert ((weights >= 0) & (weights <= 1)).all() + + +@pytest.mark.parametrize('one_d', (True, False)) +@pytest.mark.parametrize('assymetric_coef', (0.5, 2)) +def test_aspls_all_above(one_d, assymetric_coef): + """Ensures aspls weighting works as intented for a baseline with all points above the data.""" + if one_d: + y_data, baseline = baseline_1d_all_above() + else: + y_data, baseline = baseline_2d_all_above() + + weights, residual, exit_early = _weighting._aspls(y_data, baseline, assymetric_coef) + expected_weights, expected_residual = expected_aspls(y_data, baseline, assymetric_coef) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert_allclose(residual, expected_residual, rtol=1e-12, atol=1e-12) + assert not exit_early + + +@pytest.mark.parametrize('one_d', (True, False)) +@pytest.mark.parametrize('assymetric_coef', (0.5, 2)) +def test_aspls_all_below(one_d, assymetric_coef): + """Ensures aspls weighting works as intented for a baseline with all points below the data.""" + if one_d: + y_data, baseline = baseline_1d_all_below() + else: + y_data, baseline = baseline_2d_all_below() + + with pytest.warns(utils.ParameterWarning): + weights, residual, exit_early = _weighting._aspls(y_data, baseline, assymetric_coef) + expected_weights = np.zeros_like(y_data) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert_allclose(residual, y_data - baseline, rtol=1e-12, atol=1e-12) + assert exit_early + + +@pytest.mark.parametrize('one_d', (True, False)) +@pytest.mark.parametrize('assymetric_coef', (0.5, 2)) +def test_aspls_overflow(one_d, assymetric_coef): + """Ensures exponential overflow does not occur from aspls weighting.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + log_max = np.log(np.finfo(y_data.dtype).max) + residual = y_data - baseline + std = np.std(residual[residual < 0], ddof=1) + # for exponential overlow, (residual / std) > (log_max / assymetric_coef) + 1 + # changing one value in the baseline to cause overflow will not cause the + # standard deviation to change since the residual value will be positive at that index + overflow_index = 10 + overflow_value = ((log_max / assymetric_coef) + 1) * std + 10 # add 10 for good measure + if one_d: + baseline[overflow_index] = y_data[overflow_index] - overflow_value + else: + baseline[overflow_index, overflow_index] = ( + y_data[overflow_index, overflow_index] - overflow_value + ) + + # sanity check to ensure overflow actually should occur + with pytest.warns(RuntimeWarning): + expected_weights, expected_residual = expected_aspls(y_data, baseline, assymetric_coef) + # the resulting weights should still be finite since 1 / (1 + inf) == 0 + assert np.isfinite(expected_weights).all() + + with np.errstate(over='raise'): + weights, residual, exit_early = _weighting._aspls(y_data, baseline, assymetric_coef) + + assert np.isfinite(weights).all() + assert not exit_early + + # the actual weight where overflow should have occurred should be 0 + if one_d: + assert_allclose(weights[overflow_index], 0.0, atol=1e-14) + else: + assert_allclose(weights[overflow_index, overflow_index], 0.0, atol=1e-14) + + # weights should still be the same as the nieve calculation regardless of exponential overflow + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + assert_allclose(residual, expected_residual, rtol=1e-12, atol=1e-12) diff --git a/tests/test_whittaker.py b/tests/test_whittaker.py index 2270c3f..c6f2b6a 100644 --- a/tests/test_whittaker.py +++ b/tests/test_whittaker.py @@ -289,6 +289,12 @@ def test_alpha_multiplication(self, diff_order): result = _banded_utils._shift_rows(result, diff_order, diff_order) assert_allclose(result, expected_result, rtol=1e-13, atol=1e-13) + @pytest.mark.parametrize('assymetric_coef', (0, -1)) + def test_outside_assymetric_coef_fails(self, assymetric_coef): + """Ensures assymetric_coef values not greater than 0 raise an exception.""" + with pytest.raises(ValueError): + self.class_func(self.y, assymetric_coef=assymetric_coef) + class TestPsalsa(WhittakerTester): """Class for testing psalsa baseline.""" diff --git a/tests/two_d/test_whittaker.py b/tests/two_d/test_whittaker.py index dba8376..2f8804e 100644 --- a/tests/two_d/test_whittaker.py +++ b/tests/two_d/test_whittaker.py @@ -180,6 +180,12 @@ def test_wrong_alpha_shape(self, alpha_enum): with pytest.raises(ValueError): self.class_func(self.y, alpha=alpha) + @pytest.mark.parametrize('assymetric_coef', (0, -1)) + def test_outside_assymetric_coef_fails(self, assymetric_coef): + """Ensures assymetric_coef values not greater than 0 raise an exception.""" + with pytest.raises(ValueError): + self.class_func(self.y, assymetric_coef=assymetric_coef) + class TestPsalsa(WhittakerTester, EigenvalueMixin): """Class for testing psalsa baseline.""" From a4413fddba59596f68adbe95cd861132b58e403a Mon Sep 17 00:00:00 2001 From: Donnie Erb <55961724+derb12@users.noreply.github.com> Date: Thu, 19 Dec 2024 15:34:10 -0500 Subject: [PATCH 6/6] TEST: Added tests for psalsa and derpsalsa weightings --- pybaselines/_weighting.py | 2 +- pybaselines/spline.py | 16 +- pybaselines/two_d/spline.py | 6 +- pybaselines/two_d/whittaker.py | 5 +- pybaselines/whittaker.py | 16 +- tests/test_spline.py | 12 ++ tests/test_weighting.py | 258 +++++++++++++++++++++++++++++++++ tests/test_whittaker.py | 12 ++ tests/two_d/test_spline.py | 6 + tests/two_d/test_whittaker.py | 6 + 10 files changed, 328 insertions(+), 11 deletions(-) diff --git a/pybaselines/_weighting.py b/pybaselines/_weighting.py index 8b663df..dee24df 100644 --- a/pybaselines/_weighting.py +++ b/pybaselines/_weighting.py @@ -448,7 +448,7 @@ def _derpsalsa(y, baseline, p, k, shape_y, partial_weights): # since it's faster than performing the square and exp on the full residual weights = np.full(shape_y, 1 - p, dtype=float) mask = residual > 0 - weights[mask] = p * np.exp(-((residual[mask] / k)**2) / 2) + weights[mask] = p * np.exp(-0.5 * ((residual[mask] / k)**2)) weights *= partial_weights return weights diff --git a/pybaselines/spline.py b/pybaselines/spline.py index dd1eccb..5b29f18 100644 --- a/pybaselines/spline.py +++ b/pybaselines/spline.py @@ -1080,7 +1080,8 @@ def pspline_psalsa(self, data, lam=1e3, p=0.5, k=None, num_knots=100, spline_deg Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. See Also -------- @@ -1104,6 +1105,8 @@ def pspline_psalsa(self, data, lam=1e3, p=0.5, k=None, num_knots=100, spline_deg ) if k is None: k = np.std(y) / 10 + else: + k = _check_scalar_variable(k, variable_name='k') tol_history = np.empty(max_iter + 1) for i in range(max_iter + 1): baseline = self.pspline.solve_pspline(y, weight_array) @@ -1185,7 +1188,8 @@ def pspline_derpsalsa(self, data, lam=1e2, p=1e-2, k=None, num_knots=100, spline Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. See Also -------- @@ -1208,6 +1212,8 @@ def pspline_derpsalsa(self, data, lam=1e2, p=1e-2, k=None, num_knots=100, spline ) if k is None: k = np.std(y) / 10 + else: + k = _check_scalar_variable(k, variable_name='k') if smooth_half_window is None: smooth_half_window = self._size // 200 @@ -2248,7 +2254,8 @@ def pspline_psalsa(data, lam=1e3, p=0.5, k=None, num_knots=100, spline_degree=3, Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. See Also -------- @@ -2336,7 +2343,8 @@ def pspline_derpsalsa(data, lam=1e2, p=1e-2, k=None, num_knots=100, spline_degre Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. See Also -------- diff --git a/pybaselines/two_d/spline.py b/pybaselines/two_d/spline.py index 9f92697..ed5c799 100644 --- a/pybaselines/two_d/spline.py +++ b/pybaselines/two_d/spline.py @@ -11,6 +11,7 @@ import numpy as np from .. import _weighting +from .._validation import _check_scalar_variable from ..utils import ParameterWarning, gaussian, relative_difference from ._algorithm_setup import _Algorithm2D from ._whittaker_utils import PenalizedSystem2D @@ -765,7 +766,8 @@ def pspline_psalsa(self, data, lam=1e3, p=0.5, k=None, num_knots=25, spline_degr Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. See Also -------- @@ -789,6 +791,8 @@ def pspline_psalsa(self, data, lam=1e3, p=0.5, k=None, num_knots=25, spline_degr ) if k is None: k = np.std(y) / 10 + else: + k = _check_scalar_variable(k, variable_name='k') tol_history = np.empty(max_iter + 1) for i in range(max_iter + 1): baseline = self.pspline.solve(y, weight_array) diff --git a/pybaselines/two_d/whittaker.py b/pybaselines/two_d/whittaker.py index 540ed1c..b669db1 100644 --- a/pybaselines/two_d/whittaker.py +++ b/pybaselines/two_d/whittaker.py @@ -763,7 +763,8 @@ def psalsa(self, data, lam=1e5, p=0.5, k=None, diff_order=2, max_iter=50, tol=1e Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. Notes ----- @@ -789,6 +790,8 @@ def psalsa(self, data, lam=1e5, p=0.5, k=None, diff_order=2, max_iter=50, tol=1e ) if k is None: k = np.std(y) / 10 + else: + k = _check_scalar_variable(k, variable_name='k') shape = self._shape if self.whittaker_system._using_svd else self._size tol_history = np.empty(max_iter + 1) diff --git a/pybaselines/whittaker.py b/pybaselines/whittaker.py index d1f11e2..720f6b5 100644 --- a/pybaselines/whittaker.py +++ b/pybaselines/whittaker.py @@ -658,7 +658,8 @@ def psalsa(self, data, lam=1e5, p=0.5, k=None, diff_order=2, max_iter=50, tol=1e Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. Notes ----- @@ -679,6 +680,8 @@ def psalsa(self, data, lam=1e5, p=0.5, k=None, diff_order=2, max_iter=50, tol=1e y, weight_array = self._setup_whittaker(data, lam, diff_order, weights) if k is None: k = np.std(y) / 10 + else: + k = _check_scalar_variable(k, variable_name='k') tol_history = np.empty(max_iter + 1) for i in range(max_iter + 1): baseline = self.whittaker_system.solve( @@ -758,7 +761,8 @@ def derpsalsa(self, data, lam=1e6, p=0.01, k=None, diff_order=2, max_iter=50, to Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. References ---------- @@ -772,6 +776,8 @@ def derpsalsa(self, data, lam=1e6, p=0.01, k=None, diff_order=2, max_iter=50, to y, weight_array = self._setup_whittaker(data, lam, diff_order, weights) if k is None: k = np.std(y) / 10 + else: + k = _check_scalar_variable(k, variable_name='k') if smooth_half_window is None: smooth_half_window = self._size // 200 # could pad the data every iteration, but it is ~2-3 times slower and only affects @@ -1283,7 +1289,8 @@ def psalsa(data, lam=1e5, p=0.5, k=None, diff_order=2, max_iter=50, tol=1e-3, Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. Notes ----- @@ -1366,7 +1373,8 @@ def derpsalsa(data, lam=1e6, p=0.01, k=None, diff_order=2, max_iter=50, tol=1e-3 Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. References ---------- diff --git a/tests/test_spline.py b/tests/test_spline.py index 0c8d303..4530d17 100644 --- a/tests/test_spline.py +++ b/tests/test_spline.py @@ -470,6 +470,12 @@ def test_whittaker_comparison(self, lam, p, diff_order): self, whittaker.psalsa, self.y, lam=lam, p=p, diff_order=diff_order ) + @pytest.mark.parametrize('k', (0, -1)) + def test_outside_k_fails(self, k): + """Ensures k values not greater than 0 raise an exception.""" + with pytest.raises(ValueError): + self.class_func(self.y, k=k) + class TestPsplineDerpsalsa(IterativeSplineTester): """Class for testing pspline_derpsalsa baseline.""" @@ -497,6 +503,12 @@ def test_whittaker_comparison(self, lam, p, diff_order): self, whittaker.derpsalsa, self.y, lam=lam, p=p, diff_order=diff_order ) + @pytest.mark.parametrize('k', (0, -1)) + def test_outside_k_fails(self, k): + """Ensures k values not greater than 0 raise an exception.""" + with pytest.raises(ValueError): + self.class_func(self.y, k=k) + class TestPsplineMPLS(SplineTester, InputWeightsMixin): """Class for testing pspline_mpls baseline.""" diff --git a/tests/test_weighting.py b/tests/test_weighting.py index 50eb082..5e56d75 100644 --- a/tests/test_weighting.py +++ b/tests/test_weighting.py @@ -851,3 +851,261 @@ def test_aspls_overflow(one_d, assymetric_coef): # weights should still be the same as the nieve calculation regardless of exponential overflow assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) assert_allclose(residual, expected_residual, rtol=1e-12, atol=1e-12) + + +def expected_psalsa(y, baseline, p, k, shape_y): + """ + Weighting for the peaked signal's asymmetric least squares algorithm (psalsa). + + Does not perform error checking since this is just used for simple weighting cases. + + Parameters + ---------- + y : numpy.ndarray, shape (N,) + The measured data. + baseline : numpy.ndarray, shape (N,) + The calculated baseline. + p : float + The penalizing weighting factor. Must be between 0 and 1. Values greater + than the baseline will be given `p` weight, and values less than the baseline + will be given `1 - p` weight. + k : float + A factor that controls the exponential decay of the weights for baseline + values greater than the data. Should be approximately the height at which + a value could be considered a peak. + shape_y : int or (int,) or (int, int) + The length of `y`, `N`. Precomputed to avoid repeated calculations. + + Returns + ------- + weights : numpy.ndarray, shape (N,) + The calculated weights. + + References + ---------- + Oller-Moreno, S., et al. Adaptive Asymmetric Least Squares baseline estimation + for analytical instruments. 2014 IEEE 11th International Multi-Conference on + Systems, Signals, and Devices, 2014, 1-5. + + """ + residual = y - baseline + weights = np.where(residual > 0, p * np.exp(-residual / k), 1 - p) + return weights + + +@pytest.mark.parametrize('one_d', (True, False)) +@pytest.mark.parametrize('k', (0.5, 2)) +@pytest.mark.parametrize('p', (0.01, 0.99)) +def test_psalsa_normal(one_d, k, p): + """Ensures psalsa weighting works as intented for a normal baseline.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + weights = _weighting._psalsa(y_data, baseline, p, k, y_data.shape) + expected_weights = expected_psalsa(y_data, baseline, p, k, y_data.shape) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + # ensure all weights are between 0 and 1 + assert ((weights >= 0) & (weights <= 1)).all() + + +@pytest.mark.parametrize('one_d', (True, False)) +@pytest.mark.parametrize('k', (0.5, 2)) +@pytest.mark.parametrize('p', (0.01, 0.99)) +def test_psalsa_all_above(one_d, k, p): + """Ensures psalsa weighting works as intented for a baseline with all points above the data.""" + if one_d: + y_data, baseline = baseline_1d_all_above() + else: + y_data, baseline = baseline_2d_all_above() + + weights = _weighting._psalsa(y_data, baseline, p, k, y_data.shape) + expected_weights = np.full_like(y_data, 1 - p) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + + +@pytest.mark.parametrize('one_d', (True, False)) +@pytest.mark.parametrize('k', (0.5, 2)) +@pytest.mark.parametrize('p', (0.01, 0.99)) +def test_psalsa_all_below(one_d, k, p): + """Ensures psalsa weighting works as intented for a baseline with all points below the data.""" + if one_d: + y_data, baseline = baseline_1d_all_below() + else: + y_data, baseline = baseline_2d_all_below() + + weights = _weighting._psalsa(y_data, baseline, p, k, y_data.shape) + expected_weights = p * np.exp(-(y_data - baseline) / k) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + + +@pytest.mark.parametrize('one_d', (True, False)) +@pytest.mark.parametrize('k', (0.5, 2)) +@pytest.mark.parametrize('p', (0.01, 0.99)) +def test_psalsa_overflow(one_d, k, p): + """Ensures exponential overflow does not occur from psalsa weighting.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + log_max = np.log(np.finfo(y_data.dtype).max) + # for exponential overlow, -(residual / k) > log_max + overflow_index = 10 + overflow_value = k * log_max + 10 # add 10 for good measure + if one_d: + baseline[overflow_index] = y_data[overflow_index] + overflow_value + else: + baseline[overflow_index, overflow_index] = ( + y_data[overflow_index, overflow_index] + overflow_value + ) + + # sanity check to ensure overflow actually should occur + with pytest.warns(RuntimeWarning): + expected_weights = expected_psalsa(y_data, baseline, p, k, y_data.shape) + # weights in nieve approach should still be finite since overflow only occurs in regions + # where the exponential value is not actually used + assert np.isfinite(expected_weights).all() + + with np.errstate(over='raise'): + weights = _weighting._psalsa(y_data, baseline, p, k, y_data.shape) + + assert np.isfinite(weights).all() + + # the actual weight where overflow should have occurred should be 1 - p + if one_d: + assert_allclose(weights[overflow_index], 1 - p, atol=1e-14) + else: + assert_allclose(weights[overflow_index, overflow_index], 1 - p, atol=1e-14) + # weights should still be the same as the nieve calculation regardless of exponential overflow + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + + +def expected_derpsalsa(y, baseline, p, k, shape_y, partial_weights): + """ + Weights for derivative peak-screening asymmetric least squares algorithm (derpsalsa). + + Parameters + ---------- + y : numpy.ndarray, shape (N,) + The measured data. + baseline : numpy.ndarray, shape (N,) + The calculated baseline. + p : float + The penalizing weighting factor. Must be between 0 and 1. Values greater + than the baseline will be given `p` weight, and values less than the baseline + will be given `1 - p` weight. + k : float + A factor that controls the exponential decay of the weights for baseline + values greater than the data. Should be approximately the height at which + a value could be considered a peak. + shape_y : int or (int,) or (int, int) + The length of `y`, `N`. Precomputed to avoid repeated calculations. + partial_weights : numpy.ndarray, shape (N,) + The weights associated with the first and second derivatives of the data. + + Returns + ------- + weights : numpy.ndarray, shape (N,) + The calculated weights. + + Notes + ----- + The reference is not clear as to how `p` and `1-p` are applied. An alternative could + be that `partial_weights` are multiplied only where the residual is greater than + 0 and that all other weights are `1-p`, but based on Figure 1c in the reference, the + total weights are never greater than `partial_weights`, so that must mean the non-peak + regions have a weight of `1-p` times `partial_weights` rather than just `1-p`; + both weighting systems give near identical results, so it is not a big deal. + + References + ---------- + Korepanov, V. Asymmetric least-squares baseline algorithm with peak screening for + automatic processing of the Raman spectra. Journal of Raman Spectroscopy. 2020, + 51(10), 2061-2065. + + """ + residual = y - baseline + weights = partial_weights * np.where(residual > 0, p * np.exp(-((residual / k)**2) / 2), 1 - p) + return weights + + +@pytest.mark.parametrize('k', (0.5, 2)) +@pytest.mark.parametrize('p', (0.01, 0.99)) +def test_derpsalsa_normal(k, p): + """Ensures derpsalsa weighting works as intented for a normal baseline.""" + y_data, baseline = baseline_1d_normal() + + diff_y_1 = np.gradient(y_data) + diff_y_2 = np.gradient(diff_y_1) + rms_diff_1 = np.sqrt(diff_y_1.dot(diff_y_1) / y_data.size) + rms_diff_2 = np.sqrt(diff_y_2.dot(diff_y_2) / y_data.size) + + diff_1_weights = np.exp(-((diff_y_1 / rms_diff_1)**2) / 2) + diff_2_weights = np.exp(-((diff_y_2 / rms_diff_2)**2) / 2) + partial_weights = diff_1_weights * diff_2_weights + + weights = _weighting._derpsalsa(y_data, baseline, p, k, y_data.shape, partial_weights) + expected_weights = expected_derpsalsa(y_data, baseline, p, k, y_data.shape, partial_weights) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + # ensure all weights are between 0 and 1 + assert ((weights >= 0) & (weights <= 1)).all() + + +@pytest.mark.parametrize('k', (0.5, 2)) +@pytest.mark.parametrize('p', (0.01, 0.99)) +def test_derpsalsa_all_above(k, p): + """Ensures derpsalsa weighting works as intented for a baseline completely above the data.""" + y_data, baseline = baseline_1d_all_above() + + diff_y_1 = np.gradient(y_data) + diff_y_2 = np.gradient(diff_y_1) + rms_diff_1 = np.sqrt(diff_y_1.dot(diff_y_1) / y_data.size) + rms_diff_2 = np.sqrt(diff_y_2.dot(diff_y_2) / y_data.size) + + diff_1_weights = np.exp(-((diff_y_1 / rms_diff_1)**2) / 2) + diff_2_weights = np.exp(-((diff_y_2 / rms_diff_2)**2) / 2) + partial_weights = diff_1_weights * diff_2_weights + + weights = _weighting._derpsalsa(y_data, baseline, p, k, y_data.shape, partial_weights) + expected_weights = np.full_like(y_data, partial_weights * (1 - p)) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + + +@pytest.mark.parametrize('k', (0.5, 2)) +@pytest.mark.parametrize('p', (0.01, 0.99)) +def test_derpsalsa_all_below(k, p): + """Ensures derpsalsa weighting works as intented for a baseline completely below the data.""" + y_data, baseline = baseline_1d_all_below() + + diff_y_1 = np.gradient(y_data) + diff_y_2 = np.gradient(diff_y_1) + rms_diff_1 = np.sqrt(diff_y_1.dot(diff_y_1) / y_data.size) + rms_diff_2 = np.sqrt(diff_y_2.dot(diff_y_2) / y_data.size) + + diff_1_weights = np.exp(-((diff_y_1 / rms_diff_1)**2) / 2) + diff_2_weights = np.exp(-((diff_y_2 / rms_diff_2)**2) / 2) + partial_weights = diff_1_weights * diff_2_weights + + weights = _weighting._derpsalsa(y_data, baseline, p, k, y_data.shape, partial_weights) + expected_weights = partial_weights * p * np.exp(-0.5 * ((y_data - baseline) / k)**2) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) diff --git a/tests/test_whittaker.py b/tests/test_whittaker.py index c6f2b6a..917d26c 100644 --- a/tests/test_whittaker.py +++ b/tests/test_whittaker.py @@ -313,6 +313,12 @@ def test_diff_orders(self, diff_order): lam = {1: 1e2, 3: 1e10}[diff_order] self.class_func(self.y, lam=lam, diff_order=diff_order) + @pytest.mark.parametrize('k', (0, -1)) + def test_outside_k_fails(self, k): + """Ensures k values not greater than 0 raise an exception.""" + with pytest.raises(ValueError): + self.class_func(self.y, k=k) + class TestDerpsalsa(WhittakerTester): """Class for testing derpsalsa baseline.""" @@ -330,3 +336,9 @@ def test_diff_orders(self, diff_order): """Ensure that other difference orders work.""" lam = {1: 1e2, 3: 1e10}[diff_order] self.class_func(self.y, lam=lam, diff_order=diff_order) + + @pytest.mark.parametrize('k', (0, -1)) + def test_outside_k_fails(self, k): + """Ensures k values not greater than 0 raise an exception.""" + with pytest.raises(ValueError): + self.class_func(self.y, k=k) diff --git a/tests/two_d/test_spline.py b/tests/two_d/test_spline.py index cd918b9..a359e19 100644 --- a/tests/two_d/test_spline.py +++ b/tests/two_d/test_spline.py @@ -275,3 +275,9 @@ def test_whittaker_comparison(self, lam, p, diff_order): compare_pspline_whittaker( self, 'psalsa', self.y, lam=lam, p=p, diff_order=diff_order, test_rtol=1e5 ) + + @pytest.mark.parametrize('k', (0, -1)) + def test_outside_k_fails(self, k): + """Ensures k values not greater than 0 raise an exception.""" + with pytest.raises(ValueError): + self.class_func(self.y, k=k) diff --git a/tests/two_d/test_whittaker.py b/tests/two_d/test_whittaker.py index 2f8804e..1b685f6 100644 --- a/tests/two_d/test_whittaker.py +++ b/tests/two_d/test_whittaker.py @@ -202,3 +202,9 @@ def test_outside_p_fails(self, p): def test_diff_orders(self, diff_order): """Ensure that other difference orders work.""" self.class_func(self.y, diff_order=diff_order) + + @pytest.mark.parametrize('k', (0, -1)) + def test_outside_k_fails(self, k): + """Ensures k values not greater than 0 raise an exception.""" + with pytest.raises(ValueError): + self.class_func(self.y, k=k)