Source code for neurokit2.ecg.ecg_peaks

import matplotlib.pyplot as plt
import numpy as np

from ..signal import signal_fixpeaks, signal_formatpeaks
from ..stats import rescale
from .ecg_findpeaks import ecg_findpeaks


[docs] def ecg_peaks(ecg_cleaned, sampling_rate=1000, method="neurokit", correct_artifacts=False, show=False, **kwargs): """**Find R-peaks in an ECG signal** Find R-peaks in an ECG signal using the specified method. You can pass an unfiltered ECG signals as input, but typically a filtered ECG (cleaned using ``ecg_clean()``) will result in better results. Different algorithms for peak-detection include: * **neurokit** (default): QRS complexes are detected based on the steepness of the absolute gradient of the ECG signal. Subsequently, R-peaks are detected as local maxima in the QRS complexes. The method is unpublished, but see: (i) https://github.com/neuropsychology/NeuroKit/issues/476 for discussion of this algorithm; and (ii) https://doi.org/10.21105/joss.02621 for the original validation of this algorithm. * **pantompkins1985**: Algorithm by Pan & Tompkins (1985). * **hamilton2002**: Algorithm by Hamilton (2002). * **zong2003**: Algorithm by Zong et al. (2003). * **martinez2004**: Algorithm by Martinez et al (2004). * **christov2004**: Algorithm by Christov (2004). * **gamboa2008**: Algorithm by Gamboa (2008). * **elgendi2010**: Algorithm by Elgendi et al. (2010). * **engzeemod2012**: Original algorithm by Engelse & Zeelenberg (1979) modified by Lourenço et al. (2012). * **manikandan2012**: Algorithm by Manikandan & Soman (2012) based on the Shannon energy envelope (SEE). * **kalidas2017**: Algorithm by Kalidas et al. (2017). * **nabian2018**: Algorithm by Nabian et al. (2018) based on the Pan-Tompkins algorithm. * **rodrigues2021**: Adaptation of the work by Sadhukhan & Mitra (2012) and Gutiérrez-Rivas et al. (2015) by Rodrigues et al. (2021). * **emrich2023**: FastNVG Algorithm by Emrich et al. (2023) based on the visibility graph detector of Koka et al. (2022). Provides fast and sample-accurate R-peak detection. The algorithm transforms the ecg into a graph representation and extracts exact R-peak positions using graph metrics. * **promac**: ProMAC combines the result of several R-peak detectors in a probabilistic way. For a given peak detector, the binary signal representing the peak locations is convolved with a Gaussian distribution, resulting in a probabilistic representation of each peak location. This procedure is repeated for all selected methods and the resulting signals are accumulated. Finally, a threshold is used to accept or reject the peak locations. See this discussion for more information on the origins of the method: https://github.com/neuropsychology/NeuroKit/issues/222 .. note:: Please help us improve the methods' documentation by adding a small description. Parameters ---------- ecg_cleaned : Union[list, np.array, pd.Series] The cleaned ECG channel as returned by ``ecg_clean()``. sampling_rate : int The sampling frequency of ``ecg_cleaned`` (in Hz, i.e., samples/second). Defaults to 1000. method : string The algorithm to be used for R-peak detection. correct_artifacts : bool Whether or not to identify and fix artifacts, using the method by Lipponen & Tarvainen (2019). show : bool If ``True``, will show a plot of the signal with peaks. Defaults to ``False``. **kwargs Additional keyword arguments, usually specific for each method. Returns ------- signals : DataFrame A DataFrame of same length as the input signal in which occurrences of R-peaks marked as ``1`` in a list of zeros with the same length as ``ecg_cleaned``. Accessible with the keys ``"ECG_R_Peaks"``. info : dict A dictionary containing additional information, in this case the samples at which R-peaks occur, accessible with the key ``"ECG_R_Peaks"``, as well as the signals' sampling rate, accessible with the key ``"sampling_rate"``. See Also -------- ecg_clean, ecg_findpeaks, .signal_fixpeaks Examples -------- * **Example 1**: Find R-peaks using the default method (``"neurokit"``). .. ipython:: python import neurokit2 as nk import numpy as np ecg = nk.ecg_simulate(duration=10, sampling_rate=250) ecg[600:950] = ecg[600:950] + np.random.normal(0, 0.6, 350) @savefig p_ecg_peaks1.png scale=100% signals, info = nk.ecg_peaks(ecg, sampling_rate=250, correct_artifacts=True, show=True) @suppress plt.close() * **Example 2**: Compare different methods .. ipython:: python # neurokit (default) cleaned = nk.ecg_clean(ecg, sampling_rate=250, method="neurokit") _, neurokit = nk.ecg_peaks(cleaned, sampling_rate=250, method="neurokit") # pantompkins1985 cleaned = nk.ecg_clean(ecg, sampling_rate=250, method="pantompkins1985") _, pantompkins1985 = nk.ecg_peaks(cleaned, sampling_rate=250, method="pantompkins1985") # hamilton2002 cleaned = nk.ecg_clean(ecg, sampling_rate=250, method="hamilton2002") _, hamilton2002 = nk.ecg_peaks(cleaned, sampling_rate=250, method="hamilton2002") # zong2003 _, zong2003 = nk.ecg_peaks(ecg, sampling_rate=250, method="zong2003") # martinez2004 _, martinez2004 = nk.ecg_peaks(ecg, sampling_rate=250, method="martinez2004") # christov2004 _, christov2004 = nk.ecg_peaks(cleaned, sampling_rate=250, method="christov2004") # gamboa2008 cleaned = nk.ecg_clean(ecg, sampling_rate=250, method="gamboa2008") _, gamboa2008 = nk.ecg_peaks(cleaned, sampling_rate=250, method="gamboa2008") # elgendi2010 cleaned = nk.ecg_clean(ecg, sampling_rate=250, method="elgendi2010") _, elgendi2010 = nk.ecg_peaks(cleaned, sampling_rate=250, method="elgendi2010") # engzeemod2012 cleaned = nk.ecg_clean(ecg, sampling_rate=250, method="engzeemod2012") _, engzeemod2012 = nk.ecg_peaks(cleaned, sampling_rate=250, method="engzeemod2012") # Manikandan (2012) _, manikandan2012 = nk.ecg_peaks(ecg, sampling_rate=250, method="manikandan2012") # kalidas2017 cleaned = nk.ecg_clean(ecg, sampling_rate=250, method="kalidas2017") _, kalidas2017 = nk.ecg_peaks(cleaned, sampling_rate=250, method="kalidas2017") # nabian2018 _, nabian2018 = nk.ecg_peaks(ecg, sampling_rate=250, method="nabian2018") # rodrigues2021 _, rodrigues2021 = nk.ecg_peaks(ecg, sampling_rate=250, method="rodrigues2021") # emrich2023 cleaned = nk.ecg_clean(ecg, sampling_rate=250, method="emrich2023") _, emrich2023 = nk.ecg_peaks(cleaned, sampling_rate=250, method="emrich2023") # Collect all R-peak lists by iterating through the result dicts rpeaks = [ i["ECG_R_Peaks"] for i in [ neurokit, pantompkins1985, nabian2018, hamilton2002, martinez2004, christov2004, gamboa2008, elgendi2010, engzeemod2012, kalidas2017, rodrigues2021, emrich2023 ] ] # Visualize results @savefig p_ecg_peaks2.png scale=100% nk.events_plot(rpeaks, ecg) @suppress plt.close() * **Example 3**: Method-agreement procedure ('promac') .. ipython:: python ecg = nk.ecg_simulate(duration=10, sampling_rate=500) ecg = nk.signal_distort(ecg, sampling_rate=500, noise_amplitude=0.05, noise_frequency=[25, 50], artifacts_amplitude=0.05, artifacts_frequency=50) @savefig p_ecg_peaks3.png scale=100% info = nk.ecg_findpeaks(ecg, sampling_rate=250, method="promac", show=True) @suppress plt.close() References ---------- * Pan, J., & Tompkins, W. J. (1985). A real-time QRS detection algorithm. IEEE transactions on biomedical engineering, (3), 230-236. * Hamilton, P. (2002). Open source ECG analysis. In Computers in cardiology (pp. 101-104). IEEE. * Zong, W., Heldt, T., Moody, G. B., & Mark, R. G. (2003). An open-source algorithm to detect onset of arterial blood pressure pulses. In Computers in Cardiology, 2003 (pp. 259-262). IEEE. * Zong, W., Moody, G. B., & Jiang, D. (2003, September). A robust open-source algorithm to detect onset and duration of QRS complexes. In Computers in Cardiology, 2003 (pp. 737-740). IEEE. * Martinez, J. P., Almeida, R., Olmos, S., Rocha, A. P., & Laguna, P. (2004) A wavelet-based ECG delineator: evaluation on standard databases. IEEE Trans Biomed Eng, 51(4), 570–581. * Christov, I. I. (2004). Real time electrocardiogram QRS detection using combined adaptive threshold. Biomedical engineering online, 3(1), 1-9. * Gamboa, H. (2008). Multi-modal behavioral biometrics based on HCI and electrophysiology (Doctoral dissertation, Universidade Técnica de Lisboa). * Elgendi, M., Jonkman, M., & De Boer, F. (2010). Frequency Bands Effects on QRS Detection. Biosignals, Proceedings of the Third International Conference on Bio-inspired Systems and Signal Processing, 428-431. * Engelse, W. A., & Zeelenberg, C. (1979). A single scan algorithm for QRS-detection and feature extraction. Computers in cardiology, 6(1979), 37-42. * Manikandan, M. S., & Soman, K. P. (2012). A novel method for detecting R-peaks in electrocardiogram (ECG) signal. Biomedical Signal Processing and Control, 7(2), 118-128. * Lourenço, A., Silva, H., Leite, P., Lourenço, R., & Fred, A. L. (2012, February). Real Time Electrocardiogram Segmentation for Finger based ECG Biometrics. In Biosignals (pp. 49-54). * Kalidas, V., & Tamil, L. (2017, October). Real-time QRS detector using stationary wavelet transform for automated ECG analysis. In 2017 IEEE 17th International Conference on Bioinformatics and Bioengineering (BIBE) (pp. 457-461). IEEE. * Nabian, M., Yin, Y., Wormwood, J., Quigley, K. S., Barrett, L. F., Ostadabbas, S. (2018). An Open-Source Feature Extraction Tool for the Analysis of Peripheral Physiological Data. IEEE Journal of Translational Engineering in Health and Medicine, 6, 1-11. * Sadhukhan, D., & Mitra, M. (2012). R-peak detection algorithm for ECG using double difference and RR interval processing. Procedia Technology, 4, 873-877. * Gutiérrez-Rivas, R., García, J. J., Marnane, W. P., & Hernández, A. (2015). Novel real-time low-complexity QRS complex detector based on adaptive thresholding. IEEE Sensors Journal, 15(10), 6036-6043. * Rodrigues, T., Samoutphonh, S., Silva, H., & Fred, A. (2021, January). A Low-Complexity R-peak Detection Algorithm with Adaptive Thresholding for Wearable Devices. In 2020 25th International Conference on Pattern Recognition (ICPR) (pp. 1-8). IEEE. * T. Koka and M. Muma, "Fast and Sample Accurate R-Peak Detection for Noisy ECG Using Visibility Graphs," 2022 44th Annual International Conference of the IEEE Engineering in Medicine & Biology Society (EMBC), 2022, pp. 121-126. * ``promac`` * **Unpublished.** It runs different methods and derives a probability index using convolution. See this discussion for more information on the method: https://github.com/neuropsychology/NeuroKit/issues/222 * Lipponen, J. A., & Tarvainen, M. P. (2019). A robust algorithm for heart rate variability time series artefact correction using novel beat classification. Journal of medical engineering & technology, 43(3), 173-181. * Emrich, J., Koka, T., Wirth, S., & Muma, M. (2023), Accelerated Sample-Accurate R-Peak Detectors Based on Visibility Graphs. 31st European Signal Processing Conference (EUSIPCO), 1090-1094, doi: 10.23919/EUSIPCO58844.2023.10290007 """ # Store info info = {"method_peaks": method.lower(), "method_fixpeaks": "None"} # First peak detection info.update(ecg_findpeaks(ecg_cleaned, sampling_rate=sampling_rate, method=info["method_peaks"], **kwargs)) # Peak correction if correct_artifacts: info["ECG_R_Peaks_Uncorrected"] = info["ECG_R_Peaks"].copy() fixpeaks, info["ECG_R_Peaks"] = signal_fixpeaks( info["ECG_R_Peaks"], sampling_rate=sampling_rate, method="Kubios" ) # Add prefix and merge fixpeaks = {"ECG_fixpeaks_" + str(key): val for key, val in fixpeaks.items()} info.update(fixpeaks) # Format output signals = signal_formatpeaks( dict(ECG_R_Peaks=info["ECG_R_Peaks"]), # Takes a dict as input desired_length=len(ecg_cleaned), peak_indices=info["ECG_R_Peaks"], ) info["sampling_rate"] = sampling_rate # Add sampling rate in dict info if show is True: _ecg_peaks_plot(ecg_cleaned, info, sampling_rate) return signals, info
# ============================================================================= # Internals # ============================================================================= def _ecg_peaks_plot( ecg_cleaned, info=None, sampling_rate=1000, raw=None, quality=None, phase=None, ax=None, ): x_axis = np.linspace(0, len(ecg_cleaned) / sampling_rate, len(ecg_cleaned)) # Prepare plot if ax is None: _, ax = plt.subplots() ax.set_xlabel("Time (seconds)") ax.set_title("ECG signal and peaks") # Quality Area ------------------------------------------------------------- if quality is not None: quality = rescale( quality, to=[ np.min([np.min(raw), np.min(ecg_cleaned)]), np.max([np.max(raw), np.max(ecg_cleaned)]), ], ) minimum_line = np.full(len(x_axis), quality.min()) # Plot quality area first ax.fill_between( x_axis, minimum_line, quality, alpha=0.12, zorder=0, interpolate=True, facecolor="#4CAF50", label="Signal quality", ) # Raw Signal --------------------------------------------------------------- if raw is not None: ax.plot(x_axis, raw, color="#B0BEC5", label="Raw signal", zorder=1) label_clean = "Cleaned signal" else: label_clean = "Signal" # Peaks ------------------------------------------------------------------- ax.scatter( x_axis[info["ECG_R_Peaks"]], ecg_cleaned[info["ECG_R_Peaks"]], color="#FFC107", label="R-peaks", zorder=2, ) # Artifacts --------------------------------------------------------------- _ecg_peaks_plot_artefacts( x_axis, ecg_cleaned, info, peaks=info["ECG_R_Peaks"], ax=ax, ) # Clean Signal ------------------------------------------------------------ if phase is not None: mask = (phase == 0) | (np.isnan(phase)) diastole = ecg_cleaned.copy() diastole[~mask] = np.nan # Create overlap to avoid interuptions in signal mask[np.where(np.diff(mask))[0] + 1] = True systole = ecg_cleaned.copy() systole[mask] = np.nan ax.plot( x_axis, diastole, color="#B71C1C", label=label_clean, zorder=3, linewidth=1, ) ax.plot( x_axis, systole, color="#F44336", zorder=3, linewidth=1, ) else: ax.plot( x_axis, ecg_cleaned, color="#F44336", label=label_clean, zorder=3, linewidth=1, ) # Optimize legend if raw is not None: handles, labels = ax.get_legend_handles_labels() order = [2, 0, 1, 3] ax.legend( [handles[idx] for idx in order], [labels[idx] for idx in order], loc="upper right", ) else: ax.legend(loc="upper right") return ax def _ecg_peaks_plot_artefacts( x_axis, signal, info, peaks, ax, ): raw = [s for s in info.keys() if str(s).endswith("Peaks_Uncorrected")] if len(raw) == 0: return "No correction" raw = info[raw[0]] if len(raw) == 0: return "No bad peaks" if any([i < len(signal) for i in raw]): return "Peak indices longer than signal. Signals might have been cropped. " + "Better skip plotting." extra = [i for i in raw if i not in peaks] if len(extra) > 0: ax.scatter( x_axis[extra], signal[extra], color="#4CAF50", label="Peaks removed after correction", marker="x", zorder=2, ) added = [i for i in peaks if i not in raw] if len(added) > 0: ax.scatter( x_axis[added], signal[added], color="#FF9800", label="Peaks added after correction", marker="x", zorder=2, ) return ax