diff --git a/docs/functions/complexity.rst b/docs/functions/complexity.rst index 755811a922..b856ff9b79 100644 --- a/docs/functions/complexity.rst +++ b/docs/functions/complexity.rst @@ -80,11 +80,11 @@ Fractal Dimension Entropy ^^^^^^^^^^^^^^^^^ *entropy_shannon()* -""""""""""""""""""" +""""""""""""""""""""" .. autofunction:: neurokit2.complexity.entropy_shannon *entropy_maximum()* -""""""""""""""""""" +""""""""""""""""""""" .. autofunction:: neurokit2.complexity.entropy_maximum *entropy_differential()* @@ -103,6 +103,18 @@ Entropy """"""""""""""""""" .. autofunction:: neurokit2.complexity.entropy_renyi +*entropy_approximate()* +"""""""""""""""""""""""" +.. autofunction:: neurokit2.complexity.entropy_approximate + +*entropy_sample()* +"""""""""""""""""""" +.. autofunction:: neurokit2.complexity.entropy_sample + +*entropy_quadratic()* +"""""""""""""""""""""" +.. autofunction:: neurokit2.complexity.entropy_quadratic + *entropy_cumulativeresidual()* """"""""""""""""""""""""""""""""" .. autofunction:: neurokit2.complexity.entropy_cumulativeresidual @@ -155,14 +167,6 @@ Entropy """"""""""""""""""""""" .. autofunction:: neurokit2.complexity.entropy_ofentropy -*entropy_approximate()* -"""""""""""""""""""""""" -.. autofunction:: neurokit2.complexity.entropy_approximate - -*entropy_sample()* -"""""""""""""""""""" -.. autofunction:: neurokit2.complexity.entropy_sample - *entropy_permutation()* """""""""""""""""""""""" .. autofunction:: neurokit2.complexity.entropy_permutation @@ -274,4 +278,4 @@ Joint/Multivariate .. .. automodule:: neurokit2.complexity .. :members: -.. :exclude-members: complexity, complexity_delay, complexity_dimension, complexity_tolerance, complexity_k, fractal_katz, fractal_petrosian, fractal_sevcik, fractal_nld, fractal_psdslope, fractal_higuchi, fractal_correlation, entropy_shannon, entropy_maximum, entropy_differential, entropy_tsallis, entropy_renyi, entropy_cumulativeresidual, entropy_svd, entropy_spectral, entropy_phase, entropy_grid, entropy_attention, entropy_increment, entropy_slope, entropy_symbolicdynamic, entropy_dispersion, entropy_ofentropy, entropy_approximate, entropy_sample, entropy_permutation, entropy_bubble, entropy_range, entropy_fuzzy, entropy_multiscale, entropy_hierarchical, fisher_information, complexity_hjorth, complexity_lempelziv, complexity_relativeroughness, fractal_hurst, complexity_lyapunov, complexity_rqa, fractal_mandelbrot, complexity_simulate, complexity_attractor, complexity_symbolize, complexity_coarsegraining, complexity_ordinalpatterns, recurrence_matrix, entropy_shannon_joint, entropy_rate \ No newline at end of file +.. :exclude-members: complexity, complexity_delay, complexity_dimension, complexity_tolerance, complexity_k, fractal_katz, fractal_petrosian, fractal_sevcik, fractal_nld, fractal_psdslope, fractal_higuchi, fractal_correlation, entropy_shannon, entropy_maximum, entropy_differential, entropy_tsallis, entropy_renyi, entropy_cumulativeresidual, entropy_svd, entropy_spectral, entropy_phase, entropy_grid, entropy_attention, entropy_increment, entropy_slope, entropy_symbolicdynamic, entropy_dispersion, entropy_ofentropy, entropy_approximate, entropy_sample, entropy_permutation, entropy_bubble, entropy_range, entropy_fuzzy, entropy_multiscale, entropy_hierarchical, fisher_information, complexity_hjorth, complexity_lempelziv, complexity_relativeroughness, fractal_hurst, complexity_lyapunov, complexity_rqa, fractal_mandelbrot, complexity_simulate, complexity_attractor, complexity_symbolize, complexity_coarsegraining, complexity_ordinalpatterns, recurrence_matrix, entropy_shannon_joint, entropy_rate, entropy_quadratic \ No newline at end of file diff --git a/neurokit2/__init__.py b/neurokit2/__init__.py index f2ccc19594..6e7a7e8790 100644 --- a/neurokit2/__init__.py +++ b/neurokit2/__init__.py @@ -32,7 +32,7 @@ from .video import * # Info -__version__ = "0.2.6" +__version__ = "0.2.7" # Maintainer info diff --git a/neurokit2/complexity/__init__.py b/neurokit2/complexity/__init__.py index 88e1e555aa..c0383bf52f 100644 --- a/neurokit2/complexity/__init__.py +++ b/neurokit2/complexity/__init__.py @@ -30,6 +30,7 @@ from .entropy_permutation import entropy_permutation from .entropy_phase import entropy_phase from .entropy_power import entropy_power +from .entropy_quadratic import entropy_quadratic from .entropy_range import entropy_range from .entropy_rate import entropy_rate from .entropy_renyi import entropy_renyi @@ -96,7 +97,9 @@ complexity_rcmse = functools.partial(entropy_multiscale, method="RCMSEn") complexity_fuzzymse = functools.partial(entropy_multiscale, fuzzy=True) complexity_fuzzycmse = functools.partial(entropy_multiscale, method="CMSEn", fuzzy=True) -complexity_fuzzyrcmse = functools.partial(entropy_multiscale, method="RCMSEn", fuzzy=True) +complexity_fuzzyrcmse = functools.partial( + entropy_multiscale, method="RCMSEn", fuzzy=True +) complexity_dfa = fractal_dfa @@ -175,6 +178,7 @@ "entropy_symbolicdynamic", "entropy_cumulativeresidual", "entropy_approximate", + "entropy_quadratic", "entropy_bubble", "entropy_coalition", "entropy_sample", diff --git a/neurokit2/complexity/complexity_decorrelation.py b/neurokit2/complexity/complexity_decorrelation.py index c5cf637b4e..5e7f29b2d6 100644 --- a/neurokit2/complexity/complexity_decorrelation.py +++ b/neurokit2/complexity/complexity_decorrelation.py @@ -1,10 +1,11 @@ +import matplotlib.pyplot as plt import numpy as np import pandas as pd from ..signal import signal_autocor -def complexity_decorrelation(signal): +def complexity_decorrelation(signal, show=False): """**Decorrelation Time (DT)** The decorrelation time (DT) is defined as the time (in samples) of the first zero crossing of @@ -17,6 +18,8 @@ def complexity_decorrelation(signal): ---------- signal : Union[list, np.array, pd.Series] The signal (i.e., a time series) in the form of a vector of values. + show : bool + If True, will return a plot of the autocorrelation. Returns ------- @@ -36,11 +39,15 @@ def complexity_decorrelation(signal): import neurokit2 as nk - # Simulate a signal with duration os 2s - signal = nk.signal_simulate(duration=2, frequency=[5, 9, 12]) + # Simulate a signal + signal = nk.signal_simulate(duration=5, sampling_rate=100, frequency=[5, 6], noise=0.5) # Compute DT - dt, _ = nk.complexity_decorrelation(signal) + @savefig p_complexity_decorrelation1.png scale=100% + dt, _ = nk.complexity_decorrelation(signal, show=True) + @suppress + plt.close() + dt References @@ -60,7 +67,7 @@ def complexity_decorrelation(signal): ) # Unbiased autocor (see https://github.com/mne-tools/mne-features/) - autocor, _ = signal_autocor(signal, method="unbiased") + autocor, _ = signal_autocor(signal, unbiased=True) # Get zero-crossings zc = np.diff(np.sign(autocor)) != 0 @@ -68,4 +75,20 @@ def complexity_decorrelation(signal): dt = np.argmax(zc) + 1 else: dt = -1 + + if show is True: + # Max length of autocorrelation to plot + max_len = int(dt * 4) + if max_len > len(autocor): + max_len = len(autocor) + + plt.plot(autocor[0:max_len]) + plt.xlabel("Lag") + plt.ylabel("Autocorrelation") + plt.xticks(np.arange(0, max_len, step=dt).astype(int)) + plt.axvline(dt, color="red", linestyle="--", label=f"DT = {dt}") + plt.axhline(0, color="black", linestyle="--") + plt.title("Decorrelation Time (DT)") + plt.legend() + return dt, {} diff --git a/neurokit2/complexity/complexity_lyapunov.py b/neurokit2/complexity/complexity_lyapunov.py index 2acd5bd123..8fa6a06d0f 100644 --- a/neurokit2/complexity/complexity_lyapunov.py +++ b/neurokit2/complexity/complexity_lyapunov.py @@ -5,8 +5,9 @@ import numpy as np import pandas as pd import sklearn.metrics.pairwise +import sklearn.neighbors -from ..misc import NeuroKitWarning +from ..misc import NeuroKitWarning, find_knee from ..signal.signal_psd import signal_psd from .utils_complexity_embedding import complexity_embedding @@ -16,9 +17,7 @@ def complexity_lyapunov( delay=1, dimension=2, method="rosenstein1993", - len_trajectory=20, - matrix_dim=4, - min_neighbors="default", + separation="auto", **kwargs, ): """**(Largest) Lyapunov Exponent (LLE)** @@ -26,7 +25,7 @@ def complexity_lyapunov( Lyapunov exponents (LE) describe the rate of exponential separation (convergence or divergence) of nearby trajectories of a dynamical system. It is a measure of sensitive dependence on initial conditions, i.e. how quickly two nearby states diverge. A system can have multiple LEs, - equal to thenumber of the dimensionality of the phase space, and the largest LE value, "LLE" is + equal to the number of the dimensionality of the phase space, and the largest LE value, "LLE" is often used to determine the overall predictability of the dynamical system. Different algorithms exist to estimate these indices: @@ -37,13 +36,17 @@ def complexity_lyapunov( neighbouring points are then tracked along their distance trajectories for a number of data points. The slope of the line using a least-squares fit of the mean log trajectory of the distances gives the final LLE. - * **Eckmann et al. (1996)** computes LEs by first reconstructing the time series using a + * **Makowski** is a custom modification of Rosenstein's algorithm, using KDTree for more + efficient nearest neighbors computation. Additionally, the LLE is computed as the slope up to + the changepoint of divergence rate (the point where it flattens out), making it more robust + to the length trajectory parameter. + * **Eckmann et al. (1986)** computes LEs by first reconstructing the time series using a delay-embedding method, and obtains the tangent that maps to the reconstructed dynamics using a least-squares fit, where the LEs are deduced from the tangent maps. .. warning:: - The **Eckman (1996)** method currently does not work. Please help us fixing it by double + The **Eckman (1986)** method currently does not work. Please help us fixing it by double checking the code, the paper and helping us figuring out what's wrong. Overall, we would like to improve this function to return for instance all the exponents (Lyapunov spectrum), implement newer and faster methods (e.g., Balcerzak, 2018, 2020), etc. If you're interested @@ -59,17 +62,17 @@ def complexity_lyapunov( dimension : int Embedding Dimension (*m*, sometimes referred to as *d* or *order*). See :func:`complexity_dimension` to estimate the optimal value for this parameter. If method - is ``"eckmann1996"``, larger values for dimension are recommended. + is ``"eckmann1986"``, larger values for dimension are recommended. method : str - The method that defines the algorithm for computing LE. Can be one of ``"rosenstein1993"`` - or ``"eckmann1996"``. + The method that defines the algorithm for computing LE. Can be one of ``"rosenstein1993"``, + ``"makowski"``, or ``"eckmann1986"``. len_trajectory : int Applies when method is ``"rosenstein1993"``. The number of data points in which neighboring trajectories are followed. matrix_dim : int - Applies when method is ``"eckmann1996"``. Corresponds to the number of LEs to return. + Applies when method is ``"eckmann1986"``. Corresponds to the number of LEs to return. min_neighbors : int, str - Applies when method is ``"eckmann1996"``. Minimum number of neighbors. If ``"default"``, + Applies when method is ``"eckmann1986"``. Minimum number of neighbors. If ``"default"``, ``min(2 * matrix_dim, matrix_dim + 4)`` is used. **kwargs : optional Other arguments to be passed to ``signal_psd()`` for calculating the minimum temporal @@ -79,7 +82,7 @@ def complexity_lyapunov( -------- lle : float An estimate of the largest Lyapunov exponent (LLE) if method is ``"rosenstein1993"``, and - an array of LEs if ``"eckmann1996"``. + an array of LEs if ``"eckmann1986"``. info : dict A dictionary containing additional information regarding the parameters used to compute LLE. @@ -90,13 +93,24 @@ def complexity_lyapunov( import neurokit2 as nk - signal = nk.signal_simulate(duration=3, sampling_rate=100, frequency=[5, 8], noise=0.5) + signal = nk.signal_simulate(duration=5, sampling_rate=100, frequency=[5, 8], noise=0.1) + + # Rosenstein's method + @savefig p_complexity_lyapunov1.png scale=100% + lle, info = nk.complexity_lyapunov(signal, method="rosenstein", show=True) + @suppress + plt.close() - lle, info = nk.complexity_lyapunov(signal, method="rosenstein1993", show=True) lle + # Makowski's change-point method + @savefig p_complexity_lyapunov2.png scale=100% + lle, info = nk.complexity_lyapunov(signal, method="makowski", show=True) + @suppress + plt.close() + # Eckman's method is broken. Please help us fix-it! - # lle, info = nk.complexity_lyapunov(signal, dimension=2, method="eckmann1996") + # lle, info = nk.complexity_lyapunov(signal, dimension=2, method="eckmann1986") References ---------- @@ -120,18 +134,22 @@ def complexity_lyapunov( # "We impose the additional constraint that nearest neighbors have a temporal separation # greater than the mean period of the time series: This allows us to consider each pair of - # neighbors as nearby initial conditions for different trajectories."" + # neighbors as nearby initial conditions for different trajectories." # "We estimated the mean period as the reciprocal of the mean frequency of the power spectrum, # although we expect any comparable estimate, e.g., using the median frequency of the magnitude # spectrum, to yield equivalent results." + if separation == "auto": + # Actual sampling rate does not matter + psd = signal_psd( + signal, sampling_rate=1000, method="fft", normalize=False, show=False + ) + mean_freq = np.sum(psd["Power"] * psd["Frequency"]) / np.sum(psd["Power"]) - # Actual sampling rate does not matter - psd = signal_psd(signal, sampling_rate=1000, method="fft", normalize=False, show=False) - mean_freq = np.sum(psd["Power"] * psd["Frequency"]) / np.sum(psd["Power"]) - - # 1 / mean_freq = seconds per cycle - separation = int(np.ceil(1 / mean_freq * 1000)) + # 1 / mean_freq = seconds per cycle + separation = int(np.ceil(1 / mean_freq * 1000)) + else: + assert isinstance(separation, int), "'separation' should be an integer." # Run algorithm # ---------------- @@ -139,15 +157,22 @@ def complexity_lyapunov( method = method.lower() if method in ["rosenstein", "rosenstein1993"]: le, parameters = _complexity_lyapunov_rosenstein( - signal, delay, dimension, separation, len_trajectory, **kwargs + signal, delay, dimension, separation, **kwargs + ) + elif method in ["makowski"]: + le, parameters = _complexity_lyapunov_makowski( + signal, delay, dimension, separation, **kwargs ) - elif method in ["eckmann", "eckmann1996"]: + elif method in ["eckmann", "eckmann1986", "eckmann1986"]: le, parameters = _complexity_lyapunov_eckmann( signal, dimension=dimension, separation=separation, - matrix_dim=matrix_dim, - min_neighbors=min_neighbors, + ) + else: + raise ValueError( + "NeuroKit error: complexity_lyapunov(): 'method' should be one of " + " 'rosenstein1993', 'makowski', 'eckmann1986'." ) # Store params @@ -167,10 +192,83 @@ def complexity_lyapunov( # ============================================================================= +def _complexity_lyapunov_makowski( + signal, + delay=1, + dimension=2, + separation=1, + max_length="auto", + show=False, +): + # Store parameters + info = { + "Dimension": dimension, + "Delay": delay, + } + + # Embedding + embedded = complexity_embedding(signal, delay=delay, dimension=dimension) + n = len(embedded) + + # Set the maxiimum trajectory length to 10 times the delay + if max_length == "auto": + max_length = int(delay * 10) + if max_length >= n / 2: + max_length = n // 2 + + # Create KDTree and query for nearest neighbors + tree = sklearn.neighbors.KDTree(embedded, metric="euclidean") + + # Query for nearest neighbors. To ensure we get a neighbor outside of the `separation`, + # k=1 is the point itself, k=2 is the nearest neighbor, and k=3 is the second nearest neighbor. + idx = tree.query(embedded, k=2 + separation, return_distance=False) + + # The neighbor outside the `separation` region will be the last one in the returned list. + idx = idx[:, -1] + + # Compute the average divergence for each trajectory length + trajectories = np.zeros(max_length) + for k in range(1, max_length + 1): + valid = np.where((np.arange(n - k) + k < n) & (idx[: n - k] + k < n))[0] + + if valid.size == 0: + trajectories[k - 1] = -np.inf + continue + + divergences = np.linalg.norm( + embedded[valid + k] - embedded[idx[valid] + k], + axis=1, + ) + divergences = divergences[divergences > 0] + if len(divergences) == 0: + trajectories[k - 1] = np.nan + else: + trajectories[k - 1] = np.mean(np.log(divergences)) + + # Change point + x_axis = range(1, len(trajectories) + 1) + knee = find_knee(y=trajectories, x=x_axis, show=False, verbose=False) + info["Divergence_Rate"] = trajectories + info["Changepoint"] = knee + + # Linear fit + slope, intercept = np.polyfit(x_axis[0:knee], trajectories[0:knee], 1) + if show is True: + plt.plot(np.arange(1, len(trajectories) + 1), trajectories) + plt.axvline(knee, color="red", label="Changepoint", linestyle="--") + plt.axline( + (0, intercept), slope=slope, color="orange", label="Least-squares Fit" + ) + plt.ylim(bottom=np.min(trajectories)) + plt.ylabel("Divergence Rate") + plt.title(f"Largest Lyapunov Exponent (slope of the line) = {slope:.3f}") + plt.legend() + return slope, info + + def _complexity_lyapunov_rosenstein( signal, delay=1, dimension=2, separation=1, len_trajectory=20, show=False, **kwargs ): - # 1. Check that sufficient data points are available # Minimum length required to find single orbit vector min_len = (dimension - 1) * delay + 1 @@ -200,7 +298,9 @@ def _complexity_lyapunov_rosenstein( # Find indices of nearest neighbours ntraj = m - len_trajectory + 1 - min_dist_indices = np.argmin(dists[:ntraj, :ntraj], axis=1) # exclude last few indices + min_dist_indices = np.argmin( + dists[:ntraj, :ntraj], axis=1 + ) # exclude last few indices min_dist_indices = min_dist_indices.astype(int) # Follow trajectories of neighbour pairs for len_trajectory data points @@ -217,16 +317,25 @@ def _complexity_lyapunov_rosenstein( divergence_rate = trajectories[np.isfinite(trajectories)] # LLE obtained by least-squares fit to average line - slope, intercept = np.polyfit(np.arange(1, len(divergence_rate) + 1), divergence_rate, 1) + slope, intercept = np.polyfit( + np.arange(1, len(divergence_rate) + 1), divergence_rate, 1 + ) + + # Store info + parameters = { + "Trajectory_Length": len_trajectory, + "Divergence_Rate": divergence_rate, + } if show is True: plt.plot(np.arange(1, len(divergence_rate) + 1), divergence_rate) - plt.axline((0, intercept), slope=slope, color="orange", label="Least-squares Fit") + plt.axline( + (0, intercept), slope=slope, color="orange", label="Least-squares Fit" + ) plt.ylabel("Divergence Rate") + plt.title(f"Largest Lyapunov Exponent (slope of the line) = {slope:.3f}") plt.legend() - parameters = {"Trajectory_Length": len_trajectory} - return slope, parameters @@ -279,7 +388,9 @@ def _complexity_lyapunov_eckmann( # get neighbors within the radius r = distances[i][neighbour_furthest] - neighbors = np.where(distances[i] <= r)[0] # should have length = min_neighbours + neighbors = np.where(distances[i] <= r)[ + 0 + ] # should have length = min_neighbours # Find matrix T_i (matrix_dim * matrix_dim) that sends points from neighbourhood of x(i) to x(i+1) vec_beta = signal[neighbors + matrix_dim * m] - signal[i + matrix_dim * m] @@ -289,7 +400,9 @@ def _complexity_lyapunov_eckmann( # form matrix T_i t_i = np.zeros((matrix_dim, matrix_dim)) t_i[:-1, 1:] = np.identity(matrix_dim - 1) - t_i[-1] = np.linalg.lstsq(matrix, vec_beta, rcond=-1)[0] # least squares solution + t_i[-1] = np.linalg.lstsq(matrix, vec_beta, rcond=-1)[ + 0 + ] # least squares solution # QR-decomposition of T * old_Q mat_Q, mat_R = np.linalg.qr(np.dot(t_i, old_Q)) diff --git a/neurokit2/complexity/entropy_approximate.py b/neurokit2/complexity/entropy_approximate.py index cf6af72848..900b73373f 100644 --- a/neurokit2/complexity/entropy_approximate.py +++ b/neurokit2/complexity/entropy_approximate.py @@ -3,10 +3,12 @@ import pandas as pd from .optim_complexity_tolerance import _entropy_apen, complexity_tolerance -from .utils import _get_count +from .utils_entropy import _get_count -def entropy_approximate(signal, delay=1, dimension=2, tolerance="sd", corrected=False, **kwargs): +def entropy_approximate( + signal, delay=1, dimension=2, tolerance="sd", corrected=False, **kwargs +): """**Approximate entropy (ApEn) and its corrected version (cApEn)** Approximate entropy is a technique used to quantify the amount of regularity and the @@ -95,7 +97,7 @@ def entropy_approximate(signal, delay=1, dimension=2, tolerance="sd", corrected= # Compute index if corrected is False: - # ApEn is implemented in 'optim_complexity_tolerance()' to avoid circular imports + # ApEn is implemented in 'utils_entropy.py' to avoid circular imports # as one of the method for optimizing tolerance relies on ApEn out, _ = _entropy_apen(signal, delay, dimension, info["Tolerance"], **kwargs) else: @@ -110,7 +112,6 @@ def entropy_approximate(signal, delay=1, dimension=2, tolerance="sd", corrected= def _entropy_capen(signal, delay, dimension, tolerance, **kwargs): - __, count1, _ = _get_count( signal, delay=delay, diff --git a/neurokit2/complexity/entropy_multiscale.py b/neurokit2/complexity/entropy_multiscale.py index d469691670..419258181a 100644 --- a/neurokit2/complexity/entropy_multiscale.py +++ b/neurokit2/complexity/entropy_multiscale.py @@ -13,8 +13,8 @@ from .entropy_slope import entropy_slope from .entropy_symbolicdynamic import entropy_symbolicdynamic from .optim_complexity_tolerance import complexity_tolerance -from .utils import _phi, _phi_divide from .utils_complexity_coarsegraining import _get_scales, complexity_coarsegraining +from .utils_entropy import _phi, _phi_divide def entropy_multiscale( @@ -249,10 +249,12 @@ def entropy_multiscale( if "delay" in kwargs: kwargs.pop("delay") - # Parameters selection + # Default parameters algorithm = entropy_sample refined = False coarsegraining = "nonoverlapping" + + # Parameters adjustement for variants if method in ["MSEn", "SampEn"]: pass # The default arguments are good elif method in ["MSApEn", "ApEn", "MSPEn", "PEn", "MSWPEn", "WPEn"]: @@ -326,13 +328,9 @@ def entropy_multiscale( info["Value"] = np.array( [ _entropy_multiscale( - coarse=complexity_coarsegraining( - signal, - scale=scale, - method=coarsegraining, - show=False, - **kwargs, - ), + signal, + scale=scale, + coarsegraining=coarsegraining, algorithm=algorithm, dimension=dimension, tolerance=info["Tolerance"], @@ -378,13 +376,32 @@ def _entropy_multiscale_plot(mse, info): # ============================================================================= # Methods # ============================================================================= -def _entropy_multiscale(coarse, algorithm, dimension, tolerance, refined=False, **kwargs): +def _entropy_multiscale( + signal, + scale, + coarsegraining, + algorithm, + dimension, + tolerance, + refined=False, + **kwargs, +): """Wrapper function that works both on 1D and 2D coarse-grained (for composite)""" + + # Get coarse-grained signal + coarse = complexity_coarsegraining(signal, scale=scale, method=coarsegraining) + # For 1D coarse-graining if coarse.ndim == 1: + # Get delay + delay = 1 # If non-overlapping + if coarsegraining in ["rolling", "interpolate"]: + delay = scale + + # Compute entropy return algorithm( coarse, - delay=1, + delay=delay, dimension=dimension, tolerance=tolerance, **kwargs, diff --git a/neurokit2/complexity/entropy_quadratic.py b/neurokit2/complexity/entropy_quadratic.py new file mode 100644 index 0000000000..de859bcb85 --- /dev/null +++ b/neurokit2/complexity/entropy_quadratic.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +import numpy as np + +from .entropy_sample import entropy_sample + + +def entropy_quadratic(signal, delay=1, dimension=2, tolerance="sd", **kwargs): + """**Quadratic Sample Entropy (QSE)** + + Compute the quadratic sample entropy (QSE) of a signal. It is essentially a correction of + SampEn introduced by Lake (2005) defined as: + + .. math:: + + QSE = SampEn + ln(2 * tolerannce) + + QSE has been described as a more stable measure of entropy than SampEn (Gylling, 2017). + + Parameters + ---------- + signal : Union[list, np.array, pd.Series] + The signal (i.e., a time series) in the form of a vector of values. + delay : int + Time delay (often denoted *Tau* :math:`\\tau`, sometimes referred to as *lag*) in samples. + See :func:`complexity_delay` to estimate the optimal value for this parameter. + dimension : int + Embedding Dimension (*m*, sometimes referred to as *d* or *order*). See + :func:`complexity_dimension` to estimate the optimal value for this parameter. + tolerance : float + Tolerance (often denoted as *r*), distance to consider two data points as similar. If + ``"sd"`` (default), will be set to :math:`0.2 * SD_{signal}`. See + :func:`complexity_tolerance` to estimate the optimal value for this parameter. + **kwargs : optional + Other arguments. + + See Also + -------- + entropy_sample + + Returns + ---------- + qse : float + The uadratic sample entropy of the single time series. + info : dict + A dictionary containing additional information regarding the parameters used + to compute sample entropy. + + Examples + ---------- + .. ipython:: python + + import neurokit2 as nk + + signal = nk.signal_simulate(duration=2, frequency=5) + + qsa, parameters = nk.entropy_quadratic(signal, delay=1, dimension=2) + qsa + + References + ---------- + * Huselius Gylling, K. (2017). Quadratic sample entropy as a measure of burstiness: A study in + how well Rényi entropy rate and quadratic sample entropy can capture the presence of spikes in + time-series data. + * Lake, D. E. (2005). Renyi entropy measures of heart rate Gaussianity. IEEE Transactions on + Biomedical Engineering, 53(1), 21-27. + + """ + sampen, info = entropy_sample( + signal, + delay=delay, + dimension=dimension, + tolerance=tolerance, + **kwargs, + ) + return sampen + np.log(2 * info["Tolerance"]), info diff --git a/neurokit2/complexity/entropy_sample.py b/neurokit2/complexity/entropy_sample.py index 3aacca7052..debbf84367 100644 --- a/neurokit2/complexity/entropy_sample.py +++ b/neurokit2/complexity/entropy_sample.py @@ -3,7 +3,7 @@ import pandas as pd from .optim_complexity_tolerance import complexity_tolerance -from .utils import _phi, _phi_divide +from .utils_entropy import _phi, _phi_divide def entropy_sample(signal, delay=1, dimension=2, tolerance="sd", **kwargs): @@ -35,7 +35,7 @@ def entropy_sample(signal, delay=1, dimension=2, tolerance="sd", **kwargs): See Also -------- - entropy_shannon, entropy_approximate, entropy_fuzzy + entropy_shannon, entropy_approximate, entropy_fuzzy, entropy_quadratic Returns ---------- @@ -81,13 +81,13 @@ def entropy_sample(signal, delay=1, dimension=2, tolerance="sd", **kwargs): } # Compute phi - phi = _phi( + info["phi"], _ = _phi( signal, delay=delay, dimension=dimension, tolerance=info["Tolerance"], approximate=False, **kwargs - )[0] + ) - return _phi_divide(phi), info + return _phi_divide(info["phi"]), info diff --git a/neurokit2/complexity/optim_complexity_tolerance.py b/neurokit2/complexity/optim_complexity_tolerance.py index aa10d4607d..af8bb06504 100644 --- a/neurokit2/complexity/optim_complexity_tolerance.py +++ b/neurokit2/complexity/optim_complexity_tolerance.py @@ -6,8 +6,8 @@ import sklearn.neighbors from ..stats import density -from .utils import _phi from .utils_complexity_embedding import complexity_embedding +from .utils_entropy import _entropy_apen def complexity_tolerance( @@ -224,7 +224,11 @@ def complexity_tolerance( ): if dimension is None: raise ValueError("'dimension' cannot be empty for the 'nolds' method.") - r = 0.11604738531196232 * np.std(signal, ddof=1) * (0.5627 * np.log(dimension) + 1.3334) + r = ( + 0.11604738531196232 + * np.std(signal, ddof=1) + * (0.5627 * np.log(dimension) + 1.3334) + ) info = {"Method": "Adjusted 20% SD"} elif method in ["chon", "chon2009"] and ( @@ -264,7 +268,9 @@ def complexity_tolerance( raise ValueError("'dimension' cannot be empty for the 'makowski' method.") n = len(signal) r = np.std(signal, ddof=1) * ( - 0.2811 * (dimension - 1) + 0.0049 * np.log(n) - 0.02 * ((dimension - 1) * np.log(n)) + 0.2811 * (dimension - 1) + + 0.0049 * np.log(n) + - 0.02 * ((dimension - 1) * np.log(n)) ) info = {"Method": "Makowski"} @@ -292,7 +298,9 @@ def complexity_tolerance( info.update({"Method": "bin"}) else: - raise ValueError("NeuroKit error: complexity_tolerance(): 'method' not recognized.") + raise ValueError( + "NeuroKit error: complexity_tolerance(): 'method' not recognized." + ) if show is True: _optimize_tolerance_plot(r, info, method=method, signal=signal) @@ -305,10 +313,11 @@ def complexity_tolerance( def _optimize_tolerance_recurrence(signal, r_range=None, delay=None, dimension=None): - # Optimize missing parameters if delay is None or dimension is None: - raise ValueError("If method='recurrence', both delay and dimension must be specified.") + raise ValueError( + "If method='recurrence', both delay and dimension must be specified." + ) # Compute distance matrix emb = complexity_embedding(signal, delay=delay, dimension=dimension) @@ -332,10 +341,11 @@ def _optimize_tolerance_recurrence(signal, r_range=None, delay=None, dimension=N def _optimize_tolerance_maxapen(signal, r_range=None, delay=None, dimension=None): - # Optimize missing parameters if delay is None or dimension is None: - raise ValueError("If method='maxApEn', both delay and dimension must be specified.") + raise ValueError( + "If method='maxApEn', both delay and dimension must be specified." + ) if r_range is None: r_range = 40 @@ -358,20 +368,6 @@ def _optimize_tolerance_maxapen(signal, r_range=None, delay=None, dimension=None return r_range[np.argmax(apens)], {"Values": r_range, "Scores": np.array(apens)} -def _entropy_apen(signal, delay, dimension, tolerance, **kwargs): - - phi, info = _phi( - signal, - delay=delay, - dimension=dimension, - tolerance=tolerance, - approximate=True, - **kwargs, - ) - - return np.abs(np.subtract(phi[0], phi[1])), info - - def _optimize_tolerance_neighbours(signal, r_range=None, delay=None, dimension=None): if delay is None: delay = 1 @@ -400,7 +396,6 @@ def _optimize_tolerance_neighbours(signal, r_range=None, delay=None, dimension=N def _optimize_tolerance_bin(signal, delay=None, dimension=None): - # Optimize missing parameters if delay is None or dimension is None: raise ValueError("If method='bin', both delay and dimension must be specified.") @@ -424,7 +419,6 @@ def _optimize_tolerance_bin(signal, delay=None, dimension=None): # Plotting # ============================================================================= def _optimize_tolerance_plot(r, info, ax=None, method="maxApEn", signal=None): - if ax is None: fig, ax = plt.subplots() else: diff --git a/neurokit2/complexity/utils_complexity_coarsegraining.py b/neurokit2/complexity/utils_complexity_coarsegraining.py index 83362bc15c..8e585dbfbe 100644 --- a/neurokit2/complexity/utils_complexity_coarsegraining.py +++ b/neurokit2/complexity/utils_complexity_coarsegraining.py @@ -1,12 +1,14 @@ # -*- coding: utf-8 -*- import matplotlib.pyplot as plt import numpy as np -import scipy.ndimage.filters from ..signal import signal_interpolate +from .utils_complexity_embedding import complexity_embedding -def complexity_coarsegraining(signal, scale=2, method="nonoverlapping", show=False, **kwargs): +def complexity_coarsegraining( + signal, scale=2, method="nonoverlapping", show=False, **kwargs +): """**Coarse-graining of a signal** The goal of coarse-graining is to represent the signal at a different "scale". The @@ -181,31 +183,37 @@ def complexity_coarsegraining(signal, scale=2, method="nonoverlapping", show=Fal ) elif method == "rolling": - # Relying on scipy is a fast alternative to: - # pd.Series(signal).rolling(window=scale).mean().values[scale-1::] - # https://stackoverflow.com/questions/13728392/moving-average-or-running-mean - coarse = scipy.ndimage.filters.uniform_filter1d(signal, size=scale, mode="nearest") - coarse = coarse[scale - 1 : :] + # See https://github.com/neuropsychology/NeuroKit/pull/892 + coarse = complexity_embedding(signal, dimension=scale, delay=1).mean(axis=1) elif method == "timeshift": - coarse = np.transpose(np.reshape(signal[: scale * (n // scale)], (n // scale, scale))) + coarse = np.transpose( + np.reshape(signal[: scale * (n // scale)], (n // scale, scale)) + ) else: raise ValueError("Unknown `method`: {}".format(method)) if show is True: - _complexity_show(signal[0:n], coarse, method=method) + _complexity_coarsegraining_show(signal[0:n], coarse, method=method) return coarse # ============================================================================= # Utils # ============================================================================= -def _complexity_show(signal, coarse, method="nonoverlapping"): +def _complexity_coarsegraining_show(signal, coarse, method="nonoverlapping"): plt.plot(signal, linewidth=1.5) if method == "nonoverlapping": - plt.plot(np.linspace(0, len(signal), len(coarse)), coarse, color="red", linewidth=0.75) - plt.scatter(np.linspace(0, len(signal), len(coarse)), coarse, color="red", linewidth=0.5) + plt.plot( + np.linspace(0, len(signal), len(coarse)), + coarse, + color="red", + linewidth=0.75, + ) + plt.scatter( + np.linspace(0, len(signal), len(coarse)), coarse, color="red", linewidth=0.5 + ) elif method == "timeshift": for i in range(len(coarse)): plt.plot( @@ -215,7 +223,9 @@ def _complexity_show(signal, coarse, method="nonoverlapping"): linewidth=0.75, ) else: - plt.plot(np.linspace(0, len(signal), len(coarse)), coarse, color="red", linewidth=1) + plt.plot( + np.linspace(0, len(signal), len(coarse)), coarse, color="red", linewidth=1 + ) plt.title(f'Coarse-graining using method "{method}"') diff --git a/neurokit2/complexity/utils.py b/neurokit2/complexity/utils_entropy.py similarity index 86% rename from neurokit2/complexity/utils.py rename to neurokit2/complexity/utils_entropy.py index 86ffb8741c..444dfd8aea 100644 --- a/neurokit2/complexity/utils.py +++ b/neurokit2/complexity/utils_entropy.py @@ -6,6 +6,23 @@ from .utils_complexity_embedding import complexity_embedding + +# ============================================================================= +# ApEn +# ============================================================================= +def _entropy_apen(signal, delay, dimension, tolerance, **kwargs): + phi, info = _phi( + signal, + delay=delay, + dimension=dimension, + tolerance=tolerance, + approximate=True, + **kwargs, + ) + + return np.abs(np.subtract(phi[0], phi[1])), info + + # ============================================================================= # Phi # ============================================================================= @@ -117,9 +134,8 @@ def _get_count( valid_metrics = sklearn.neighbors.KDTree.valid_metrics + ["range"] if distance not in valid_metrics: raise ValueError( - "The given metric (%s) is not valid." - "The valid metric names are: %s" - % (distance, valid_metrics) + f"The given metric ({distance}) is not valid." + f" Valid metric names are: {valid_metrics}" ) if fuzzy is True: @@ -152,7 +168,10 @@ def distrange(x, y): # Count for each row count = np.array( - [np.sum(distrange(embedded, embedded[i]) < tolerance) for i in range(len(embedded))] + [ + np.sum(distrange(embedded, embedded[i]) < tolerance) + for i in range(len(embedded)) + ] ) else: # chebyshev and other sklearn methods @@ -160,5 +179,7 @@ def distrange(x, y): # has a `workers` argument to use multiple cores? Benchmark or opinion required! if kdtree is None: kdtree = sklearn.neighbors.KDTree(embedded, metric=distance) - count = kdtree.query_radius(embedded, tolerance, count_only=True).astype(np.float64) + count = kdtree.query_radius(embedded, tolerance, count_only=True).astype( + np.float64 + ) return embedded, count, kdtree diff --git a/neurokit2/data/read_xdf.py b/neurokit2/data/read_xdf.py index ec4b31d219..bb7ac9250e 100644 --- a/neurokit2/data/read_xdf.py +++ b/neurokit2/data/read_xdf.py @@ -82,13 +82,11 @@ def read_xdf(filename, upsample=2, fillmissing=None): # Rename GYRO channels and add ACCelerometer if stream["info"]["type"][0] == "GYRO": dat = dat.rename(columns={"X": "GYRO_X", "Y": "GYRO_Y", "Z": "GYRO_Z"}) - dat["ACC"] = np.sqrt( - dat["GYRO_X"] ** 2 + dat["GYRO_Y"] ** 2 + dat["GYRO_Z"] ** 2 - ) + dat["ACC"] = np.sqrt(dat["GYRO_X"] ** 2 + dat["GYRO_Y"] ** 2 + dat["GYRO_Z"] ** 2) # Muse - PPG data has three channels: ambient, infrared, red if stream["info"]["type"][0] == "PPG": - dat = dat.rename(columns={"PPG1": "LUX", "PPG2": "PPG", "PPG3": "RED"}) + dat = dat.rename(columns={"PPG1": "LUX", "PPG2": "PPG", "PPG3": "RED", "IR": "PPG"}) # Zeros suggest interruptions, better to replace with NaNs (I think?) dat["PPG"] = dat["PPG"].replace(0, value=np.nan) dat["LUX"] = dat["LUX"].replace(0, value=np.nan) @@ -101,12 +99,8 @@ def read_xdf(filename, upsample=2, fillmissing=None): # Store metadata info = { - "sampling_rates_original": [ - float(s["info"]["nominal_srate"][0]) for s in streams - ], - "sampling_rates_effective": [ - float(s["info"]["effective_srate"]) for s in streams - ], + "sampling_rates_original": [float(s["info"]["nominal_srate"][0]) for s in streams], + "sampling_rates_effective": [float(s["info"]["effective_srate"]) for s in streams], "datetime": header["info"]["datetime"][0], "data": dfs, } @@ -127,14 +121,8 @@ def read_xdf(filename, upsample=2, fillmissing=None): fillmissing = int(info["sampling_rate"] * fillmissing) # Create new index with evenly spaced timestamps - idx = pd.date_range( - df.index.min(), df.index.max(), freq=str(1000 / info["sampling_rate"]) + "ms" - ) + idx = pd.date_range(df.index.min(), df.index.max(), freq=str(1000 / info["sampling_rate"]) + "ms") # https://stackoverflow.com/questions/47148446/pandas-resample-interpolate-is-producing-nans - df = ( - df.reindex(df.index.union(idx)) - .interpolate(method="index", limit=fillmissing) - .reindex(idx) - ) + df = df.reindex(df.index.union(idx)).interpolate(method="index", limit=fillmissing).reindex(idx) return df, info diff --git a/neurokit2/ecg/ecg_clean.py b/neurokit2/ecg/ecg_clean.py index c1d2830cf6..7319d3ba5b 100644 --- a/neurokit2/ecg/ecg_clean.py +++ b/neurokit2/ecg/ecg_clean.py @@ -233,23 +233,19 @@ def _ecg_clean_pantompkins(ecg_signal, sampling_rate=1000): # ============================================================================= -# Elgendi et al. (2010) +# Hamilton (2002) # ============================================================================= -def _ecg_clean_elgendi(ecg_signal, sampling_rate=1000): - """From https://github.com/berndporr/py-ecg-detectors/ - - - Elgendi, Mohamed & Jonkman, Mirjam & De Boer, Friso. (2010). Frequency Bands Effects on QRS - Detection. The 3rd International Conference on Bio-inspired Systems and Signal Processing - (BIOSIGNALS2010). 428-431. - +def _ecg_clean_hamilton(ecg_signal, sampling_rate=1000): + """Adapted from https://github.com/PIA- + Group/BioSPPy/blob/e65da30f6379852ecb98f8e2e0c9b4b5175416c3/biosppy/signals/ecg.py#L69. """ - order = 2 + order = 1 clean = signal_filter( signal=ecg_signal, sampling_rate=sampling_rate, lowcut=8, - highcut=20, + highcut=16, method="butterworth_zi", order=order, ) @@ -258,19 +254,23 @@ def _ecg_clean_elgendi(ecg_signal, sampling_rate=1000): # ============================================================================= -# Hamilton (2002) +# Elgendi et al. (2010) # ============================================================================= -def _ecg_clean_hamilton(ecg_signal, sampling_rate=1000): - """Adapted from https://github.com/PIA- - Group/BioSPPy/blob/e65da30f6379852ecb98f8e2e0c9b4b5175416c3/biosppy/signals/ecg.py#L69. +def _ecg_clean_elgendi(ecg_signal, sampling_rate=1000): + """From https://github.com/berndporr/py-ecg-detectors/ + + - Elgendi, Mohamed & Jonkman, Mirjam & De Boer, Friso. (2010). Frequency Bands Effects on QRS + Detection. The 3rd International Conference on Bio-inspired Systems and Signal Processing + (BIOSIGNALS2010). 428-431. + """ - order = 1 + order = 2 clean = signal_filter( signal=ecg_signal, sampling_rate=sampling_rate, lowcut=8, - highcut=16, + highcut=20, method="butterworth_zi", order=order, ) diff --git a/neurokit2/ecg/ecg_findpeaks.py b/neurokit2/ecg/ecg_findpeaks.py index dde1358a4c..4b36ca281c 100644 --- a/neurokit2/ecg/ecg_findpeaks.py +++ b/neurokit2/ecg/ecg_findpeaks.py @@ -4,11 +4,18 @@ import scipy.signal import scipy.stats -from ..signal import (signal_findpeaks, signal_plot, signal_sanitize, - signal_smooth, signal_zerocrossings) +from ..signal import ( + signal_findpeaks, + signal_plot, + signal_sanitize, + signal_smooth, + signal_zerocrossings, +) -def ecg_findpeaks(ecg_cleaned, sampling_rate=1000, method="neurokit", show=False, **kwargs): +def ecg_findpeaks( + ecg_cleaned, sampling_rate=1000, method="neurokit", show=False, **kwargs +): """**Locate R-peaks** Low-level function used by :func:`ecg_peaks` to identify R-peaks in an ECG signal using a @@ -86,9 +93,11 @@ def _ecg_findpeaks_findmethod(method): return _ecg_findpeaks_christov elif method in ["engzee", "engzee2012", "engzeemod", "engzeemod2012"]: return _ecg_findpeaks_engzee + elif method in ["manikandan", "manikandan2012"]: + return _ecg_findpeaks_manikandan elif method in ["elgendi", "elgendi2010"]: return _ecg_findpeaks_elgendi - elif method in ["kalidas2017", "swt", "kalidas", "kalidastamil", "kalidastamil2017"]: + elif method in ["kalidas2017", "swt", "kalidas"]: return _ecg_findpeaks_kalidas elif method in ["martinez2004", "martinez"]: return _ecg_findpeaks_WT @@ -99,7 +108,9 @@ def _ecg_findpeaks_findmethod(method): elif method in ["promac", "all"]: return _ecg_findpeaks_promac else: - raise ValueError(f"NeuroKit error: ecg_findpeaks(): '{method}' not implemented.") + raise ValueError( + f"NeuroKit error: ecg_findpeaks(): '{method}' not implemented." + ) # ============================================================================= @@ -116,6 +127,7 @@ def _ecg_findpeaks_promac( "zong", "engzee", "elgendi", + "manikandan", "kalidas", "martinez", "rodrigues", @@ -149,7 +161,9 @@ def _ecg_findpeaks_promac( """ x = np.zeros(len(signal)) - promac_methods = [method.lower() for method in promac_methods] # remove capitalised letters + promac_methods = [ + method.lower() for method in promac_methods + ] # remove capitalised letters error_list = [] # Stores the failed methods for method in promac_methods: @@ -173,7 +187,9 @@ def _ecg_findpeaks_promac( peaks = signal_findpeaks(x, height_min=threshold)["Peaks"] if show is True: - signal_plot(pd.DataFrame({"ECG": signal, "Convoluted": convoluted}), standardize=True) + signal_plot( + pd.DataFrame({"ECG": signal, "Convoluted": convoluted}), standardize=True + ) [ plt.axvline(x=peak, color="red", linestyle="--") for peak in peaks ] # pylint: disable=W0106 @@ -187,7 +203,9 @@ def _ecg_findpeaks_promac( # _ecg_findpeaks_promac_addmethod + _ecg_findpeaks_promac_convolve # Joining them makes parameters exposition more consistent -def _ecg_findpeaks_promac_addconvolve(signal, sampling_rate, x, fun, gaussian_sd=100, **kwargs): +def _ecg_findpeaks_promac_addconvolve( + signal, sampling_rate, x, fun, gaussian_sd=100, **kwargs +): peaks = fun(signal, sampling_rate=sampling_rate, **kwargs) mask = np.zeros(len(signal)) @@ -195,7 +213,9 @@ def _ecg_findpeaks_promac_addconvolve(signal, sampling_rate, x, fun, gaussian_sd # SD is defined as a typical QRS size, which for adults if 100ms sd = sampling_rate * gaussian_sd / 1000 - shape = scipy.stats.norm.pdf(np.linspace(-sd * 4, sd * 4, num=int(sd * 8)), loc=0, scale=sd) + shape = scipy.stats.norm.pdf( + np.linspace(-sd * 4, sd * 4, num=int(sd * 8)), loc=0, scale=sd + ) x += np.convolve(mask, shape, "same") @@ -252,7 +272,6 @@ def _ecg_findpeaks_neurokit( peaks = [0] for i in range(num_qrs): - beg = beg_qrs[i] end = end_qrs[i] len_qrs = end - beg @@ -307,35 +326,6 @@ def _ecg_findpeaks_pantompkins(signal, sampling_rate=1000, **kwargs): return mwa_peaks -# =========================================================================== -# Nabian et al. (2018) -# =========================================================================== -def _ecg_findpeaks_nabian2018(signal, sampling_rate=1000, **kwargs): - """R peak detection method by Nabian et al. (2018) inspired by the Pan-Tompkins algorithm. - - - Nabian, M., Yin, Y., Wormwood, J., Quigley, K. S., Barrett, L. F., Ostadabbas, S. (2018). - An Open-Source Feature Extraction Tool for the Analysis of Peripheral Physiological Data. - IEEE Journal of Translational Engineering in Health and Medicine, 6, 1-11. - - """ - window_size = int(0.4 * sampling_rate) - - peaks = np.zeros(len(signal)) - - for i in range(1 + window_size, len(signal) - window_size): - ecg_window = signal[i - window_size : i + window_size] - rpeak = np.argmax(ecg_window) - - if i == (i - window_size - 1 + rpeak): - peaks[i] = 1 - - rpeaks = np.where(peaks == 1)[0] - - # min_distance = 200 - - return rpeaks - - # ============================================================================= # Hamilton (2002) # ============================================================================= @@ -370,7 +360,6 @@ def _ecg_findpeaks_hamilton(signal, sampling_rate=1000, **kwargs): peaks = [] for i in range(len(ma)): # pylint: disable=C0200,R1702 - if ( i > 0 and i < len(ma) - 1 and ma[i - 1] < ma[i] and ma[i + 1] < ma[i] ): # pylint: disable=R1716 @@ -420,7 +409,9 @@ def _ecg_findpeaks_hamilton(signal, sampling_rate=1000, **kwargs): # ============================================================================= # Slope Sum Function (SSF) - Zong et al. (2003) # ============================================================================= -def _ecg_findpeaks_ssf(signal, sampling_rate=1000, threshold=20, before=0.03, after=0.01, **kwargs): +def _ecg_findpeaks_ssf( + signal, sampling_rate=1000, threshold=20, before=0.03, after=0.01, **kwargs +): """From https://github.com/PIA- Group/BioSPPy/blob/e65da30f6379852ecb98f8e2e0c9b4b5175416c3/biosppy/signals/ecg.py#L448. @@ -489,7 +480,10 @@ def _ecg_findpeaks_zong(signal, sampling_rate=1000, cutoff=16, window=0.13, **kw for i, j in enumerate(np.arange(w, len(y))): s = y[j - w : j] tmp[i] = np.sum( - np.sqrt(np.power(1 / sampling_rate, 2) * np.ones(w - 1) + np.power(np.diff(s), 2)) + np.sqrt( + np.power(1 / sampling_rate, 2) * np.ones(w - 1) + + np.power(np.diff(s), 2) + ) ) # Pad with the first value clt = np.concatenate([[tmp[0]] * w, tmp]) @@ -544,7 +538,6 @@ def _ecg_findpeaks_christov(signal, sampling_rate=1000, **kwargs): Y = [] for i in range(1, len(MA2) - 1): - diff = abs(MA2[i + 1] - MA2[i - 1]) Y.append(diff) @@ -581,7 +574,6 @@ def _ecg_findpeaks_christov(signal, sampling_rate=1000, **kwargs): QRS = [] for i in range(len(MA3)): # pylint: disable=C0200 - # M if i < 5 * sampling_rate: M = 0.6 * np.max(MA3[: i + 1]) @@ -603,7 +595,6 @@ def _ecg_findpeaks_christov(signal, sampling_rate=1000, **kwargs): M = np.mean(MM) elif QRS and i > QRS[-1] + ms200 and i < QRS[-1] + ms1200: - M = np.mean(MM) * M_slope[i - (QRS[-1] + ms200)] elif QRS and i > QRS[-1] + ms1200: @@ -618,11 +609,9 @@ def _ecg_findpeaks_christov(signal, sampling_rate=1000, **kwargs): # R if QRS and i < QRS[-1] + int((2.0 / 3.0 * Rm)): - R = 0 elif QRS and i > QRS[-1] + int((2.0 / 3.0 * Rm)) and i < QRS[-1] + Rm: - dec = (M - np.mean(MM)) / 1.4 R = 0 + dec @@ -648,6 +637,75 @@ def _ecg_findpeaks_christov(signal, sampling_rate=1000, **kwargs): return QRS +# ============================================================================= +# Continuous Wavelet Transform (CWT) - Martinez et al. (2004) +# ============================================================================= +def _ecg_findpeaks_WT(signal, sampling_rate=1000, **kwargs): + # Try loading pywt + try: + import pywt + except ImportError as import_error: + raise ImportError( + "NeuroKit error: ecg_delineator(): the 'PyWavelets' module is required for" + " this method to run. Please install it first (`pip install PyWavelets`)." + ) from import_error + # first derivative of the Gaissian signal + scales = np.array([1, 2, 4, 8, 16]) + cwtmatr, __ = pywt.cwt(signal, scales, "gaus1", sampling_period=1.0 / sampling_rate) + + # For wt of scale 2^4 + signal_4 = cwtmatr[4, :] + epsilon_4 = np.sqrt(np.mean(np.square(signal_4))) + peaks_4, _ = scipy.signal.find_peaks(np.abs(signal_4), height=epsilon_4) + + # For wt of scale 2^3 + signal_3 = cwtmatr[3, :] + epsilon_3 = np.sqrt(np.mean(np.square(signal_3))) + peaks_3, _ = scipy.signal.find_peaks(np.abs(signal_3), height=epsilon_3) + # Keep only peaks_3 that are nearest to peaks_4 + peaks_3_keep = np.zeros_like(peaks_4) + for i in range(len(peaks_4)): # pylint: disable=C0200 + peaks_distance = abs(peaks_4[i] - peaks_3) + peaks_3_keep[i] = peaks_3[np.argmin(peaks_distance)] + + # For wt of scale 2^2 + signal_2 = cwtmatr[2, :] + epsilon_2 = np.sqrt(np.mean(np.square(signal_2))) + peaks_2, _ = scipy.signal.find_peaks(np.abs(signal_2), height=epsilon_2) + # Keep only peaks_2 that are nearest to peaks_3 + peaks_2_keep = np.zeros_like(peaks_4) + for i in range(len(peaks_4)): + peaks_distance = abs(peaks_3_keep[i] - peaks_2) + peaks_2_keep[i] = peaks_2[np.argmin(peaks_distance)] + + # For wt of scale 2^1 + signal_1 = cwtmatr[1, :] + epsilon_1 = np.sqrt(np.mean(np.square(signal_1))) + peaks_1, _ = scipy.signal.find_peaks(np.abs(signal_1), height=epsilon_1) + # Keep only peaks_1 that are nearest to peaks_2 + peaks_1_keep = np.zeros_like(peaks_4) + for i in range(len(peaks_4)): + peaks_distance = abs(peaks_2_keep[i] - peaks_1) + peaks_1_keep[i] = peaks_1[np.argmin(peaks_distance)] + + # Find R peaks + max_R_peak_dist = int(0.1 * sampling_rate) + rpeaks = [] + for index_cur, index_next in zip(peaks_1_keep[:-1], peaks_1_keep[1:]): + correct_sign = ( + signal_1[index_cur] < 0 and signal_1[index_next] > 0 + ) # pylint: disable=R1716 + near = (index_next - index_cur) < max_R_peak_dist # limit 2 + if near and correct_sign: + rpeaks.append( + signal_zerocrossings(signal_1[index_cur : index_next + 1])[0] + + index_cur + ) + + rpeaks = np.array(rpeaks, dtype="int") + return rpeaks + + # ============================================================================= # Gamboa (2008) # ============================================================================= @@ -673,7 +731,9 @@ def _ecg_findpeaks_gamboa(signal, sampling_rate=1000, tol=0.002, **kwargs): d2 = np.diff(norm_signal, 2) - b = np.nonzero((np.diff(np.sign(np.diff(-d2)))) == -2)[0] + 2 # pylint: disable=E1130 + b = ( + np.nonzero((np.diff(np.sign(np.diff(-d2)))) == -2)[0] + 2 + ) # pylint: disable=E1130 b = np.intersect1d(b, np.nonzero(-d2 > tol)[0]) # pylint: disable=E1130 rpeaks = [] @@ -693,6 +753,50 @@ def _ecg_findpeaks_gamboa(signal, sampling_rate=1000, tol=0.002, **kwargs): return rpeaks +# ============================================================================= +# Elgendi et al. (2010) +# ============================================================================= +def _ecg_findpeaks_elgendi(signal, sampling_rate=1000, **kwargs): + """From https://github.com/berndporr/py-ecg-detectors/ + + - Elgendi, Mohamed & Jonkman, Mirjam & De Boer, Friso. (2010). Frequency Bands Effects on QRS + Detection. The 3rd International Conference on Bio-inspired Systems and Signal Processing + (BIOSIGNALS2010). 428-431. + + """ + + window1 = int(0.12 * sampling_rate) + mwa_qrs = _ecg_findpeaks_MWA(abs(signal), window1) + + window2 = int(0.6 * sampling_rate) + mwa_beat = _ecg_findpeaks_MWA(abs(signal), window2) + + blocks = np.zeros(len(signal)) + block_height = np.max(signal) + + for i in range(len(mwa_qrs)): # pylint: disable=C0200 + blocks[i] = block_height if mwa_qrs[i] > mwa_beat[i] else 0 + QRS = [] + + for i in range(1, len(blocks)): + if blocks[i - 1] == 0 and blocks[i] == block_height: + start = i + + elif blocks[i - 1] == block_height and blocks[i] == 0: + end = i - 1 + + if end - start > int(0.08 * sampling_rate): + detection = np.argmax(signal[start : end + 1]) + start + if QRS: + if detection - QRS[-1] > int(0.3 * sampling_rate): + QRS.append(detection) + else: + QRS.append(detection) + + QRS = np.array(QRS, dtype="int") + return QRS + + # ============================================================================= # Engzee Modified (2012) # ============================================================================= @@ -739,7 +843,6 @@ def _ecg_findpeaks_engzee(signal, sampling_rate=1000, **kwargs): newM5 = False for i in range(len(low_pass)): # pylint: disable=C0200 - # M if i < 5 * sampling_rate: M = 0.6 * np.max(low_pass[: i + 1]) @@ -748,7 +851,6 @@ def _ecg_findpeaks_engzee(signal, sampling_rate=1000, **kwargs): MM.pop(0) elif QRS and i < QRS[-1] + ms200: - newM5 = 0.6 * np.max(low_pass[QRS[-1] : i]) if newM5 > 1.5 * MM[-1]: @@ -761,7 +863,6 @@ def _ecg_findpeaks_engzee(signal, sampling_rate=1000, **kwargs): M = np.mean(MM) elif QRS and i > QRS[-1] + ms200 and i < QRS[-1] + ms1200: - M = np.mean(MM) * M_slope[i - (QRS[-1] + ms200)] elif QRS and i > QRS[-1] + ms1200: @@ -818,6 +919,103 @@ def _ecg_findpeaks_engzee(signal, sampling_rate=1000, **kwargs): return r_peaks +# ============================================================================= +# Shannon energy R-peak detection - Manikandan and Soman (2012) +# ============================================================================= +def _ecg_findpeaks_manikandan(signal, sampling_rate=1000, **kwargs): + """From https://github.com/hongzuL/A-novel-method-for-detecting-R-peaks-in-electrocardiogram-signal/ + + A (hopefully) fixed version of https://github.com/nsunami/Shannon-Energy-R-Peak-Detection + """ + + # Preprocessing ------------------------------------------------------------ + # Forward and backward filtering using filtfilt. + def cheby1_bandpass_filter(data, lowcut, highcut, fs, order=5, rp=1): + nyq = 0.5 * fs + low = lowcut / nyq + high = highcut / nyq + b, a = scipy.signal.cheby1(order, rp=rp, Wn=[low, high], btype="bandpass") + y = scipy.signal.filtfilt(b, a, data) + return y + + # Running mean filter function + def running_mean(x, N): + cumsum = np.cumsum(np.insert(x, 0, 0)) + return (cumsum[N:] - cumsum[:-N]) / float(N) + + # Apply Chebyshev Type I Bandpass filter + # Low cut frequency = 6 Hz + # High cut frequency = 18 + filtered = cheby1_bandpass_filter( + signal, lowcut=6, highcut=18, fs=sampling_rate, order=4 + ) + + # Eq. 1: First-order differencing difference + dn = np.append(filtered[1:], 0) - filtered + # Eq. 2 + dtn = dn / (np.max(abs(dn))) + + # The absolute value, energy value, Shannon entropy value, and Shannon energy value + # # Eq. 3 + # an = np.abs(dtn) + # # Eq. 4 + # en = an**2 + # # Eq. 5 + # sen = -np.abs(dtn) * np.log10(np.abs(dtn)) + # Eq. 6 + sn = -(dtn**2) * np.log10(dtn**2) + + # Apply rectangular window + # Length should be approximately the same as the duration of possible wider QRS complex + # Normal QRS duration is .12 sec, so we overshoot with 0.15 sec + window_len = int(0.15 * sampling_rate) + window_len = window_len - 1 if window_len % 2 == 0 else window_len # Make odd + window = scipy.signal.windows.boxcar(window_len) + + # The filtering operation is performed in both the forward and reverse directions + see = scipy.signal.convolve(sn, window, mode="same") + see = np.flip(see) + see = scipy.signal.convolve(see, window, "same") + see = np.flip(see) + + # Hilbert Transformation + ht = np.imag(scipy.signal.hilbert(see)) + + # Moving Average to remove low frequency drift + # 2.5 sec from Manikanda in 360 Hz (900 samples) + # 2.5 sec in 500 Hz == 1250 samples + ma_len = int(2.5 * sampling_rate) + ma_out = np.insert(running_mean(ht, ma_len), 0, [0] * (ma_len - 1)) + + # Get the difference between the Hilbert signal and the MA filtered signal + zn = ht - ma_out + + # R-Peak Detection --------------------------------------------------------- + # Look for points crossing zero + + # Find points crossing zero upwards (negative to positive) + idx = np.argwhere(np.diff(np.sign(zn)) > 0).flatten().tolist() + # Prepare a container for windows + idx_search = [] + id_maxes = np.empty(0, dtype=int) + search_window_half = int(np.ceil(window_len / 2)) + for i in idx: + lows = np.arange(i - search_window_half, i) + highs = np.arange(i + 1, i + search_window_half + 1) + if highs[-1] > len(signal): + highs = np.delete( + highs, np.arange(np.where(highs == len(signal))[0], len(highs) + 1) + ) + ekg_window = np.concatenate((lows, [i], highs)) + idx_search.append(ekg_window) + ekg_window_wave = signal[ekg_window] + id_maxes = np.append( + id_maxes, + ekg_window[np.where(ekg_window_wave == np.max(ekg_window_wave))[0]], + ) + return np.array(idx) + + # ============================================================================= # Stationary Wavelet Transform (SWT) - Kalidas and Tamil (2017) # ============================================================================= @@ -873,112 +1071,32 @@ def _ecg_findpeaks_kalidas(signal, sampling_rate=1000, **kwargs): return filt_peaks -# ============================================================================= -# Elgendi et al. (2010) -# ============================================================================= -def _ecg_findpeaks_elgendi(signal, sampling_rate=1000, **kwargs): - """From https://github.com/berndporr/py-ecg-detectors/ +# =========================================================================== +# Nabian et al. (2018) +# =========================================================================== +def _ecg_findpeaks_nabian2018(signal, sampling_rate=1000, **kwargs): + """R peak detection method by Nabian et al. (2018) inspired by the Pan-Tompkins algorithm. - - Elgendi, Mohamed & Jonkman, Mirjam & De Boer, Friso. (2010). Frequency Bands Effects on QRS - Detection. The 3rd International Conference on Bio-inspired Systems and Signal Processing - (BIOSIGNALS2010). 428-431. + - Nabian, M., Yin, Y., Wormwood, J., Quigley, K. S., Barrett, L. F., Ostadabbas, S. (2018). + An Open-Source Feature Extraction Tool for the Analysis of Peripheral Physiological Data. + IEEE Journal of Translational Engineering in Health and Medicine, 6, 1-11. """ + window_size = int(0.4 * sampling_rate) - window1 = int(0.12 * sampling_rate) - mwa_qrs = _ecg_findpeaks_MWA(abs(signal), window1) - - window2 = int(0.6 * sampling_rate) - mwa_beat = _ecg_findpeaks_MWA(abs(signal), window2) - - blocks = np.zeros(len(signal)) - block_height = np.max(signal) - - for i in range(len(mwa_qrs)): # pylint: disable=C0200 - blocks[i] = block_height if mwa_qrs[i] > mwa_beat[i] else 0 - QRS = [] - - for i in range(1, len(blocks)): - if blocks[i - 1] == 0 and blocks[i] == block_height: - start = i - - elif blocks[i - 1] == block_height and blocks[i] == 0: - end = i - 1 - - if end - start > int(0.08 * sampling_rate): - detection = np.argmax(signal[start : end + 1]) + start - if QRS: - if detection - QRS[-1] > int(0.3 * sampling_rate): - QRS.append(detection) - else: - QRS.append(detection) - - QRS = np.array(QRS, dtype="int") - return QRS - - -# ============================================================================= -# Continuous Wavelet Transform (CWT) - Martinez et al. (2004) -# ============================================================================= -# -def _ecg_findpeaks_WT(signal, sampling_rate=1000, **kwargs): - # Try loading pywt - try: - import pywt - except ImportError as import_error: - raise ImportError( - "NeuroKit error: ecg_delineator(): the 'PyWavelets' module is required for" - " this method to run. Please install it first (`pip install PyWavelets`)." - ) from import_error - # first derivative of the Gaissian signal - scales = np.array([1, 2, 4, 8, 16]) - cwtmatr, __ = pywt.cwt(signal, scales, "gaus1", sampling_period=1.0 / sampling_rate) - - # For wt of scale 2^4 - signal_4 = cwtmatr[4, :] - epsilon_4 = np.sqrt(np.mean(np.square(signal_4))) - peaks_4, _ = scipy.signal.find_peaks(np.abs(signal_4), height=epsilon_4) + peaks = np.zeros(len(signal)) - # For wt of scale 2^3 - signal_3 = cwtmatr[3, :] - epsilon_3 = np.sqrt(np.mean(np.square(signal_3))) - peaks_3, _ = scipy.signal.find_peaks(np.abs(signal_3), height=epsilon_3) - # Keep only peaks_3 that are nearest to peaks_4 - peaks_3_keep = np.zeros_like(peaks_4) - for i in range(len(peaks_4)): # pylint: disable=C0200 - peaks_distance = abs(peaks_4[i] - peaks_3) - peaks_3_keep[i] = peaks_3[np.argmin(peaks_distance)] + for i in range(1 + window_size, len(signal) - window_size): + ecg_window = signal[i - window_size : i + window_size] + rpeak = np.argmax(ecg_window) - # For wt of scale 2^2 - signal_2 = cwtmatr[2, :] - epsilon_2 = np.sqrt(np.mean(np.square(signal_2))) - peaks_2, _ = scipy.signal.find_peaks(np.abs(signal_2), height=epsilon_2) - # Keep only peaks_2 that are nearest to peaks_3 - peaks_2_keep = np.zeros_like(peaks_4) - for i in range(len(peaks_4)): - peaks_distance = abs(peaks_3_keep[i] - peaks_2) - peaks_2_keep[i] = peaks_2[np.argmin(peaks_distance)] + if i == (i - window_size - 1 + rpeak): + peaks[i] = 1 - # For wt of scale 2^1 - signal_1 = cwtmatr[1, :] - epsilon_1 = np.sqrt(np.mean(np.square(signal_1))) - peaks_1, _ = scipy.signal.find_peaks(np.abs(signal_1), height=epsilon_1) - # Keep only peaks_1 that are nearest to peaks_2 - peaks_1_keep = np.zeros_like(peaks_4) - for i in range(len(peaks_4)): - peaks_distance = abs(peaks_2_keep[i] - peaks_1) - peaks_1_keep[i] = peaks_1[np.argmin(peaks_distance)] + rpeaks = np.where(peaks == 1)[0] - # Find R peaks - max_R_peak_dist = int(0.1 * sampling_rate) - rpeaks = [] - for index_cur, index_next in zip(peaks_1_keep[:-1], peaks_1_keep[1:]): - correct_sign = signal_1[index_cur] < 0 and signal_1[index_next] > 0 # pylint: disable=R1716 - near = (index_next - index_cur) < max_R_peak_dist # limit 2 - if near and correct_sign: - rpeaks.append(signal_zerocrossings(signal_1[index_cur : index_next + 1])[0] + index_cur) + # min_distance = 200 - rpeaks = np.array(rpeaks, dtype="int") return rpeaks @@ -1105,8 +1223,8 @@ def _ecg_findpeaks_vgraph(signal, sampling_rate=1000, lowcut=3, order=2, **kwarg elif N - deltaM <= L < L < N: w[L:] = 0.5 * (_w + w[L:]) else: - w[L: L + deltaM] = 0.5 * (_w[:deltaM] + w[L: L + deltaM]) - w[L + deltaM: R] = _w[deltaM:] + w[L : L + deltaM] = 0.5 * (_w[:deltaM] + w[L : L + deltaM]) + w[L + deltaM : R] = _w[deltaM:] # Update the boundary conditions L = L + (M - deltaM) @@ -1121,6 +1239,7 @@ def _ecg_findpeaks_vgraph(signal, sampling_rate=1000, lowcut=3, order=2, **kwarg rpeaks = np.array(rpeaks, dtype="int") return rpeaks + # ============================================================================= # Utilities # ============================================================================= @@ -1143,14 +1262,18 @@ def _ecg_findpeaks_MWA(signal, window_size, **kwargs): # return causal moving averages, i.e. each output element is the average # of window_size input elements ending at that position, we use the # `origin` argument to shift the filter computation accordingly. - mwa = scipy.ndimage.uniform_filter1d(signal, window_size, origin=(window_size - 1) // 2) + mwa = scipy.ndimage.uniform_filter1d( + signal, window_size, origin=(window_size - 1) // 2 + ) # Compute actual moving averages for the first `window_size - 1` elements, # which the uniform_filter1d function computes using padding. We want # those output elements to be averages of only the input elements until # that position. head_size = min(window_size - 1, len(signal)) - mwa[:head_size] = np.cumsum(signal[:head_size]) / np.linspace(1, head_size, head_size) + mwa[:head_size] = np.cumsum(signal[:head_size]) / np.linspace( + 1, head_size, head_size + ) return mwa @@ -1200,7 +1323,9 @@ def _ecg_findpeaks_peakdetect(detection, sampling_rate=1000, **kwargs): threshold_I2 = 0.5 * threshold_I1 missed_peaks = missed_peaks[detection[missed_peaks] > threshold_I2] if len(missed_peaks) > 0: - signal_peaks[-1] = missed_peaks[np.argmax(detection[missed_peaks])] + signal_peaks[-1] = missed_peaks[ + np.argmax(detection[missed_peaks]) + ] signal_peaks.append(peak) last_peak = peak diff --git a/neurokit2/ecg/ecg_peaks.py b/neurokit2/ecg/ecg_peaks.py index 2357391f84..ebaf59bf0d 100644 --- a/neurokit2/ecg/ecg_peaks.py +++ b/neurokit2/ecg/ecg_peaks.py @@ -34,6 +34,8 @@ def ecg_peaks( * **elgendi2010**: Algorithm by Elgendi et al. (2010). * **engzeemod2012**: Original algorithm by Engelse & Zeelenberg (1979) modified by Lourenço et al. (2012). + * **manikandan2012**: Algorithm by Manikandan & Soman (2012) based on the Shannon energy + envelope (SEE). * **kalidas2017**: Algorithm by Kalidas et al. (2017). * **nabian2018**: Algorithm by Nabian et al. (2018) based on the Pan-Tompkins algorithm. * **rodrigues2021**: Adaptation of the work by Sadhukhan & Mitra (2012) and Gutiérrez-Rivas et @@ -113,19 +115,16 @@ def ecg_peaks( cleaned = nk.ecg_clean(ecg, sampling_rate=250, method="pantompkins1985") _, pantompkins1985 = nk.ecg_peaks(cleaned, sampling_rate=250, method="pantompkins1985") - # nabian2018 - _, nabian2018 = nk.ecg_peaks(ecg, sampling_rate=250, method="nabian2018") - # hamilton2002 cleaned = nk.ecg_clean(ecg, sampling_rate=250, method="hamilton2002") _, hamilton2002 = nk.ecg_peaks(cleaned, sampling_rate=250, method="hamilton2002") - # martinez2004 - _, martinez2004 = nk.ecg_peaks(ecg, sampling_rate=250, method="martinez2004") - # zong2003 _, zong2003 = nk.ecg_peaks(ecg, sampling_rate=250, method="zong2003") + # martinez2004 + _, martinez2004 = nk.ecg_peaks(ecg, sampling_rate=250, method="martinez2004") + # christov2004 _, christov2004 = nk.ecg_peaks(cleaned, sampling_rate=250, method="christov2004") @@ -141,10 +140,16 @@ def ecg_peaks( cleaned = nk.ecg_clean(ecg, sampling_rate=250, method="engzeemod2012") _, engzeemod2012 = nk.ecg_peaks(cleaned, sampling_rate=250, method="engzeemod2012") + # Manikandan (2012) + _, manikandan2012 = nk.ecg_peaks(ecg, sampling_rate=250, method="manikandan2012") + # kalidas2017 cleaned = nk.ecg_clean(ecg, sampling_rate=250, method="kalidas2017") _, kalidas2017 = nk.ecg_peaks(cleaned, sampling_rate=250, method="kalidas2017") + # nabian2018 + _, nabian2018 = nk.ecg_peaks(ecg, sampling_rate=250, method="nabian2018") + # rodrigues2021 _, rodrigues2021 = nk.ecg_peaks(ecg, sampling_rate=250, method="rodrigues2021") @@ -212,6 +217,8 @@ def ecg_peaks( Signal Processing, 428-431. * Engelse, W. A., & Zeelenberg, C. (1979). A single scan algorithm for QRS-detection and feature extraction. Computers in cardiology, 6(1979), 37-42. + * Manikandan, M. S., & Soman, K. P. (2012). A novel method for detecting R-peaks in + electrocardiogram (ECG) signal. Biomedical Signal Processing and Control, 7(2), 118-128. * Lourenço, A., Silva, H., Leite, P., Lourenço, R., & Fred, A. L. (2012, February). Real Time Electrocardiogram Segmentation for Finger based ECG Biometrics. In Biosignals (pp. 49-54). diff --git a/neurokit2/ecg/ecg_plot.py b/neurokit2/ecg/ecg_plot.py index 7e498f0644..b7eaacfa9b 100644 --- a/neurokit2/ecg/ecg_plot.py +++ b/neurokit2/ecg/ecg_plot.py @@ -34,10 +34,10 @@ def ecg_plot(ecg_signals, info=None): .. code-block:: python - # To be run after ecg_plot() - fig = plt.gcf() - fig.set_size_inches(10, 12, forward=True) - fig.savefig("myfig.png") + # To be run after ecg_plot() + fig = plt.gcf() + fig.set_size_inches(10, 12, forward=True) + fig.savefig("myfig.png") Examples -------- diff --git a/neurokit2/eda/eda_intervalrelated.py b/neurokit2/eda/eda_intervalrelated.py index 8897d0fc6e..3e27101761 100644 --- a/neurokit2/eda/eda_intervalrelated.py +++ b/neurokit2/eda/eda_intervalrelated.py @@ -75,9 +75,7 @@ def eda_intervalrelated(data, sampling_rate=1000, **kwargs): # Add label info results[index]["Label"] = data[index]["Label"].iloc[0] - results[index] = _eda_intervalrelated( - data[index], results[index], sampling_rate=sampling_rate, **kwargs - ) + results[index] = _eda_intervalrelated(data[index], results[index], sampling_rate=sampling_rate, **kwargs) results = pd.DataFrame.from_dict(results, orient="index") @@ -89,9 +87,7 @@ def eda_intervalrelated(data, sampling_rate=1000, **kwargs): # ============================================================================= -def _eda_intervalrelated( - data, output={}, sampling_rate=1000, method_sympathetic="posada", **kwargs -): +def _eda_intervalrelated(data, output={}, sampling_rate=1000, method_sympathetic="posada", **kwargs): """Format input for dictionary.""" # Sanitize input colnames = data.columns.values @@ -114,7 +110,12 @@ def _eda_intervalrelated( ) output["SCR_Peaks_Amplitude_Mean"] = np.nan else: - output["SCR_Peaks_Amplitude_Mean"] = np.nanmean(data["SCR_Amplitude"].values) + peaks_idx = data["SCR_Peaks"] == 1 + # Mean amplitude is only computed over peaks. If no peaks, return NaN + if peaks_idx.sum() > 0: + output["SCR_Peaks_Amplitude_Mean"] = np.nanmean(data[peaks_idx]["SCR_Amplitude"].values) + else: + output["SCR_Peaks_Amplitude_Mean"] = np.nan # Get variability of tonic if "EDA_Tonic" in colnames: @@ -126,14 +127,18 @@ def _eda_intervalrelated( if "EDA_Clean" in colnames: output.update( eda_sympathetic( - data["EDA_Clean"], sampling_rate=sampling_rate, method=method_sympathetic + data["EDA_Clean"], + sampling_rate=sampling_rate, + method=method_sympathetic, ) ) elif "EDA_Raw" in colnames: # If not clean signal, use raw output.update( eda_sympathetic( - data["EDA_Raw"], sampling_rate=sampling_rate, method=method_sympathetic + data["EDA_Raw"], + sampling_rate=sampling_rate, + method=method_sympathetic, ) ) @@ -141,13 +146,9 @@ def _eda_intervalrelated( output.update({"EDA_Autocorrelation": np.nan}) # Default values if len(data) > sampling_rate * 30: # 30 seconds minimum (NOTE: somewhat arbitrary) if "EDA_Clean" in colnames: - output["EDA_Autocorrelation"] = eda_autocor( - data["EDA_Clean"], sampling_rate=sampling_rate, **kwargs - ) + output["EDA_Autocorrelation"] = eda_autocor(data["EDA_Clean"], sampling_rate=sampling_rate, **kwargs) elif "EDA_Raw" in colnames: # If not clean signal, use raw - output["EDA_Autocorrelation"] = eda_autocor( - data["EDA_Raw"], sampling_rate=sampling_rate, **kwargs - ) + output["EDA_Autocorrelation"] = eda_autocor(data["EDA_Raw"], sampling_rate=sampling_rate, **kwargs) return output diff --git a/neurokit2/eda/eda_plot.py b/neurokit2/eda/eda_plot.py index d61274e435..5ba0cb642b 100644 --- a/neurokit2/eda/eda_plot.py +++ b/neurokit2/eda/eda_plot.py @@ -62,6 +62,10 @@ def eda_plot(eda_signals, info=None, static=True): onsets = np.where(eda_signals["SCR_Onsets"] == 1)[0] half_recovery = np.where(eda_signals["SCR_Recovery"] == 1)[0] + # clean peaks that do not have onsets + if len(peaks) > len(onsets): + peaks = peaks[1:] + # Determine unit of x-axis. x_label = "Time (seconds)" x_axis = np.linspace(0, len(eda_signals) / info["sampling_rate"], len(eda_signals)) diff --git a/neurokit2/hrv/intervals_process.py b/neurokit2/hrv/intervals_process.py index c5131e953d..3db138dbd2 100644 --- a/neurokit2/hrv/intervals_process.py +++ b/neurokit2/hrv/intervals_process.py @@ -66,39 +66,42 @@ def intervals_process( # Download data data = nk.data("bio_resting_5min_100hz") + sampling_rate = 100 # Clean signal and find peaks ecg_cleaned = nk.ecg_clean(data["ECG"], sampling_rate=100) - peaks, info = nk.ecg_peaks(ecg_cleaned, sampling_rate=100, correct_artifacts=True) + _, info = nk.ecg_peaks(ecg_cleaned, sampling_rate=100, correct_artifacts=True) + peaks = info["ECG_R_Peaks"] # Convert peaks to intervals rri = np.diff(peaks) / sampling_rate * 1000 rri_time = np.array(peaks[1:]) / sampling_rate - # Compute HRV indices - @savefig p_intervals_process1.png scale=100% - plt.figure() - plt.plot(intervals_time, intervals, label="Original intervals") - intervals, intervals_time = intervals_process(rri, - intervals_time=rri_time, - interpolate=True, - interpolation_rate=100, - detrend="tarvainen2002") - plt.plot(intervals_time, intervals, label="Processed intervals") - plt.xlabel("Time (seconds)") - plt.ylabel("Interbeat intervals (milliseconds)") - @suppress - plt.close() + # # Compute HRV indices + # @savefig p_intervals_process1.png scale=100% + # plt.figure() + # plt.plot(intervals_time, intervals, label="Original intervals") + # intervals, intervals_time = nk.intervals_process(rri, + # intervals_time=rri_time, + # interpolate=True, + # interpolation_rate=100, + # detrend="tarvainen2002") + # plt.plot(intervals_time, intervals, label="Processed intervals") + # plt.xlabel("Time (seconds)") + # plt.ylabel("Interbeat intervals (milliseconds)") + # @suppress + # plt.close() """ # Sanitize input - intervals, intervals_time, _ = _intervals_sanitize(intervals, intervals_time=intervals_time) + intervals, intervals_time, _ = _intervals_sanitize( + intervals, intervals_time=intervals_time + ) if interpolate is False: interpolation_rate = None if interpolation_rate is not None: - # Rate should be at least 1 Hz (due to Nyquist & frequencies we are interested in) # We considered an interpolation rate 4 Hz by default to match Kubios # but in case of some applications with high heart rates we decided to make it 100 Hz @@ -127,5 +130,7 @@ def intervals_process( interpolation_rate = _intervals_time_to_sampling_rate(intervals_time) if detrend is not None: - intervals = signal_detrend(intervals, method=detrend, sampling_rate=interpolation_rate) + intervals = signal_detrend( + intervals, method=detrend, sampling_rate=interpolation_rate + ) return intervals, intervals_time, interpolation_rate diff --git a/neurokit2/signal/signal_autocor.py b/neurokit2/signal/signal_autocor.py index d2f8d72fb6..4b3dc813fa 100644 --- a/neurokit2/signal/signal_autocor.py +++ b/neurokit2/signal/signal_autocor.py @@ -4,7 +4,9 @@ from matplotlib import pyplot as plt -def signal_autocor(signal, lag=None, demean=True, method="auto", show=False): +def signal_autocor( + signal, lag=None, demean=True, method="auto", unbiased=False, show=False +): """**Autocorrelation (ACF)** Compute the autocorrelation of a signal. @@ -57,6 +59,14 @@ def signal_autocor(signal, lag=None, demean=True, method="auto", show=False): @suppress plt.close() + .. ipython:: python + + # Example 3: Using 'unbiased' Method + @savefig p_signal_autocor3.png scale=100% + r, info = nk.signal_autocor(signal, lag=2, method='fft', unbiased=True, show=True) + @suppress + plt.close() + """ n = len(signal) @@ -66,8 +76,14 @@ def signal_autocor(signal, lag=None, demean=True, method="auto", show=False): # Run autocor method = method.lower() + if method == "unbiased": + unbiased = True + method = "fft" + if method in ["auto"]: - acov = scipy.signal.correlate(signal, signal, mode="full", method="auto")[n - 1 :] + acov = scipy.signal.correlate(signal, signal, mode="full", method="auto")[ + n - 1 : + ] elif method in ["cor", "correlation", "correlate"]: acov = np.correlate(signal, signal, mode="full") acov = acov[n - 1 :] # Min time lag is 0 @@ -76,16 +92,15 @@ def signal_autocor(signal, lag=None, demean=True, method="auto", show=False): fft = np.fft.fft(a) acf = np.fft.ifft(np.conjugate(fft) * fft)[:n] acov = acf.real - elif method == "unbiased": - dnorm = np.r_[np.arange(1, n + 1), np.arange(n - 1, 0, -1)] - fft = np.fft.fft(signal, n=n) - acf = np.fft.ifft(np.conjugate(fft) * fft)[:n] - acf /= dnorm[n - 1 :] - acov = acf.real else: raise ValueError("Method must be 'auto', 'correlation' or 'fft'.") - # Normalize + # If unbiased, normalize by the number of overlapping elements + if unbiased is True: + normalization = np.arange(n, 0, -1) + acov /= normalization + + # Normalize (so that max correlation is 1) r = acov / np.max(acov) # Confidence interval diff --git a/tests/tests_complexity.py b/tests/tests_complexity.py index 43a583a105..da77352ddd 100644 --- a/tests/tests_complexity.py +++ b/tests/tests_complexity.py @@ -4,9 +4,9 @@ import nolds import numpy as np import pandas as pd -from pyentrp import entropy as pyentrp import sklearn.neighbors from packaging import version +from pyentrp import entropy as pyentrp # import EntropyHub import neurokit2 as nk @@ -21,13 +21,14 @@ # Some sanity checks # ============================================================================= def test_complexity_sanity(): - signal = np.cos(np.linspace(start=0, stop=30, num=1000)) mdfa_q = [-5, -3, -1, 1, 3, 5] # Entropy assert np.allclose( - nk.entropy_fuzzy(signal)[0], nk.entropy_sample(signal, fuzzy=True)[0], atol=0.000001 + nk.entropy_fuzzy(signal)[0], + nk.entropy_sample(signal, fuzzy=True)[0], + atol=0.000001, ) # Fractal @@ -39,9 +40,13 @@ def test_complexity_sanity(): # TODO: why this gives 70 or 71 depending on the machine???? # assert parameters["Fluctuations"].shape == (70, len(mdfa_q)) - assert np.allclose(nk.fractal_correlation(signal)[0], 0.7382138350901658, atol=0.000001) assert np.allclose( - nk.fractal_correlation(signal, radius="nolds")[0], nolds.corr_dim(signal, 2), atol=0.01 + nk.fractal_correlation(signal)[0], 0.7382138350901658, atol=0.000001 + ) + assert np.allclose( + nk.fractal_correlation(signal, radius="nolds")[0], + nolds.corr_dim(signal, 2), + atol=0.01, ) @@ -79,7 +84,6 @@ def test_complexity_sanity(): def test_complexity_vs_R(): - signal = pd.read_csv( "https://raw.githubusercontent.com/neuropsychology/NeuroKit/master/data/bio_eventrelated_100hz.csv" )["RSP"].values @@ -97,7 +101,9 @@ def test_complexity_vs_R(): sampen = nk.entropy_sample(signal[0:300], dimension=2, tolerance=r)[0] assert np.allclose( sampen, - nk.entropy_sample(signal[0:300], dimension=2, tolerance=r, distance="infinity")[0], + nk.entropy_sample(signal[0:300], dimension=2, tolerance=r, distance="infinity")[ + 0 + ], atol=0.001, ) assert np.allclose(sampen, 0.03784376, atol=0.001) @@ -111,7 +117,6 @@ def test_complexity_vs_R(): def test_complexity_vs_Python(): - signal = np.cos(np.linspace(start=0, stop=30, num=100)) tolerance = 0.2 * np.std(signal, ddof=1) @@ -133,9 +138,9 @@ def test_complexity_vs_Python(): entropy_app_entropy(signal, 2), ) - assert nk.entropy_approximate(signal, dimension=2, tolerance=tolerance)[0] != pyeeg_ap_entropy( - signal, 2, tolerance - ) + assert nk.entropy_approximate(signal, dimension=2, tolerance=tolerance)[ + 0 + ] != pyeeg_ap_entropy(signal, 2, tolerance) # Sample assert np.allclose( @@ -167,7 +172,9 @@ def test_complexity_vs_Python(): != pyentrp.sample_entropy(signal, 2, 0.2)[1] ) assert ( - nk.entropy_sample(signal, dimension=2, tolerance=0.2 * np.sqrt(np.var(signal)))[0] + nk.entropy_sample(signal, dimension=2, tolerance=0.2 * np.sqrt(np.var(signal)))[ + 0 + ] != MultiscaleEntropy_sample_entropy(signal, 2, 0.2)[0.2][2] ) @@ -254,11 +261,16 @@ def pyeeg_embed_seq(time_series, tau, embedding_dimension): else: typed_time_series = time_series - shape = (typed_time_series.size - tau * (embedding_dimension - 1), embedding_dimension) + shape = ( + typed_time_series.size - tau * (embedding_dimension - 1), + embedding_dimension, + ) strides = (typed_time_series.itemsize, tau * typed_time_series.itemsize) - return np.lib.stride_tricks.as_strided(typed_time_series, shape=shape, strides=strides) + return np.lib.stride_tricks.as_strided( + typed_time_series, shape=shape, strides=strides + ) def pyeeg_bin_power(X, Band, Fs): @@ -269,7 +281,11 @@ def pyeeg_bin_power(X, Band, Fs): Freq = float(Band[Freq_Index]) Next_Freq = float(Band[Freq_Index + 1]) Power[Freq_Index] = sum( - C[int(np.floor(Freq / Fs * len(X))) : int(np.floor(Next_Freq / Fs * len(X)))] + C[ + int(np.floor(Freq / Fs * len(X))) : int( + np.floor(Next_Freq / Fs * len(X)) + ) + ] ) Power_Ratio = Power / sum(Power) return Power, Power_Ratio @@ -341,16 +357,6 @@ def entropy_embed(x, order=3, delay=1): def entropy_app_samp_entropy(x, order, metric="chebyshev", approximate=True): - sklearn_version = version.parse(sklearn.__version__) - if sklearn_version >= version.parse("1.3.0"): - _all_metrics = sklearn.neighbors.KDTree.valid_metrics() - else: - _all_metrics = sklearn.neighbors.KDTree.valid_metrics - if metric not in _all_metrics: - raise ValueError( - "The given metric (%s) is not valid. The valid " # pylint: disable=consider-using-f-string - "metric names are: %s" % (metric, _all_metrics) - ) phi = np.zeros(2) r = 0.2 * np.std(x, axis=-1, ddof=1) @@ -516,7 +522,11 @@ def MultiscaleEntropy_check_type(x, num_type, name): tmp = [x] elif not isinstance(x, Iterable): raise ValueError( - name + " should be a " + num_type.__name__ + " or an iterator of " + num_type.__name__ + name + + " should be a " + + num_type.__name__ + + " or an iterator of " + + num_type.__name__ ) else: tmp = [] @@ -635,7 +645,12 @@ def MultiscaleEntropy_sample_entropy( def MultiscaleEntropy_mse( - x, scale_factor=list(range(1, 21)), m=[2], r=[0.15], return_type="dict", safe_mode=False + x, + scale_factor=list(range(1, 21)), + m=[2], + r=[0.15], + return_type="dict", + safe_mode=False, ): """[Multiscale Entropy]