Source code for neurokit2.ppg.ppg_clean

# -*- coding: utf-8 -*-
from warnings import warn

import numpy as np

from ..misc import NeuroKitWarning, as_vector
from ..signal import signal_fillmissing, signal_filter


[docs] def ppg_clean(ppg_signal, sampling_rate=1000, heart_rate=None, method="elgendi"): """**Clean a photoplethysmogram (PPG) signal** Prepare a raw PPG signal for systolic peak detection. Parameters ---------- ppg_signal : Union[list, np.array, pd.Series] The raw PPG channel. heart_rate : Union[int, float] The heart rate of the PPG signal. Applicable only if method is ``"nabian2018"`` to check that filter frequency is appropriate. sampling_rate : int The sampling frequency of the PPG (in Hz, i.e., samples/second). The default is 1000. method : str The processing pipeline to apply. Can be one of ``"elgendi"``, ``"nabian2018"``, or ``"none"``. The default is ``"elgendi"``. If ``"none"`` is passed, the raw signal will be returned without any cleaning. Returns ------- clean : array A vector containing the cleaned PPG. See Also -------- ppg_simulate, ppg_findpeaks Examples -------- .. ipython:: python import neurokit2 as nk import pandas as pd import matplotlib.pyplot as plt # Simulate and clean signal ppg = nk.ppg_simulate(heart_rate=75, duration=30) ppg_elgendi = nk.ppg_clean(ppg, method='elgendi') ppg_nabian = nk.ppg_clean(ppg, method='nabian2018', heart_rate=75) # Plot and compare methods signals = pd.DataFrame({'PPG_Raw' : ppg, 'PPG_Elgendi' : ppg_elgendi, 'PPG_Nabian' : ppg_nabian}) @savefig p_ppg_clean1.png scale=100% signals.plot() @suppress plt.close() References ---------- * Nabian, M., Yin, Y., Wormwood, J., Quigley, K. S., Barrett, L. F., & Ostadabbas, S. (2018). An open-source feature extraction tool for the analysis of peripheral physiological data. IEEE Journal of Translational Engineering in Health and Medicine, 6, 1-11. """ ppg_signal = as_vector(ppg_signal) # Missing data n_missing = np.sum(np.isnan(ppg_signal)) if n_missing > 0: warn( "There are " + str(n_missing) + " missing data points in your signal." " Filling missing values using `signal_fillmissing`.", category=NeuroKitWarning, ) ppg_signal = signal_fillmissing(ppg_signal, method="both") method = str(method).lower() if method in ["elgendi"]: clean = _ppg_clean_elgendi(ppg_signal, sampling_rate) elif method in ["nabian2018"]: clean = _ppg_clean_nabian2018(ppg_signal, sampling_rate, heart_rate=heart_rate) elif method in ["none"]: clean = ppg_signal else: raise ValueError( "`method` not found. Must be one of 'elgendi', 'nabian2018', or 'none'." ) return clean
# ============================================================================= # Methods # ============================================================================= def _ppg_clean_elgendi(ppg_signal, sampling_rate): filtered = signal_filter( ppg_signal, sampling_rate=sampling_rate, lowcut=0.5, highcut=8, order=3, method="butterworth", ) return filtered def _ppg_clean_nabian2018(ppg_signal, sampling_rate, heart_rate=None): """Low-pass filter for continuous BP signal preprocessing, adapted from Nabian et al. (2018).""" # Determine low-pass filter value highcut = 40 # Convert heart rate to seconds, check if low-pass filter within appropriate range if heart_rate is not None: heart_rate = heart_rate / 60 if not highcut >= 10 * heart_rate and not highcut < 0.5 * sampling_rate: raise ValueError( "Highcut value should be at least 10 times heart rate and" " less than 0.5 times sampling rate." ) filtered = signal_filter( ppg_signal, sampling_rate=sampling_rate, lowcut=None, highcut=highcut, order=2, method="butterworth", ) return filtered