Source code for neurokit2.rsp.rsp_clean

# -*- coding: utf-8 -*-
from warnings import warn

import numpy as np
import pandas as pd
import scipy.signal

from ..misc import NeuroKitWarning, as_vector
from ..signal import signal_detrend, signal_filter
from ..stats import mad


[docs] def rsp_clean(rsp_signal, sampling_rate=1000, method="khodadad2018", **kwargs): """**Preprocess a respiration (RSP) signal** Clean a respiration signal using different sets of parameters, such as: * **khodadad2018**: Second order 0.05-3 Hz bandpass Butterworth filter. Note that the implementation differs from the referenced paper (see issue #950). * **BioSPPy**: Second order 0.1-0.35 Hz bandpass Butterworth filter followed by a constant detrending). * **hampel**: Applies a median-based Hampel filter by replacing values which are 3 (can be changed via ``threshold``) :func:`.mad` away from the rolling median. Parameters ---------- rsp_signal : Union[list, np.array, pd.Series] The raw respiration channel (as measured, for instance, by a respiration belt). sampling_rate : int, optional The sampling frequency of :func:`.rsp_signal` (in Hz, i.e., samples/second). method : str, optional The processing pipeline to apply. Can be one of ``"khodadad2018"`` (default), ``"biosppy"`` or ``"hampel"``. **kwargs Other arguments to pass to the cleaning method. Returns ------- array Vector containing the cleaned respiratory signal. See Also -------- rsp_findpeaks, signal_rate, rsp_amplitude, rsp_process, rsp_plot Examples -------- .. ipython:: python import pandas as pd import neurokit2 as nk rsp = nk.rsp_simulate(duration=30, sampling_rate=50, noise=0.1) signals = pd.DataFrame({ "RSP_Raw": rsp, "RSP_Khodadad2018": nk.rsp_clean(rsp, sampling_rate=50, method="khodadad2018"), "RSP_BioSPPy": nk.rsp_clean(rsp, sampling_rate=50, method="biosppy"), "RSP_Hampel": nk.rsp_clean(rsp, sampling_rate=50, method="hampel", threshold=3) }) @savefig p_rsp_clean1.png scale=100% signals.plot() @suppress plt.close() References ---------- * Khodadad, D., Nordebo, S., Müller, B., Waldmann, A., Yerworth, R., Becher, T., ... & Bayford, R. (2018). Optimized breath detection algorithm in electrical impedance tomography. Physiological measurement, 39(9), 094001. * Power, J., Lynch, C., Dubin, M., Silver, B., Martin, A., Jones, R.,(2020) Characteristics of respiratory measures in young adults scanned at rest, including systematic changes and “missed” deep breaths. NeuroImage, Volume 204, 116234 """ rsp_signal = as_vector(rsp_signal) # Missing data n_missing = np.sum(np.isnan(rsp_signal)) if n_missing > 0: warn( f"There are {n_missing} missing data points in your signal." " Filling missing values by using the forward filling method.", category=NeuroKitWarning, ) rsp_signal = _rsp_clean_missing(rsp_signal) method = method.lower() # remove capitalised letters if method in ["khodadad", "khodadad2018"]: clean = _rsp_clean_khodadad2018(rsp_signal, sampling_rate) elif method == "biosppy": clean = _rsp_clean_biosppy(rsp_signal, sampling_rate) elif method in ["power", "power2020", "hampel"]: clean = _rsp_clean_hampel( rsp_signal, **kwargs, ) elif method is None or method == "none": clean = rsp_signal else: raise ValueError( "NeuroKit error: rsp_clean(): 'method' should be one of 'khodadad2018', 'biosppy' or 'hampel'." ) return clean
# ============================================================================= # Handle missing data # ============================================================================= def _rsp_clean_missing(rsp_signal): rsp_signal = pd.DataFrame.pad(pd.Series(rsp_signal)) return rsp_signal # ============================================================================= # Khodadad et al. (2018) # ============================================================================= def _rsp_clean_khodadad2018(rsp_signal, sampling_rate=1000): """The algorithm is based on (but not an exact implementation of) the "Zero-crossing algorithm with amplitude threshold" by `Khodadad et al. (2018) <https://iopscience.iop.org/article/10.1088/1361-6579/aad7e6/meta>`_. """ # Slow baseline drifts / fluctuations must be removed from the raw # breathing signal (i.e., the signal must be centered around zero) in order # to be able to reliable detect zero-crossings. # Remove baseline by applying a lowcut at .05Hz (preserves breathing rates # higher than 3 breath per minute) and high frequency noise by applying a # highcut at 3 Hz (preserves breathing rates slower than 180 breath per # minute). clean = signal_filter( rsp_signal, sampling_rate=sampling_rate, lowcut=0.05, highcut=3, order=2, method="butterworth", ) return clean # ============================================================================= # BioSPPy # ============================================================================= def _rsp_clean_biosppy(rsp_signal, sampling_rate=1000): """Uses the same defaults as `BioSPPy. <https://github.com/PIA-Group/BioSPPy/blob/master/biosppy/signals/resp.py>`_. """ # Parameters order = 2 frequency = [0.1, 0.35] # Normalize frequency to Nyquist Frequency (Fs/2). frequency = 2 * np.array(frequency) / sampling_rate # Filtering b, a = scipy.signal.butter(N=order, Wn=frequency, btype="bandpass", analog=False) filtered = scipy.signal.filtfilt(b, a, rsp_signal) # Baseline detrending clean = signal_detrend(filtered, order=0) return clean # ============================================================================= # Hampel filter # ============================================================================= def _rsp_clean_hampel(rsp_signal, sampling_rate=1000, window_length=0.1, threshold=3, **kwargs): """Explanation MatLabs' https://www.mathworks.com/help/dsp/ref/hampelfilter.html. From https://stackoverflow.com/a/51731332. Parameters ---------- rsp_signal : Union[list, np.array, pd.Series] The raw respiration channel (as measured, for instance, by a respiration belt). window_length : int, optional Window to be considered when cleaning, by default 0.1. In seconds. threshold : float, optional Threshold of deviations after which a point is considered an outlier, by default 3. """ # Get window length in samples window_length = int(window_length * sampling_rate) # Convert to Series to use its rolling methods rsp_signal = pd.Series(rsp_signal) rolling_median = rsp_signal.rolling(window=window_length, center=True).median() rolling_MAD = rsp_signal.rolling(window=window_length, center=True).apply(mad) threshold = threshold * rolling_MAD difference = np.abs(rsp_signal - rolling_median) # Find outliers outlier_idx = difference > threshold # Substitute outliers with rolling median rsp_signal[outlier_idx] = rolling_median[outlier_idx] return as_vector(rsp_signal)