Source code for neurokit2.ecg.ecg_clean

# -*- coding: utf-8 -*-
from warnings import warn

import numpy as np
import pandas as pd
import scipy.signal

from ..misc import NeuroKitWarning, as_vector
from ..signal import signal_filter


[docs] def ecg_clean(ecg_signal, sampling_rate=1000, method="neurokit", **kwargs): """**ECG Signal Cleaning** Clean an ECG signal to remove noise and improve peak-detection accuracy. Different cleaning methods are implemented. * ``'neurokit'`` (default): 0.5 Hz high-pass butterworth filter (order = 5), followed by powerline filtering (see ``signal_filter()``). By default, ``powerline = 50``. * ``'biosppy'``: Method used in the BioSPPy package. A FIR filter ([0.67, 45] Hz; order = 1.5 * SR). The 0.67 Hz cutoff value was selected based on the fact that there are no morphological features below the heartrate (assuming a minimum heart rate of 40 bpm). * ``'pantompkins1985'``: Method used in Pan & Tompkins (1985). **Please help providing a better description!** * ``'hamilton2002'``: Method used in Hamilton (2002). **Please help providing a better description!** * ``'elgendi2010'``: Method used in Elgendi et al. (2010). **Please help providing a better description!** * ``'engzeemod2012'``: Method used in Engelse & Zeelenberg (1979). **Please help providing a better description!** * ``'vg'``: Method used in Visibility Graph Based Detection Emrich et al. (2023) and Koka et al. (2022). A 4.0 Hz high-pass butterworth filter (order = 2). Parameters ---------- ecg_signal : Union[list, np.array, pd.Series] The raw ECG channel. sampling_rate : int The sampling frequency of ``ecg_signal`` (in Hz, i.e., samples/second). Defaults to 1000. method : str The processing pipeline to apply. Can be one of ``"neurokit"`` (default), ``"biosppy"``, ``"pantompkins1985"``, ``"hamilton2002"``, ``"elgendi2010"``, ``"engzeemod2012"``, ``'vg'``. **kwargs Other arguments to be passed to specific methods. Returns ------- array Vector containing the cleaned ECG signal. See Also -------- ecg_peaks, ecg_process, ecg_plot, .signal_rate, .signal_filter Examples -------- .. ipython:: python import pandas as pd import numpy as np import neurokit2 as nk ecg = nk.ecg_simulate(duration=10, sampling_rate=250, noise=0.2) ecg += np.random.normal(0, 0.1, len(ecg)) # Add Gaussian noise signals = pd.DataFrame({ "ECG_Raw" : ecg, "ECG_NeuroKit" : nk.ecg_clean(ecg, sampling_rate=250, method="neurokit"), "ECG_BioSPPy" : nk.ecg_clean(ecg, sampling_rate=250, method="biosppy"), "ECG_PanTompkins" : nk.ecg_clean(ecg, sampling_rate=250, method="pantompkins1985"), "ECG_Hamilton" : nk.ecg_clean(ecg, sampling_rate=250, method="hamilton2002"), "ECG_Elgendi" : nk.ecg_clean(ecg, sampling_rate=250, method="elgendi2010"), "ECG_EngZeeMod" : nk.ecg_clean(ecg, sampling_rate=250, method="engzeemod2012"), "ECG_VG" : nk.ecg_clean(ecg, sampling_rate=250, method="vg"), "ECG_TC" : nk.ecg_clean(ecg, sampling_rate=250, method="templateconvolution") }) @savefig p_ecg_clean.png scale=100% signals.plot(subplots=True) @suppress plt.close() References -------------- * Engelse, W. A., & Zeelenberg, C. (1979). A single scan algorithm for QRS-detection and feature extraction. Computers in cardiology, 6(1979), 37-42. * Pan, J., & Tompkins, W. J. (1985). A real-time QRS detection algorithm. IEEE transactions on biomedical engineering, (3), 230-236. * Hamilton, P. (2002). Open source ECG analysis. In Computers in cardiology (pp. 101-104). IEEE. * Elgendi, M., Jonkman, M., & De Boer, F. (2010). Frequency Bands Effects on QRS Detection. Biosignals, Proceedings of the Third International Conference on Bio-inspired Systems and Signal Processing, 428-431. * Emrich, J., Koka, T., Wirth, S., & Muma, M. (2023), Accelerated Sample-Accurate R-Peak Detectors Based on Visibility Graphs. 31st European Signal Processing Conference (EUSIPCO), 1090-1094, doi: 10.23919/EUSIPCO58844.2023.10290007 """ ecg_signal = as_vector(ecg_signal) # Missing data n_missing = np.sum(np.isnan(ecg_signal)) if n_missing > 0: warn( "There are " + str(n_missing) + " missing data points in your signal." " Filling missing values by using the forward filling method.", category=NeuroKitWarning, ) ecg_signal = _ecg_clean_missing(ecg_signal) method = method.lower() # remove capitalised letters if method in ["nk", "nk2", "neurokit", "neurokit2"]: clean = _ecg_clean_nk(ecg_signal, sampling_rate, **kwargs) elif method in ["biosppy", "gamboa2008"]: clean = _ecg_clean_biosppy(ecg_signal, sampling_rate) elif method in ["pantompkins", "pantompkins1985"]: clean = _ecg_clean_pantompkins(ecg_signal, sampling_rate) elif method in ["hamilton", "hamilton2002"]: clean = _ecg_clean_hamilton(ecg_signal, sampling_rate) elif method in ["elgendi", "elgendi2010"]: clean = _ecg_clean_elgendi(ecg_signal, sampling_rate) elif method in ["engzee", "engzee2012", "engzeemod", "engzeemod2012"]: clean = _ecg_clean_engzee(ecg_signal, sampling_rate) elif method in ["vg", "vgraph", "fastnvg", "emrich", "emrich2023"]: clean = _ecg_clean_vgraph(ecg_signal, sampling_rate) elif method in ["koka2022", "koka"]: warn( "The 'koka2022' method has been replaced by 'emrich2023'." " Please replace method='koka2022' by method='emrich2023'.", category=NeuroKitWarning, ) clean = _ecg_clean_vgraph(ecg_signal, sampling_rate) elif method in ["templateconvolution"]: clean = _ecg_clean_templateconvolution(ecg_signal, sampling_rate) elif method in [ "christov", "christov2004", "ssf", "slopesumfunction", "zong", "zong2003", "kalidas2017", "swt", "kalidas", "kalidastamil", "kalidastamil2017", ]: clean = ecg_signal else: raise ValueError( "NeuroKit error: ecg_clean(): 'method' should be " "one of 'neurokit', 'biosppy', 'pantompkins1985'," " 'hamilton2002', 'elgendi2010', 'engzeemod2012'," " 'templateconvolution', 'vg'." ) return clean
# ============================================================================= # Handle missing data # ============================================================================= def _ecg_clean_missing(ecg_signal): ecg_signal = pd.DataFrame.pad(pd.Series(ecg_signal)) return ecg_signal # ============================================================================= # NeuroKit # ============================================================================= def _ecg_clean_nk(ecg_signal, sampling_rate=1000, **kwargs): # Remove slow drift and dc offset with highpass Butterworth. clean = signal_filter( signal=ecg_signal, sampling_rate=sampling_rate, lowcut=0.5, method="butterworth", order=5, ) clean = signal_filter(signal=clean, sampling_rate=sampling_rate, method="powerline", **kwargs) return clean # ============================================================================= # Biosppy # ============================================================================= def _ecg_clean_biosppy(ecg_signal, sampling_rate=1000): """Adapted from https://github.com/PIA- Group/BioSPPy/blob/e65da30f6379852ecb98f8e2e0c9b4b5175416c3/biosppy/signals/ecg.py#L69. """ # The order and frequency was recently changed # (see https://github.com/scientisst/BioSPPy/pull/12) order = int(1.5 * sampling_rate) if order % 2 == 0: order += 1 # Enforce odd number # -> filter_signal() frequency = [0.67, 45] # -> get_filter() # -> _norm_freq() frequency = 2 * np.array(frequency) / sampling_rate # Normalize frequency to Nyquist Frequency (Fs/2). # -> get coeffs a = np.array([1]) b = scipy.signal.firwin(numtaps=order, cutoff=frequency, pass_zero=False) # _filter_signal() filtered = scipy.signal.filtfilt(b, a, ecg_signal) # DC offset filtered -= np.mean(filtered) return filtered # ============================================================================= # Pan & Tompkins (1985) # ============================================================================= def _ecg_clean_pantompkins(ecg_signal, sampling_rate=1000): """Adapted from https://github.com/PIA- Group/BioSPPy/blob/e65da30f6379852ecb98f8e2e0c9b4b5175416c3/biosppy/signals/ecg.py#L69. """ order = 1 clean = signal_filter( signal=ecg_signal, sampling_rate=sampling_rate, lowcut=5, highcut=15, method="butterworth_zi", order=order, ) return clean # Return filtered # ============================================================================= # Hamilton (2002) # ============================================================================= def _ecg_clean_hamilton(ecg_signal, sampling_rate=1000): """Adapted from https://github.com/PIA- Group/BioSPPy/blob/e65da30f6379852ecb98f8e2e0c9b4b5175416c3/biosppy/signals/ecg.py#L69. """ order = 1 clean = signal_filter( signal=ecg_signal, sampling_rate=sampling_rate, lowcut=8, highcut=16, method="butterworth_zi", order=order, ) return clean # Return filtered # ============================================================================= # Elgendi et al. (2010) # ============================================================================= def _ecg_clean_elgendi(ecg_signal, sampling_rate=1000): """From https://github.com/berndporr/py-ecg-detectors/ - Elgendi, Mohamed & Jonkman, Mirjam & De Boer, Friso. (2010). Frequency Bands Effects on QRS Detection. The 3rd International Conference on Bio-inspired Systems and Signal Processing (BIOSIGNALS2010). 428-431. """ order = 2 clean = signal_filter( signal=ecg_signal, sampling_rate=sampling_rate, lowcut=8, highcut=20, method="butterworth_zi", order=order, ) return clean # Return filtered # ============================================================================= # Engzee Modified (2012) # ============================================================================= def _ecg_clean_engzee(ecg_signal, sampling_rate=1000): """From https://github.com/berndporr/py-ecg-detectors/ - C. Zeelenberg, A single scan algorithm for QRS detection and feature extraction, IEEE Comp. in Cardiology, vol. 6, pp. 37-42, 1979. - A. Lourenco, H. Silva, P. Leite, R. Lourenco and A. Fred, "Real Time Electrocardiogram Segmentation for Finger Based ECG Biometrics", BIOSIGNALS 2012, pp. 49-54, 2012. """ order = 4 clean = signal_filter( signal=ecg_signal, sampling_rate=sampling_rate, lowcut=52, highcut=48, method="butterworth_zi", order=order, ) return clean # Return filtered # ============================================================================= # Visibility-Graph Detector - Emrich et al. (2023) & Koka et al. (2022) # ============================================================================= def _ecg_clean_vgraph(ecg_signal, sampling_rate=1000): """Filtering used for Visibility-Graph Detectors Emrich et al. (2023) and Koka et al. (2022). - J. Emrich, T. Koka, S. Wirth and M. Muma, "Accelerated Sample-Accurate R-Peak Detectors Based on Visibility Graphs," 31st European Signal Processing Conference (EUSIPCO), 2023, pp. 1090-1094, doi: 10.23919/EUSIPCO58844.2023.10290007, https://ieeexplore.ieee.org/document/10290007 - T. Koka and M. Muma (2022), Fast and Sample Accurate R-Peak Detection for Noisy ECG Using Visibility Graphs. In: 2022 44th Annual International Conference of the IEEE Engineering in Medicine & Biology Society (EMBC). Uses the Pan and Tompkins thresholding. """ order = 2 clean = signal_filter( signal=ecg_signal, sampling_rate=sampling_rate, lowcut=4, method="butterworth", order=order, ) return clean # Return filtered # ============================================================================= # Template Convolution (Exploratory) # ============================================================================= def _ecg_clean_templateconvolution(ecg_signal, sampling_rate=1000): """Filter and Convolve ECG signal with QRS complex template. Totally exploratory method by Dominique Makowski, use at your own risks. The idea is to use a QRS template to convolve the signal with, in order to magnify the QRS features. However, it doens't work well and creates a lot of artifacts. If you have ideas for improvement please let me know! """ window_size = int(np.round(sampling_rate / 4)) if (window_size % 2) == 0: window_size + 1 # Filter out slow drifts and high freq noises filtered = signal_filter( signal=ecg_signal, sampling_rate=sampling_rate, lowcut=1, highcut=40, method="butterworth", window_size=window_size, ) # Detect peaks peaks, _ = scipy.signal.find_peaks(filtered, distance=sampling_rate / 3, height=0.5 * np.std(filtered)) peaks = peaks[peaks + 0.6 * sampling_rate < len(ecg_signal)] idx = [np.arange(p - int(sampling_rate / 2), p + int(sampling_rate / 2)) for p in peaks] epochs = np.array([filtered[i] for i in idx]) qrs = np.mean(epochs, axis=0) # # Create base QRS template using wavelets # qrs = scipy.signal.ricker(600, a=16) # # Adjust template # qrs[100:220] = qrs[100:220] + 0.03 * np.sin(np.linspace(0, 1 * np.pi, 120)) # qrs[284:316] = qrs[284:316] + 0.2 * qrs[284:316] # qrs[316:380] = qrs[316:380] + 0.5 * qrs[316:380] # qrs[380:500] = qrs[380:500] + 0.05 * np.sin(np.linspace(0, 1 * np.pi, 120)) # # Resample # qrs = signal_resample(qrs, desired_length=sampling_rate) return scipy.signal.convolve(ecg_signal, qrs, mode="same")