Source code for neurokit2.signal.signal_tidypeaksonsets

# - * - coding: utf-8 - * -
import numpy as np
import scipy.signal

from .signal_formatpeaks import _signal_formatpeaks_sanitize


[docs] def signal_tidypeaksonsets( signal, peaks, onsets, method="Charlton2022", **kwargs, ): """**Tidy peaks and onsets** Tidy up PPG peaks and onsets to make them conform to the following rules: (i) No two points at the same time (ii) At least one local minimum between consecutive peaks (iii) At least one local maximum between consecutive onsets (iv) Alternates between onsets and peaks (v) Starts with onset, and ends with peak (vi) Same number of peaks and onsets Parameters ---------- signal : Union[list, np.array, pd.Series] The signal (i.e., a time series) that contains the peaks and onsets, in the form of a vector of values. peaks : list or array or DataFrame or Series or dict The samples at which the peaks occur. onsets : list or array or DataFrame or Series or dict The samples at which the onsets occur. method : str ``"Charlton2022"`` uses the approach used in the ppg-beats toolbox, Zenodo, https://doi.org/10.5281/zenodo.6037646 This approach was designed for PPG signals. **kwargs Other keyword arguments. Returns ------- peaks_clean : array The corrected peak locations. onsets_clean : array The corrected onset locations. See Also -------- ppg_findpeaks, ppg_peaks References ---------- * Charlton, P. H. et al. (2022). Detecting beats in the photoplethysmogram: benchmarking open-source algorithms. Physiological Measurement, 46, 035002, doi:10.1088/1361-6579/adb89e """ # Format input peaks = _signal_formatpeaks_sanitize(peaks) onsets = _signal_formatpeaks_sanitize(onsets) # If method Kubios if method.lower() == "charlton2022": info, peaks_clean, onsets_clean = _signal_fixpeaks_charlton( signal, peaks, onsets, **kwargs ) else: raise ValueError( "`method` not found. Must be one of the following: 'charlton2022'." ) return info, peaks_clean, onsets_clean
# ============================================================================= # Methods # ============================================================================= # ============================================================================= # Charlton (2022) - main function # ============================================================================= def _signal_fixpeaks_charlton( signal, peaks, onsets, ): """Charlton 2022 method – enforces consistency between peaks and onsets. Tidy up peaks and onsets to make them conform to the following rules: (i) No two points at the same time (ii) At least one local minimum between consecutive peaks (iii) At least one local maximum between consecutive onsets (iv) Alternates between onsets and peaks (v) Starts with onset, and ends with peak (vi) Same number of peaks and onsets """ # (i) No two points at the same time peaks, onsets = _remove_repeated_peaks_and_onsets(peaks, onsets) # (ii) At least one local minimum between consecutive peaks peaks = _ensure_extremum_between(signal, peaks, other_extrema_type='pk') # (iii) At least one local maximum between consecutive onsets onsets = _ensure_extremum_between(signal, onsets, other_extrema_type='on') # (iv) Alternates between onsets and peaks # If there are two consecutive peaks, then insert an onset between them onsets, peaks = _insert_missing_extremum(signal, onsets, peaks, other_extrema_type='pk') # If there are two consecutive onsets, then insert a peak between them peaks, onsets = _insert_missing_extremum(signal, peaks, onsets, other_extrema_type='on') # (v) Starts with onset, and ends with peak peaks, onsets = _ensure_starts_with_onset_ends_with_peak(peaks, onsets) # (vi) same number of peaks and onsets peaks, onsets = _ensure_same_no_peaks_onsets(peaks, onsets) peaks_clean = peaks onsets_clean = onsets info = { "method": "charlton2022", "extra_peaks": [i for i in peaks if i not in peaks_clean], "extra_onsets": [i for i in onsets if i not in onsets_clean], } return info, peaks_clean, onsets_clean # ============================================================================= # Charlton (2022) - helper functions # ============================================================================= def _remove_repeated_peaks_and_onsets(peaks, onsets): # remove any repeated peaks (or onsets) peaks = np.unique(peaks) onsets = np.unique(onsets) # If there is a peak and onset at the same index, then remove them both repeated = np.intersect1d(peaks, onsets) peaks = np.setxor1d(peaks, repeated) onsets = np.setxor1d(onsets, repeated) return peaks, onsets def _ensure_extremum_between(signal, other_extrema, other_extrema_type='pk'): # If there are two peaks (or onsets) without a local minimum (or maximum) between them, # then remove the lower (or higher) one if other_extrema_type == 'pk': extrema = scipy.signal.argrelmin(signal)[0] else: extrema = scipy.signal.argrelmax(signal)[0] other_extrema = list(other_extrema) finished = False while not finished: els_to_remove = [] for i in range(len(other_extrema) - 1): rel_els = np.arange(other_extrema[i] + 1, other_extrema[i + 1]) if not np.intersect1d(rel_els, extrema).size: other_extrema_vals = signal[other_extrema[i:i+2]] if other_extrema_type == 'pk': el_to_remove = int(np.argmin(other_extrema_vals)) else: el_to_remove = int(np.argmax(other_extrema_vals)) els_to_remove.append(i + el_to_remove) if not els_to_remove: finished = True else: for i in reversed(els_to_remove): # remove in reverse to avoid shifting del other_extrema[i] return np.array(other_extrema) def _insert_missing_extremum(signal, extrema, other_extrema, other_extrema_type='pk'): """ If there are two consecutive extrema of one type (known as other_extrema), then insert an extremum of the second type (known as extrema) between them. """ other_extrema_log = np.concatenate([ np.ones(len(other_extrema), dtype=bool), np.zeros(len(extrema), dtype=bool) ]) els = np.concatenate([other_extrema, extrema]) order = np.argsort(els) els = els[order] other_extrema_log = other_extrema_log[order] bad_els = np.where( (np.diff(other_extrema_log) == 0) & (other_extrema_log[:-1]) )[0] # repeated other_extrema if len(bad_els) > 0: # if there is a repeated other extrema for bad_el_no in range(len(bad_els)): # cycle through each repeated other extrema curr_other_extrema = [els[bad_els[bad_el_no]], els[bad_els[bad_el_no] + 1]] start = curr_other_extrema[0] end = curr_other_extrema[1] # Create baseline to remove bw_to_remove = np.linspace(signal[start], signal[end], end - start + 1) # Detrend segment segment = signal[start:end+1] - bw_to_remove # Find extremum in detrended segment if other_extrema_type == 'pk': temp = np.argmin(segment) else: temp = np.argmax(segment) # check this hasn't just detected one of the other_extrema (which can happen with strong baseline wander) if temp == 0 or temp == (end - start): # then just remove the first peak other_extrema = other_extrema[other_extrema != start] else: curr_new_extrema = start + temp extrema = np.sort(np.append(extrema, curr_new_extrema)) return extrema, other_extrema def _ensure_starts_with_onset_ends_with_peak(peaks, onsets): """ Make sure that the first onset is before the first peak, and the last peak is after the last onset """ finished = False while not finished: if len(onsets) > 0 and len(peaks) > 0 and onsets[0] > peaks[0]: peaks = peaks[1:] else: finished = True finished = False while not finished: if len(peaks) > 0 and len(onsets) > 0 and peaks[-1] < onsets[-1]: onsets = onsets[:-1] else: finished = True return peaks, onsets def _ensure_same_no_peaks_onsets(peaks, onsets): """ NB: This doesn't quite ensure the same no of peaks and onsets, it only does it for a specific condition """ # if no peaks (or onsets) were detected, then don't output any indices for either if len(peaks) == 0: onsets = [] if len(onsets) == 0: peaks = [] return peaks, onsets