Source code for neurokit2.signal.signal_detrend

# -*- coding: utf-8 -*-
import numpy as np
import scipy.sparse

from ..stats import fit_loess, fit_polynomial
from .signal_decompose import signal_decompose


[docs] def signal_detrend( signal, method="polynomial", order=1, regularization=500, alpha=0.75, window=1.5, stepsize=0.02, components=[-1], sampling_rate=1000, ): """**Signal Detrending** Apply a baseline (order = 0), linear (order = 1), or polynomial (order > 1) detrending to the signal (i.e., removing a general trend). One can also use other methods, such as smoothness priors approach described by Tarvainen (2002) or LOESS regression, but these scale badly for long signals. Parameters ---------- signal : Union[list, np.array, pd.Series] The signal (i.e., a time series) in the form of a vector of values. method : str Can be one of ``"polynomial"`` (default; traditional detrending of a given order) or ``"tarvainen2002"`` to use the smoothness priors approach described by Tarvainen (2002) (mostly used in HRV analyses as a lowpass filter to remove complex trends), ``"loess"`` for LOESS smoothing trend removal or ``"locreg"`` for local linear regression (the *'runline'* algorithm from chronux). order : int Only used if ``method`` is ``"polynomial"``. The order of the polynomial. 0, 1 or > 1 for a baseline ('constant detrend', i.e., remove only the mean), linear (remove the linear trend) or polynomial detrending, respectively. Can also be ``"auto"``, in which case it will attempt to find the optimal order to minimize the RMSE. regularization : int Only used if ``method="tarvainen2002"``. The regularization parameter (default to 500). alpha : float Only used if ``method`` is "loess". The parameter which controls the degree of smoothing. window : float Only used if ``method`` is "locreg". The detrending ``window`` should correspond to the 1 divided by the desired low-frequency band to remove (``window = 1 / detrend_frequency``) For instance, to remove frequencies below ``0.67Hz`` the window should be ``1.5`` (``1 / 0.67 = 1.5``). stepsize : float Only used if ``method`` is ``"locreg"``. components : list Only used if ``method`` is ``"EMD"``. What Intrinsic Mode Functions (IMFs) from EMD to remove. By default, the last one. sampling_rate : int, optional Only used if ``method`` is "locreg". Sampling rate (Hz) of the signal. If not None, the ``stepsize`` and ``window`` arguments will be multiplied by the sampling rate. By default 1000. Returns ------- array Vector containing the detrended signal. See Also -------- signal_filter, fit_loess, signal_decompose Examples -------- .. ipython:: python import numpy as np import pandas as pd import neurokit2 as nk import matplotlib.pyplot as plt # Simulate signal with low and high frequency signal = nk.signal_simulate(frequency=[0.1, 2], amplitude=[2, 0.5], sampling_rate=100) signal = signal + (3 + np.linspace(0, 6, num=len(signal))) # Add baseline and linear trend # Apply detrending algorithms # --------------------------- # Method 1: Default Polynomial Detrending of a Given Order # Constant detrend (removes the mean) baseline = nk.signal_detrend(signal, order=0) # Linear Detrend (removes the linear trend) linear = nk.signal_detrend(signal, order=1) # Polynomial Detrend (removes the polynomial trend) quadratic = nk.signal_detrend(signal, order=2) # Quadratic detrend cubic = nk.signal_detrend(signal, order=3) # Cubic detrend poly10 = nk.signal_detrend(signal, order=10) # Linear detrend (10th order) # Method 2: Tarvainen's smoothness priors approach (Tarvainen et al., 2002) tarvainen = nk.signal_detrend(signal, method="tarvainen2002") # Method 3: LOESS smoothing trend removal loess = nk.signal_detrend(signal, method="loess") # Method 4: Local linear regression (100Hz) locreg = nk.signal_detrend(signal, method="locreg", window=1.5, stepsize=0.02, sampling_rate=100) # Method 5: EMD emd = nk.signal_detrend(signal, method="EMD", components=[-2, -1]) # Visualize different methods @savefig signal_detrend1.png scale=100% axes = pd.DataFrame({"Original signal": signal, "Baseline": baseline, "Linear": linear, "Quadratic": quadratic, "Cubic": cubic, "Polynomial (10th)": poly10, "Tarvainen": tarvainen, "LOESS": loess, "Local Regression": locreg, "EMD": emd}).plot(subplots=True) # Plot horizontal lines to better visualize the detrending for subplot in axes: subplot.axhline(y=0, color="k", linestyle="--") @suppress plt.close() References ---------- * Tarvainen, M. P., Ranta-Aho, P. O., & Karjalainen, P. A. (2002). An advanced detrending method with application to HRV analysis. IEEE Transactions on Biomedical Engineering, 49(2), 172-175 """ signal = np.array(signal) # Force vector method = method.lower() if method in ["tarvainen", "tarvainen2002"]: detrended = _signal_detrend_tarvainen2002(signal, regularization) elif method in ["poly", "polynomial"]: detrended = signal - fit_polynomial(signal, X=None, order=order)[0] elif method in ["loess", "lowess"]: detrended = signal - fit_loess(signal, alpha=alpha)[0] elif method in ["locdetrend", "runline", "locreg", "locregression"]: detrended = _signal_detrend_locreg( signal, window=window, stepsize=stepsize, sampling_rate=sampling_rate ) elif method in ["emd"]: detrended = _signal_detrend_emd(signal, components=components) else: raise ValueError( "NeuroKit error: signal_detrend(): 'method' should be one of 'polynomial', 'loess'" + "'locreg', 'EMD' or 'tarvainen2002'." ) return detrended
# ============================================================================= # Internals # ============================================================================= def _signal_detrend_tarvainen2002(signal, regularization=500): """Method by Tarvainen et al., 2002. - Tarvainen, M. P., Ranta-Aho, P. O., & Karjalainen, P. A. (2002). An advanced detrending method with application to HRV analysis. IEEE Transactions on Biomedical Engineering, 49(2), 172-175. """ N = len(signal) identity = np.eye(N) B = np.dot(np.ones((N - 2, 1)), np.array([[1, -2, 1]])) D_2 = scipy.sparse.dia_matrix((B.T, [0, 1, 2]), shape=(N - 2, N)) # pylint: disable=E1101 inv = np.linalg.inv(identity + regularization**2 * D_2.T @ D_2) z_stat = ((identity - inv)) @ signal trend = np.squeeze(np.asarray(signal - z_stat)) # detrend return signal - trend def _signal_detrend_locreg(signal, window=1.5, stepsize=0.02, sampling_rate=1000): """Local linear regression ('runline' algorithm from chronux). Based on https://github.com/sappelhoff/pyprep. - http://chronux.org/chronuxFiles/Documentation/chronux/spectral_analysis/continuous/locdetrend.html - https://github.com/sappelhoff/pyprep/blob/master/pyprep/removeTrend.py - https://github.com/VisLab/EEG-Clean-Tools/blob/master/PrepPipeline/utilities/localDetrend.m """ length = len(signal) # Sanitize input if sampling_rate is None: sampling_rate = 1 # Sanity checks window = int(window * sampling_rate) stepsize = int(stepsize * sampling_rate) if window > length: raise ValueError( "NeuroKit error: signal_detrend(): 'window' should be " "less than the number of samples. Try using 1.5 * sampling rate." ) if stepsize <= 1: raise ValueError( "NeuroKit error: signal_detrend(): 'stepsize' should be more than 1. Increase its value." ) y_line = np.zeros((length, 1)) norm = np.zeros((length, 1)) nwin = int(np.ceil((length - window) / stepsize)) yfit = np.zeros((nwin, window)) xwt = (np.arange(1, window + 1) - window / 2) / (window / 2) wt = np.power(1 - np.power(np.absolute(xwt), 3), 3) # pylint: disable=E1111 for j in range(0, nwin): tseg = signal[(stepsize * j) : (stepsize * j + window)] y1 = np.mean(tseg) y2 = np.mean(np.multiply(np.arange(1, window + 1), tseg)) * (2 / (window + 1)) a = np.multiply(np.subtract(y2, y1), 6 / (window - 1)) b = np.subtract(y1, a * (window + 1) / 2) # pylint: disable=E1111 yfit[j, :] = np.multiply(np.arange(1, window + 1), a) + b y_line[(j * stepsize) : (j * stepsize + window)] = y_line[ (j * stepsize) : (j * stepsize + window) ] + np.reshape(np.multiply(yfit[j, :], wt), (window, 1)) norm[(j * stepsize) : (j * stepsize + window)] = norm[ (j * stepsize) : (j * stepsize + window) ] + np.reshape(wt, (window, 1)) above_norm = np.where(norm[:, 0] > 0) y_line[above_norm] = y_line[above_norm] / norm[above_norm] indx = (nwin - 1) * stepsize + window - 1 npts = length - indx + 1 y_line[indx - 1 :] = np.reshape( (np.multiply(np.arange(window + 1, window + npts + 1), a) + b), (npts, 1) ) detrended = signal - y_line[:, 0] return detrended def _signal_detrend_emd(signal, components=-1): """The function calculates the Intrinsic Mode Functions (IMFs) of a given timeseries, subtracts the user-chosen IMFs for detrending, and returns the detrended timeseries.""" # Calculate the IMFs imfs = signal_decompose(signal, method="emd") return signal - np.sum(imfs[components, :], axis=0)