Source code for neurokit2.ppg.ppg_quality

# - * - coding: utf-8 - * -

import numpy as np

from ..epochs import epochs_to_df
from ..signal import signal_interpolate
from .ppg_peaks import ppg_peaks
from .ppg_segment import ppg_segment


[docs] def ppg_quality( ppg_cleaned, ppg_pw_peaks=None, sampling_rate=1000, method="templatematch", approach=None ): """**PPG Signal Quality Assessment** Assess the quality of the PPG Signal using various methods: * The ``"templatematch"`` method (loosely based on Orphanidou et al., 2015) computes a continuous index of quality of the PPG signal, by calculating the correlation coefficient between each individual pulse wave and an average (template) pulse wave shape. This index is therefore relative: 1 corresponds to pulse waves that are closest to the average pulse wave shape (i.e. correlate exactly with it) and 0 corresponds to there being no correlation with the average pulse wave shape. Note that 1 does not necessarily mean "good": use this index with care and plot it alongside your PPG signal to see if it makes sense. * The ``"disimilarity"`` method (loosely based on Sabeti et al., 2019) computes a continuous index of quality of the PPG signal, by calculating the level of disimilarity between each individual pulse wave and an average (template) pulse wave shape (after they are normalised). A value of zero indicates no disimilarity (i.e. equivalent pulse wave shapes), whereas values above or below indicate increasing disimilarity. The original method used dynamic time-warping to align the pulse waves prior to calculating the level of dsimilarity, whereas this implementation does not currently include this step. Parameters ---------- ppg_cleaned : Union[list, np.array, pd.Series] The cleaned PPG signal in the form of a vector of values. ppg_pw_peaks : tuple or list The list of PPG pulse wave peak samples returned by ``ppg_peaks()``. If None, peaks is computed from the signal input. sampling_rate : int The sampling frequency of the signal (in Hz, i.e., samples/second). method : str The method for computing PPG signal quality, can be ``"templatematch"`` (default). Returns ------- array Vector containing the quality index ranging from 0 to 1 for ``"templatematch"`` method, or an unbounded value (where 0 indicates high quality) for ``"disimilarity"`` method. See Also -------- ppg_segment References ---------- * Orphanidou, C. et al. (2015). "Signal-quality indices for the electrocardiogram and photoplethysmogram: derivation and applications to wireless monitoring". IEEE Journal of Biomedical and Health Informatics, 19(3), 832-8. Examples -------- * **Example 1:** 'templatematch' method .. ipython:: python import neurokit2 as nk ppg = nk.ppg_simulate(duration=30, sampling_rate=300, heart_rate=80) ppg_cleaned = nk.ppg_clean(ppg, sampling_rate=300) quality = nk.ppg_quality(ppg_cleaned, sampling_rate=300, method="templatematch") @savefig p_ppg_quality.png scale=100% nk.signal_plot([ppg_cleaned, quality], standardize=True) @suppress plt.close() """ method = method.lower() # remove capitalised letters # Run selected quality assessment method if method in ["templatematch"]: quality = _ppg_quality_templatematch( ppg_cleaned, ppg_pw_peaks=ppg_pw_peaks, sampling_rate=sampling_rate ) elif method in ["disimilarity"]: quality = _ppg_quality_disimilarity( ppg_cleaned, ppg_pw_peaks=ppg_pw_peaks, sampling_rate=sampling_rate ) return quality
# ============================================================================= # Calculate a template pulse wave # ============================================================================= def _calc_template_pw(ppg_cleaned, ppg_pw_peaks=None, sampling_rate=1000): # Sanitize inputs if ppg_pw_peaks is None: _, ppg_pw_peaks = ppg_peaks(ppg_cleaned, sampling_rate=sampling_rate) ppg_pw_peaks = ppg_pw_peaks["PPG_Peaks"] # Get heartbeats heartbeats = ppg_segment(ppg_cleaned, ppg_pw_peaks, sampling_rate) pw_data = epochs_to_df(heartbeats).pivot( index="Label", columns="Time", values="Signal" ) pw_data.index = pw_data.index.astype(int) pw_data = pw_data.sort_index() # Filter Nans missing = pw_data.T.isnull().sum().values nonmissing = np.where(missing == 0)[0] pw_data = pw_data.iloc[nonmissing, :] # Find template pulse wave templ_pw = pw_data.mean() return templ_pw, pw_data, ppg_pw_peaks # ============================================================================= # Template-matching method # ============================================================================= def _ppg_quality_templatematch(ppg_cleaned, ppg_pw_peaks=None, sampling_rate=1000): # Obtain individual pulse waves and template pulse wave templ_pw, pw_data, ppg_pw_peaks = _calc_template_pw( ppg_cleaned, ppg_pw_peaks=ppg_pw_peaks, sampling_rate=sampling_rate ) # Find individual correlation coefficients (CCs) cc = np.zeros(len(ppg_pw_peaks)-1) for beat_no in range(0,len(ppg_pw_peaks)-1): temp = np.corrcoef(pw_data.iloc[beat_no], templ_pw) cc[beat_no] = temp[0,1] # Interpolate beat-by-beat CCs quality = signal_interpolate( ppg_pw_peaks[0:-1], cc, x_new=np.arange(len(ppg_cleaned)), method="previous" ) return quality # ============================================================================= # Disimilarity measure method # ============================================================================= def _norm_sum_one(pw): # ensure all values are positive pw = pw - pw.min() + 1 # normalise pulse wave to sum to one pw = [x / sum(pw) for x in pw] return pw def _calc_dis(pw1, pw2): # following the methodology in https://doi.org/10.1016/j.imu.2019.100222 (Sec. 3.1.2.5) # convert to numpy arrays pw1 = np.array(pw1) pw2 = np.array(pw2) # normalise to sum to one pw1 = _norm_sum_one(pw1) pw2 = _norm_sum_one(pw2) # ignore any elements which are zero because log(0) is -inf rel_els = (pw1 != 0) & (pw2 != 0) # calculate disimilarity measure (using pw2 as the template) dis = np.sum(pw2[rel_els] * np.log(pw2[rel_els] / pw1[rel_els])) return dis def _ppg_quality_disimilarity(ppg_cleaned, ppg_pw_peaks=None, sampling_rate=1000): # Obtain individual pulse waves and template pulse wave templ_pw, pw_data, ppg_pw_peaks = _calc_template_pw( ppg_cleaned, ppg_pw_peaks=ppg_pw_peaks, sampling_rate=sampling_rate ) # Find individual disimilarity measures dis = np.zeros(len(ppg_pw_peaks)-1) for beat_no in range(0,len(ppg_pw_peaks)-1): dis[beat_no] = _calc_dis(pw_data.iloc[beat_no], templ_pw) # Interpolate beat-by-beat dis's quality = signal_interpolate( ppg_pw_peaks[0:-1], dis, x_new=np.arange(len(ppg_cleaned)), method="previous" ) return quality