Source code for neurokit2.hrv.hrv_rsa

# -*- coding: utf-8 -*-
from warnings import warn

import numpy as np
import pandas as pd
import scipy.linalg

from ..ecg.ecg_rsp import ecg_rsp
from ..misc import NeuroKitWarning
from ..rsp import rsp_process
from ..signal import (
    signal_filter,
    signal_interpolate,
    signal_rate,
    signal_resample,
    signal_timefrequency,
)
from ..signal.signal_formatpeaks import _signal_formatpeaks_sanitize
from .hrv_utils import _hrv_format_input, _hrv_get_rri
from .intervals_process import intervals_process


[docs] def hrv_rsa( ecg_signals, rsp_signals=None, rpeaks=None, sampling_rate=1000, continuous=False, window=None, window_number=None, ): """**Respiratory Sinus Arrhythmia (RSA)** Respiratory sinus arrhythmia (RSA), also referred to as 'cardiac coherence' or 'physiological coherence' (though these terms are often encompassing a wider meaning), is the naturally occurring variation in heart rate during the breathing cycle. Metrics to quantify it are often used as a measure of parasympathetic nervous system activity. Neurophysiology informs us that the functional output of the myelinated vagus originating from the nucleus ambiguous has a respiratory rhythm. Thus, there would a temporal relation between the respiratory rhythm being expressed in the firing of these efferent pathways and the functional effect on the heart rate rhythm manifested as RSA. Several methods exist to quantify RSA: * The **Peak-to-trough (P2T)** algorithm measures the statistical range in milliseconds of the heart period oscillation associated with synchronous respiration. Operationally, subtracting the shortest heart period during inspiration from the longest heart period during a breath cycle produces an estimate of RSA during each breath. The peak-to-trough method makes no statistical assumption or correction (e.g., adaptive filtering) regarding other sources of variance in the heart period time series that may confound, distort, or interact with the metric such as slower periodicities and baseline trend. Although it has been proposed that the P2T method "acts as a time-domain filter dynamically centered at the exact ongoing respiratory frequency" (Grossman, 1992), the method does not transform the time series in any way, as a filtering process would. Instead the method uses knowledge of the ongoing respiratory cycle to associate segments of the heart period time series with either inhalation or exhalation (Lewis, 2012). * The **Porges-Bohrer (PB)** algorithm assumes that heart period time series reflect the sum of several component time series. Each of these component time series may be mediated by different neural mechanisms and may have different statistical features. The Porges-Bohrer method applies an algorithm that selectively extracts RSA, even when the periodic process representing RSA is superimposed on a complex baseline that may include aperiodic and slow periodic processes. Since the method is designed to remove sources of variance in the heart period time series other than the variance within the frequency band of spontaneous breathing, the method is capable of accurately quantifying RSA when the signal to noise ratio is low. Parameters ---------- ecg_signals : DataFrame DataFrame obtained from :func:`.ecg_process`. Should contain columns ``ECG_Rate`` and ``ECG_R_Peaks``. Can also take a DataFrame comprising of both ECG and RSP signals, generated by :func:`.bio_process`. rsp_signals : DataFrame DataFrame obtained from :func:`.rsp_process`. Should contain columns ``RSP_Phase`` and ``RSP_PhaseCompletion``. No impact when a DataFrame comprising of both the ECG and RSP signals are passed as ``ecg_signals``. Defaults to ``None``. rpeaks : dict The samples at which the R-peaks of the ECG signal occur. Dict returned by :func:`.ecg_peaks`, :func:`.ecg_process`, or :func:`.bio_process`. Defaults to ``None``. sampling_rate : int The sampling frequency of signals (in Hz, i.e., samples/second). continuous : bool If ``False``, will return RSA properties computed from the data (one value per index). If ``True``, will return continuous estimations of RSA of the same length as the signal. See below for more details. window : int For calculating RSA second by second. Length of each segment in seconds. If ``None`` (default), window will be set at 32 seconds. window_number : int Between 2 and 8. For calculating RSA second by second. Number of windows to be calculated in Peak Matched Multiple Window. If ``None`` (default), window_number will be set at 8. Returns ---------- rsa : dict A dictionary containing the RSA features, which includes: * ``"RSA_P2T_Values"``: the estimate of RSA during each breath cycle, produced by subtracting the shortest heart period (or RR interval) from the longest heart period in ms. * ``"RSA_P2T_Mean"``: the mean peak-to-trough across all cycles in ms * ``"RSA_P2T_Mean_log"``: the logarithm of the mean of RSA estimates. * ``"RSA_P2T_SD"``: the standard deviation of all RSA estimates. * ``"RSA_P2T_NoRSA"``: the number of breath cycles from which RSA could not be calculated. * ``"RSA_PorgesBohrer"``: the Porges-Bohrer estimate of RSA, optimal when the signal to noise ratio is low, in ``ln(ms^2)``. Example ---------- .. ipython:: python import neurokit2 as nk # Download data data = nk.data("bio_eventrelated_100hz") # Process the data ecg_signals, info = nk.ecg_process(data["ECG"], sampling_rate=100) rsp_signals, _ = nk.rsp_process(data["RSP"], sampling_rate=100) # Get RSA features nk.hrv_rsa(ecg_signals, rsp_signals, info, sampling_rate=100, continuous=False) # Get RSA as a continuous signal rsa = nk.hrv_rsa(ecg_signals, rsp_signals, info, sampling_rate=100, continuous=True) rsa @savefig hrv_rsa1.png scale=100% nk.signal_plot([ecg_signals["ECG_Rate"], rsp_signals["RSP_Rate"], rsa], standardize=True) @suppress plt.close() References ------------ * Servant, D., Logier, R., Mouster, Y., & Goudemand, M. (2009). La variabilité de la fréquence cardiaque. Intérêts en psychiatrie. L'Encéphale, 35(5), 423-428. * Lewis, G. F., Furman, S. A., McCool, M. F., & Porges, S. W. (2012). Statistical strategies to quantify respiratory sinus arrhythmia: Are commonly used metrics equivalent?. Biological psychology, 89(2), 349-364. * Zohar, A. H., Cloninger, C. R., & McCraty, R. (2013). Personality and heart rate variability: exploring pathways from personality to cardiac coherence and health. Open Journal of Social Sciences, 1(06), 32. """ signals, ecg_period, rpeaks, sampling_rate = _hrv_rsa_formatinput( ecg_signals, rsp_signals, rpeaks, sampling_rate ) # Extract cycles rsp_cycles = _hrv_rsa_cycles(signals) rsp_onsets = rsp_cycles["RSP_Inspiration_Onsets"] rsp_peaks = np.argwhere(signals["RSP_Peaks"].values == 1)[:, 0] rsp_peaks = np.array(rsp_peaks)[rsp_peaks > rsp_onsets[0]] if len(rsp_peaks) - len(rsp_onsets) == 0: rsp_peaks = rsp_peaks[:-1] if len(rsp_peaks) - len(rsp_onsets) != -1: warn( "Couldn't find rsp cycles onsets and centers. Check your RSP signal." + " Returning empty dict.", category=NeuroKitWarning, ) return {} # Methods ------------------------ # Peak-to-Trough rsa_p2t = _hrv_rsa_p2t( rsp_onsets, rpeaks, sampling_rate, continuous=continuous, ecg_period=ecg_period, rsp_peaks=rsp_peaks, ) # Porges-Bohrer rsa_pb = _hrv_rsa_pb(ecg_period, sampling_rate, continuous=continuous) # RSAsecondbysecond if window is None: window = 32 # 32 seconds input_duration = rpeaks[-1] / sampling_rate if input_duration >= window: rsa_gates = _hrv_rsa_gates( ecg_signals, rpeaks, sampling_rate=sampling_rate, window=window, window_number=window_number, continuous=continuous, ) else: warn( f"The duration of recording is shorter than the duration of the window ({window}" " seconds). Returning RSA by Gates method as Nan. Consider using a longer recording.", category=NeuroKitWarning, ) if continuous is False: rsa_gates = np.nan else: rsa_gates = np.full(len(rsa_p2t), np.nan) if continuous is False: rsa = {} # Initialize empty dict rsa.update(rsa_p2t) rsa.update(rsa_pb) rsa.update(rsa_gates) else: rsa = pd.DataFrame({"RSA_P2T": rsa_p2t, "RSA_Gates": rsa_gates}) return rsa
# ============================================================================= # Methods (Domains) # ============================================================================= def _hrv_rsa_p2t( rsp_onsets, rpeaks, sampling_rate, continuous=False, ecg_period=None, rsp_peaks=None ): """Peak-to-trough algorithm (P2T)""" # Find all RSP cycles and the Rpeaks within cycles_rri_inh = [] cycles_rri_exh = [] # Add 750 ms offset to exhalation peak and end of the cycle in order to include next RRI # (see Grossman, 1990) rsp_offset = 0.75 * sampling_rate for idx in range(len(rsp_onsets) - 1): cycle_init = rsp_onsets[idx] rsp_peak_offset = rsp_peaks[idx] + rsp_offset rsp_peak = rsp_peaks[idx] cycle_end = rsp_onsets[idx + 1] + rsp_offset # Separately select RRI for inhalation and exhalation cycles_rri_inh.append( rpeaks[np.logical_and(rpeaks >= cycle_init, rpeaks < rsp_peak_offset)] ) cycles_rri_exh.append( rpeaks[np.logical_and(rpeaks >= rsp_peak, rpeaks < cycle_end)] ) # Iterate over all cycles rsa_values = np.full(len(cycles_rri_exh), np.nan) for i in range(len(cycles_rri_exh)): # Estimate of RSA during each breathing phase RRis_inh = np.diff(cycles_rri_inh[i]) / sampling_rate * 1000 RRis_exh = np.diff(cycles_rri_exh[i]) / sampling_rate * 1000 if np.logical_and( len(RRis_inh) > 0, len(RRis_exh) > 0 ): # you need at least one RRI rsa_value = np.max(RRis_exh) - np.min(RRis_inh) if rsa_value > 0: # Take into consideration only rsp cycles in which the max exh > than min inh rsa_values[i] = rsa_value else: # Negative effect should be factor into the mean using 0 (see Grossman 1990) rsa_values[i] = 0 if continuous is False: rsa = {"RSA_P2T_Mean": np.nanmean(rsa_values)} rsa["RSA_P2T_Mean_log"] = np.log(rsa["RSA_P2T_Mean"]) # pylint: disable=E1111 rsa["RSA_P2T_SD"] = np.nanstd(rsa_values, ddof=1) rsa["RSA_P2T_NoRSA"] = len( pd.Series(rsa_values).index[pd.Series(rsa_values).isnull()] ) else: rsa = signal_interpolate( x_values=rsp_peaks[~np.isnan(rsa_values)], y_values=rsa_values[~np.isnan(rsa_values)], x_new=np.arange(len(ecg_period)), ) return rsa def _hrv_rsa_pb(ecg_period, sampling_rate, continuous=False): """Porges-Bohrer method.""" if continuous is True: return None # Re-sample at 2 Hz resampled = signal_resample( ecg_period, sampling_rate=sampling_rate, desired_sampling_rate=2 ) # Fit 21-point cubic polynomial filter (zero mean, 3rd order) # with a low-pass cutoff frequency of 0.095Hz trend = signal_filter( resampled, sampling_rate=2, lowcut=0.095, highcut=None, method="savgol", order=3, window_size=21, ) zero_mean = resampled - trend # Remove variance outside bandwidth of spontaneous respiration zero_mean_filtered = signal_filter( zero_mean, sampling_rate=2, lowcut=0.12, highcut=0.40 ) # Divide into 30-second epochs time = np.arange(0, len(zero_mean_filtered)) / 2 time = pd.DataFrame({"Epoch Index": time // 30, "Signal": zero_mean_filtered}) time = time.set_index("Epoch Index") epochs = [time.loc[i] for i in range(int(np.max(time.index.values)) + 1)] variance = [] for epoch in epochs: variance.append(np.log(epoch.var(axis=0) / 1000)) # convert ms variance = [row for row in variance if not np.isnan(row).any()] return {"RSA_PorgesBohrer": pd.concat(variance).mean()} # def _hrv_rsa_synchrony(ecg_period, rsp_signal, sampling_rate=1000, method="correlation", continuous=False): # """Experimental method # """ # if rsp_signal is None: # return None # # filtered_period = signal_filter(ecg_period, sampling_rate=sampling_rate, # lowcut=0.12, highcut=0.4, order=6) # coupling = signal_synchrony(filtered_period, rsp_signal, method=method, window_size=sampling_rate*3) # coupling = signal_filter(coupling, sampling_rate=sampling_rate, highcut=0.4, order=6) # # if continuous is False: # rsa = {} # rsa["RSA_Synchrony_Mean"] = np.nanmean(coupling) # rsa["RSA_Synchrony_SD"] = np.nanstd(coupling, ddof=1) # return rsa # else: # return coupling # def _hrv_rsa_servant(ecg_period, sampling_rate=1000, continuous=False): # """Servant, D., Logier, R., Mouster, Y., & Goudemand, M. (2009). La variabilité de la fréquence # cardiaque. Intérêts en psychiatrie. L’Encéphale, 35(5), 423–428. doi:10.1016/j.encep.2008.06.016 # """ # # rpeaks, _ = nk.ecg_peaks(nk.ecg_simulate(duration=90)) # ecg_period = nk.ecg_rate(rpeaks) / 60 * 1000 # sampling_rate=1000 # # if len(ecg_period) / sampling_rate <= 60: # return None # # # signal = nk.signal_filter(ecg_period, sampling_rate=sampling_rate, # lowcut=0.1, highcut=1, order=6) # signal = nk.standardize(signal) # # nk.signal_plot([ecg_period, signal], standardize=True) # # troughs = nk.signal_findpeaks(-1 * signal)["Peaks"] # trough_signal = nk.signal_interpolate(x_values=troughs, # y_values=signal[troughs], # desired_length=len(signal)) # first_trough = troughs[0] # # # Initial parameters # n_windows = int(len(trough_signal[first_trough:]) / sampling_rate / 16) # How many windows of 16 s # onsets = (np.arange(n_windows) * 16 * sampling_rate) + first_trough # # areas_under_curve = np.zeros(len(onsets)) # for i, onset in enumerate(onsets): # areas_under_curve[i] = sklearn.metrics.auc(np.linspace(0, 16, 16*sampling_rate), # trough_signal[onset:onset+(16*sampling_rate)]) # max_auc = np.max(areas_under_curve) # # # Moving computation # onsets = np.arange(first_trough, len(signal)-16*sampling_rate, step=4*sampling_rate) # areas_under_curve = np.zeros(len(onsets)) # for i, onset in enumerate(onsets): # areas_under_curve[i] = sklearn.metrics.auc(np.linspace(0, 16, 16*sampling_rate), # trough_signal[onset:onset+(16*sampling_rate)]) # rsa = (max_auc - areas_under_curve) / max_auc + 1 # # # Not sure what to do next, sent an email to Servant. # pass # ============================================================================= # Second-by-second RSA # ============================================================================= def _hrv_rsa_gates( ecg_signals, rpeaks, sampling_rate=1000, window=None, window_number=None, continuous=False, ): # Boundaries of rsa freq min_frequency = 0.12 max_frequency = 0.40 # Retrieve IBI and interpolate it rri, rri_time, _ = _hrv_get_rri(rpeaks, sampling_rate=sampling_rate) # Re-sample at 4 Hz desired_sampling_rate = 4 rri, rri_time, sampling_rate = intervals_process( rri, intervals_time=rri_time, interpolate=True, interpolation_rate=desired_sampling_rate, ) # Sanitize parameters overlap = int((window - 1) * desired_sampling_rate) nperseg = window * desired_sampling_rate if window_number is None: window_number = 8 # Get multipeak window multipeak, weight = _get_multipeak_window(nperseg, window_number) for i in range(4): _, time, psd = signal_timefrequency( rri, sampling_rate=desired_sampling_rate, min_frequency=min_frequency, max_frequency=max_frequency, method="stft", window=window, window_type=multipeak[:, i], overlap=overlap, show=False, ) if i == 0: rsa = np.zeros_like(psd) rsa = psd * weight[i] + rsa # add weights meanRSA = np.log(2 * sum(rsa) / nperseg) # Sanitize output if continuous is False: rsa = {"RSA_Gates_Mean": np.nanmean(meanRSA)} rsa["RSA_Gates_Mean_log"] = np.log( rsa["RSA_Gates_Mean"] ) # pylint: disable=E1111 rsa["RSA_Gates_SD"] = np.nanstd(meanRSA, ddof=1) else: # For window=32, meanRSA is RSA from 16th second to xth second where x=recording # duration-16secs # Padding the missing first and list window/2 segments pad_length = window / 2 time_start = np.arange(0, pad_length) time_end = np.arange(time[-1], time[-1] + pad_length)[1:] time = np.concatenate((time_start, time, time_end)) rsa_start = np.full(len(time_start), meanRSA[0]) rsa_end = np.full(len(time_end), meanRSA[-1]) meanRSA = np.concatenate((rsa_start, meanRSA, rsa_end)) # Convert to samples time = np.multiply(time, sampling_rate) rsa = signal_interpolate( time.astype(int), meanRSA, x_new=len(ecg_signals), method="monotone_cubic" ) return rsa def _get_multipeak_window(nperseg, window_number=8): """Get Peak Matched Multiple Window References ---------- Hansson, M., & Salomonsson, G. (1997). A multiple window method for estimation of peaked spectra. IEEE Transactions on Signal Processing, 45(3), 778-781. """ K1 = 20 # Peak in dB K2 = 30 # Penalty value in dB B = (window_number + 2) / nperseg # Resolution in spectrum loge = np.log10(np.exp(1)) C = 2 * K1 / 10 / B / loge length = np.arange(1, nperseg).conj().transpose() r0 = 2 / C * (1 - np.exp(-C * B / 2)) r_num = 2 * C - np.exp(-C * B / 2) * ( 2 * C * np.cos(np.pi * B * length) - 4 * np.pi * length * np.sin(np.pi * B * length) ) r_den = C**2 + (2 * np.pi * length) ** 2 r = np.divide(r_num, r_den) rpeak = np.append(r0, r) # Covariance function peaked spectrum r = 2 * np.sin(np.pi * B * length) / (2 * np.pi * length) rbox = np.append(B, r) rpen = ( 10 ** (K2 / 10) * np.append(1, np.zeros((nperseg - 1, 1))) - (10 ** (K2 / 10) - 1) * rbox ) # Covariance function penalty function Ry = scipy.linalg.toeplitz(rpeak) Rx = scipy.linalg.toeplitz(rpen) RR = scipy.linalg.cholesky(Rx) C = scipy.linalg.inv(RR.conj().transpose()).dot(Ry).dot(scipy.linalg.inv(RR)) _, Q = scipy.linalg.schur(C) F = scipy.linalg.inv(RR).dot(Q) RD = F.conj().transpose().dot(Ry).dot(F) RD = np.diag(RD) RDN = np.sort(RD) h = np.argsort(RD) FN = np.zeros((nperseg, nperseg)) for i in range(len(RD)): FN[:, i] = F[:, h[i]] / np.sqrt(F[:, h[i]].conj().transpose().dot(F[:, h[i]])) RDN = RDN[len(RD) - 1 : 0 : -1] FN = FN[:, len(RD) - 1 : 0 : -1] weight = RDN[:window_number] / np.sum(RDN[:window_number]) multipeak = FN[:, 0:window_number] return multipeak, weight # ============================================================================= # Internals # ============================================================================= def _hrv_rsa_cycles(signals): """Extract respiratory cycles.""" inspiration_onsets = np.intersect1d( np.where(signals["RSP_Phase"] == 1)[0], np.where(signals["RSP_Phase_Completion"] == 0)[0], assume_unique=True, ) expiration_onsets = np.intersect1d( np.where(signals["RSP_Phase"] == 0)[0], np.where(signals["RSP_Phase_Completion"] == 0)[0], assume_unique=True, ) cycles_length = np.diff(inspiration_onsets) return { "RSP_Inspiration_Onsets": inspiration_onsets, "RSP_Expiration_Onsets": expiration_onsets, "RSP_Cycles_Length": cycles_length, } def _hrv_rsa_formatinput(ecg_signals, rsp_signals, rpeaks=None, sampling_rate=1000): rpeaks, sampling_rate = _hrv_format_input( rpeaks, sampling_rate=sampling_rate, output_format="peaks" ) # Sanity Checks if isinstance(ecg_signals, tuple): ecg_signals = ecg_signals[0] rpeaks = None if isinstance(ecg_signals, pd.DataFrame): if "ECG_Rate" in ecg_signals.columns: ecg_period = ecg_signals["ECG_Rate"].values else: if "ECG_R_Peaks" in ecg_signals.columns: ecg_period = signal_rate( np.where(ecg_signals["ECG_R_Peaks"].values == 1)[0], sampling_rate=sampling_rate, desired_length=len(ecg_signals), ) else: raise ValueError( "NeuroKit error: _hrv_rsa_formatinput():" "Wrong input, we couldn't extract" "heart rate signal." ) if rsp_signals is None: rsp_signals = ecg_signals.copy() elif isinstance(rsp_signals, tuple): rsp_signals = rsp_signals[0] if isinstance(rsp_signals, pd.DataFrame): rsp_cols = [col for col in rsp_signals.columns if "RSP_Phase" in col] if len(rsp_cols) != 2: edr = ecg_rsp(ecg_period, sampling_rate=sampling_rate) rsp_signals, _ = rsp_process(edr, sampling_rate) warn( "RSP signal not found. For this time, we will derive RSP" " signal from ECG using ecg_rsp(). But the results are" " definitely not reliable, so please provide a real RSP signal.", category=NeuroKitWarning, ) if rpeaks is None: try: rpeaks = _signal_formatpeaks_sanitize(ecg_signals) except NameError as e: raise ValueError( "NeuroKit error: _hrv_rsa_formatinput(): " "Wrong input, we couldn't extract rpeaks indices." ) from e else: rpeaks = _signal_formatpeaks_sanitize(rpeaks) nonduplicates = ecg_signals.columns[ [i not in rsp_signals.columns for i in ecg_signals.columns] ] signals = pd.concat([ecg_signals[nonduplicates], rsp_signals], axis=1) return signals, ecg_period, rpeaks, sampling_rate