import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.signal
import scipy.stats
from warnings import warn
from bisect import insort
from collections import deque
from ..signal import (
signal_findpeaks,
signal_plot,
signal_sanitize,
signal_smooth,
signal_zerocrossings,
)
from ..misc import NeuroKitWarning
[docs]
def ecg_findpeaks(
ecg_cleaned, sampling_rate=1000, method="neurokit", show=False, **kwargs
):
"""**Locate R-peaks**
Low-level function used by :func:`ecg_peaks` to identify R-peaks in an ECG signal using a
different set of algorithms. Use the main function and see its documentation for details.
Parameters
----------
ecg_cleaned : Union[list, np.array, pd.Series]
See :func:`ecg_peaks()`.
sampling_rate : int
See :func:`ecg_peaks()`.
method : string
See :func:`ecg_peaks()`.
show : bool
If ``True``, will return a plot to visualizing the thresholds used in the algorithm.
Useful for debugging.
**kwargs
Additional keyword arguments, usually specific for each ``method``.
Returns
-------
info : dict
A dictionary containing additional information, in this case the
samples at which R-peaks occur, accessible with the key ``"ECG_R_Peaks"``.
See Also
--------
ecg_peaks, .signal_fixpeaks
"""
# Try retrieving right column
if isinstance(ecg_cleaned, pd.DataFrame):
try:
ecg_cleaned = ecg_cleaned["ECG_Clean"]
except (NameError, KeyError):
try:
ecg_cleaned = ecg_cleaned["ECG_Raw"]
except (NameError, KeyError):
ecg_cleaned = ecg_cleaned["ECG"]
# Sanitize input
ecg_cleaned = signal_sanitize(ecg_cleaned)
method = method.lower() # remove capitalised letters
# Run peak detection algorithm
try:
func = _ecg_findpeaks_findmethod(method)
rpeaks = func(ecg_cleaned, sampling_rate=sampling_rate, show=show, **kwargs)
except ValueError as error:
raise error
# Prepare output.
info = {"ECG_R_Peaks": rpeaks}
return info
# Returns the peak detector function by name
def _ecg_findpeaks_findmethod(method):
if method in ["nk", "nk2", "neurokit", "neurokit2"]:
return _ecg_findpeaks_neurokit
elif method in ["pantompkins", "pantompkins1985"]:
return _ecg_findpeaks_pantompkins
elif method in ["nabian", "nabian2018"]:
return _ecg_findpeaks_nabian2018
elif method in ["gamboa2008", "gamboa"]:
return _ecg_findpeaks_gamboa
elif method in ["ssf", "slopesumfunction"]:
return _ecg_findpeaks_ssf
elif method in ["zong", "zong2003", "wqrs"]:
return _ecg_findpeaks_zong
elif method in ["hamilton", "hamilton2002"]:
return _ecg_findpeaks_hamilton
elif method in ["christov", "christov2004"]:
return _ecg_findpeaks_christov
elif method in ["engzee", "engzee2012", "engzeemod", "engzeemod2012"]:
return _ecg_findpeaks_engzee
elif method in ["manikandan", "manikandan2012"]:
return _ecg_findpeaks_manikandan
elif method in ["elgendi", "elgendi2010"]:
return _ecg_findpeaks_elgendi
elif method in ["kalidas2017", "swt", "kalidas"]:
return _ecg_findpeaks_kalidas
elif method in ["martinez2004", "martinez"]:
return _ecg_findpeaks_WT
elif method in ["rodrigues2020", "rodrigues2021", "rodrigues", "asi"]:
return _ecg_findpeaks_rodrigues
elif method in ["vg", "vgraph", "fastnvg", "emrich", "emrich2023"]:
return _ecg_findpeaks_visibilitygraph
elif method in ["koka2022", "koka"]:
warn(
"The 'koka2022' method has been replaced by 'emrich2023'."
" Please replace method='koka2022' by method='emrich2023'.",
category=NeuroKitWarning,
)
return _ecg_findpeaks_visibilitygraph
elif method in ["promac", "all"]:
return _ecg_findpeaks_promac
else:
raise ValueError(
f"NeuroKit error: ecg_findpeaks(): '{method}' not implemented."
)
# =============================================================================
# Probabilistic Methods-Agreement via Convolution (ProMAC)
# =============================================================================
def _ecg_findpeaks_promac(
signal,
sampling_rate=1000,
show=False,
promac_methods=[
"neurokit",
"gamboa",
"ssf",
"zong",
"engzee",
"elgendi",
"manikandan",
"kalidas",
"martinez",
"rodrigues",
],
threshold=0.33,
gaussian_sd=100,
**kwargs,
):
"""Probabilistic Methods-Agreement via Convolution (ProMAC).
Parameters
----------
signal : Union[list, np.array, pd.Series]
The (cleaned) ECG channel, e.g. as returned by `ecg_clean()`.
sampling_rate : int
The sampling frequency of `ecg_signal` (in Hz, i.e., samples/second).
Defaults to 1000.
show : bool
If True, will return a plot to visualizing the thresholds used in the algorithm.
Useful for debugging.
promac_methods : list of string
The algorithms to be used for R-peak detection. See the list of acceptable algorithms for
the 'ecg_peaks' function.
threshold : float
The tolerance for peak acceptance. This value is a percentage of the signal's maximum
value. Only peaks found above this tolerance will be finally considered as actual peaks.
gaussian_sd : int
The standard deviation of the Gaussian distribution used to represent the peak location
probability. This value should be in millisencods and is usually taken as the size of
QRS complexes.
"""
x = np.zeros(len(signal))
promac_methods = [
method.lower() for method in promac_methods
] # remove capitalised letters
error_list = [] # Stores the failed methods
for method in promac_methods:
try:
func = _ecg_findpeaks_findmethod(method)
x = _ecg_findpeaks_promac_addconvolve(
signal, sampling_rate, x, func, gaussian_sd=gaussian_sd, **kwargs
)
except ValueError:
error_list.append(f"Method '{method}' is not valid.")
except Exception as error:
error_list.append(f"{method} error: {error}")
# Rescale
x = x / np.max(x)
convoluted = x.copy()
# Remove below threshold
x[x < threshold] = 0
# Find peaks
peaks = signal_findpeaks(x, height_min=threshold)["Peaks"]
if show is True:
signal_plot(
pd.DataFrame({"ECG": signal, "Convoluted": convoluted}), standardize=True
)
[
plt.axvline(x=peak, color="red", linestyle="--") for peak in peaks
] # pylint: disable=W0106
# I am not sure if mandatory print the best option
if error_list: # empty?
print(error_list)
return peaks
# _ecg_findpeaks_promac_addmethod + _ecg_findpeaks_promac_convolve
# Joining them makes parameters exposition more consistent
def _ecg_findpeaks_promac_addconvolve(
signal, sampling_rate, x, fun, gaussian_sd=100, **kwargs
):
peaks = fun(signal, sampling_rate=sampling_rate, **kwargs)
mask = np.zeros(len(signal))
mask[peaks] = 1
# SD is defined as a typical QRS size, which for adults if 100ms
sd = sampling_rate * gaussian_sd / 1000
shape = scipy.stats.norm.pdf(
np.linspace(-sd * 4, sd * 4, num=int(sd * 8)), loc=0, scale=sd
)
x += np.convolve(mask, shape, "same")
return x
# =============================================================================
# NeuroKit
# =============================================================================
def _ecg_findpeaks_neurokit(
signal,
sampling_rate=1000,
smoothwindow=0.1,
avgwindow=0.75,
gradthreshweight=1.5,
minlenweight=0.4,
mindelay=0.3,
show=False,
):
"""All tune-able parameters are specified as keyword arguments.
The `signal` must be the highpass-filtered raw ECG with a lowcut of .5 Hz.
"""
if show is True:
__, (ax1, ax2) = plt.subplots(nrows=2, ncols=1, sharex=True)
# Compute the ECG's gradient as well as the gradient threshold. Run with
# show=True in order to get an idea of the threshold.
grad = np.gradient(signal)
absgrad = np.abs(grad)
smooth_kernel = int(np.rint(smoothwindow * sampling_rate))
avg_kernel = int(np.rint(avgwindow * sampling_rate))
smoothgrad = signal_smooth(absgrad, kernel="boxcar", size=smooth_kernel)
avggrad = signal_smooth(smoothgrad, kernel="boxcar", size=avg_kernel)
gradthreshold = gradthreshweight * avggrad
mindelay = int(np.rint(sampling_rate * mindelay))
if show is True:
ax1.plot(signal)
ax2.plot(smoothgrad)
ax2.plot(gradthreshold)
# Identify start and end of QRS complexes.
qrs = smoothgrad > gradthreshold
beg_qrs = np.where(np.logical_and(np.logical_not(qrs[0:-1]), qrs[1:]))[0]
end_qrs = np.where(np.logical_and(qrs[0:-1], np.logical_not(qrs[1:])))[0]
if len(beg_qrs) == 0:
return np.array([])
# Throw out QRS-ends that precede first QRS-start.
end_qrs = end_qrs[end_qrs > beg_qrs[0]]
# Identify R-peaks within QRS (ignore QRS that are too short).
num_qrs = min(beg_qrs.size, end_qrs.size)
min_len = np.mean(end_qrs[:num_qrs] - beg_qrs[:num_qrs]) * minlenweight
peaks = [0]
for i in range(num_qrs):
beg = beg_qrs[i]
end = end_qrs[i]
len_qrs = end - beg
if len_qrs < min_len:
continue
if show is True:
ax2.axvspan(beg, end, facecolor="m", alpha=0.5)
# Find local maxima and their prominence within QRS.
data = signal[beg:end]
locmax, props = scipy.signal.find_peaks(data, prominence=(None, None))
if locmax.size > 0:
# Identify most prominent local maximum.
peak = beg + locmax[np.argmax(props["prominences"])]
# Enforce minimum delay between peaks.
if peak - peaks[-1] > mindelay:
peaks.append(peak)
peaks.pop(0)
if show is True:
ax1.scatter(peaks, signal[peaks], c="r")
peaks = np.asarray(peaks).astype(int) # Convert to int
return peaks
# =============================================================================
# Pan & Tompkins (1985)
# =============================================================================
def _ecg_findpeaks_pantompkins(signal, sampling_rate=1000, **kwargs):
"""From https://github.com/berndporr/py-ecg-detectors/
- Pan, J., & Tompkins, W. J. (1985). A real-time QRS detection algorithm. IEEE transactions
on biomedical engineering, (3), 230-236.
"""
diff = np.diff(signal)
squared = diff * diff
N = int(0.12 * sampling_rate)
mwa = _ecg_findpeaks_MWA(squared, N)
mwa[: int(0.2 * sampling_rate)] = 0
mwa_peaks = _ecg_findpeaks_peakdetect(mwa, sampling_rate)
mwa_peaks = np.array(mwa_peaks, dtype="int")
return mwa_peaks
# =============================================================================
# Hamilton (2002)
# =============================================================================
def _ecg_findpeaks_hamilton(signal, sampling_rate=1000, **kwargs):
"""From https://github.com/berndporr/py-ecg-detectors/
- Hamilton, Open Source ECG Analysis Software Documentation, E.P.Limited, 2002.
"""
diff = np.abs(np.diff(signal))
b = np.ones(int(0.08 * sampling_rate))
b = b / int(0.08 * sampling_rate)
a = [1]
ma = scipy.signal.lfilter(b, a, diff)
ma[0 : len(b) * 2] = 0
n_pks = deque([], maxlen=8)
n_pks_ave = 0.0
s_pks = deque([], maxlen=8)
s_pks_ave = 0.0
QRS = [0]
RR = deque([], maxlen=8)
RR_ave = 0.0
th = 0.0
i = 0
idx = []
peaks = []
for i in range(1, len(ma) - 1): # pylint: disable=C0200,R1702
if ma[i - 1] < ma[i] and ma[i + 1] < ma[i]: # pylint: disable=R1716
peak = i
peaks.append(peak)
if ma[peak] > th and (peak - QRS[-1]) > 0.3 * sampling_rate:
QRS.append(peak)
idx.append(peak)
s_pks.append(ma[peak])
s_pks_ave = np.mean(s_pks)
if RR_ave != 0.0 and QRS[-1] - QRS[-2] > 1.5 * RR_ave:
missed_peaks = peaks[idx[-2] + 1 : idx[-1]]
missed_peak_added = False
for missed_peak in missed_peaks:
if (
missed_peak - peaks[idx[-2]] > int(0.36 * sampling_rate)
and ma[missed_peak] > 0.5 * th
):
insort(QRS, missed_peak)
missed_peak_added = True
break
if missed_peak_added:
continue
if len(QRS) > 2:
RR.append(QRS[-1] - QRS[-2])
RR_ave = int(np.mean(RR))
else:
n_pks.append(ma[peak])
n_pks_ave = np.mean(n_pks)
th = n_pks_ave + 0.45 * (s_pks_ave - n_pks_ave)
QRS.pop(0)
QRS = np.array(QRS, dtype="int")
return QRS
# =============================================================================
# Slope Sum Function (SSF) - Zong et al. (2003)
# =============================================================================
def _ecg_findpeaks_ssf(
signal, sampling_rate=1000, threshold=20, before=0.03, after=0.01, **kwargs
):
"""From https://github.com/PIA-
Group/BioSPPy/blob/e65da30f6379852ecb98f8e2e0c9b4b5175416c3/biosppy/signals/ecg.py#L448.
"""
# TODO: Doesn't really seems to work
# convert to samples
winB = int(before * sampling_rate)
winA = int(after * sampling_rate)
Rset = set()
length = len(signal)
# diff
dx = np.diff(signal)
dx[dx >= 0] = 0
dx = dx**2
# detection
(idx,) = np.nonzero(dx > threshold)
idx0 = np.hstack(([0], idx))
didx = np.diff(idx0)
# search
sidx = idx[didx > 1]
for item in sidx:
a = item - winB
if a < 0:
a = 0
b = item + winA
if b > length:
continue
r = np.argmax(signal[a:b]) + a
Rset.add(r)
# output
rpeaks = list(Rset)
rpeaks.sort()
rpeaks = np.array(rpeaks, dtype="int")
return rpeaks
# =============================================================================
# Zong (2003) - WQRS
# =============================================================================
def _ecg_findpeaks_zong(signal, sampling_rate=1000, cutoff=16, window=0.13, **kwargs):
"""From https://github.com/berndporr/py-ecg-detectors/
- Zong, W., Moody, G. B., & Jiang, D. (2003, September). A robust open-source algorithm to
detect onset and duration of QRS complexes. In Computers in Cardiology, 2003 (pp. 737-740).
IEEE.
"""
# 1. Filter signal
# TODO: Should remove this step? It's technically part of cleaning,
# Not sure it is integral to the peak-detection per se. Opinions are welcome.
order = 2
# Cutoff normalized by nyquist frequency
b, a = scipy.signal.butter(order, cutoff / (0.5 * sampling_rate))
y = scipy.signal.lfilter(b, a, signal)
# Curve length transformation
w = int(np.ceil(window * sampling_rate))
tmp = np.zeros(len(y) - w)
for i, j in enumerate(np.arange(w, len(y))):
s = y[j - w : j]
tmp[i] = np.sum(
np.sqrt(
np.power(1 / sampling_rate, 2) * np.ones(w - 1)
+ np.power(np.diff(s), 2)
)
)
# Pad with the first value
clt = np.concatenate([[tmp[0]] * w, tmp])
# Find adaptive threshold
window_size = 10 * sampling_rate
# Apply fast moving window average with 1D convolution
ret = np.pad(clt, (window_size - 1, 0), "constant", constant_values=(0, 0))
ret = np.convolve(ret, np.ones(window_size), "valid")
# Check that ret is at least as large as the window
if len(ret) < window_size:
warn(
f"The signal must be at least {window_size} samples long for peak detection with the Zong method. ",
category=NeuroKitWarning,
)
return np.array([])
for i in range(1, window_size):
ret[i - 1] = ret[i - 1] / i
ret[window_size - 1 :] = ret[window_size - 1 :] / window_size
# Find peaks
peaks = []
for i in range(len(clt)):
z = sampling_rate * 0.35
if (len(peaks) == 0 or i > peaks[-1] + z) and clt[i] > ret[i]:
peaks.append(i)
return np.array(peaks)
# =============================================================================
# Christov (2004)
# =============================================================================
def _ecg_findpeaks_christov(signal, sampling_rate=1000, **kwargs):
"""From https://github.com/berndporr/py-ecg-detectors/
- Ivaylo I. Christov, Real time electrocardiogram QRS detection using combined adaptive
threshold, BioMedical Engineering OnLine 2004, vol. 3:28, 2004.
"""
total_taps = 0
b = np.ones(int(0.02 * sampling_rate))
b = b / int(0.02 * sampling_rate)
total_taps += len(b)
a = [1]
MA1 = scipy.signal.lfilter(b, a, signal)
b = np.ones(int(0.028 * sampling_rate))
b = b / int(0.028 * sampling_rate)
total_taps += len(b)
a = [1]
MA2 = scipy.signal.lfilter(b, a, MA1)
Y = []
for i in range(1, len(MA2) - 1):
diff = abs(MA2[i + 1] - MA2[i - 1])
Y.append(diff)
b = np.ones(int(0.040 * sampling_rate))
b = b / int(0.040 * sampling_rate)
total_taps += len(b)
a = [1]
MA3 = scipy.signal.lfilter(b, a, Y)
MA3[0:total_taps] = 0
ms50 = int(0.05 * sampling_rate)
ms200 = int(0.2 * sampling_rate)
ms1200 = int(1.2 * sampling_rate)
ms350 = int(0.35 * sampling_rate)
M = 0
newM5 = 0
M_list = []
MM = []
M_slope = np.linspace(1.0, 0.6, ms1200 - ms200)
F = 0
F_list = []
R = 0
RR = []
Rm = 0
R_list = []
MFR = 0
MFR_list = []
QRS = []
for i in range(len(MA3)): # pylint: disable=C0200
# M
if i < 5 * sampling_rate:
M = 0.6 * np.max(MA3[: i + 1])
MM.append(M)
if len(MM) > 5:
MM.pop(0)
elif QRS and i < QRS[-1] + ms200:
newM5 = 0.6 * np.max(MA3[QRS[-1] : i])
if newM5 > 1.5 * MM[-1]:
newM5 = 1.1 * MM[-1]
elif QRS and i == QRS[-1] + ms200:
if newM5 == 0:
newM5 = MM[-1]
MM.append(newM5)
if len(MM) > 5:
MM.pop(0)
M = np.mean(MM)
elif QRS and i > QRS[-1] + ms200 and i < QRS[-1] + ms1200:
M = np.mean(MM) * M_slope[i - (QRS[-1] + ms200)]
elif QRS and i > QRS[-1] + ms1200:
M = 0.6 * np.mean(MM)
# F
if i > ms350:
F_section = MA3[i - ms350 : i]
max_latest = np.max(F_section[-ms50:])
max_earliest = np.max(F_section[:ms50])
F += (max_latest - max_earliest) / 150.0
# R
if QRS and i < QRS[-1] + int((2.0 / 3.0 * Rm)):
R = 0
elif QRS and i > QRS[-1] + int((2.0 / 3.0 * Rm)) and i < QRS[-1] + Rm:
dec = (M - np.mean(MM)) / 1.4
R = 0 + dec
MFR = M + F + R
M_list.append(M)
F_list.append(F)
R_list.append(R)
MFR_list.append(MFR)
if not QRS and MA3[i] > MFR:
QRS.append(i)
elif QRS and i > QRS[-1] + ms200 and MA3[i] > MFR:
QRS.append(i)
if len(QRS) > 2:
RR.append(QRS[-1] - QRS[-2])
if len(RR) > 5:
RR.pop(0)
Rm = int(np.mean(RR))
if len(QRS) == 0:
return np.array([])
QRS.pop(0)
QRS = np.array(QRS, dtype="int")
return QRS
# =============================================================================
# Continuous Wavelet Transform (CWT) - Martinez et al. (2004)
# =============================================================================
def _ecg_findpeaks_WT(signal, sampling_rate=1000, **kwargs):
# Try loading pywt
try:
import pywt
except ImportError as import_error:
raise ImportError(
"NeuroKit error: ecg_delineator(): the 'PyWavelets' module is required for"
" this method to run. Please install it first (`pip install PyWavelets`)."
) from import_error
# first derivative of the Gaissian signal
scales = np.array([1, 2, 4, 8, 16])
cwtmatr, __ = pywt.cwt(signal, scales, "gaus1", sampling_period=1.0 / sampling_rate)
# For wt of scale 2^4
signal_4 = cwtmatr[4, :]
epsilon_4 = np.sqrt(np.mean(np.square(signal_4)))
peaks_4, _ = scipy.signal.find_peaks(np.abs(signal_4), height=epsilon_4)
# For wt of scale 2^3
signal_3 = cwtmatr[3, :]
epsilon_3 = np.sqrt(np.mean(np.square(signal_3)))
peaks_3, _ = scipy.signal.find_peaks(np.abs(signal_3), height=epsilon_3)
# Keep only peaks_3 that are nearest to peaks_4
peaks_3_keep = np.zeros_like(peaks_4)
for i in range(len(peaks_4)): # pylint: disable=C0200
peaks_distance = abs(peaks_4[i] - peaks_3)
peaks_3_keep[i] = peaks_3[np.argmin(peaks_distance)]
# For wt of scale 2^2
signal_2 = cwtmatr[2, :]
epsilon_2 = np.sqrt(np.mean(np.square(signal_2)))
peaks_2, _ = scipy.signal.find_peaks(np.abs(signal_2), height=epsilon_2)
# Keep only peaks_2 that are nearest to peaks_3
peaks_2_keep = np.zeros_like(peaks_4)
for i in range(len(peaks_4)):
peaks_distance = abs(peaks_3_keep[i] - peaks_2)
peaks_2_keep[i] = peaks_2[np.argmin(peaks_distance)]
# For wt of scale 2^1
signal_1 = cwtmatr[1, :]
epsilon_1 = np.sqrt(np.mean(np.square(signal_1)))
peaks_1, _ = scipy.signal.find_peaks(np.abs(signal_1), height=epsilon_1)
# Keep only peaks_1 that are nearest to peaks_2
peaks_1_keep = np.zeros_like(peaks_4)
for i in range(len(peaks_4)):
peaks_distance = abs(peaks_2_keep[i] - peaks_1)
peaks_1_keep[i] = peaks_1[np.argmin(peaks_distance)]
# Find R peaks
max_R_peak_dist = int(0.1 * sampling_rate)
rpeaks = []
for index_cur, index_next in zip(peaks_1_keep[:-1], peaks_1_keep[1:]):
correct_sign = (
signal_1[index_cur] < 0 and signal_1[index_next] > 0
) # pylint: disable=R1716
near = (index_next - index_cur) < max_R_peak_dist # limit 2
if near and correct_sign:
rpeaks.append(
signal_zerocrossings(signal_1[index_cur : index_next + 1])[0]
+ index_cur
)
rpeaks = np.array(rpeaks, dtype="int")
return rpeaks
# =============================================================================
# Gamboa (2008)
# =============================================================================
def _ecg_findpeaks_gamboa(signal, sampling_rate=1000, tol=0.002, **kwargs):
"""From https://github.com/PIA-
Group/BioSPPy/blob/e65da30f6379852ecb98f8e2e0c9b4b5175416c3/biosppy/signals/ecg.py#L834.
- Gamboa, H. (2008). Multi-modal behavioral biometrics based on HCI and electrophysiology
(Doctoral dissertation, Universidade Técnica de Lisboa).
"""
hist, edges = np.histogram(signal, 100, density=True)
TH = 0.01
F = np.cumsum(hist)
v0 = edges[np.nonzero(F > TH)[0][0]]
v1 = edges[np.nonzero(F < (1 - TH))[0][-1]]
nrm = max([abs(v0), abs(v1)])
norm_signal = signal / float(nrm)
d2 = np.diff(norm_signal, 2)
b = (
np.nonzero((np.diff(np.sign(np.diff(-d2)))) == -2)[0] + 2
) # pylint: disable=E1130
b = np.intersect1d(b, np.nonzero(-d2 > tol)[0]) # pylint: disable=E1130
rpeaks = []
if len(b) >= 3:
b = b.astype("float")
previous = b[0]
# convert to samples
v_100ms = int(0.1 * sampling_rate)
v_300ms = int(0.3 * sampling_rate)
for i in b[1:]:
if i - previous > v_300ms:
previous = i
rpeaks.append(np.argmax(signal[int(i) : int(i + v_100ms)]) + i)
rpeaks = sorted(list(set(rpeaks)))
rpeaks = np.array(rpeaks, dtype="int")
return rpeaks
# =============================================================================
# Elgendi et al. (2010)
# =============================================================================
def _ecg_findpeaks_elgendi(signal, sampling_rate=1000, **kwargs):
"""From https://github.com/berndporr/py-ecg-detectors/
- Elgendi, Mohamed & Jonkman, Mirjam & De Boer, Friso. (2010). Frequency Bands Effects on QRS
Detection. The 3rd International Conference on Bio-inspired Systems and Signal Processing
(BIOSIGNALS2010). 428-431.
"""
window1 = int(0.12 * sampling_rate)
mwa_qrs = _ecg_findpeaks_MWA(abs(signal), window1)
window2 = int(0.6 * sampling_rate)
mwa_beat = _ecg_findpeaks_MWA(abs(signal), window2)
blocks = mwa_qrs > mwa_beat
QRS = []
qrs_duration_threshold = int(0.08 * sampling_rate)
rr_distance_threshold = int(0.3 * sampling_rate)
for i, (prev, cur) in enumerate(zip(blocks, blocks[1:])):
if prev < cur:
start = i
elif prev > cur:
end = i - 1
if end - start > qrs_duration_threshold:
detection = np.argmax(signal[start : end + 1]) + start
if QRS:
if detection - QRS[-1] > rr_distance_threshold:
QRS.append(detection)
else:
QRS.append(detection)
QRS = np.array(QRS, dtype="int")
return QRS
# =============================================================================
# Engzee Modified (2012)
# =============================================================================
def _ecg_findpeaks_engzee(signal, sampling_rate=1000, **kwargs):
"""From https://github.com/berndporr/py-ecg-detectors/
- C. Zeelenberg, A single scan algorithm for QRS detection and feature extraction, IEEE Comp.
in Cardiology, vol. 6, pp. 37-42, 1979
- A. Lourenco, H. Silva, P. Leite, R. Lourenco and A. Fred, "Real Time Electrocardiogram
Segmentation for Finger Based ECG Biometrics", BIOSIGNALS 2012, pp. 49-54, 2012.
"""
engzee_fake_delay = 0
diff = np.zeros(len(signal))
for i in range(4, len(diff)):
diff[i] = signal[i] - signal[i - 4]
ci = [1, 4, 6, 4, 1]
low_pass = scipy.signal.lfilter(ci, 1, diff)
low_pass[: int(0.2 * sampling_rate)] = 0
ms200 = int(0.2 * sampling_rate)
ms1200 = int(1.2 * sampling_rate)
ms160 = int(0.16 * sampling_rate)
neg_threshold = int(0.01 * sampling_rate)
M = 0
M_list = []
neg_m = []
MM = []
M_slope = np.linspace(1.0, 0.6, ms1200 - ms200)
QRS = []
r_peaks = []
counter = 0
thi_list = []
thi = False
thf_list = []
thf = False
newM5 = False
for i in range(len(low_pass)): # pylint: disable=C0200
# M
if i < 5 * sampling_rate:
M = 0.6 * np.max(low_pass[: i + 1])
MM.append(M)
if len(MM) > 5:
MM.pop(0)
elif QRS and i < QRS[-1] + ms200:
newM5 = 0.6 * np.max(low_pass[QRS[-1] : i])
if newM5 > 1.5 * MM[-1]:
newM5 = 1.1 * MM[-1]
elif newM5 and QRS and i == QRS[-1] + ms200:
MM.append(newM5)
if len(MM) > 5:
MM.pop(0)
M = np.mean(MM)
elif QRS and i > QRS[-1] + ms200 and i < QRS[-1] + ms1200:
M = np.mean(MM) * M_slope[i - (QRS[-1] + ms200)]
elif QRS and i > QRS[-1] + ms1200:
M = 0.6 * np.mean(MM)
M_list.append(M)
neg_m.append(-M)
if not QRS and low_pass[i] > M:
QRS.append(i)
thi_list.append(i)
thi = True
elif QRS and i > QRS[-1] + ms200 and low_pass[i] > M:
QRS.append(i)
thi_list.append(i)
thi = True
if thi and i < thi_list[-1] + ms160:
if low_pass[i] < -M and low_pass[i - 1] > -M:
# thf_list.append(i)
thf = True
if thf and low_pass[i] < -M:
thf_list.append(i)
counter += 1
elif low_pass[i] > -M and thf:
counter = 0
thi = False
thf = False
elif thi and i > thi_list[-1] + ms160:
counter = 0
thi = False
thf = False
if counter > neg_threshold:
unfiltered_section = signal[thi_list[-1] - int(0.01 * sampling_rate) : i]
r_peaks.append(
engzee_fake_delay
+ np.argmax(unfiltered_section)
+ thi_list[-1]
- int(0.01 * sampling_rate)
)
counter = 0
thi = False
thf = False
if len(r_peaks) == 0:
return np.array([])
r_peaks.pop(
0
) # removing the 1st detection as it 1st needs the QRS complex amplitude for the threshold
r_peaks = np.array(r_peaks, dtype="int")
return r_peaks
# =============================================================================
# Shannon energy R-peak detection - Manikandan and Soman (2012)
# =============================================================================
def _ecg_findpeaks_manikandan(signal, sampling_rate=1000, **kwargs):
"""From https://github.com/hongzuL/A-novel-method-for-detecting-R-peaks-in-electrocardiogram-signal/
A (hopefully) fixed version of https://github.com/nsunami/Shannon-Energy-R-Peak-Detection
"""
# Preprocessing ------------------------------------------------------------
# Forward and backward filtering using filtfilt.
def cheby1_bandpass_filter(data, lowcut, highcut, fs, order=5, rp=1):
nyq = 0.5 * fs
low = lowcut / nyq
high = highcut / nyq
b, a = scipy.signal.cheby1(order, rp=rp, Wn=[low, high], btype="bandpass")
y = scipy.signal.filtfilt(b, a, data)
return y
# Running mean filter function
def running_mean(x, N):
cumsum = np.cumsum(np.insert(x, 0, 0))
return (cumsum[N:] - cumsum[:-N]) / float(N)
# Apply Chebyshev Type I Bandpass filter
# Low cut frequency = 6 Hz
# High cut frequency = 18
filtered = cheby1_bandpass_filter(
signal, lowcut=6, highcut=18, fs=sampling_rate, order=4
)
# Eq. 1: First-order differencing difference
dn = np.append(filtered[1:], 0) - filtered
# If the signal is flat then return an empty array rather than error out
# with a divide by zero error.
if np.max(abs(dn)) == 0:
return np.array([])
# Eq. 2
dtn = dn / (np.max(abs(dn)))
# The absolute value, energy value, Shannon entropy value, and Shannon energy value
# # Eq. 3
# an = np.abs(dtn)
# # Eq. 4
# en = an**2
# # Eq. 5
# sen = -np.abs(dtn) * np.log10(np.abs(dtn))
# Eq. 6
sn = -(dtn**2) * np.log10(dtn**2)
# Apply rectangular window
# Length should be approximately the same as the duration of possible wider QRS complex
# Normal QRS duration is .12 sec, so we overshoot with 0.15 sec
window_len = int(0.15 * sampling_rate)
window_len = window_len - 1 if window_len % 2 == 0 else window_len # Make odd
window = scipy.signal.windows.boxcar(window_len)
# The filtering operation is performed in both the forward and reverse directions
see = scipy.signal.convolve(sn, window, mode="same")
see = np.flip(see)
see = scipy.signal.convolve(see, window, "same")
see = np.flip(see)
# Hilbert Transformation
ht = np.imag(scipy.signal.hilbert(see))
# Moving Average to remove low frequency drift
# 2.5 sec from Manikanda in 360 Hz (900 samples)
# 2.5 sec in 500 Hz == 1250 samples
ma_len = int(2.5 * sampling_rate)
ma_out = np.insert(running_mean(ht, ma_len), 0, [0] * (ma_len - 1))
# Get the difference between the Hilbert signal and the MA filtered signal
zn = ht - ma_out
# R-Peak Detection ---------------------------------------------------------
# Look for points crossing zero
# Find points crossing zero upwards (negative to positive)
idx = np.argwhere(np.diff(np.sign(zn)) > 0).flatten().tolist()
# Prepare a container for windows
idx_search = []
id_maxes = np.empty(0, dtype=int)
search_window_half = int(np.ceil(window_len / 2))
for i in idx:
lows = np.arange(i - search_window_half, i)
highs = np.arange(i + 1, i + search_window_half + 1)
if highs[-1] > len(signal):
highs = np.delete(
highs, np.arange(np.where(highs == len(signal))[0], len(highs) + 1)
)
ekg_window = np.concatenate((lows, [i], highs))
idx_search.append(ekg_window)
ekg_window_wave = signal[ekg_window]
id_maxes = np.append(
id_maxes,
ekg_window[np.where(ekg_window_wave == np.max(ekg_window_wave))[0]],
)
return np.array(idx)
# =============================================================================
# Stationary Wavelet Transform (SWT) - Kalidas and Tamil (2017)
# =============================================================================
def _ecg_findpeaks_kalidas(signal, sampling_rate=1000, **kwargs):
"""From https://github.com/berndporr/py-ecg-detectors/
- Vignesh Kalidas and Lakshman Tamil (2017). Real-time QRS detector using Stationary Wavelet Transform
for Automated ECG Analysis. In: 2017 IEEE 17th International Conference on Bioinformatics and
Bioengineering (BIBE). Uses the Pan and Tompkins thresolding.
"""
# Try loading pywt
try:
import pywt
except ImportError as import_error:
raise ImportError(
"NeuroKit error: ecg_findpeaks(): the 'PyWavelets' module is required for"
" this method to run. Please install it first (`pip install PyWavelets`)."
) from import_error
signal_length = len(signal)
swt_level = 3
padding = -1
for i in range(1000):
if (len(signal) + i) % 2**swt_level == 0:
padding = i
break
if padding > 0:
signal = np.pad(signal, (0, padding), "edge")
elif padding == -1:
print("Padding greater than 1000 required\n")
swt_ecg = pywt.swt(signal, "db3", level=swt_level)
swt_ecg = np.array(swt_ecg)
swt_ecg = swt_ecg[0, 1, :]
squared = swt_ecg * swt_ecg
f1 = 0.01 / (0.5 * sampling_rate)
f2 = 10 / (0.5 * sampling_rate)
sos = scipy.signal.butter(3, [f1, f2], btype="bandpass", output="sos")
filtered_squared = scipy.signal.sosfilt(sos, squared)
# Drop padding to avoid detecting peaks inside it (#456)
filtered_squared = filtered_squared[:signal_length]
filt_peaks = _ecg_findpeaks_peakdetect(filtered_squared, sampling_rate)
filt_peaks = np.array(filt_peaks, dtype="int")
return filt_peaks
# ===========================================================================
# Nabian et al. (2018)
# ===========================================================================
def _ecg_findpeaks_nabian2018(signal, sampling_rate=1000, **kwargs):
"""R peak detection method by Nabian et al. (2018) inspired by the Pan-Tompkins algorithm.
- Nabian, M., Yin, Y., Wormwood, J., Quigley, K. S., Barrett, L. F., Ostadabbas, S. (2018).
An Open-Source Feature Extraction Tool for the Analysis of Peripheral Physiological Data.
IEEE Journal of Translational Engineering in Health and Medicine, 6, 1-11.
"""
window_size = int(0.4 * sampling_rate)
peaks = np.zeros(len(signal))
for i in range(1 + window_size, len(signal) - window_size):
ecg_window = signal[i - window_size : i + window_size]
rpeak = np.argmax(ecg_window)
if i == (i - window_size - 1 + rpeak):
peaks[i] = 1
rpeaks = np.where(peaks == 1)[0]
# min_distance = 200
return rpeaks
# =============================================================================
# ASI (FSM based 2020)
# =============================================================================
def _ecg_findpeaks_rodrigues(signal, sampling_rate=1000, **kwargs):
"""Segmenter by Tiago Rodrigues, inspired by on Gutierrez-Rivas (2015) and Sadhukhan (2012).
References
----------
- Gutiérrez-Rivas, R., García, J. J., Marnane, W. P., & Hernández, A. (2015). Novel real-time
low-complexity QRS complex detector based on adaptive thresholding. IEEE Sensors Journal,
15(10), 6036-6043.
- Sadhukhan, D., & Mitra, M. (2012). R-peak detection algorithm for ECG using double difference
and RR interval processing. Procedia Technology, 4, 873-877.
"""
N = int(np.clip(np.round(3 * sampling_rate / 128), 2, None))
Nd = N - 1
Pth = (0.7 * sampling_rate) / 128 + 2.7
# Pth = 3, optimal for fs = 250 Hz
Rmin = 0.26
rpeaks = []
i = 1
Ramptotal = 0
# Double derivative squared
diff_ecg = [signal[i] - signal[i - Nd] for i in range(Nd, len(signal))]
ddiff_ecg = [diff_ecg[i] - diff_ecg[i - 1] for i in range(1, len(diff_ecg))]
squar = np.square(ddiff_ecg)
# Integrate moving window
b = np.array(np.ones(N))
a = [1]
processed_ecg = scipy.signal.lfilter(b, a, squar)
tf = len(processed_ecg)
rpeakpos = 0
# R-peak finder FSM
while i < tf: # ignore last sample of recording
# State 1: looking for maximum
tf1 = np.round(i + Rmin * sampling_rate)
Rpeakamp = 0
while i < tf1 and i < tf:
# Rpeak amplitude and position
if processed_ecg[i] > Rpeakamp:
Rpeakamp = processed_ecg[i]
rpeakpos = i + 1
i += 1
Ramptotal = (19 / 20) * Ramptotal + (1 / 20) * Rpeakamp
rpeaks.append(rpeakpos)
# State 2: waiting state
d = tf1 - rpeakpos
tf2 = i + np.round(0.2 * 2 - d)
while i <= tf2:
i += 1
# State 3: decreasing threshold
Thr = Ramptotal
while i < tf and processed_ecg[i] < Thr:
Thr *= np.exp(-Pth / sampling_rate)
i += 1
rpeaks = np.array(rpeaks, dtype="int")
return rpeaks
# =============================================================================
# Fast Visibility Graph Detector - by Emrich et al. (2023)
# =============================================================================
def _ecg_findpeaks_visibilitygraph(
signal,
sampling_rate=1000,
window_seconds=2,
window_overlap=0.5,
accelerated=True,
**kwargs,
):
"""Accelerated R-Peak Detector Using Visibility Graphs by Emrich et al. (2023). Implements the FastNVG algorithm,
allowing fast and sample precise R-peak detection.
References
----------
- J. Emrich, T. Koka, S. Wirth and M. Muma, "Accelerated Sample-Accurate R-Peak
Detectors Based on Visibility Graphs," 31st European Signal Processing
Conference (EUSIPCO), 2023, pp. 1090-1094, doi: 10.23919/EUSIPCO58844.2023.10290007,
https://ieeexplore.ieee.org/document/10290007
- T. Koka and M. Muma (2022), Fast and Sample Accurate R-Peak Detection for Noisy ECG Using
Visibility Graphs. In: 2022 44th Annual International Conference of the IEEE Engineering
in Medicine & Biology Society (EMBC). Uses the Pan and Tompkins thresholding.
- https://github.com/JonasEmrich/vg-beat-detectors
"""
# Try loading ts2vg
try:
import ts2vg
except ImportError as import_error:
raise ImportError(
"NeuroKit error: ecg_findpeaks(): the 'ts2vg' module is required for"
" this method to run. Please install it first (`pip install ts2vg`)."
) from import_error
# Initialize variables
N = len(signal) # Signal length
M = int(window_seconds * sampling_rate) # Length of window segment
L = 0 # Left segment boundary
R = M # Right segment boundary
dM = int(np.ceil(window_overlap * M)) # Size of segment overlap
n_segments = int(np.ceil(((N - R) / (M - dM) + 1))) # Number of segments
weights = np.zeros(N) # Empty array to store the weights
BETA = 0.55 # Target number of nonzero elements in the resulting weight vector
# if the input signal is flat, return an empty array, otherwise the visiblity graph will fail
if np.max(signal) == np.min(signal):
return np.array([])
# If input length is smaller than window, compute only one segment of this length
if N < M:
M, R = N, N
n_segments = 1
# Do computation in small segments
for jj in range(n_segments):
segment = signal[L:R]
# Select indicies and values to construct the visibility graph
if accelerated:
# Compute the threshold for the segment
threshold = np.quantile(segment, 0.5)
# Find local maxima
indices_maxima = scipy.signal.find_peaks(segment)[0]
segment_maxima = segment[indices_maxima]
# Select local maxima above the threshold
greater = np.argwhere(segment_maxima > threshold).flatten()
indices = indices_maxima[greater]
segment = segment_maxima[greater]
else:
indices = np.arange(len(segment))
# Compute the adjacency matrix to the directed visibility graph
A = (
ts2vg.NaturalVG(directed="top_to_bottom")
.build(segment, indices)
.adjacency_matrix()
)
# Compute the ECG weights using k-Hop-paths
size = len(indices)
w = np.ones(size) / size
while np.count_nonzero(w) / size >= BETA:
_w = np.matmul(A, w)
_w /= np.linalg.norm(_w)
if np.any(np.isnan(_w)):
break
w = _w
# Update weight vector by merging overlaping segments
if L == 0:
weights[L + indices] = w
elif N - dM + 1 <= L and L + 1 <= N:
weights[L + indices] = 0.5 * (weights[L + indices] + w)
else:
weights[L + indices[indices <= dM]] = 0.5 * (
w[indices <= dM] + weights[L + indices[indices <= dM]]
)
weights[L + indices[indices > dM]] = w[indices > dM]
if R - L < M:
break
# Update segment boundaries
L += M - dM
if R + (M - dM) <= N:
R += M - dM
else:
R = N
# Weigh signal with obtained weights and use thresholding algorithm for peak localization
weighted_signal = signal * weights
rpeaks = _ecg_findpeaks_visgraphthreshold(weighted_signal, sampling_rate)
rpeaks = np.array(rpeaks, dtype="int")
return rpeaks
# =============================================================================
# Utilities
# =============================================================================
def _ecg_findpeaks_MWA(signal, window_size, **kwargs):
"""Based on https://github.com/berndporr/py-ecg-detectors/
Optimized for vectorized computation.
"""
window_size = int(window_size)
# Scipy's uniform_filter1d is a fast and accurate way of computing
# moving averages. By default it computes the averages of `window_size`
# elements centered around each element in the input array, including
# `(window_size - 1) // 2` elements after the current element (when
# `window_size` is even, the extra element is taken from before). To
# return causal moving averages, i.e. each output element is the average
# of window_size input elements ending at that position, we use the
# `origin` argument to shift the filter computation accordingly.
mwa = scipy.ndimage.uniform_filter1d(
signal, window_size, origin=(window_size - 1) // 2
)
# Compute actual moving averages for the first `window_size - 1` elements,
# which the uniform_filter1d function computes using padding. We want
# those output elements to be averages of only the input elements until
# that position.
head_size = min(window_size - 1, len(signal))
mwa[:head_size] = np.cumsum(signal[:head_size]) / np.linspace(
1, head_size, head_size
)
return mwa
def _ecg_findpeaks_peakdetect(detection, sampling_rate=1000, **kwargs):
"""Based on https://github.com/berndporr/py-ecg-detectors/
Optimized for vectorized computation.
"""
min_peak_distance = int(0.3 * sampling_rate)
min_missed_distance = int(0.25 * sampling_rate)
signal_peaks = []
SPKI = 0.0
NPKI = 0.0
last_peak = 0
last_index = -1
# NOTE: Using plateau_size=(1,1) here avoids detecting flat peaks and
# maintains original py-ecg-detectors behaviour. Flat peaks are typically
# found in measurement artifacts where the signal saturates at maximum
# recording amplitude. Such cases should not be detected as peaks. If we
# do encounter recordings where even normal R peaks are flat, then changing
# this to something like plateau_size=(1, sampling_rate // 10) might make
# sense. See also https://github.com/neuropsychology/NeuroKit/pull/450.
peaks, _ = scipy.signal.find_peaks(detection, plateau_size=(1, 1))
for index, peak in enumerate(peaks):
peak_value = detection[peak]
threshold_I1 = NPKI + 0.25 * (SPKI - NPKI)
if peak_value > threshold_I1 and peak > last_peak + min_peak_distance:
signal_peaks.append(peak)
# RR_missed threshold is based on the previous eight R-R intervals
if len(signal_peaks) > 9:
RR_ave = (signal_peaks[-2] - signal_peaks[-10]) // 8
RR_missed = int(1.66 * RR_ave)
if peak - last_peak > RR_missed:
missed_peaks = peaks[last_index + 1 : index]
missed_peaks = missed_peaks[
(missed_peaks > last_peak + min_missed_distance)
& (missed_peaks < peak - min_missed_distance)
]
threshold_I2 = 0.5 * threshold_I1
missed_peaks = missed_peaks[detection[missed_peaks] > threshold_I2]
if len(missed_peaks) > 0:
signal_peaks[-1] = missed_peaks[
np.argmax(detection[missed_peaks])
]
signal_peaks.append(peak)
last_peak = peak
last_index = index
SPKI = 0.125 * peak_value + 0.875 * SPKI
else:
NPKI = 0.125 * peak_value + 0.875 * NPKI
return signal_peaks
def _ecg_findpeaks_visgraphthreshold(weight, sampling_frequency=1000, **kwargs):
"""Based on the python implementation [3] of the thresholding proposed by Pan and Tompkins in [4]. Modifications
were made with respect to the application of R-peak detection using visibility graphs and the k-Hop paths metric.
References
----------
[3] https://github.com/berndporr/py-ecg-detectors
[4] J. Pan and W. J. Tompkins, “A Real-Time QRS Detection Algorithm”, IEEE Transactions on Biomedical
Engineering, vol. BME-32, Mar. 1985. pp. 230-236.
"""
# initialise variables
N = len(weight)
min_distance = int(0.25 * sampling_frequency)
signal_peaks = [-min_distance]
noise_peaks = []
# Learning Phase, 2sec
spki = (
np.max(weight[0 : 2 * sampling_frequency]) * 0.25
) # running estimate of signal level
npki = (
np.mean(weight[0 : 2 * sampling_frequency]) * 0.5
) # running estimate of noise level
threshold_I1 = spki
# iterate over the whole array / series
for i in range(N):
# skip first and last elements
if 0 < i < N - 1:
# detect peak candidates based on a rising + falling slope
if weight[i - 1] <= weight[i] >= weight[i + 1]:
# peak candidates should be greater than signal threshold
if weight[i] > threshold_I1:
# distance to last peak is greater than minimum detection distance
if (i - signal_peaks[-1]) > 0.3 * sampling_frequency:
signal_peaks.append(i)
spki = 0.125 * weight[signal_peaks[-1]] + 0.875 * spki
# candidate is close to last detected peak -> check if current candidate is better choice
elif 0.3 * sampling_frequency >= (i - signal_peaks[-1]):
# compare slope of last peak with current candidate
if (
weight[i] > weight[signal_peaks[-1]]
): # test greater slope -> qrs
spki = (
spki - 0.125 * weight[signal_peaks[-1]]
) / 0.875 # reset threshold
signal_peaks[-1] = i
spki = 0.125 * weight[signal_peaks[-1]] + 0.875 * spki
else:
noise_peaks.append(i)
npki = 0.125 * weight[noise_peaks[-1]] + 0.875 * npki
else:
# not a peak -> label as noise and update noise level
npki = 0.125 * weight[i] + 0.875 * npki
# back search for missed peaks
if len(signal_peaks) > 8:
RR = np.diff(signal_peaks[-9:])
RR_ave = int(np.mean(RR))
RR_missed = int(1.66 * RR_ave)
# if time difference of the last two signal peaks found is too large
if signal_peaks[-1] - signal_peaks[-2] > RR_missed:
threshold_I2 = 0.5 * threshold_I1
# get range of candidates and apply noise threshold
missed_section_peaks = range(
signal_peaks[-2] + min_distance,
signal_peaks[-1] - min_distance,
)
missed_section_peaks = [
p
for p in missed_section_peaks
if weight[p] > threshold_I2
]
# add the largest sample in missed interval to peaks
if len(missed_section_peaks) > 0:
missed_peak = missed_section_peaks[
np.argmax(weight[missed_section_peaks])
]
signal_peaks.append(signal_peaks[-1])
signal_peaks[-2] = missed_peak
else:
# not a peak -> label as noise and update noise level
npki = 0.125 * weight[i] + 0.875 * npki
threshold_I1 = npki + 0.25 * (spki - npki)
# remove first dummy elements
if signal_peaks[0] == -min_distance:
signal_peaks.pop(0)
return np.array(signal_peaks)