diff --git a/docs/functions/complexity.rst b/docs/functions/complexity.rst index 755811a922..6bb2f9588b 100644 --- a/docs/functions/complexity.rst +++ b/docs/functions/complexity.rst @@ -103,6 +103,18 @@ Entropy """"""""""""""""""" .. autofunction:: neurokit2.complexity.entropy_renyi +*entropy_approximate()* +"""""""""""""""""""""""" +.. autofunction:: neurokit2.complexity.entropy_approximate + +*entropy_sample()* +"""""""""""""""""""" +.. autofunction:: neurokit2.complexity.entropy_sample + +*entropy_quadratic()* +"""""""""""""""""""" +.. autofunction:: neurokit2.complexity.entropy_quadratic + *entropy_cumulativeresidual()* """"""""""""""""""""""""""""""""" .. autofunction:: neurokit2.complexity.entropy_cumulativeresidual @@ -155,14 +167,6 @@ Entropy """"""""""""""""""""""" .. autofunction:: neurokit2.complexity.entropy_ofentropy -*entropy_approximate()* -"""""""""""""""""""""""" -.. autofunction:: neurokit2.complexity.entropy_approximate - -*entropy_sample()* -"""""""""""""""""""" -.. autofunction:: neurokit2.complexity.entropy_sample - *entropy_permutation()* """""""""""""""""""""""" .. autofunction:: neurokit2.complexity.entropy_permutation @@ -274,4 +278,4 @@ Joint/Multivariate .. .. automodule:: neurokit2.complexity .. :members: -.. :exclude-members: complexity, complexity_delay, complexity_dimension, complexity_tolerance, complexity_k, fractal_katz, fractal_petrosian, fractal_sevcik, fractal_nld, fractal_psdslope, fractal_higuchi, fractal_correlation, entropy_shannon, entropy_maximum, entropy_differential, entropy_tsallis, entropy_renyi, entropy_cumulativeresidual, entropy_svd, entropy_spectral, entropy_phase, entropy_grid, entropy_attention, entropy_increment, entropy_slope, entropy_symbolicdynamic, entropy_dispersion, entropy_ofentropy, entropy_approximate, entropy_sample, entropy_permutation, entropy_bubble, entropy_range, entropy_fuzzy, entropy_multiscale, entropy_hierarchical, fisher_information, complexity_hjorth, complexity_lempelziv, complexity_relativeroughness, fractal_hurst, complexity_lyapunov, complexity_rqa, fractal_mandelbrot, complexity_simulate, complexity_attractor, complexity_symbolize, complexity_coarsegraining, complexity_ordinalpatterns, recurrence_matrix, entropy_shannon_joint, entropy_rate \ No newline at end of file +.. :exclude-members: complexity, complexity_delay, complexity_dimension, complexity_tolerance, complexity_k, fractal_katz, fractal_petrosian, fractal_sevcik, fractal_nld, fractal_psdslope, fractal_higuchi, fractal_correlation, entropy_shannon, entropy_maximum, entropy_differential, entropy_tsallis, entropy_renyi, entropy_cumulativeresidual, entropy_svd, entropy_spectral, entropy_phase, entropy_grid, entropy_attention, entropy_increment, entropy_slope, entropy_symbolicdynamic, entropy_dispersion, entropy_ofentropy, entropy_approximate, entropy_sample, entropy_permutation, entropy_bubble, entropy_range, entropy_fuzzy, entropy_multiscale, entropy_hierarchical, fisher_information, complexity_hjorth, complexity_lempelziv, complexity_relativeroughness, fractal_hurst, complexity_lyapunov, complexity_rqa, fractal_mandelbrot, complexity_simulate, complexity_attractor, complexity_symbolize, complexity_coarsegraining, complexity_ordinalpatterns, recurrence_matrix, entropy_shannon_joint, entropy_rate, entropy_quadratic \ No newline at end of file diff --git a/neurokit2/__init__.py b/neurokit2/__init__.py index f2ccc19594..6e7a7e8790 100644 --- a/neurokit2/__init__.py +++ b/neurokit2/__init__.py @@ -32,7 +32,7 @@ from .video import * # Info -__version__ = "0.2.6" +__version__ = "0.2.7" # Maintainer info diff --git a/neurokit2/complexity/__init__.py b/neurokit2/complexity/__init__.py index 88e1e555aa..c0383bf52f 100644 --- a/neurokit2/complexity/__init__.py +++ b/neurokit2/complexity/__init__.py @@ -30,6 +30,7 @@ from .entropy_permutation import entropy_permutation from .entropy_phase import entropy_phase from .entropy_power import entropy_power +from .entropy_quadratic import entropy_quadratic from .entropy_range import entropy_range from .entropy_rate import entropy_rate from .entropy_renyi import entropy_renyi @@ -96,7 +97,9 @@ complexity_rcmse = functools.partial(entropy_multiscale, method="RCMSEn") complexity_fuzzymse = functools.partial(entropy_multiscale, fuzzy=True) complexity_fuzzycmse = functools.partial(entropy_multiscale, method="CMSEn", fuzzy=True) -complexity_fuzzyrcmse = functools.partial(entropy_multiscale, method="RCMSEn", fuzzy=True) +complexity_fuzzyrcmse = functools.partial( + entropy_multiscale, method="RCMSEn", fuzzy=True +) complexity_dfa = fractal_dfa @@ -175,6 +178,7 @@ "entropy_symbolicdynamic", "entropy_cumulativeresidual", "entropy_approximate", + "entropy_quadratic", "entropy_bubble", "entropy_coalition", "entropy_sample", diff --git a/neurokit2/complexity/complexity_decorrelation.py b/neurokit2/complexity/complexity_decorrelation.py index c5cf637b4e..5e7f29b2d6 100644 --- a/neurokit2/complexity/complexity_decorrelation.py +++ b/neurokit2/complexity/complexity_decorrelation.py @@ -1,10 +1,11 @@ +import matplotlib.pyplot as plt import numpy as np import pandas as pd from ..signal import signal_autocor -def complexity_decorrelation(signal): +def complexity_decorrelation(signal, show=False): """**Decorrelation Time (DT)** The decorrelation time (DT) is defined as the time (in samples) of the first zero crossing of @@ -17,6 +18,8 @@ def complexity_decorrelation(signal): ---------- signal : Union[list, np.array, pd.Series] The signal (i.e., a time series) in the form of a vector of values. + show : bool + If True, will return a plot of the autocorrelation. Returns ------- @@ -36,11 +39,15 @@ def complexity_decorrelation(signal): import neurokit2 as nk - # Simulate a signal with duration os 2s - signal = nk.signal_simulate(duration=2, frequency=[5, 9, 12]) + # Simulate a signal + signal = nk.signal_simulate(duration=5, sampling_rate=100, frequency=[5, 6], noise=0.5) # Compute DT - dt, _ = nk.complexity_decorrelation(signal) + @savefig p_complexity_decorrelation1.png scale=100% + dt, _ = nk.complexity_decorrelation(signal, show=True) + @suppress + plt.close() + dt References @@ -60,7 +67,7 @@ def complexity_decorrelation(signal): ) # Unbiased autocor (see https://github.com/mne-tools/mne-features/) - autocor, _ = signal_autocor(signal, method="unbiased") + autocor, _ = signal_autocor(signal, unbiased=True) # Get zero-crossings zc = np.diff(np.sign(autocor)) != 0 @@ -68,4 +75,20 @@ def complexity_decorrelation(signal): dt = np.argmax(zc) + 1 else: dt = -1 + + if show is True: + # Max length of autocorrelation to plot + max_len = int(dt * 4) + if max_len > len(autocor): + max_len = len(autocor) + + plt.plot(autocor[0:max_len]) + plt.xlabel("Lag") + plt.ylabel("Autocorrelation") + plt.xticks(np.arange(0, max_len, step=dt).astype(int)) + plt.axvline(dt, color="red", linestyle="--", label=f"DT = {dt}") + plt.axhline(0, color="black", linestyle="--") + plt.title("Decorrelation Time (DT)") + plt.legend() + return dt, {} diff --git a/neurokit2/complexity/complexity_lyapunov.py b/neurokit2/complexity/complexity_lyapunov.py index 2acd5bd123..8fa6a06d0f 100644 --- a/neurokit2/complexity/complexity_lyapunov.py +++ b/neurokit2/complexity/complexity_lyapunov.py @@ -5,8 +5,9 @@ import numpy as np import pandas as pd import sklearn.metrics.pairwise +import sklearn.neighbors -from ..misc import NeuroKitWarning +from ..misc import NeuroKitWarning, find_knee from ..signal.signal_psd import signal_psd from .utils_complexity_embedding import complexity_embedding @@ -16,9 +17,7 @@ def complexity_lyapunov( delay=1, dimension=2, method="rosenstein1993", - len_trajectory=20, - matrix_dim=4, - min_neighbors="default", + separation="auto", **kwargs, ): """**(Largest) Lyapunov Exponent (LLE)** @@ -26,7 +25,7 @@ def complexity_lyapunov( Lyapunov exponents (LE) describe the rate of exponential separation (convergence or divergence) of nearby trajectories of a dynamical system. It is a measure of sensitive dependence on initial conditions, i.e. how quickly two nearby states diverge. A system can have multiple LEs, - equal to thenumber of the dimensionality of the phase space, and the largest LE value, "LLE" is + equal to the number of the dimensionality of the phase space, and the largest LE value, "LLE" is often used to determine the overall predictability of the dynamical system. Different algorithms exist to estimate these indices: @@ -37,13 +36,17 @@ def complexity_lyapunov( neighbouring points are then tracked along their distance trajectories for a number of data points. The slope of the line using a least-squares fit of the mean log trajectory of the distances gives the final LLE. - * **Eckmann et al. (1996)** computes LEs by first reconstructing the time series using a + * **Makowski** is a custom modification of Rosenstein's algorithm, using KDTree for more + efficient nearest neighbors computation. Additionally, the LLE is computed as the slope up to + the changepoint of divergence rate (the point where it flattens out), making it more robust + to the length trajectory parameter. + * **Eckmann et al. (1986)** computes LEs by first reconstructing the time series using a delay-embedding method, and obtains the tangent that maps to the reconstructed dynamics using a least-squares fit, where the LEs are deduced from the tangent maps. .. warning:: - The **Eckman (1996)** method currently does not work. Please help us fixing it by double + The **Eckman (1986)** method currently does not work. Please help us fixing it by double checking the code, the paper and helping us figuring out what's wrong. Overall, we would like to improve this function to return for instance all the exponents (Lyapunov spectrum), implement newer and faster methods (e.g., Balcerzak, 2018, 2020), etc. If you're interested @@ -59,17 +62,17 @@ def complexity_lyapunov( dimension : int Embedding Dimension (*m*, sometimes referred to as *d* or *order*). See :func:`complexity_dimension` to estimate the optimal value for this parameter. If method - is ``"eckmann1996"``, larger values for dimension are recommended. + is ``"eckmann1986"``, larger values for dimension are recommended. method : str - The method that defines the algorithm for computing LE. Can be one of ``"rosenstein1993"`` - or ``"eckmann1996"``. + The method that defines the algorithm for computing LE. Can be one of ``"rosenstein1993"``, + ``"makowski"``, or ``"eckmann1986"``. len_trajectory : int Applies when method is ``"rosenstein1993"``. The number of data points in which neighboring trajectories are followed. matrix_dim : int - Applies when method is ``"eckmann1996"``. Corresponds to the number of LEs to return. + Applies when method is ``"eckmann1986"``. Corresponds to the number of LEs to return. min_neighbors : int, str - Applies when method is ``"eckmann1996"``. Minimum number of neighbors. If ``"default"``, + Applies when method is ``"eckmann1986"``. Minimum number of neighbors. If ``"default"``, ``min(2 * matrix_dim, matrix_dim + 4)`` is used. **kwargs : optional Other arguments to be passed to ``signal_psd()`` for calculating the minimum temporal @@ -79,7 +82,7 @@ def complexity_lyapunov( -------- lle : float An estimate of the largest Lyapunov exponent (LLE) if method is ``"rosenstein1993"``, and - an array of LEs if ``"eckmann1996"``. + an array of LEs if ``"eckmann1986"``. info : dict A dictionary containing additional information regarding the parameters used to compute LLE. @@ -90,13 +93,24 @@ def complexity_lyapunov( import neurokit2 as nk - signal = nk.signal_simulate(duration=3, sampling_rate=100, frequency=[5, 8], noise=0.5) + signal = nk.signal_simulate(duration=5, sampling_rate=100, frequency=[5, 8], noise=0.1) + + # Rosenstein's method + @savefig p_complexity_lyapunov1.png scale=100% + lle, info = nk.complexity_lyapunov(signal, method="rosenstein", show=True) + @suppress + plt.close() - lle, info = nk.complexity_lyapunov(signal, method="rosenstein1993", show=True) lle + # Makowski's change-point method + @savefig p_complexity_lyapunov2.png scale=100% + lle, info = nk.complexity_lyapunov(signal, method="makowski", show=True) + @suppress + plt.close() + # Eckman's method is broken. Please help us fix-it! - # lle, info = nk.complexity_lyapunov(signal, dimension=2, method="eckmann1996") + # lle, info = nk.complexity_lyapunov(signal, dimension=2, method="eckmann1986") References ---------- @@ -120,18 +134,22 @@ def complexity_lyapunov( # "We impose the additional constraint that nearest neighbors have a temporal separation # greater than the mean period of the time series: This allows us to consider each pair of - # neighbors as nearby initial conditions for different trajectories."" + # neighbors as nearby initial conditions for different trajectories." # "We estimated the mean period as the reciprocal of the mean frequency of the power spectrum, # although we expect any comparable estimate, e.g., using the median frequency of the magnitude # spectrum, to yield equivalent results." + if separation == "auto": + # Actual sampling rate does not matter + psd = signal_psd( + signal, sampling_rate=1000, method="fft", normalize=False, show=False + ) + mean_freq = np.sum(psd["Power"] * psd["Frequency"]) / np.sum(psd["Power"]) - # Actual sampling rate does not matter - psd = signal_psd(signal, sampling_rate=1000, method="fft", normalize=False, show=False) - mean_freq = np.sum(psd["Power"] * psd["Frequency"]) / np.sum(psd["Power"]) - - # 1 / mean_freq = seconds per cycle - separation = int(np.ceil(1 / mean_freq * 1000)) + # 1 / mean_freq = seconds per cycle + separation = int(np.ceil(1 / mean_freq * 1000)) + else: + assert isinstance(separation, int), "'separation' should be an integer." # Run algorithm # ---------------- @@ -139,15 +157,22 @@ def complexity_lyapunov( method = method.lower() if method in ["rosenstein", "rosenstein1993"]: le, parameters = _complexity_lyapunov_rosenstein( - signal, delay, dimension, separation, len_trajectory, **kwargs + signal, delay, dimension, separation, **kwargs + ) + elif method in ["makowski"]: + le, parameters = _complexity_lyapunov_makowski( + signal, delay, dimension, separation, **kwargs ) - elif method in ["eckmann", "eckmann1996"]: + elif method in ["eckmann", "eckmann1986", "eckmann1986"]: le, parameters = _complexity_lyapunov_eckmann( signal, dimension=dimension, separation=separation, - matrix_dim=matrix_dim, - min_neighbors=min_neighbors, + ) + else: + raise ValueError( + "NeuroKit error: complexity_lyapunov(): 'method' should be one of " + " 'rosenstein1993', 'makowski', 'eckmann1986'." ) # Store params @@ -167,10 +192,83 @@ def complexity_lyapunov( # ============================================================================= +def _complexity_lyapunov_makowski( + signal, + delay=1, + dimension=2, + separation=1, + max_length="auto", + show=False, +): + # Store parameters + info = { + "Dimension": dimension, + "Delay": delay, + } + + # Embedding + embedded = complexity_embedding(signal, delay=delay, dimension=dimension) + n = len(embedded) + + # Set the maxiimum trajectory length to 10 times the delay + if max_length == "auto": + max_length = int(delay * 10) + if max_length >= n / 2: + max_length = n // 2 + + # Create KDTree and query for nearest neighbors + tree = sklearn.neighbors.KDTree(embedded, metric="euclidean") + + # Query for nearest neighbors. To ensure we get a neighbor outside of the `separation`, + # k=1 is the point itself, k=2 is the nearest neighbor, and k=3 is the second nearest neighbor. + idx = tree.query(embedded, k=2 + separation, return_distance=False) + + # The neighbor outside the `separation` region will be the last one in the returned list. + idx = idx[:, -1] + + # Compute the average divergence for each trajectory length + trajectories = np.zeros(max_length) + for k in range(1, max_length + 1): + valid = np.where((np.arange(n - k) + k < n) & (idx[: n - k] + k < n))[0] + + if valid.size == 0: + trajectories[k - 1] = -np.inf + continue + + divergences = np.linalg.norm( + embedded[valid + k] - embedded[idx[valid] + k], + axis=1, + ) + divergences = divergences[divergences > 0] + if len(divergences) == 0: + trajectories[k - 1] = np.nan + else: + trajectories[k - 1] = np.mean(np.log(divergences)) + + # Change point + x_axis = range(1, len(trajectories) + 1) + knee = find_knee(y=trajectories, x=x_axis, show=False, verbose=False) + info["Divergence_Rate"] = trajectories + info["Changepoint"] = knee + + # Linear fit + slope, intercept = np.polyfit(x_axis[0:knee], trajectories[0:knee], 1) + if show is True: + plt.plot(np.arange(1, len(trajectories) + 1), trajectories) + plt.axvline(knee, color="red", label="Changepoint", linestyle="--") + plt.axline( + (0, intercept), slope=slope, color="orange", label="Least-squares Fit" + ) + plt.ylim(bottom=np.min(trajectories)) + plt.ylabel("Divergence Rate") + plt.title(f"Largest Lyapunov Exponent (slope of the line) = {slope:.3f}") + plt.legend() + return slope, info + + def _complexity_lyapunov_rosenstein( signal, delay=1, dimension=2, separation=1, len_trajectory=20, show=False, **kwargs ): - # 1. Check that sufficient data points are available # Minimum length required to find single orbit vector min_len = (dimension - 1) * delay + 1 @@ -200,7 +298,9 @@ def _complexity_lyapunov_rosenstein( # Find indices of nearest neighbours ntraj = m - len_trajectory + 1 - min_dist_indices = np.argmin(dists[:ntraj, :ntraj], axis=1) # exclude last few indices + min_dist_indices = np.argmin( + dists[:ntraj, :ntraj], axis=1 + ) # exclude last few indices min_dist_indices = min_dist_indices.astype(int) # Follow trajectories of neighbour pairs for len_trajectory data points @@ -217,16 +317,25 @@ def _complexity_lyapunov_rosenstein( divergence_rate = trajectories[np.isfinite(trajectories)] # LLE obtained by least-squares fit to average line - slope, intercept = np.polyfit(np.arange(1, len(divergence_rate) + 1), divergence_rate, 1) + slope, intercept = np.polyfit( + np.arange(1, len(divergence_rate) + 1), divergence_rate, 1 + ) + + # Store info + parameters = { + "Trajectory_Length": len_trajectory, + "Divergence_Rate": divergence_rate, + } if show is True: plt.plot(np.arange(1, len(divergence_rate) + 1), divergence_rate) - plt.axline((0, intercept), slope=slope, color="orange", label="Least-squares Fit") + plt.axline( + (0, intercept), slope=slope, color="orange", label="Least-squares Fit" + ) plt.ylabel("Divergence Rate") + plt.title(f"Largest Lyapunov Exponent (slope of the line) = {slope:.3f}") plt.legend() - parameters = {"Trajectory_Length": len_trajectory} - return slope, parameters @@ -279,7 +388,9 @@ def _complexity_lyapunov_eckmann( # get neighbors within the radius r = distances[i][neighbour_furthest] - neighbors = np.where(distances[i] <= r)[0] # should have length = min_neighbours + neighbors = np.where(distances[i] <= r)[ + 0 + ] # should have length = min_neighbours # Find matrix T_i (matrix_dim * matrix_dim) that sends points from neighbourhood of x(i) to x(i+1) vec_beta = signal[neighbors + matrix_dim * m] - signal[i + matrix_dim * m] @@ -289,7 +400,9 @@ def _complexity_lyapunov_eckmann( # form matrix T_i t_i = np.zeros((matrix_dim, matrix_dim)) t_i[:-1, 1:] = np.identity(matrix_dim - 1) - t_i[-1] = np.linalg.lstsq(matrix, vec_beta, rcond=-1)[0] # least squares solution + t_i[-1] = np.linalg.lstsq(matrix, vec_beta, rcond=-1)[ + 0 + ] # least squares solution # QR-decomposition of T * old_Q mat_Q, mat_R = np.linalg.qr(np.dot(t_i, old_Q)) diff --git a/neurokit2/complexity/entropy_approximate.py b/neurokit2/complexity/entropy_approximate.py index cf6af72848..900b73373f 100644 --- a/neurokit2/complexity/entropy_approximate.py +++ b/neurokit2/complexity/entropy_approximate.py @@ -3,10 +3,12 @@ import pandas as pd from .optim_complexity_tolerance import _entropy_apen, complexity_tolerance -from .utils import _get_count +from .utils_entropy import _get_count -def entropy_approximate(signal, delay=1, dimension=2, tolerance="sd", corrected=False, **kwargs): +def entropy_approximate( + signal, delay=1, dimension=2, tolerance="sd", corrected=False, **kwargs +): """**Approximate entropy (ApEn) and its corrected version (cApEn)** Approximate entropy is a technique used to quantify the amount of regularity and the @@ -95,7 +97,7 @@ def entropy_approximate(signal, delay=1, dimension=2, tolerance="sd", corrected= # Compute index if corrected is False: - # ApEn is implemented in 'optim_complexity_tolerance()' to avoid circular imports + # ApEn is implemented in 'utils_entropy.py' to avoid circular imports # as one of the method for optimizing tolerance relies on ApEn out, _ = _entropy_apen(signal, delay, dimension, info["Tolerance"], **kwargs) else: @@ -110,7 +112,6 @@ def entropy_approximate(signal, delay=1, dimension=2, tolerance="sd", corrected= def _entropy_capen(signal, delay, dimension, tolerance, **kwargs): - __, count1, _ = _get_count( signal, delay=delay, diff --git a/neurokit2/complexity/entropy_multiscale.py b/neurokit2/complexity/entropy_multiscale.py index d469691670..419258181a 100644 --- a/neurokit2/complexity/entropy_multiscale.py +++ b/neurokit2/complexity/entropy_multiscale.py @@ -13,8 +13,8 @@ from .entropy_slope import entropy_slope from .entropy_symbolicdynamic import entropy_symbolicdynamic from .optim_complexity_tolerance import complexity_tolerance -from .utils import _phi, _phi_divide from .utils_complexity_coarsegraining import _get_scales, complexity_coarsegraining +from .utils_entropy import _phi, _phi_divide def entropy_multiscale( @@ -249,10 +249,12 @@ def entropy_multiscale( if "delay" in kwargs: kwargs.pop("delay") - # Parameters selection + # Default parameters algorithm = entropy_sample refined = False coarsegraining = "nonoverlapping" + + # Parameters adjustement for variants if method in ["MSEn", "SampEn"]: pass # The default arguments are good elif method in ["MSApEn", "ApEn", "MSPEn", "PEn", "MSWPEn", "WPEn"]: @@ -326,13 +328,9 @@ def entropy_multiscale( info["Value"] = np.array( [ _entropy_multiscale( - coarse=complexity_coarsegraining( - signal, - scale=scale, - method=coarsegraining, - show=False, - **kwargs, - ), + signal, + scale=scale, + coarsegraining=coarsegraining, algorithm=algorithm, dimension=dimension, tolerance=info["Tolerance"], @@ -378,13 +376,32 @@ def _entropy_multiscale_plot(mse, info): # ============================================================================= # Methods # ============================================================================= -def _entropy_multiscale(coarse, algorithm, dimension, tolerance, refined=False, **kwargs): +def _entropy_multiscale( + signal, + scale, + coarsegraining, + algorithm, + dimension, + tolerance, + refined=False, + **kwargs, +): """Wrapper function that works both on 1D and 2D coarse-grained (for composite)""" + + # Get coarse-grained signal + coarse = complexity_coarsegraining(signal, scale=scale, method=coarsegraining) + # For 1D coarse-graining if coarse.ndim == 1: + # Get delay + delay = 1 # If non-overlapping + if coarsegraining in ["rolling", "interpolate"]: + delay = scale + + # Compute entropy return algorithm( coarse, - delay=1, + delay=delay, dimension=dimension, tolerance=tolerance, **kwargs, diff --git a/neurokit2/complexity/entropy_quadratic.py b/neurokit2/complexity/entropy_quadratic.py new file mode 100644 index 0000000000..de859bcb85 --- /dev/null +++ b/neurokit2/complexity/entropy_quadratic.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- +import numpy as np + +from .entropy_sample import entropy_sample + + +def entropy_quadratic(signal, delay=1, dimension=2, tolerance="sd", **kwargs): + """**Quadratic Sample Entropy (QSE)** + + Compute the quadratic sample entropy (QSE) of a signal. It is essentially a correction of + SampEn introduced by Lake (2005) defined as: + + .. math:: + + QSE = SampEn + ln(2 * tolerannce) + + QSE has been described as a more stable measure of entropy than SampEn (Gylling, 2017). + + Parameters + ---------- + signal : Union[list, np.array, pd.Series] + The signal (i.e., a time series) in the form of a vector of values. + delay : int + Time delay (often denoted *Tau* :math:`\\tau`, sometimes referred to as *lag*) in samples. + See :func:`complexity_delay` to estimate the optimal value for this parameter. + dimension : int + Embedding Dimension (*m*, sometimes referred to as *d* or *order*). See + :func:`complexity_dimension` to estimate the optimal value for this parameter. + tolerance : float + Tolerance (often denoted as *r*), distance to consider two data points as similar. If + ``"sd"`` (default), will be set to :math:`0.2 * SD_{signal}`. See + :func:`complexity_tolerance` to estimate the optimal value for this parameter. + **kwargs : optional + Other arguments. + + See Also + -------- + entropy_sample + + Returns + ---------- + qse : float + The uadratic sample entropy of the single time series. + info : dict + A dictionary containing additional information regarding the parameters used + to compute sample entropy. + + Examples + ---------- + .. ipython:: python + + import neurokit2 as nk + + signal = nk.signal_simulate(duration=2, frequency=5) + + qsa, parameters = nk.entropy_quadratic(signal, delay=1, dimension=2) + qsa + + References + ---------- + * Huselius Gylling, K. (2017). Quadratic sample entropy as a measure of burstiness: A study in + how well Rényi entropy rate and quadratic sample entropy can capture the presence of spikes in + time-series data. + * Lake, D. E. (2005). Renyi entropy measures of heart rate Gaussianity. IEEE Transactions on + Biomedical Engineering, 53(1), 21-27. + + """ + sampen, info = entropy_sample( + signal, + delay=delay, + dimension=dimension, + tolerance=tolerance, + **kwargs, + ) + return sampen + np.log(2 * info["Tolerance"]), info diff --git a/neurokit2/complexity/entropy_sample.py b/neurokit2/complexity/entropy_sample.py index 3aacca7052..debbf84367 100644 --- a/neurokit2/complexity/entropy_sample.py +++ b/neurokit2/complexity/entropy_sample.py @@ -3,7 +3,7 @@ import pandas as pd from .optim_complexity_tolerance import complexity_tolerance -from .utils import _phi, _phi_divide +from .utils_entropy import _phi, _phi_divide def entropy_sample(signal, delay=1, dimension=2, tolerance="sd", **kwargs): @@ -35,7 +35,7 @@ def entropy_sample(signal, delay=1, dimension=2, tolerance="sd", **kwargs): See Also -------- - entropy_shannon, entropy_approximate, entropy_fuzzy + entropy_shannon, entropy_approximate, entropy_fuzzy, entropy_quadratic Returns ---------- @@ -81,13 +81,13 @@ def entropy_sample(signal, delay=1, dimension=2, tolerance="sd", **kwargs): } # Compute phi - phi = _phi( + info["phi"], _ = _phi( signal, delay=delay, dimension=dimension, tolerance=info["Tolerance"], approximate=False, **kwargs - )[0] + ) - return _phi_divide(phi), info + return _phi_divide(info["phi"]), info diff --git a/neurokit2/complexity/optim_complexity_tolerance.py b/neurokit2/complexity/optim_complexity_tolerance.py index aa10d4607d..af8bb06504 100644 --- a/neurokit2/complexity/optim_complexity_tolerance.py +++ b/neurokit2/complexity/optim_complexity_tolerance.py @@ -6,8 +6,8 @@ import sklearn.neighbors from ..stats import density -from .utils import _phi from .utils_complexity_embedding import complexity_embedding +from .utils_entropy import _entropy_apen def complexity_tolerance( @@ -224,7 +224,11 @@ def complexity_tolerance( ): if dimension is None: raise ValueError("'dimension' cannot be empty for the 'nolds' method.") - r = 0.11604738531196232 * np.std(signal, ddof=1) * (0.5627 * np.log(dimension) + 1.3334) + r = ( + 0.11604738531196232 + * np.std(signal, ddof=1) + * (0.5627 * np.log(dimension) + 1.3334) + ) info = {"Method": "Adjusted 20% SD"} elif method in ["chon", "chon2009"] and ( @@ -264,7 +268,9 @@ def complexity_tolerance( raise ValueError("'dimension' cannot be empty for the 'makowski' method.") n = len(signal) r = np.std(signal, ddof=1) * ( - 0.2811 * (dimension - 1) + 0.0049 * np.log(n) - 0.02 * ((dimension - 1) * np.log(n)) + 0.2811 * (dimension - 1) + + 0.0049 * np.log(n) + - 0.02 * ((dimension - 1) * np.log(n)) ) info = {"Method": "Makowski"} @@ -292,7 +298,9 @@ def complexity_tolerance( info.update({"Method": "bin"}) else: - raise ValueError("NeuroKit error: complexity_tolerance(): 'method' not recognized.") + raise ValueError( + "NeuroKit error: complexity_tolerance(): 'method' not recognized." + ) if show is True: _optimize_tolerance_plot(r, info, method=method, signal=signal) @@ -305,10 +313,11 @@ def complexity_tolerance( def _optimize_tolerance_recurrence(signal, r_range=None, delay=None, dimension=None): - # Optimize missing parameters if delay is None or dimension is None: - raise ValueError("If method='recurrence', both delay and dimension must be specified.") + raise ValueError( + "If method='recurrence', both delay and dimension must be specified." + ) # Compute distance matrix emb = complexity_embedding(signal, delay=delay, dimension=dimension) @@ -332,10 +341,11 @@ def _optimize_tolerance_recurrence(signal, r_range=None, delay=None, dimension=N def _optimize_tolerance_maxapen(signal, r_range=None, delay=None, dimension=None): - # Optimize missing parameters if delay is None or dimension is None: - raise ValueError("If method='maxApEn', both delay and dimension must be specified.") + raise ValueError( + "If method='maxApEn', both delay and dimension must be specified." + ) if r_range is None: r_range = 40 @@ -358,20 +368,6 @@ def _optimize_tolerance_maxapen(signal, r_range=None, delay=None, dimension=None return r_range[np.argmax(apens)], {"Values": r_range, "Scores": np.array(apens)} -def _entropy_apen(signal, delay, dimension, tolerance, **kwargs): - - phi, info = _phi( - signal, - delay=delay, - dimension=dimension, - tolerance=tolerance, - approximate=True, - **kwargs, - ) - - return np.abs(np.subtract(phi[0], phi[1])), info - - def _optimize_tolerance_neighbours(signal, r_range=None, delay=None, dimension=None): if delay is None: delay = 1 @@ -400,7 +396,6 @@ def _optimize_tolerance_neighbours(signal, r_range=None, delay=None, dimension=N def _optimize_tolerance_bin(signal, delay=None, dimension=None): - # Optimize missing parameters if delay is None or dimension is None: raise ValueError("If method='bin', both delay and dimension must be specified.") @@ -424,7 +419,6 @@ def _optimize_tolerance_bin(signal, delay=None, dimension=None): # Plotting # ============================================================================= def _optimize_tolerance_plot(r, info, ax=None, method="maxApEn", signal=None): - if ax is None: fig, ax = plt.subplots() else: diff --git a/neurokit2/complexity/utils_complexity_coarsegraining.py b/neurokit2/complexity/utils_complexity_coarsegraining.py index 83362bc15c..8e585dbfbe 100644 --- a/neurokit2/complexity/utils_complexity_coarsegraining.py +++ b/neurokit2/complexity/utils_complexity_coarsegraining.py @@ -1,12 +1,14 @@ # -*- coding: utf-8 -*- import matplotlib.pyplot as plt import numpy as np -import scipy.ndimage.filters from ..signal import signal_interpolate +from .utils_complexity_embedding import complexity_embedding -def complexity_coarsegraining(signal, scale=2, method="nonoverlapping", show=False, **kwargs): +def complexity_coarsegraining( + signal, scale=2, method="nonoverlapping", show=False, **kwargs +): """**Coarse-graining of a signal** The goal of coarse-graining is to represent the signal at a different "scale". The @@ -181,31 +183,37 @@ def complexity_coarsegraining(signal, scale=2, method="nonoverlapping", show=Fal ) elif method == "rolling": - # Relying on scipy is a fast alternative to: - # pd.Series(signal).rolling(window=scale).mean().values[scale-1::] - # https://stackoverflow.com/questions/13728392/moving-average-or-running-mean - coarse = scipy.ndimage.filters.uniform_filter1d(signal, size=scale, mode="nearest") - coarse = coarse[scale - 1 : :] + # See https://github.com/neuropsychology/NeuroKit/pull/892 + coarse = complexity_embedding(signal, dimension=scale, delay=1).mean(axis=1) elif method == "timeshift": - coarse = np.transpose(np.reshape(signal[: scale * (n // scale)], (n // scale, scale))) + coarse = np.transpose( + np.reshape(signal[: scale * (n // scale)], (n // scale, scale)) + ) else: raise ValueError("Unknown `method`: {}".format(method)) if show is True: - _complexity_show(signal[0:n], coarse, method=method) + _complexity_coarsegraining_show(signal[0:n], coarse, method=method) return coarse # ============================================================================= # Utils # ============================================================================= -def _complexity_show(signal, coarse, method="nonoverlapping"): +def _complexity_coarsegraining_show(signal, coarse, method="nonoverlapping"): plt.plot(signal, linewidth=1.5) if method == "nonoverlapping": - plt.plot(np.linspace(0, len(signal), len(coarse)), coarse, color="red", linewidth=0.75) - plt.scatter(np.linspace(0, len(signal), len(coarse)), coarse, color="red", linewidth=0.5) + plt.plot( + np.linspace(0, len(signal), len(coarse)), + coarse, + color="red", + linewidth=0.75, + ) + plt.scatter( + np.linspace(0, len(signal), len(coarse)), coarse, color="red", linewidth=0.5 + ) elif method == "timeshift": for i in range(len(coarse)): plt.plot( @@ -215,7 +223,9 @@ def _complexity_show(signal, coarse, method="nonoverlapping"): linewidth=0.75, ) else: - plt.plot(np.linspace(0, len(signal), len(coarse)), coarse, color="red", linewidth=1) + plt.plot( + np.linspace(0, len(signal), len(coarse)), coarse, color="red", linewidth=1 + ) plt.title(f'Coarse-graining using method "{method}"') diff --git a/neurokit2/complexity/utils.py b/neurokit2/complexity/utils_entropy.py similarity index 85% rename from neurokit2/complexity/utils.py rename to neurokit2/complexity/utils_entropy.py index f3801bfb09..dee79b3b2b 100644 --- a/neurokit2/complexity/utils.py +++ b/neurokit2/complexity/utils_entropy.py @@ -6,6 +6,23 @@ from .utils_complexity_embedding import complexity_embedding + +# ============================================================================= +# ApEn +# ============================================================================= +def _entropy_apen(signal, delay, dimension, tolerance, **kwargs): + phi, info = _phi( + signal, + delay=delay, + dimension=dimension, + tolerance=tolerance, + approximate=True, + **kwargs, + ) + + return np.abs(np.subtract(phi[0], phi[1])), info + + # ============================================================================= # Phi # ============================================================================= @@ -111,15 +128,14 @@ def _get_count( # ------------------- # Sanity checks sklearn_version = version.parse(sklearn.__version__) - if sklearn_version >= version.parse("1.3.0"): + if sklearn_version == version.parse("1.3.0"): valid_metrics = sklearn.neighbors.KDTree.valid_metrics() + ["range"] else: valid_metrics = sklearn.neighbors.KDTree.valid_metrics + ["range"] if distance not in valid_metrics: raise ValueError( - "The given metric (%s) is not valid." - "The valid metric names are: %s" - % (distance, valid_metrics) + f"The given metric ({distance}) is not valid." + f" Valid metric names are: {valid_metrics}" ) if fuzzy is True: @@ -152,7 +168,10 @@ def distrange(x, y): # Count for each row count = np.array( - [np.sum(distrange(embedded, embedded[i]) < tolerance) for i in range(len(embedded))] + [ + np.sum(distrange(embedded, embedded[i]) < tolerance) + for i in range(len(embedded)) + ] ) else: # chebyshev and other sklearn methods @@ -160,5 +179,7 @@ def distrange(x, y): # has a `workers` argument to use multiple cores? Benchmark or opinion required! if kdtree is None: kdtree = sklearn.neighbors.KDTree(embedded, metric=distance) - count = kdtree.query_radius(embedded, tolerance, count_only=True).astype(np.float64) + count = kdtree.query_radius(embedded, tolerance, count_only=True).astype( + np.float64 + ) return embedded, count, kdtree diff --git a/neurokit2/ecg/ecg_plot.py b/neurokit2/ecg/ecg_plot.py index 7e498f0644..b7eaacfa9b 100644 --- a/neurokit2/ecg/ecg_plot.py +++ b/neurokit2/ecg/ecg_plot.py @@ -34,10 +34,10 @@ def ecg_plot(ecg_signals, info=None): .. code-block:: python - # To be run after ecg_plot() - fig = plt.gcf() - fig.set_size_inches(10, 12, forward=True) - fig.savefig("myfig.png") + # To be run after ecg_plot() + fig = plt.gcf() + fig.set_size_inches(10, 12, forward=True) + fig.savefig("myfig.png") Examples -------- diff --git a/neurokit2/eda/eda_plot.py b/neurokit2/eda/eda_plot.py index d61274e435..5ba0cb642b 100644 --- a/neurokit2/eda/eda_plot.py +++ b/neurokit2/eda/eda_plot.py @@ -62,6 +62,10 @@ def eda_plot(eda_signals, info=None, static=True): onsets = np.where(eda_signals["SCR_Onsets"] == 1)[0] half_recovery = np.where(eda_signals["SCR_Recovery"] == 1)[0] + # clean peaks that do not have onsets + if len(peaks) > len(onsets): + peaks = peaks[1:] + # Determine unit of x-axis. x_label = "Time (seconds)" x_axis = np.linspace(0, len(eda_signals) / info["sampling_rate"], len(eda_signals)) diff --git a/neurokit2/signal/signal_autocor.py b/neurokit2/signal/signal_autocor.py index d2f8d72fb6..4b3dc813fa 100644 --- a/neurokit2/signal/signal_autocor.py +++ b/neurokit2/signal/signal_autocor.py @@ -4,7 +4,9 @@ from matplotlib import pyplot as plt -def signal_autocor(signal, lag=None, demean=True, method="auto", show=False): +def signal_autocor( + signal, lag=None, demean=True, method="auto", unbiased=False, show=False +): """**Autocorrelation (ACF)** Compute the autocorrelation of a signal. @@ -57,6 +59,14 @@ def signal_autocor(signal, lag=None, demean=True, method="auto", show=False): @suppress plt.close() + .. ipython:: python + + # Example 3: Using 'unbiased' Method + @savefig p_signal_autocor3.png scale=100% + r, info = nk.signal_autocor(signal, lag=2, method='fft', unbiased=True, show=True) + @suppress + plt.close() + """ n = len(signal) @@ -66,8 +76,14 @@ def signal_autocor(signal, lag=None, demean=True, method="auto", show=False): # Run autocor method = method.lower() + if method == "unbiased": + unbiased = True + method = "fft" + if method in ["auto"]: - acov = scipy.signal.correlate(signal, signal, mode="full", method="auto")[n - 1 :] + acov = scipy.signal.correlate(signal, signal, mode="full", method="auto")[ + n - 1 : + ] elif method in ["cor", "correlation", "correlate"]: acov = np.correlate(signal, signal, mode="full") acov = acov[n - 1 :] # Min time lag is 0 @@ -76,16 +92,15 @@ def signal_autocor(signal, lag=None, demean=True, method="auto", show=False): fft = np.fft.fft(a) acf = np.fft.ifft(np.conjugate(fft) * fft)[:n] acov = acf.real - elif method == "unbiased": - dnorm = np.r_[np.arange(1, n + 1), np.arange(n - 1, 0, -1)] - fft = np.fft.fft(signal, n=n) - acf = np.fft.ifft(np.conjugate(fft) * fft)[:n] - acf /= dnorm[n - 1 :] - acov = acf.real else: raise ValueError("Method must be 'auto', 'correlation' or 'fft'.") - # Normalize + # If unbiased, normalize by the number of overlapping elements + if unbiased is True: + normalization = np.arange(n, 0, -1) + acov /= normalization + + # Normalize (so that max correlation is 1) r = acov / np.max(acov) # Confidence interval diff --git a/tests/tests_complexity.py b/tests/tests_complexity.py index 43a583a105..da77352ddd 100644 --- a/tests/tests_complexity.py +++ b/tests/tests_complexity.py @@ -4,9 +4,9 @@ import nolds import numpy as np import pandas as pd -from pyentrp import entropy as pyentrp import sklearn.neighbors from packaging import version +from pyentrp import entropy as pyentrp # import EntropyHub import neurokit2 as nk @@ -21,13 +21,14 @@ # Some sanity checks # ============================================================================= def test_complexity_sanity(): - signal = np.cos(np.linspace(start=0, stop=30, num=1000)) mdfa_q = [-5, -3, -1, 1, 3, 5] # Entropy assert np.allclose( - nk.entropy_fuzzy(signal)[0], nk.entropy_sample(signal, fuzzy=True)[0], atol=0.000001 + nk.entropy_fuzzy(signal)[0], + nk.entropy_sample(signal, fuzzy=True)[0], + atol=0.000001, ) # Fractal @@ -39,9 +40,13 @@ def test_complexity_sanity(): # TODO: why this gives 70 or 71 depending on the machine???? # assert parameters["Fluctuations"].shape == (70, len(mdfa_q)) - assert np.allclose(nk.fractal_correlation(signal)[0], 0.7382138350901658, atol=0.000001) assert np.allclose( - nk.fractal_correlation(signal, radius="nolds")[0], nolds.corr_dim(signal, 2), atol=0.01 + nk.fractal_correlation(signal)[0], 0.7382138350901658, atol=0.000001 + ) + assert np.allclose( + nk.fractal_correlation(signal, radius="nolds")[0], + nolds.corr_dim(signal, 2), + atol=0.01, ) @@ -79,7 +84,6 @@ def test_complexity_sanity(): def test_complexity_vs_R(): - signal = pd.read_csv( "https://raw.githubusercontent.com/neuropsychology/NeuroKit/master/data/bio_eventrelated_100hz.csv" )["RSP"].values @@ -97,7 +101,9 @@ def test_complexity_vs_R(): sampen = nk.entropy_sample(signal[0:300], dimension=2, tolerance=r)[0] assert np.allclose( sampen, - nk.entropy_sample(signal[0:300], dimension=2, tolerance=r, distance="infinity")[0], + nk.entropy_sample(signal[0:300], dimension=2, tolerance=r, distance="infinity")[ + 0 + ], atol=0.001, ) assert np.allclose(sampen, 0.03784376, atol=0.001) @@ -111,7 +117,6 @@ def test_complexity_vs_R(): def test_complexity_vs_Python(): - signal = np.cos(np.linspace(start=0, stop=30, num=100)) tolerance = 0.2 * np.std(signal, ddof=1) @@ -133,9 +138,9 @@ def test_complexity_vs_Python(): entropy_app_entropy(signal, 2), ) - assert nk.entropy_approximate(signal, dimension=2, tolerance=tolerance)[0] != pyeeg_ap_entropy( - signal, 2, tolerance - ) + assert nk.entropy_approximate(signal, dimension=2, tolerance=tolerance)[ + 0 + ] != pyeeg_ap_entropy(signal, 2, tolerance) # Sample assert np.allclose( @@ -167,7 +172,9 @@ def test_complexity_vs_Python(): != pyentrp.sample_entropy(signal, 2, 0.2)[1] ) assert ( - nk.entropy_sample(signal, dimension=2, tolerance=0.2 * np.sqrt(np.var(signal)))[0] + nk.entropy_sample(signal, dimension=2, tolerance=0.2 * np.sqrt(np.var(signal)))[ + 0 + ] != MultiscaleEntropy_sample_entropy(signal, 2, 0.2)[0.2][2] ) @@ -254,11 +261,16 @@ def pyeeg_embed_seq(time_series, tau, embedding_dimension): else: typed_time_series = time_series - shape = (typed_time_series.size - tau * (embedding_dimension - 1), embedding_dimension) + shape = ( + typed_time_series.size - tau * (embedding_dimension - 1), + embedding_dimension, + ) strides = (typed_time_series.itemsize, tau * typed_time_series.itemsize) - return np.lib.stride_tricks.as_strided(typed_time_series, shape=shape, strides=strides) + return np.lib.stride_tricks.as_strided( + typed_time_series, shape=shape, strides=strides + ) def pyeeg_bin_power(X, Band, Fs): @@ -269,7 +281,11 @@ def pyeeg_bin_power(X, Band, Fs): Freq = float(Band[Freq_Index]) Next_Freq = float(Band[Freq_Index + 1]) Power[Freq_Index] = sum( - C[int(np.floor(Freq / Fs * len(X))) : int(np.floor(Next_Freq / Fs * len(X)))] + C[ + int(np.floor(Freq / Fs * len(X))) : int( + np.floor(Next_Freq / Fs * len(X)) + ) + ] ) Power_Ratio = Power / sum(Power) return Power, Power_Ratio @@ -341,16 +357,6 @@ def entropy_embed(x, order=3, delay=1): def entropy_app_samp_entropy(x, order, metric="chebyshev", approximate=True): - sklearn_version = version.parse(sklearn.__version__) - if sklearn_version >= version.parse("1.3.0"): - _all_metrics = sklearn.neighbors.KDTree.valid_metrics() - else: - _all_metrics = sklearn.neighbors.KDTree.valid_metrics - if metric not in _all_metrics: - raise ValueError( - "The given metric (%s) is not valid. The valid " # pylint: disable=consider-using-f-string - "metric names are: %s" % (metric, _all_metrics) - ) phi = np.zeros(2) r = 0.2 * np.std(x, axis=-1, ddof=1) @@ -516,7 +522,11 @@ def MultiscaleEntropy_check_type(x, num_type, name): tmp = [x] elif not isinstance(x, Iterable): raise ValueError( - name + " should be a " + num_type.__name__ + " or an iterator of " + num_type.__name__ + name + + " should be a " + + num_type.__name__ + + " or an iterator of " + + num_type.__name__ ) else: tmp = [] @@ -635,7 +645,12 @@ def MultiscaleEntropy_sample_entropy( def MultiscaleEntropy_mse( - x, scale_factor=list(range(1, 21)), m=[2], r=[0.15], return_type="dict", safe_mode=False + x, + scale_factor=list(range(1, 21)), + m=[2], + r=[0.15], + return_type="dict", + safe_mode=False, ): """[Multiscale Entropy]