diff --git a/pybaselines/_weighting.py b/pybaselines/_weighting.py index 8b663df..dee24df 100644 --- a/pybaselines/_weighting.py +++ b/pybaselines/_weighting.py @@ -448,7 +448,7 @@ def _derpsalsa(y, baseline, p, k, shape_y, partial_weights): # since it's faster than performing the square and exp on the full residual weights = np.full(shape_y, 1 - p, dtype=float) mask = residual > 0 - weights[mask] = p * np.exp(-((residual[mask] / k)**2) / 2) + weights[mask] = p * np.exp(-0.5 * ((residual[mask] / k)**2)) weights *= partial_weights return weights diff --git a/pybaselines/spline.py b/pybaselines/spline.py index dd1eccb..5b29f18 100644 --- a/pybaselines/spline.py +++ b/pybaselines/spline.py @@ -1080,7 +1080,8 @@ def pspline_psalsa(self, data, lam=1e3, p=0.5, k=None, num_knots=100, spline_deg Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. See Also -------- @@ -1104,6 +1105,8 @@ def pspline_psalsa(self, data, lam=1e3, p=0.5, k=None, num_knots=100, spline_deg ) if k is None: k = np.std(y) / 10 + else: + k = _check_scalar_variable(k, variable_name='k') tol_history = np.empty(max_iter + 1) for i in range(max_iter + 1): baseline = self.pspline.solve_pspline(y, weight_array) @@ -1185,7 +1188,8 @@ def pspline_derpsalsa(self, data, lam=1e2, p=1e-2, k=None, num_knots=100, spline Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. See Also -------- @@ -1208,6 +1212,8 @@ def pspline_derpsalsa(self, data, lam=1e2, p=1e-2, k=None, num_knots=100, spline ) if k is None: k = np.std(y) / 10 + else: + k = _check_scalar_variable(k, variable_name='k') if smooth_half_window is None: smooth_half_window = self._size // 200 @@ -2248,7 +2254,8 @@ def pspline_psalsa(data, lam=1e3, p=0.5, k=None, num_knots=100, spline_degree=3, Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. See Also -------- @@ -2336,7 +2343,8 @@ def pspline_derpsalsa(data, lam=1e2, p=1e-2, k=None, num_knots=100, spline_degre Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. See Also -------- diff --git a/pybaselines/two_d/spline.py b/pybaselines/two_d/spline.py index 9f92697..ed5c799 100644 --- a/pybaselines/two_d/spline.py +++ b/pybaselines/two_d/spline.py @@ -11,6 +11,7 @@ import numpy as np from .. import _weighting +from .._validation import _check_scalar_variable from ..utils import ParameterWarning, gaussian, relative_difference from ._algorithm_setup import _Algorithm2D from ._whittaker_utils import PenalizedSystem2D @@ -765,7 +766,8 @@ def pspline_psalsa(self, data, lam=1e3, p=0.5, k=None, num_knots=25, spline_degr Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. See Also -------- @@ -789,6 +791,8 @@ def pspline_psalsa(self, data, lam=1e3, p=0.5, k=None, num_knots=25, spline_degr ) if k is None: k = np.std(y) / 10 + else: + k = _check_scalar_variable(k, variable_name='k') tol_history = np.empty(max_iter + 1) for i in range(max_iter + 1): baseline = self.pspline.solve(y, weight_array) diff --git a/pybaselines/two_d/whittaker.py b/pybaselines/two_d/whittaker.py index 540ed1c..b669db1 100644 --- a/pybaselines/two_d/whittaker.py +++ b/pybaselines/two_d/whittaker.py @@ -763,7 +763,8 @@ def psalsa(self, data, lam=1e5, p=0.5, k=None, diff_order=2, max_iter=50, tol=1e Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. Notes ----- @@ -789,6 +790,8 @@ def psalsa(self, data, lam=1e5, p=0.5, k=None, diff_order=2, max_iter=50, tol=1e ) if k is None: k = np.std(y) / 10 + else: + k = _check_scalar_variable(k, variable_name='k') shape = self._shape if self.whittaker_system._using_svd else self._size tol_history = np.empty(max_iter + 1) diff --git a/pybaselines/whittaker.py b/pybaselines/whittaker.py index d1f11e2..720f6b5 100644 --- a/pybaselines/whittaker.py +++ b/pybaselines/whittaker.py @@ -658,7 +658,8 @@ def psalsa(self, data, lam=1e5, p=0.5, k=None, diff_order=2, max_iter=50, tol=1e Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. Notes ----- @@ -679,6 +680,8 @@ def psalsa(self, data, lam=1e5, p=0.5, k=None, diff_order=2, max_iter=50, tol=1e y, weight_array = self._setup_whittaker(data, lam, diff_order, weights) if k is None: k = np.std(y) / 10 + else: + k = _check_scalar_variable(k, variable_name='k') tol_history = np.empty(max_iter + 1) for i in range(max_iter + 1): baseline = self.whittaker_system.solve( @@ -758,7 +761,8 @@ def derpsalsa(self, data, lam=1e6, p=0.01, k=None, diff_order=2, max_iter=50, to Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. References ---------- @@ -772,6 +776,8 @@ def derpsalsa(self, data, lam=1e6, p=0.01, k=None, diff_order=2, max_iter=50, to y, weight_array = self._setup_whittaker(data, lam, diff_order, weights) if k is None: k = np.std(y) / 10 + else: + k = _check_scalar_variable(k, variable_name='k') if smooth_half_window is None: smooth_half_window = self._size // 200 # could pad the data every iteration, but it is ~2-3 times slower and only affects @@ -1283,7 +1289,8 @@ def psalsa(data, lam=1e5, p=0.5, k=None, diff_order=2, max_iter=50, tol=1e-3, Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. Notes ----- @@ -1366,7 +1373,8 @@ def derpsalsa(data, lam=1e6, p=0.01, k=None, diff_order=2, max_iter=50, tol=1e-3 Raises ------ ValueError - Raised if `p` is not between 0 and 1. + Raised if `p` is not between 0 and 1. Also raised if `k` is not greater + than 0. References ---------- diff --git a/tests/test_spline.py b/tests/test_spline.py index 0c8d303..4530d17 100644 --- a/tests/test_spline.py +++ b/tests/test_spline.py @@ -470,6 +470,12 @@ def test_whittaker_comparison(self, lam, p, diff_order): self, whittaker.psalsa, self.y, lam=lam, p=p, diff_order=diff_order ) + @pytest.mark.parametrize('k', (0, -1)) + def test_outside_k_fails(self, k): + """Ensures k values not greater than 0 raise an exception.""" + with pytest.raises(ValueError): + self.class_func(self.y, k=k) + class TestPsplineDerpsalsa(IterativeSplineTester): """Class for testing pspline_derpsalsa baseline.""" @@ -497,6 +503,12 @@ def test_whittaker_comparison(self, lam, p, diff_order): self, whittaker.derpsalsa, self.y, lam=lam, p=p, diff_order=diff_order ) + @pytest.mark.parametrize('k', (0, -1)) + def test_outside_k_fails(self, k): + """Ensures k values not greater than 0 raise an exception.""" + with pytest.raises(ValueError): + self.class_func(self.y, k=k) + class TestPsplineMPLS(SplineTester, InputWeightsMixin): """Class for testing pspline_mpls baseline.""" diff --git a/tests/test_weighting.py b/tests/test_weighting.py index 50eb082..5e56d75 100644 --- a/tests/test_weighting.py +++ b/tests/test_weighting.py @@ -851,3 +851,261 @@ def test_aspls_overflow(one_d, assymetric_coef): # weights should still be the same as the nieve calculation regardless of exponential overflow assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) assert_allclose(residual, expected_residual, rtol=1e-12, atol=1e-12) + + +def expected_psalsa(y, baseline, p, k, shape_y): + """ + Weighting for the peaked signal's asymmetric least squares algorithm (psalsa). + + Does not perform error checking since this is just used for simple weighting cases. + + Parameters + ---------- + y : numpy.ndarray, shape (N,) + The measured data. + baseline : numpy.ndarray, shape (N,) + The calculated baseline. + p : float + The penalizing weighting factor. Must be between 0 and 1. Values greater + than the baseline will be given `p` weight, and values less than the baseline + will be given `1 - p` weight. + k : float + A factor that controls the exponential decay of the weights for baseline + values greater than the data. Should be approximately the height at which + a value could be considered a peak. + shape_y : int or (int,) or (int, int) + The length of `y`, `N`. Precomputed to avoid repeated calculations. + + Returns + ------- + weights : numpy.ndarray, shape (N,) + The calculated weights. + + References + ---------- + Oller-Moreno, S., et al. Adaptive Asymmetric Least Squares baseline estimation + for analytical instruments. 2014 IEEE 11th International Multi-Conference on + Systems, Signals, and Devices, 2014, 1-5. + + """ + residual = y - baseline + weights = np.where(residual > 0, p * np.exp(-residual / k), 1 - p) + return weights + + +@pytest.mark.parametrize('one_d', (True, False)) +@pytest.mark.parametrize('k', (0.5, 2)) +@pytest.mark.parametrize('p', (0.01, 0.99)) +def test_psalsa_normal(one_d, k, p): + """Ensures psalsa weighting works as intented for a normal baseline.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + weights = _weighting._psalsa(y_data, baseline, p, k, y_data.shape) + expected_weights = expected_psalsa(y_data, baseline, p, k, y_data.shape) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + # ensure all weights are between 0 and 1 + assert ((weights >= 0) & (weights <= 1)).all() + + +@pytest.mark.parametrize('one_d', (True, False)) +@pytest.mark.parametrize('k', (0.5, 2)) +@pytest.mark.parametrize('p', (0.01, 0.99)) +def test_psalsa_all_above(one_d, k, p): + """Ensures psalsa weighting works as intented for a baseline with all points above the data.""" + if one_d: + y_data, baseline = baseline_1d_all_above() + else: + y_data, baseline = baseline_2d_all_above() + + weights = _weighting._psalsa(y_data, baseline, p, k, y_data.shape) + expected_weights = np.full_like(y_data, 1 - p) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + + +@pytest.mark.parametrize('one_d', (True, False)) +@pytest.mark.parametrize('k', (0.5, 2)) +@pytest.mark.parametrize('p', (0.01, 0.99)) +def test_psalsa_all_below(one_d, k, p): + """Ensures psalsa weighting works as intented for a baseline with all points below the data.""" + if one_d: + y_data, baseline = baseline_1d_all_below() + else: + y_data, baseline = baseline_2d_all_below() + + weights = _weighting._psalsa(y_data, baseline, p, k, y_data.shape) + expected_weights = p * np.exp(-(y_data - baseline) / k) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + + +@pytest.mark.parametrize('one_d', (True, False)) +@pytest.mark.parametrize('k', (0.5, 2)) +@pytest.mark.parametrize('p', (0.01, 0.99)) +def test_psalsa_overflow(one_d, k, p): + """Ensures exponential overflow does not occur from psalsa weighting.""" + if one_d: + y_data, baseline = baseline_1d_normal() + else: + y_data, baseline = baseline_2d_normal() + + log_max = np.log(np.finfo(y_data.dtype).max) + # for exponential overlow, -(residual / k) > log_max + overflow_index = 10 + overflow_value = k * log_max + 10 # add 10 for good measure + if one_d: + baseline[overflow_index] = y_data[overflow_index] + overflow_value + else: + baseline[overflow_index, overflow_index] = ( + y_data[overflow_index, overflow_index] + overflow_value + ) + + # sanity check to ensure overflow actually should occur + with pytest.warns(RuntimeWarning): + expected_weights = expected_psalsa(y_data, baseline, p, k, y_data.shape) + # weights in nieve approach should still be finite since overflow only occurs in regions + # where the exponential value is not actually used + assert np.isfinite(expected_weights).all() + + with np.errstate(over='raise'): + weights = _weighting._psalsa(y_data, baseline, p, k, y_data.shape) + + assert np.isfinite(weights).all() + + # the actual weight where overflow should have occurred should be 1 - p + if one_d: + assert_allclose(weights[overflow_index], 1 - p, atol=1e-14) + else: + assert_allclose(weights[overflow_index, overflow_index], 1 - p, atol=1e-14) + # weights should still be the same as the nieve calculation regardless of exponential overflow + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + + +def expected_derpsalsa(y, baseline, p, k, shape_y, partial_weights): + """ + Weights for derivative peak-screening asymmetric least squares algorithm (derpsalsa). + + Parameters + ---------- + y : numpy.ndarray, shape (N,) + The measured data. + baseline : numpy.ndarray, shape (N,) + The calculated baseline. + p : float + The penalizing weighting factor. Must be between 0 and 1. Values greater + than the baseline will be given `p` weight, and values less than the baseline + will be given `1 - p` weight. + k : float + A factor that controls the exponential decay of the weights for baseline + values greater than the data. Should be approximately the height at which + a value could be considered a peak. + shape_y : int or (int,) or (int, int) + The length of `y`, `N`. Precomputed to avoid repeated calculations. + partial_weights : numpy.ndarray, shape (N,) + The weights associated with the first and second derivatives of the data. + + Returns + ------- + weights : numpy.ndarray, shape (N,) + The calculated weights. + + Notes + ----- + The reference is not clear as to how `p` and `1-p` are applied. An alternative could + be that `partial_weights` are multiplied only where the residual is greater than + 0 and that all other weights are `1-p`, but based on Figure 1c in the reference, the + total weights are never greater than `partial_weights`, so that must mean the non-peak + regions have a weight of `1-p` times `partial_weights` rather than just `1-p`; + both weighting systems give near identical results, so it is not a big deal. + + References + ---------- + Korepanov, V. Asymmetric least-squares baseline algorithm with peak screening for + automatic processing of the Raman spectra. Journal of Raman Spectroscopy. 2020, + 51(10), 2061-2065. + + """ + residual = y - baseline + weights = partial_weights * np.where(residual > 0, p * np.exp(-((residual / k)**2) / 2), 1 - p) + return weights + + +@pytest.mark.parametrize('k', (0.5, 2)) +@pytest.mark.parametrize('p', (0.01, 0.99)) +def test_derpsalsa_normal(k, p): + """Ensures derpsalsa weighting works as intented for a normal baseline.""" + y_data, baseline = baseline_1d_normal() + + diff_y_1 = np.gradient(y_data) + diff_y_2 = np.gradient(diff_y_1) + rms_diff_1 = np.sqrt(diff_y_1.dot(diff_y_1) / y_data.size) + rms_diff_2 = np.sqrt(diff_y_2.dot(diff_y_2) / y_data.size) + + diff_1_weights = np.exp(-((diff_y_1 / rms_diff_1)**2) / 2) + diff_2_weights = np.exp(-((diff_y_2 / rms_diff_2)**2) / 2) + partial_weights = diff_1_weights * diff_2_weights + + weights = _weighting._derpsalsa(y_data, baseline, p, k, y_data.shape, partial_weights) + expected_weights = expected_derpsalsa(y_data, baseline, p, k, y_data.shape, partial_weights) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + # ensure all weights are between 0 and 1 + assert ((weights >= 0) & (weights <= 1)).all() + + +@pytest.mark.parametrize('k', (0.5, 2)) +@pytest.mark.parametrize('p', (0.01, 0.99)) +def test_derpsalsa_all_above(k, p): + """Ensures derpsalsa weighting works as intented for a baseline completely above the data.""" + y_data, baseline = baseline_1d_all_above() + + diff_y_1 = np.gradient(y_data) + diff_y_2 = np.gradient(diff_y_1) + rms_diff_1 = np.sqrt(diff_y_1.dot(diff_y_1) / y_data.size) + rms_diff_2 = np.sqrt(diff_y_2.dot(diff_y_2) / y_data.size) + + diff_1_weights = np.exp(-((diff_y_1 / rms_diff_1)**2) / 2) + diff_2_weights = np.exp(-((diff_y_2 / rms_diff_2)**2) / 2) + partial_weights = diff_1_weights * diff_2_weights + + weights = _weighting._derpsalsa(y_data, baseline, p, k, y_data.shape, partial_weights) + expected_weights = np.full_like(y_data, partial_weights * (1 - p)) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) + + +@pytest.mark.parametrize('k', (0.5, 2)) +@pytest.mark.parametrize('p', (0.01, 0.99)) +def test_derpsalsa_all_below(k, p): + """Ensures derpsalsa weighting works as intented for a baseline completely below the data.""" + y_data, baseline = baseline_1d_all_below() + + diff_y_1 = np.gradient(y_data) + diff_y_2 = np.gradient(diff_y_1) + rms_diff_1 = np.sqrt(diff_y_1.dot(diff_y_1) / y_data.size) + rms_diff_2 = np.sqrt(diff_y_2.dot(diff_y_2) / y_data.size) + + diff_1_weights = np.exp(-((diff_y_1 / rms_diff_1)**2) / 2) + diff_2_weights = np.exp(-((diff_y_2 / rms_diff_2)**2) / 2) + partial_weights = diff_1_weights * diff_2_weights + + weights = _weighting._derpsalsa(y_data, baseline, p, k, y_data.shape, partial_weights) + expected_weights = partial_weights * p * np.exp(-0.5 * ((y_data - baseline) / k)**2) + + assert isinstance(weights, np.ndarray) + assert weights.shape == y_data.shape + assert_allclose(weights, expected_weights, rtol=1e-12, atol=1e-12) diff --git a/tests/test_whittaker.py b/tests/test_whittaker.py index c6f2b6a..917d26c 100644 --- a/tests/test_whittaker.py +++ b/tests/test_whittaker.py @@ -313,6 +313,12 @@ def test_diff_orders(self, diff_order): lam = {1: 1e2, 3: 1e10}[diff_order] self.class_func(self.y, lam=lam, diff_order=diff_order) + @pytest.mark.parametrize('k', (0, -1)) + def test_outside_k_fails(self, k): + """Ensures k values not greater than 0 raise an exception.""" + with pytest.raises(ValueError): + self.class_func(self.y, k=k) + class TestDerpsalsa(WhittakerTester): """Class for testing derpsalsa baseline.""" @@ -330,3 +336,9 @@ def test_diff_orders(self, diff_order): """Ensure that other difference orders work.""" lam = {1: 1e2, 3: 1e10}[diff_order] self.class_func(self.y, lam=lam, diff_order=diff_order) + + @pytest.mark.parametrize('k', (0, -1)) + def test_outside_k_fails(self, k): + """Ensures k values not greater than 0 raise an exception.""" + with pytest.raises(ValueError): + self.class_func(self.y, k=k) diff --git a/tests/two_d/test_spline.py b/tests/two_d/test_spline.py index cd918b9..a359e19 100644 --- a/tests/two_d/test_spline.py +++ b/tests/two_d/test_spline.py @@ -275,3 +275,9 @@ def test_whittaker_comparison(self, lam, p, diff_order): compare_pspline_whittaker( self, 'psalsa', self.y, lam=lam, p=p, diff_order=diff_order, test_rtol=1e5 ) + + @pytest.mark.parametrize('k', (0, -1)) + def test_outside_k_fails(self, k): + """Ensures k values not greater than 0 raise an exception.""" + with pytest.raises(ValueError): + self.class_func(self.y, k=k) diff --git a/tests/two_d/test_whittaker.py b/tests/two_d/test_whittaker.py index 2f8804e..1b685f6 100644 --- a/tests/two_d/test_whittaker.py +++ b/tests/two_d/test_whittaker.py @@ -202,3 +202,9 @@ def test_outside_p_fails(self, p): def test_diff_orders(self, diff_order): """Ensure that other difference orders work.""" self.class_func(self.y, diff_order=diff_order) + + @pytest.mark.parametrize('k', (0, -1)) + def test_outside_k_fails(self, k): + """Ensures k values not greater than 0 raise an exception.""" + with pytest.raises(ValueError): + self.class_func(self.y, k=k)