Source code for neurokit2.ppg.ppg_findpeaks

# -*- coding: utf-8 -*-

import matplotlib.pyplot as plt
import numpy as np
import scipy.signal

from ..signal import signal_smooth


[docs] def ppg_findpeaks( ppg_cleaned, sampling_rate=1000, method="elgendi", show=False, **kwargs ): """**Find systolic peaks in a photoplethysmogram (PPG) signal** Low-level function used by :func:`ppg_peaks` to identify peaks in a PPG signal using a different set of algorithms. Use the main function and see its documentation for details. Parameters ---------- ppg_cleaned : Union[list, np.array, pd.Series] The cleaned PPG channel as returned by :func:`.ppg_clean`. sampling_rate : int The sampling frequency of the PPG (in Hz, i.e., samples/second). The default is 1000. method : str The processing pipeline to apply. Can be one of ``"elgendi"``, ``"bishop"``. The default is ``"elgendi"``. show : bool If ``True``, returns a plot of the thresholds used during peak detection. Useful for debugging. The default is ``False``. Returns ------- info : dict A dictionary containing additional information, in this case the samples at which systolic peaks occur, accessible with the key ``"PPG_Peaks"``. See Also -------- ppg_simulate, ppg_clean Examples -------- .. ipython:: python import neurokit2 as nk import matplotlib.pyplot as plt ppg = nk.ppg_simulate(heart_rate=75, duration=20, sampling_rate=50) ppg_clean = nk.ppg_clean(ppg, sampling_rate=50) @savefig p_ppg_findpeaks1.png scale=100% peaks = nk.ppg_findpeaks(ppg_clean, sampling_rate=100, show=True) @suppress plt.close() # Method by Bishop et al., (2018) @savefig p_ppg_findpeaks2.png scale=100% peaks = nk.ppg_findpeaks(ppg, method="bishop", show=True) @suppress plt.close() References ---------- * Elgendi, M., Norton, I., Brearley, M., Abbott, D., & Schuurmans, D. (2013). Systolic peak detection in acceleration photoplethysmograms measured from emergency responders in tropical conditions. PloS one, 8(10), e76585. * Bishop, S. M., & Ercole, A. (2018). Multi-scale peak and trough detection optimised for periodic and quasi-periodic neuroscience data. In Intracranial Pressure & Neuromonitoring XVI (pp. 189-195). Springer International Publishing. * Charlton, P. H. et al. (2024). MSPTDfast: An Efficient Photoplethysmography Beat Detection Algorithm. Proc CinC. """ method = method.lower() if method in ["elgendi"]: peaks = _ppg_findpeaks_elgendi(ppg_cleaned, sampling_rate, show=show, **kwargs) elif method in ["msptd", "bishop2018", "bishop"]: peaks, _ = _ppg_findpeaks_bishop(ppg_cleaned, show=show, **kwargs) elif method in ["msptdfast", "msptdfastv1", "charlton2024", "charlton"]: peaks, onsets = _ppg_findpeaks_charlton(ppg_cleaned, sampling_rate, show=show, **kwargs) else: raise ValueError( "`method` not found. Must be one of the following: 'elgendi', 'bishop', 'charlton'." ) # Prepare output. info = {"PPG_Peaks": peaks} if 'onsets' in locals(): info["PPG_Onsets"] = onsets return info
def _ppg_findpeaks_elgendi( signal, sampling_rate=1000, peakwindow=0.111, beatwindow=0.667, beatoffset=0.02, mindelay=0.3, show=False, ): """Implementation of Elgendi M, Norton I, Brearley M, Abbott D, Schuurmans D (2013) Systolic Peak Detection in Acceleration Photoplethysmograms Measured from Emergency Responders in Tropical Conditions. PLoS ONE 8(10): e76585. doi:10.1371/journal.pone.0076585. All tune-able parameters are specified as keyword arguments. `signal` must be the bandpass-filtered raw PPG with a lowcut of .5 Hz, a highcut of 8 Hz. """ if show: _, (ax0, ax1) = plt.subplots(nrows=2, ncols=1, sharex=True) ax0.plot(signal, label="filtered") # Ignore the samples with negative amplitudes and square the samples with # values larger than zero. signal_abs = signal.copy() signal_abs[signal_abs < 0] = 0 sqrd = signal_abs**2 # Compute the thresholds for peak detection. Call with show=True in order # to visualize thresholds. ma_peak_kernel = int(np.rint(peakwindow * sampling_rate)) ma_peak = signal_smooth(sqrd, kernel="boxcar", size=ma_peak_kernel) ma_beat_kernel = int(np.rint(beatwindow * sampling_rate)) ma_beat = signal_smooth(sqrd, kernel="boxcar", size=ma_beat_kernel) thr1 = ma_beat + beatoffset * np.mean(sqrd) # threshold 1 if show: ax1.plot(sqrd, label="squared") ax1.plot(thr1, label="threshold") ax1.legend(loc="upper right") # Identify start and end of PPG waves. waves = ma_peak > thr1 beg_waves = np.where(np.logical_and(np.logical_not(waves[0:-1]), waves[1:]))[0] end_waves = np.where(np.logical_and(waves[0:-1], np.logical_not(waves[1:])))[0] # Throw out wave-ends that precede first wave-start. end_waves = end_waves[end_waves > beg_waves[0]] # Identify systolic peaks within waves (ignore waves that are too short). num_waves = min(beg_waves.size, end_waves.size) min_len = int( np.rint(peakwindow * sampling_rate) ) # this is threshold 2 in the paper min_delay = int(np.rint(mindelay * sampling_rate)) peaks = [0] for i in range(num_waves): beg = beg_waves[i] end = end_waves[i] len_wave = end - beg if len_wave < min_len: continue # Visualize wave span. if show: ax1.axvspan(beg, end, facecolor="m", alpha=0.5) # Find local maxima and their prominence within wave span. data = signal[beg:end] locmax, props = scipy.signal.find_peaks(data, prominence=(None, None)) if locmax.size > 0: # Identify most prominent local maximum. peak = beg + locmax[np.argmax(props["prominences"])] # Enforce minimum delay between peaks. if peak - peaks[-1] > min_delay: peaks.append(peak) peaks.pop(0) if show: ax0.scatter(peaks, signal_abs[peaks], c="r") ax0.legend(loc="upper right") ax0.set_title("PPG Peaks (Method by Elgendi et al., 2013)") peaks = np.asarray(peaks).astype(int) return peaks def _ppg_findpeaks_bishop( signal, show=False, ): """Implementation of Bishop SM, Ercole A (2018) Multi-scale peak and trough detection optimised for periodic and quasi-periodic neuroscience data. doi:10.1007/978-3-319-65798-1_39. Currently designed for short signals of relatively low sampling frequencies (e.g. 6 seconds at 100 Hz). Also, the function currently only returns peaks, but it does identify pulse onsets too. """ # TODO: create ppg_peaks() that also returns onsets and stuff # Setup N = len(signal) L = int(np.ceil(N / 2) - 1) # Step 1: calculate local maxima and local minima scalograms # - detrend: this removes the best-fit straight line x = scipy.signal.detrend(signal, type="linear") # - initialise LMS matrices m_max = np.full((L, N), False) m_min = np.full((L, N), False) # - populate LMS matrices for k in range(1, L): # scalogram scales for i in range(k + 2, N - k + 1): if x[i - 1] > x[i - k - 1] and x[i - 1] > x[i + k - 1]: m_max[k - 1, i - 1] = True if x[i - 1] < x[i - k - 1] and x[i - 1] < x[i + k - 1]: m_min[k - 1, i - 1] = True # Step 2: find the scale with the most local maxima (or local minima) # - row-wise summation (i.e. sum each row) gamma_max = np.sum(m_max, axis=1) # the "axis=1" option makes it row-wise gamma_min = np.sum(m_min, axis=1) # - find scale with the most local maxima (or local minima) lambda_max = np.argmax(gamma_max) lambda_min = np.argmax(gamma_min) # Step 3: Use lambda to remove all elements of m for which k>lambda m_max = m_max[: (lambda_max + 1), :] m_min = m_min[: (lambda_min + 1), :] # Step 4: Find peaks (and onsets) # - column-wise summation m_max_sum = np.sum(m_max == False, axis=0) m_min_sum = np.sum(m_min == False, axis=0) peaks = np.where(m_max_sum == 0)[0].astype(int) onsets = np.where(m_min_sum == 0)[0].astype(int) if show: _, ax0 = plt.subplots(nrows=1, ncols=1, sharex=True) ax0.plot(signal, label="signal") ax0.scatter(peaks, signal[peaks], c="r") ax0.scatter(onsets, signal[onsets], c="b") ax0.set_title("PPG Peaks (Method by Bishop et al., 2018)") return peaks, onsets def _ppg_findpeaks_charlton( signal, sampling_rate=1000, show=False, ): """Implementation of Charlton et al (2024) MSPTDfast: An Efficient Photoplethysmography Beat Detection Algorithm. 2024 Computing in Cardiology (CinC), Karlsruhe, Germany, doi:10.1101/2024.07.18.24310627. """ # Inner functions def find_m_max(x, N, max_scale, m_max): """Find local maxima scalogram for peaks """ for k in range(1, max_scale + 1): # scalogram scales for i in range(k + 2, N - k + 2): if x[i - 2] > x[i - k - 2] and x[i - 2] > x[i + k - 2]: m_max[k - 1, i - 2] = True return m_max def find_m_min(x, N, max_scale, m_min): """Find local minima scalogram for onsets """ for k in range(1, max_scale + 1): # scalogram scales for i in range(k + 2, N - k + 2): if x[i - 2] < x[i - k - 2] and x[i - 2] < x[i + k - 2]: m_min[k - 1, i - 2] = True return m_min def find_lms_using_msptd_approach(max_scale, x, options): """Find local maxima (or minima) scalogram(s) using the MSPTD approach """ # Setup N = len(x) # Find local maxima scalogram (if required) if options["find_pks"]: m_max = np.full((max_scale, N), False) # matrix for maxima m_max = find_m_max(x, N, max_scale, m_max) else: m_max = None # Find local minima scalogram (if required) if options["find_trs"]: m_min = np.full((max_scale, N), False) # matrix for minima m_min = find_m_min(x, N, max_scale, m_min) else: m_min = None return m_max, m_min def downsample(win_sig, ds_factor): """Downsamples signal by picking out every nth sample, where n is specified by ds_factor """ return win_sig[::ds_factor] def detect_peaks_and_onsets_using_msptd(signal, fs, options): """Detect peaks and onsets in a PPG signal using a modified MSPTD approach (where the modifications are those specified in Charlton et al. 2024) """ # Setup N = len(signal) L = int(np.ceil(N / 2) - 1) # Step 0: Don't calculate scales outside the range of plausible HRs plaus_hr_hz = np.array(options['plaus_hr_bpm']) / 60 # in Hz init_scales = np.arange(1, L + 1) durn_signal = len(signal) / fs init_scales_fs = (L / init_scales) / durn_signal if options['use_reduced_lms_scales']: init_scales_inc_log = init_scales_fs >= plaus_hr_hz[0] else: init_scales_inc_log = np.ones_like(init_scales_fs, dtype=bool) # DIDN"T FULLY UNDERSTAND max_scale_index = np.where(init_scales_inc_log)[0] # DIDN"T FULLY UNDERSTAND THIS AND NEXT FEW LINES if max_scale_index.size > 0: max_scale = max_scale_index[-1] + 1 # Add 1 to convert from 0-based to 1-based index else: max_scale = None # Or handle the case where no scales are valid # Step 1: calculate local maxima and local minima scalograms # - detrend x = scipy.signal.detrend(signal, type="linear") # - populate LMS matrices [m_max, m_min] = find_lms_using_msptd_approach(max_scale, x, options) # Step 2: find the scale with the most local maxima (or local minima) # - row-wise summation (i.e. sum each row) if options["find_pks"]: gamma_max = np.sum(m_max, axis=1) # the "axis=1" option makes it row-wise if options["find_trs"]: gamma_min = np.sum(m_min, axis=1) # - find scale with the most local maxima (or local minima) if options["find_pks"]: lambda_max = np.argmax(gamma_max) if options["find_trs"]: lambda_min = np.argmax(gamma_min) # Step 3: Use lambda to remove all elements of m for which k>lambda first_scale_to_include = np.argmax(init_scales_inc_log) if options["find_pks"]: m_max = m_max[first_scale_to_include:lambda_max + 1, :] if options["find_trs"]: m_min = m_min[first_scale_to_include:lambda_min + 1, :] # Step 4: Find peaks (and onsets) # - column-wise summation if options["find_pks"]: m_max_sum = np.sum(m_max == False, axis=0) peaks = np.where(m_max_sum == 0)[0].astype(int) else: peaks = [] if options["find_trs"]: m_min_sum = np.sum(m_min == False, axis=0) onsets = np.where(m_min_sum == 0)[0].astype(int) else: onsets = [] return peaks, onsets # ~~~ Main function ~~~ # Specify settings # - version: optimal selection (CinC 2024) options = { 'find_trs': True, # whether or not to find onsets 'find_pks': True, # whether or not to find peaks 'do_ds': True, # whether or not to do downsampling 'ds_freq': 20, # the target downsampling frequency 'use_reduced_lms_scales': True, # whether or not to reduce the number of scales (default 30 bpm) 'win_len': 8, # duration of individual windows for analysis 'win_overlap': 0.2, # proportion of window overlap 'plaus_hr_bpm': [30, 200] # range of plausible HRs (only the lower bound is used) } # Split into overlapping windows no_samps_in_win = options["win_len"] * sampling_rate if len(signal) <= no_samps_in_win: win_starts = 0 win_ends = len(signal) - 1 else: win_offset = round(no_samps_in_win * (1 - options["win_overlap"])) win_starts = list(range(0, len(signal) - no_samps_in_win + 1, win_offset)) win_ends = [start + 1 + no_samps_in_win for start in win_starts] if win_ends[-1] < len(signal): win_starts.append(len(signal) - 1 - no_samps_in_win) win_ends.append(len(signal)) # this ensures that the windows include the entire signal duration # Set up downsampling if the sampling frequency is particularly high if options["do_ds"]: min_fs = options["ds_freq"] if sampling_rate > min_fs: ds_factor = int(np.floor(sampling_rate / min_fs)) ds_fs = sampling_rate / np.floor(sampling_rate / min_fs) else: options["do_ds"] = False # detect peaks and onsets in each window peaks = [] onsets = [] # cycle through each window for win_no in range(len(win_starts)): # Extract this window's data win_sig = signal[win_starts[win_no]:win_ends[win_no]] # Downsample signal if options['do_ds']: rel_sig = downsample(win_sig, ds_factor) rel_fs = ds_fs else: rel_sig = win_sig rel_fs = sampling_rate # Detect peaks and onsets p, t = detect_peaks_and_onsets_using_msptd(rel_sig, rel_fs, options) # Resample peaks if options['do_ds']: p = [peak * ds_factor for peak in p] t = [onset * ds_factor for onset in t] # Correct peak indices by finding highest point within tolerance either side of detected peaks tol_durn = 0.05 if rel_fs < 10: tol_durn = 0.2 elif rel_fs < 20: tol_durn = 0.1 tol = int(np.ceil(rel_fs * tol_durn)) for pk_no in range(len(p)): segment = win_sig[(p[pk_no] - tol):(p[pk_no] + tol + 1)] temp = np.argmax(segment) p[pk_no] = p[pk_no] - tol + temp # Correct onset indices by finding highest point within tolerance either side of detected onsets for onset_no in range(len(t)): segment = win_sig[(t[onset_no] - tol):(t[onset_no] + tol + 1)] temp = np.argmin(segment) t[onset_no] = t[onset_no] - tol + temp # Store peaks and onsets win_peaks = [peak + win_starts[win_no] for peak in p] peaks.extend(win_peaks) win_onsets = [onset + win_starts[win_no] for onset in t] onsets.extend(win_onsets) # Tidy up detected peaks and onsets (by ordering them and only retaining unique ones) peaks = sorted(set(peaks)) onsets = sorted(set(onsets)) # Plot results (optional) if show: _, ax0 = plt.subplots(nrows=1, ncols=1, sharex=True) ax0.plot(signal, label="signal") ax0.scatter(peaks, signal[peaks], c="r") ax0.scatter(onsets, signal[onsets], c="b") ax0.set_title("PPG Onsets (Method by Charlton et al., 2024)") return peaks, onsets