Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEST: Added tests for all weighting schemes and improved error handling #30

Merged
merged 6 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 160 additions & 18 deletions pybaselines/_weighting.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# -*- coding: utf-8 -*-
"""Contains various weighting schemes used in pybaselines."""

import warnings

import numpy as np
from scipy.special import expit

from .utils import _MIN_FLOAT
from .utils import _MIN_FLOAT, ParameterWarning


def _asls(y, baseline, p):
Expand All @@ -22,7 +24,7 @@ def _asls(y, baseline, p):
p : float
The penalizing weighting factor. Must be between 0 and 1. Values greater
than the baseline will be given `p` weight, and values less than the baseline
will be given `p - 1` weight.
will be given `1 - p` weight.

Returns
-------
Expand All @@ -38,11 +40,83 @@ def _asls(y, baseline, p):
asymmetric least squares method, Analytical Methods, 2014, 6(12), 4402-4407.

"""
mask = y > baseline
weights = p * mask + (1 - p) * (~mask)
weights = np.where(y > baseline, p, 1 - p)
return weights


def _airpls(y, baseline, iteration):
"""
The weighting for adaptive iteratively reweighted penalized least squares (airPLS).

Parameters
----------
y : numpy.ndarray, shape (N,)
The measured data.
baseline : numpy.ndarray, shape (N,)
The calculated baseline.
iteration : int
The iteration number. Should be 1-based, such that the first iteration is 1
instead of 0.

Returns
-------
weights : numpy.ndarray, shape (N,)
The calculated weights.
residual_l1_norm : float
The L1 norm of the negative residuals, used to calculate the exit criteria
for the airPLS algorithm.
exit_early : bool
Designates if there is a potential error with the calculation such that no further
iterations should be performed.

References
----------
Zhang, Z.M., et al. Baseline correction using adaptive iteratively
reweighted penalized least squares. Analyst, 2010, 135(5), 1138-1146.

Notes
-----
Equation 9 in the original algorithm was misprinted according to the author
(https://github.com/zmzhang/airPLS/issues/8), so the correct weighting is used here.

The pybaselines weighting differs from the original airPLS algorithm by normalizing the
weights by their maximum value to ensure that weights are within [0, 1]; the original
algorithm is defined such that all weights are greater than or equal to 1 for negative
residuals, which would differ from other Whittaker-smoothing based algorithms and could
potentially cause issues if combining with optimizer functions.

"""
residual = y - baseline
neg_mask = residual < 0
neg_residual = residual[neg_mask]
if neg_residual.size < 2:
exit_early = True
warnings.warn(
('almost all baseline points are below the data, indicating that "tol"'
' is too low and/or "max_iter" is too high'), ParameterWarning,
stacklevel=2
)
return np.zeros_like(y), 0.0, exit_early
else:
exit_early = False

residual_l1_norm = abs(neg_residual.sum())
# clip from [0, log(max dtype)] since the positive residuals (negative values) do not matter
log_max = np.log(np.finfo(y.dtype).max)
inner = np.clip(
(-iteration / residual_l1_norm) * neg_residual,
a_min=0,
a_max=log_max - np.spacing(log_max)
)
new_weights = np.exp(inner)
# Not stated in the paper, but without dividing by the maximum weight, the
# calculated weights are all greater than or equal to 1
weights = np.zeros_like(y)
weights[neg_mask] = new_weights / new_weights.max()

return weights, residual_l1_norm, exit_early


def _safe_std(array, **kwargs):
"""
Calculates the standard deviation and protects against nan and 0.
Expand Down Expand Up @@ -96,6 +170,9 @@ def _arpls(y, baseline):
-------
weights : numpy.ndarray, shape (N,)
The calculated weights.
exit_early : bool
Designates if there is a potential error with the calculation such that no further
iterations should be performed.

References
----------
Expand All @@ -105,10 +182,20 @@ def _arpls(y, baseline):
"""
residual = y - baseline
neg_residual = residual[residual < 0]
if neg_residual.size < 2:
exit_early = True
warnings.warn(
('almost all baseline points are below the data, indicating that "tol"'
' is too low and/or "max_iter" is too high'), ParameterWarning,
stacklevel=2
)
return np.zeros_like(y), exit_early
else:
exit_early = False
std = _safe_std(neg_residual, ddof=1) # use dof=1 since sampling subset
# add a negative sign since expit performs 1/(1+exp(-input))
weights = expit(-(2 / std) * (residual - (2 * std - np.mean(neg_residual))))
return weights
return weights, exit_early


def _drpls(y, baseline, iteration):
Expand All @@ -129,6 +216,9 @@ def _drpls(y, baseline, iteration):
-------
weights : numpy.ndarray, shape (N,)
The calculated weights.
exit_early : bool
Designates if there is a potential error with the calculation such that no further
iterations should be performed.

References
----------
Expand All @@ -138,10 +228,25 @@ def _drpls(y, baseline, iteration):
"""
residual = y - baseline
neg_residual = residual[residual < 0]
if neg_residual.size < 2:
exit_early = True
warnings.warn(
('almost all baseline points are below the data, indicating that "tol"'
' is too low and/or "max_iter" is too high'), ParameterWarning,
stacklevel=2
)
return np.zeros_like(y), exit_early
else:
exit_early = False

std = _safe_std(neg_residual, ddof=1) # use dof=1 since only sampling a subset
inner = (np.exp(iteration) / std) * (residual - (2 * std - np.mean(neg_residual)))
# the exponential term is used to change the shape of the weighting from a logistic curve
# at low iterations to a step curve at higher iterations (figure 1 in the paper); setting
# the maximum iteration to 100 still acheives this purpose while avoiding unnecesarry
# overflow for high iterations
inner = (np.exp(min(iteration, 100)) / std) * (residual - (2 * std - np.mean(neg_residual)))
weights = 0.5 * (1 - (inner / (1 + np.abs(inner))))
return weights
return weights, exit_early


def _iarpls(y, baseline, iteration):
Expand All @@ -162,6 +267,9 @@ def _iarpls(y, baseline, iteration):
-------
weights : numpy.ndarray, shape (N,)
The calculated weights.
exit_early : bool
Designates if there is a potential error with the calculation such that no further
iterations should be performed.

References
----------
Expand All @@ -171,13 +279,29 @@ def _iarpls(y, baseline, iteration):

"""
residual = y - baseline
std = _safe_std(residual[residual < 0], ddof=1) # dof=1 since sampling a subset
inner = (np.exp(iteration) / std) * (residual - 2 * std)
neg_residual = residual[residual < 0]
if neg_residual.size < 2:
exit_early = True
warnings.warn(
('almost all baseline points are below the data, indicating that "tol"'
' is too low and/or "max_iter" is too high'), ParameterWarning,
stacklevel=2
)
return np.zeros_like(y), exit_early
else:
exit_early = False

std = _safe_std(neg_residual, ddof=1) # use dof=1 since only sampling a subset
# the exponential term is used to change the shape of the weighting from a logistic curve
# at low iterations to a step curve at higher iterations (figure 1 in the paper); setting
# the maximum iteration to 100 still acheives this purpose while avoiding unnecesarry
# overflow for high iterations
inner = (np.exp(min(iteration, 100)) / std) * (residual - 2 * std)
weights = 0.5 * (1 - (inner / np.sqrt(1 + inner**2)))
return weights
return weights, exit_early


def _aspls(y, baseline):
def _aspls(y, baseline, assymetric_coef):
"""
Weighting for the adaptive smoothness penalized least squares smoothing (aspls).

Expand All @@ -187,17 +311,23 @@ def _aspls(y, baseline):
The measured data.
baseline : numpy.ndarray, shape (N,)
The calculated baseline.
assymetric_coef : float
The assymetric coefficient for the weighting. Higher values leads to a steeper
weighting curve (ie. more step-like).

Returns
-------
weights : numpy.ndarray, shape (N,)
The calculated weights.
residual : numpy.ndarray, shape (N,)
The residual, ``y - baseline``.
exit_early : bool
Designates if there is a potential error with the calculation such that no further
iterations should be performed.

Notes
-----
The weighting uses an asymmetric coefficient (`k` in the asPLS paper) of 0.5 instead
The default asymmetric coefficient (`k` in the asPLS paper) is 0.5 instead
of the 2 listed in the asPLS paper. pybaselines uses the factor of 0.5 since it
matches the results in Table 2 and Figure 5 of the asPLS paper closer than the
factor of 2 and fits noisy data much better.
Expand All @@ -209,10 +339,22 @@ def _aspls(y, baseline):

"""
residual = y - baseline
std = _safe_std(residual[residual < 0], ddof=1) # use dof=1 since sampling a subset
neg_residual = residual[residual < 0]
if neg_residual.size < 2:
exit_early = True
warnings.warn(
('almost all baseline points are below the data, indicating that "tol"'
' is too low and/or "max_iter" is too high'), ParameterWarning,
stacklevel=2
)
return np.zeros_like(y), residual, exit_early
else:
exit_early = False
std = _safe_std(neg_residual, ddof=1) # use dof=1 since sampling subset

# add a negative sign since expit performs 1/(1+exp(-input))
weights = expit(-(0.5 / std) * (residual - std))
return weights, residual
weights = expit(-(assymetric_coef / std) * (residual - std))
return weights, residual, exit_early


def _psalsa(y, baseline, p, k, shape_y):
Expand All @@ -228,7 +370,7 @@ def _psalsa(y, baseline, p, k, shape_y):
p : float
The penalizing weighting factor. Must be between 0 and 1. Values greater
than the baseline will be given `p` weight, and values less than the baseline
will be given `p - 1` weight.
will be given `1 - p` weight.
k : float
A factor that controls the exponential decay of the weights for baseline
values greater than the data. Should be approximately the height at which
Expand Down Expand Up @@ -270,7 +412,7 @@ def _derpsalsa(y, baseline, p, k, shape_y, partial_weights):
p : float
The penalizing weighting factor. Must be between 0 and 1. Values greater
than the baseline will be given `p` weight, and values less than the baseline
will be given `p - 1` weight.
will be given `1 - p` weight.
k : float
A factor that controls the exponential decay of the weights for baseline
values greater than the data. Should be approximately the height at which
Expand Down Expand Up @@ -306,7 +448,7 @@ def _derpsalsa(y, baseline, p, k, shape_y, partial_weights):
# since it's faster than performing the square and exp on the full residual
weights = np.full(shape_y, 1 - p, dtype=float)
mask = residual > 0
weights[mask] = p * np.exp(-((residual[mask] / k)**2) / 2)
weights[mask] = p * np.exp(-0.5 * ((residual[mask] / k)**2))
weights *= partial_weights
return weights

Expand Down
Loading
Loading