# -*- coding: utf-8 -*-
from warnings import warn
import matplotlib.pyplot as plt
import numpy as np
import scipy.signal
from ..misc import NeuroKitWarning
from ..signal import signal_interpolate
from ..stats import rescale
from .rsp_clean import rsp_clean
from .rsp_peaks import rsp_findpeaks
[docs]
def rsp_rvt(
rsp_signal,
sampling_rate=1000,
method="power2020",
boundaries=[2.0, 1 / 30],
iterations=10,
show=False,
silent=False,
**kwargs
):
"""**Respiratory Volume per Time (RVT)**
Computes Respiratory Volume per Time (RVT). RVT is the product of respiratory volume and
breathing rate. RVT can be used to identify the global fMRI confounds of breathing, which is
often considered noise.
Parameters
----------
rsp_signal : array
Array containing the respiratory rate, produced by :func:`.signal_rate`.
sampling_rate : int, optional
The sampling frequency of the signal (in Hz, i.e., samples/second).
method: str, optional
The rvt method to apply. Can be one of ``"power2020"`` (default), ``"harrison2021"`` or
``"birn2006"``.
boundaries : list, optional
Only applies if method is ``"harrison"``. Lower and upper limit of (humanly possible)
breath frequency in Hertz.
iterations : int, optional
Only applies if method is ``"harrison"``. Amount of phase refinement estimates
to remove high frequencies. Synthetic samples often take less than 3.
show : bool, optional
If ``True``, will return a simple plot of the RVT (with the re-scaled original RSP signal).
silent : bool, optional
If ``True``, warnings will not be printed.
**kwargs
Arguments to be passed to the underlying peak detection algorithm.
Returns
-------
array
Array containing the current RVT at every timestep.
See Also
--------
signal_rate, rsp_peaks, rsp_process, rsp_clean
Examples
--------
.. ipython:: python
import neurokit2 as nk
rsp = nk.rsp_simulate(duration=60, random_state=1)
@savefig p_rsp_rvt1.png scale=100%
nk.rsp_rvt(rsp, method="power2020", show=True)
@suppress
plt.close()
@savefig p_rsp_rvt2.png scale=100%
nk.rsp_rvt(rsp, method="harrison2021", show=True)
@suppress
plt.close()
@savefig p_rsp_rvt3.png scale=100%
nk.rsp_rvt(rsp, method="birn2006", show=True)
@suppress
plt.close()
References
----------
* Birn, R. M., Diamond, J. B., Smith, M. A., & Bandettini, P. A. (2006). Separating
respiratory-variation-related fluctuations from neuronal-activity-related fluctuations in
fMRI. Neuroimage, 31(4), 1536-1548.
* Power, J. D., Lynch, C. J., Dubin, M. J., Silver, B. M., Martin, A., & Jones, R. M. (2020).
Characteristics of respiratory measures in young adults scanned at rest, including systematic
changes and "missed" deep breaths. Neuroimage, 204, 116234.
* Harrison, S. J., Bianchi, S., Heinzle, J., Stephan, K. E., Iglesias, S., & Kasper, L. (2021).
A Hilbert-based method for processing respiratory timeseries. Neuroimage, 230, 117787.
"""
method = method.lower() # remove capitalised letters
if method in ["harrison", "harrison2021"]:
rvt = _rsp_rvt_harrison(
rsp_signal,
sampling_rate=sampling_rate,
silent=silent,
boundaries=boundaries,
iterations=iterations,
)
elif method in ["birn", "birn2006"]:
rvt = _rsp_rvt_birn(rsp_signal, sampling_rate=sampling_rate, silent=silent, **kwargs)
elif method in ["power", "power2020"]:
rvt = _rsp_rvt_power(rsp_signal, sampling_rate=sampling_rate, silent=silent, **kwargs)
else:
raise ValueError("NeuroKit error: rsp_rvt(): 'method' should be one of 'birn', 'power' or 'harrison'.")
if show:
_rsp_rvt_plot(rvt, rsp_signal, sampling_rate)
return rvt
def _rsp_rvt_birn(
rsp_signal,
sampling_rate=1000,
silent=False,
window_length=0.4,
peak_distance=0.8,
peak_prominence=0.5,
interpolation_method="linear",
):
zsmooth_signal = _smooth_rsp_data(
rsp_signal,
sampling_rate=sampling_rate,
window_length=window_length,
silent=silent,
)
info = rsp_findpeaks(
zsmooth_signal,
method="scipy",
peak_distance=peak_distance,
peak_prominence=peak_prominence,
)
peak_coords = info["RSP_Peaks"]
trough_coords = info["RSP_Troughs"]
# prepare for loop
seconds_delta = [np.nan]
mid_peak = [np.nan]
# loop over peaks
for ending_peak_index in range(1, len(peak_coords)):
starting_peak = peak_coords[ending_peak_index - 1]
ending_peak = peak_coords[ending_peak_index]
mid_peak.append(round((starting_peak + ending_peak) / 2))
seconds_delta.append((ending_peak - starting_peak) / sampling_rate)
# Interpolate
output_range = range(len(zsmooth_signal))
rvt_time = signal_interpolate(mid_peak, seconds_delta, output_range, method=interpolation_method)
rvt_peaks = signal_interpolate(
peak_coords,
zsmooth_signal[peak_coords],
output_range,
method=interpolation_method,
)
rvt_troughs = signal_interpolate(
trough_coords,
zsmooth_signal[trough_coords],
output_range,
method=interpolation_method,
)
# what is trigvec?
# trigvec = (TR * signal_rate):len(zsmoothresp)
rvt = (rvt_peaks - rvt_troughs) / rvt_time
rvt[np.isinf(rvt)] = np.nan
return rvt
def _rsp_rvt_power(
rsp_signal,
sampling_rate=1000,
silent=False,
window_length=0.4,
peak_distance=0.8,
peak_prominence=0.5,
interpolation_method="linear",
):
# preprocess signal
zsmooth_signal = _smooth_rsp_data(
rsp_signal,
sampling_rate=sampling_rate,
silent=silent,
window_length=window_length,
)
# find peaks and troughs
info = rsp_findpeaks(
zsmooth_signal,
method="scipy",
peak_distance=peak_distance,
peak_prominence=peak_prominence,
)
peak_coords = info["RSP_Peaks"]
trough_coords = info["RSP_Troughs"]
# initialize for loop
peak_heights = [np.nan] * len(peak_coords)
# go over loop
for peak_index in range(1, len(peak_coords)):
# find peak and trough
peak_loc = peak_coords[peak_index]
prev_peak_loc = peak_coords[peak_index - 1]
# find troughs between prev_peak_loc and peak_loc
trough_locs = trough_coords[(trough_coords > prev_peak_loc) & (trough_coords < peak_loc)]
# safety catch if there is no trough found
if len(trough_locs) == 0:
continue
trough_loc = max(trough_locs)
# calculate peak_height for peak at peak_index
peak_heights[peak_index] = (zsmooth_signal[peak_loc] - zsmooth_signal[trough_loc]) / (peak_loc - prev_peak_loc)
return signal_interpolate(peak_coords, peak_heights, range(len(rsp_signal)), method=interpolation_method)
def _smooth_rsp_data(signal, sampling_rate=1000, window_length=0.4, silent=False):
signal = rsp_clean(
signal,
sampling_rate=sampling_rate,
window_length=window_length,
method="hampel",
)
smooth_signal = scipy.signal.savgol_filter(
signal,
window_length=_make_uneven_filter_size(window_length * sampling_rate, silent),
polyorder=2,
)
zsmooth_signal = scipy.stats.zscore(smooth_signal)
return zsmooth_signal
def _rsp_rvt_harrison(
rsp_signal,
sampling_rate=1000,
boundaries=[2.0, 1 / 30],
iterations=10,
silent=False,
):
# low-pass filter at not too far above breathing-rate to remove high-frequency noise
n_pad = int(np.ceil(10 * sampling_rate))
d = scipy.signal.iirfilter(N=10, Wn=0.75, btype="lowpass", analog=False, output="sos", fs=sampling_rate)
fr_lp = scipy.signal.sosfiltfilt(d, np.pad(rsp_signal, n_pad, "symmetric"))
fr_lp = fr_lp[n_pad : (len(fr_lp) - n_pad)]
# derive Hilbert-transform
fr_filt = fr_lp
fr_mag = abs(scipy.signal.hilbert(fr_filt))
for _ in range(iterations):
# analytic signal to phase
fr_phase = np.unwrap(np.angle(scipy.signal.hilbert(fr_filt)))
# Remove any phase decreases that may occur
# Find places where the gradient changes sign
# maybe can be changed with signal.signal_zerocrossings
fr_phase_diff = np.diff(np.sign(np.gradient(fr_phase)))
decrease_inds = np.argwhere(fr_phase_diff < 0)
increase_inds = np.append(np.argwhere(fr_phase_diff > 0), [len(fr_phase) - 1])
for n_max in decrease_inds:
# Find value of `fr_phase` at max and min:
fr_max = fr_phase[n_max].squeeze()
n_min, fr_min = _rsp_rvt_find_min(increase_inds, fr_phase, n_max, silent)
if n_min is None:
# There is no finishing point to the interpolation at the very end
continue
# Find where `fr_phase` passes `fr_min` for the first time
n_start = np.argwhere(fr_phase > fr_min)
if len(n_start) == 0:
n_start = n_max
else:
n_start = n_start[0].squeeze()
# Find where `fr_phase` exceeds `fr_max` for the first time
n_end = np.argwhere(fr_phase < fr_max)
if len(n_end) == 0:
n_end = n_min
else:
n_end = n_end[-1].squeeze()
# Linearly interpolate from n_start to n_end
fr_phase[n_start:n_end] = np.linspace(fr_min, fr_max, num=n_end - n_start).squeeze()
# Filter out any high frequencies from phase-only signal
fr_filt = scipy.signal.sosfiltfilt(d, np.pad(np.cos(fr_phase), n_pad, "symmetric"))
fr_filt = fr_filt[n_pad : (len(fr_filt) - n_pad)]
# Keep phase only signal as reference
fr_filt = np.cos(fr_phase)
# Make RVT
# Low-pass filter to remove within_cycle changes
# Note factor of two is for compatability with the common definition of RV
# as the difference between max and min inhalation (i.e. twice the amplitude)
d = scipy.signal.iirfilter(N=10, Wn=0.2, btype="lowpass", analog=False, output="sos", fs=sampling_rate)
fr_rv = 2 * scipy.signal.sosfiltfilt(d, np.pad(fr_mag, n_pad, "symmetric"))
fr_rv = fr_rv[n_pad : (len(fr_rv) - n_pad)]
fr_rv[fr_rv < 0] = 0
# Breathing rate is instantaneous frequency
fr_if = sampling_rate * np.gradient(fr_phase) / (2 * np.pi)
fr_if = scipy.signal.sosfiltfilt(d, np.pad(fr_if, n_pad, "symmetric"))
fr_if = fr_if[n_pad : (len(fr_if) - n_pad)]
# remove in-human patterns, since both limits are in Hertz, the upper_limit is lower
fr_if = np.clip(fr_if, boundaries[1], boundaries[0])
# RVT = magnitude * breathing rate
rvt = np.multiply(fr_rv, fr_if)
# Downsampling is not needed as we assume always the same sampling rate and operate always in the same sampling rate
return rvt
def _rsp_rvt_find_min(increase_inds, fr_phase, smaller_index, silent):
bigger_n_max = np.argwhere(increase_inds > smaller_index)
if len(bigger_n_max) == 0:
if not silent:
warn(
"rsp_rvt(): There is no next increasing point as end point for the interpolation. "
"Interpolation is skipped for this case.",
category=NeuroKitWarning,
)
return None, None
bigger_n_max = bigger_n_max[0].squeeze()
n_min = increase_inds[bigger_n_max]
fr_min = fr_phase[n_min].squeeze()
# Sometime fr_min is the same as n_max and it caused problems
if fr_phase[smaller_index].squeeze() < fr_min:
if not silent:
warn(
"rsp_rvt(): The next bigger increasing index has a bigger value than the chosen decreasing index, "
"this might be due to very small/noisy breaths or saddle points. "
"Interpolation is skipped for this case.",
category=NeuroKitWarning,
)
return None, None
return n_min, fr_min
def _rsp_rvt_plot(rvt, rsp_signal, sampling_rate):
plt.figure()
plt.title("Respiratory Volume per Time (RVT)")
plt.xlabel("Time [s]")
plt.plot(
rescale(rsp_signal, to=[np.nanmin(rvt), np.nanmax(rvt)]),
label="RSP",
color="#CFD8DC",
)
plt.plot(rvt, label="RVT", color="#00BCD4")
plt.legend()
tickpositions = plt.gca().get_xticks()[1:-1]
plt.xticks(tickpositions, [tickposition / sampling_rate for tickposition in tickpositions])
def _make_uneven_filter_size(number, silent=False):
if number < 0:
if not silent:
warn(
"Received a negative filter size, progressed with filter size 1.",
category=NeuroKitWarning,
)
return 1
if number % 2 == 1:
return int(number)
if number > 0:
return int(number - 1)
return 1