Skip to content

Commit

Permalink
Merge pull request #973 from danibene/fix/ecg_findpeaks_refactor
Browse files Browse the repository at this point in the history
[Fix] refactor ecg_findpeaks
  • Loading branch information
danibene authored Mar 24, 2024
2 parents 24939c7 + 6e2b150 commit 8e27aef
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 42 deletions.
158 changes: 123 additions & 35 deletions neurokit2/ecg/ecg_findpeaks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,19 @@
from bisect import insort
from collections import deque

from ..signal import signal_findpeaks, signal_plot, signal_sanitize, signal_smooth, signal_zerocrossings
from ..signal import (
signal_findpeaks,
signal_plot,
signal_sanitize,
signal_smooth,
signal_zerocrossings,
)
from ..misc import NeuroKitWarning

def ecg_findpeaks(ecg_cleaned, sampling_rate=1000, method="neurokit", show=False, **kwargs):

def ecg_findpeaks(
ecg_cleaned, sampling_rate=1000, method="neurokit", show=False, **kwargs
):
"""**Locate R-peaks**
Low-level function used by :func:`ecg_peaks` to identify R-peaks in an ECG signal using a
Expand Down Expand Up @@ -110,7 +119,9 @@ def _ecg_findpeaks_findmethod(method):
elif method in ["promac", "all"]:
return _ecg_findpeaks_promac
else:
raise ValueError(f"NeuroKit error: ecg_findpeaks(): '{method}' not implemented.")
raise ValueError(
f"NeuroKit error: ecg_findpeaks(): '{method}' not implemented."
)


# =============================================================================
Expand Down Expand Up @@ -161,13 +172,17 @@ def _ecg_findpeaks_promac(
"""
x = np.zeros(len(signal))
promac_methods = [method.lower() for method in promac_methods] # remove capitalised letters
promac_methods = [
method.lower() for method in promac_methods
] # remove capitalised letters
error_list = [] # Stores the failed methods

for method in promac_methods:
try:
func = _ecg_findpeaks_findmethod(method)
x = _ecg_findpeaks_promac_addconvolve(signal, sampling_rate, x, func, gaussian_sd=gaussian_sd, **kwargs)
x = _ecg_findpeaks_promac_addconvolve(
signal, sampling_rate, x, func, gaussian_sd=gaussian_sd, **kwargs
)
except ValueError:
error_list.append(f"Method '{method}' is not valid.")
except Exception as error:
Expand All @@ -183,8 +198,12 @@ def _ecg_findpeaks_promac(
peaks = signal_findpeaks(x, height_min=threshold)["Peaks"]

if show is True:
signal_plot(pd.DataFrame({"ECG": signal, "Convoluted": convoluted}), standardize=True)
[plt.axvline(x=peak, color="red", linestyle="--") for peak in peaks] # pylint: disable=W0106
signal_plot(
pd.DataFrame({"ECG": signal, "Convoluted": convoluted}), standardize=True
)
[
plt.axvline(x=peak, color="red", linestyle="--") for peak in peaks
] # pylint: disable=W0106

# I am not sure if mandatory print the best option
if error_list: # empty?
Expand All @@ -195,15 +214,19 @@ def _ecg_findpeaks_promac(

# _ecg_findpeaks_promac_addmethod + _ecg_findpeaks_promac_convolve
# Joining them makes parameters exposition more consistent
def _ecg_findpeaks_promac_addconvolve(signal, sampling_rate, x, fun, gaussian_sd=100, **kwargs):
def _ecg_findpeaks_promac_addconvolve(
signal, sampling_rate, x, fun, gaussian_sd=100, **kwargs
):
peaks = fun(signal, sampling_rate=sampling_rate, **kwargs)

mask = np.zeros(len(signal))
mask[peaks] = 1

# SD is defined as a typical QRS size, which for adults if 100ms
sd = sampling_rate * gaussian_sd / 1000
shape = scipy.stats.norm.pdf(np.linspace(-sd * 4, sd * 4, num=int(sd * 8)), loc=0, scale=sd)
shape = scipy.stats.norm.pdf(
np.linspace(-sd * 4, sd * 4, num=int(sd * 8)), loc=0, scale=sd
)

x += np.convolve(mask, shape, "same")

Expand Down Expand Up @@ -323,7 +346,7 @@ def _ecg_findpeaks_hamilton(signal, sampling_rate=1000, **kwargs):
- Hamilton, Open Source ECG Analysis Software Documentation, E.P.Limited, 2002.
"""
diff = abs(np.diff(signal))
diff = np.abs(np.diff(signal))

b = np.ones(int(0.08 * sampling_rate))
b = b / int(0.08 * sampling_rate)
Expand Down Expand Up @@ -359,10 +382,17 @@ def _ecg_findpeaks_hamilton(signal, sampling_rate=1000, **kwargs):

if RR_ave != 0.0 and QRS[-1] - QRS[-2] > 1.5 * RR_ave:
missed_peaks = peaks[idx[-2] + 1 : idx[-1]]
missed_peak_added = False
for missed_peak in missed_peaks:
if missed_peak - peaks[idx[-2]] > int(0.36 * sampling_rate) and ma[missed_peak] > 0.5 * th:
if (
missed_peak - peaks[idx[-2]] > int(0.36 * sampling_rate)
and ma[missed_peak] > 0.5 * th
):
insort(QRS, missed_peak)
missed_peak_added = True
break
if missed_peak_added:
continue

if len(QRS) > 2:
RR.append(QRS[-1] - QRS[-2])
Expand All @@ -383,7 +413,9 @@ def _ecg_findpeaks_hamilton(signal, sampling_rate=1000, **kwargs):
# =============================================================================
# Slope Sum Function (SSF) - Zong et al. (2003)
# =============================================================================
def _ecg_findpeaks_ssf(signal, sampling_rate=1000, threshold=20, before=0.03, after=0.01, **kwargs):
def _ecg_findpeaks_ssf(
signal, sampling_rate=1000, threshold=20, before=0.03, after=0.01, **kwargs
):
"""From https://github.com/PIA-
Group/BioSPPy/blob/e65da30f6379852ecb98f8e2e0c9b4b5175416c3/biosppy/signals/ecg.py#L448.
Expand Down Expand Up @@ -451,7 +483,12 @@ def _ecg_findpeaks_zong(signal, sampling_rate=1000, cutoff=16, window=0.13, **kw
tmp = np.zeros(len(y) - w)
for i, j in enumerate(np.arange(w, len(y))):
s = y[j - w : j]
tmp[i] = np.sum(np.sqrt(np.power(1 / sampling_rate, 2) * np.ones(w - 1) + np.power(np.diff(s), 2)))
tmp[i] = np.sum(
np.sqrt(
np.power(1 / sampling_rate, 2) * np.ones(w - 1)
+ np.power(np.diff(s), 2)
)
)
# Pad with the first value
clt = np.concatenate([[tmp[0]] * w, tmp])

Expand Down Expand Up @@ -659,10 +696,15 @@ def _ecg_findpeaks_WT(signal, sampling_rate=1000, **kwargs):
max_R_peak_dist = int(0.1 * sampling_rate)
rpeaks = []
for index_cur, index_next in zip(peaks_1_keep[:-1], peaks_1_keep[1:]):
correct_sign = signal_1[index_cur] < 0 and signal_1[index_next] > 0 # pylint: disable=R1716
correct_sign = (
signal_1[index_cur] < 0 and signal_1[index_next] > 0
) # pylint: disable=R1716
near = (index_next - index_cur) < max_R_peak_dist # limit 2
if near and correct_sign:
rpeaks.append(signal_zerocrossings(signal_1[index_cur : index_next + 1])[0] + index_cur)
rpeaks.append(
signal_zerocrossings(signal_1[index_cur : index_next + 1])[0]
+ index_cur
)

rpeaks = np.array(rpeaks, dtype="int")
return rpeaks
Expand Down Expand Up @@ -693,7 +735,9 @@ def _ecg_findpeaks_gamboa(signal, sampling_rate=1000, tol=0.002, **kwargs):

d2 = np.diff(norm_signal, 2)

b = np.nonzero((np.diff(np.sign(np.diff(-d2)))) == -2)[0] + 2 # pylint: disable=E1130
b = (
np.nonzero((np.diff(np.sign(np.diff(-d2)))) == -2)[0] + 2
) # pylint: disable=E1130
b = np.intersect1d(b, np.nonzero(-d2 > tol)[0]) # pylint: disable=E1130

rpeaks = []
Expand Down Expand Up @@ -734,7 +778,7 @@ def _ecg_findpeaks_elgendi(signal, sampling_rate=1000, **kwargs):
blocks = mwa_qrs > mwa_beat

QRS = []

qrs_duration_threshold = int(0.08 * sampling_rate)
rr_distance_threshold = int(0.3 * sampling_rate)

Expand Down Expand Up @@ -862,12 +906,19 @@ def _ecg_findpeaks_engzee(signal, sampling_rate=1000, **kwargs):

if counter > neg_threshold:
unfiltered_section = signal[thi_list[-1] - int(0.01 * sampling_rate) : i]
r_peaks.append(engzee_fake_delay + np.argmax(unfiltered_section) + thi_list[-1] - int(0.01 * sampling_rate))
r_peaks.append(
engzee_fake_delay
+ np.argmax(unfiltered_section)
+ thi_list[-1]
- int(0.01 * sampling_rate)
)
counter = 0
thi = False
thf = False

r_peaks.pop(0) # removing the 1st detection as it 1st needs the QRS complex amplitude for the threshold
r_peaks.pop(
0
) # removing the 1st detection as it 1st needs the QRS complex amplitude for the threshold
r_peaks = np.array(r_peaks, dtype="int")
return r_peaks

Expand Down Expand Up @@ -899,7 +950,9 @@ def running_mean(x, N):
# Apply Chebyshev Type I Bandpass filter
# Low cut frequency = 6 Hz
# High cut frequency = 18
filtered = cheby1_bandpass_filter(signal, lowcut=6, highcut=18, fs=sampling_rate, order=4)
filtered = cheby1_bandpass_filter(
signal, lowcut=6, highcut=18, fs=sampling_rate, order=4
)

# Eq. 1: First-order differencing difference
dn = np.append(filtered[1:], 0) - filtered
Expand Down Expand Up @@ -954,7 +1007,9 @@ def running_mean(x, N):
lows = np.arange(i - search_window_half, i)
highs = np.arange(i + 1, i + search_window_half + 1)
if highs[-1] > len(signal):
highs = np.delete(highs, np.arange(np.where(highs == len(signal))[0], len(highs) + 1))
highs = np.delete(
highs, np.arange(np.where(highs == len(signal))[0], len(highs) + 1)
)
ekg_window = np.concatenate((lows, [i], highs))
idx_search.append(ekg_window)
ekg_window_wave = signal[ekg_window]
Expand Down Expand Up @@ -1124,7 +1179,12 @@ def _ecg_findpeaks_rodrigues(signal, sampling_rate=1000, **kwargs):
# Fast Visibility Graph Detector - by Emrich et al. (2023)
# =============================================================================
def _ecg_findpeaks_visibilitygraph(
signal, sampling_rate=1000, window_seconds=2, window_overlap=0.5, accelerated=True, **kwargs
signal,
sampling_rate=1000,
window_seconds=2,
window_overlap=0.5,
accelerated=True,
**kwargs,
):
"""Accelerated R-Peak Detector Using Visibility Graphs by Emrich et al. (2023). Implements the FastNVG algorithm,
allowing fast and sample precise R-peak detection.
Expand Down Expand Up @@ -1187,7 +1247,11 @@ def _ecg_findpeaks_visibilitygraph(
indices = np.arange(len(segment))

# Compute the adjacency matrix to the directed visibility graph
A = ts2vg.NaturalVG(directed="top_to_bottom").build(segment, indices).adjacency_matrix()
A = (
ts2vg.NaturalVG(directed="top_to_bottom")
.build(segment, indices)
.adjacency_matrix()
)

# Compute the ECG weights using k-Hop-paths
size = len(indices)
Expand All @@ -1205,7 +1269,9 @@ def _ecg_findpeaks_visibilitygraph(
elif N - dM + 1 <= L and L + 1 <= N:
weights[L + indices] = 0.5 * (weights[L + indices] + w)
else:
weights[L + indices[indices <= dM]] = 0.5 * (w[indices <= dM] + weights[L + indices[indices <= dM]])
weights[L + indices[indices <= dM]] = 0.5 * (
w[indices <= dM] + weights[L + indices[indices <= dM]]
)
weights[L + indices[indices > dM]] = w[indices > dM]

if R - L < M:
Expand Down Expand Up @@ -1247,14 +1313,18 @@ def _ecg_findpeaks_MWA(signal, window_size, **kwargs):
# return causal moving averages, i.e. each output element is the average
# of window_size input elements ending at that position, we use the
# `origin` argument to shift the filter computation accordingly.
mwa = scipy.ndimage.uniform_filter1d(signal, window_size, origin=(window_size - 1) // 2)
mwa = scipy.ndimage.uniform_filter1d(
signal, window_size, origin=(window_size - 1) // 2
)

# Compute actual moving averages for the first `window_size - 1` elements,
# which the uniform_filter1d function computes using padding. We want
# those output elements to be averages of only the input elements until
# that position.
head_size = min(window_size - 1, len(signal))
mwa[:head_size] = np.cumsum(signal[:head_size]) / np.linspace(1, head_size, head_size)
mwa[:head_size] = np.cumsum(signal[:head_size]) / np.linspace(
1, head_size, head_size
)

return mwa

Expand Down Expand Up @@ -1298,12 +1368,15 @@ def _ecg_findpeaks_peakdetect(detection, sampling_rate=1000, **kwargs):
if peak - last_peak > RR_missed:
missed_peaks = peaks[last_index + 1 : index]
missed_peaks = missed_peaks[
(missed_peaks > last_peak + min_missed_distance) & (missed_peaks < peak - min_missed_distance)
(missed_peaks > last_peak + min_missed_distance)
& (missed_peaks < peak - min_missed_distance)
]
threshold_I2 = 0.5 * threshold_I1
missed_peaks = missed_peaks[detection[missed_peaks] > threshold_I2]
if len(missed_peaks) > 0:
signal_peaks[-1] = missed_peaks[np.argmax(detection[missed_peaks])]
signal_peaks[-1] = missed_peaks[
np.argmax(detection[missed_peaks])
]
signal_peaks.append(peak)

last_peak = peak
Expand Down Expand Up @@ -1334,8 +1407,12 @@ def _ecg_findpeaks_visgraphthreshold(weight, sampling_frequency=1000, **kwargs):
noise_peaks = []

# Learning Phase, 2sec
spki = np.max(weight[0 : 2 * sampling_frequency]) * 0.25 # running estimate of signal level
npki = np.mean(weight[0 : 2 * sampling_frequency]) * 0.5 # running estimate of noise level
spki = (
np.max(weight[0 : 2 * sampling_frequency]) * 0.25
) # running estimate of signal level
npki = (
np.mean(weight[0 : 2 * sampling_frequency]) * 0.5
) # running estimate of noise level
threshold_I1 = spki

# iterate over the whole array / series
Expand All @@ -1353,8 +1430,12 @@ def _ecg_findpeaks_visgraphthreshold(weight, sampling_frequency=1000, **kwargs):
# candidate is close to last detected peak -> check if current candidate is better choice
elif 0.3 * sampling_frequency >= (i - signal_peaks[-1]):
# compare slope of last peak with current candidate
if weight[i] > weight[signal_peaks[-1]]: # test greater slope -> qrs
spki = (spki - 0.125 * weight[signal_peaks[-1]]) / 0.875 # reset threshold
if (
weight[i] > weight[signal_peaks[-1]]
): # test greater slope -> qrs
spki = (
spki - 0.125 * weight[signal_peaks[-1]]
) / 0.875 # reset threshold
signal_peaks[-1] = i
spki = 0.125 * weight[signal_peaks[-1]] + 0.875 * spki
else:
Expand All @@ -1375,13 +1456,20 @@ def _ecg_findpeaks_visgraphthreshold(weight, sampling_frequency=1000, **kwargs):
threshold_I2 = 0.5 * threshold_I1
# get range of candidates and apply noise threshold
missed_section_peaks = range(
signal_peaks[-2] + min_distance, signal_peaks[-1] - min_distance
signal_peaks[-2] + min_distance,
signal_peaks[-1] - min_distance,
)
missed_section_peaks = [p for p in missed_section_peaks if weight[p] > threshold_I2]
missed_section_peaks = [
p
for p in missed_section_peaks
if weight[p] > threshold_I2
]

# add the largest sample in missed interval to peaks
if len(missed_section_peaks) > 0:
missed_peak = missed_section_peaks[np.argmax(weight[missed_section_peaks])]
missed_peak = missed_section_peaks[
np.argmax(weight[missed_section_peaks])
]
signal_peaks.append(signal_peaks[-1])
signal_peaks[-2] = missed_peak

Expand Down
Loading

0 comments on commit 8e27aef

Please sign in to comment.