Source code for neurokit2.hrv.intervals_to_peaks
import numpy as np
from .intervals_utils import _intervals_sanitize, _intervals_successive
[docs]
def intervals_to_peaks(intervals, intervals_time=None, sampling_rate=1000):
"""**Convert intervals to peaks**
Convenience function to convert intervals to peaks, such as from R-R intervals to R-peaks of an
ECG signal. This can be useful if you do not have raw peak indices and have only interval data
such as breath-to-breath (BBI) or rpeak-to-rpeak (RRI) intervals.
Parameters
----------
intervals : list or array
List of intervals (by default in milliseconds).
intervals_time : list or array, optional
Optional list of timestamps corresponding to intervals, in seconds. If None (default), the
cumulative sum of the intervals is used.
sampling_rate : int, optional
Sampling rate (Hz) of the continuous signal in which the peaks occur.
Returns
-------
np.ndarray
An array of integer values indicating the peak indices,
with the first peak occurring at sample point 0.
Examples
---------
.. ipython:: python
import neurokit2 as nk
# Suppose we have a vector of RRi from data sampled at 1000 Hz
ibi = [500, 400, 700, 500, 300, 800, 500]
peaks = nk.intervals_to_peaks(ibi, sampling_rate=1000)
# We can then use NeuroKit's functionalities to compute HRV indices
@savefig p_intervals_to_peaks.png scale=100%
hrv_indices = nk.hrv_time(peaks, sampling_rate=100, show=True)
@suppress
plt.close()
hrv_indices
.. ipython:: python
# We can also use the timestamps of the intervals
rri = [400, 500, 700, 800, 900]
rri_idx = [0.7, 1.2, 2.5, 3.3, 4.2]
nk.intervals_to_peaks(rri, rri_idx, sampling_rate=1000)
"""
if intervals is None:
return None
intervals, intervals_time, intervals_missing = _intervals_sanitize(
intervals, intervals_time=intervals_time, remove_missing=True
)
if intervals_missing:
# Check for non successive intervals in case of missing data
non_successive_indices = np.arange(1, len(intervals_time))[
np.invert(_intervals_successive(intervals, intervals_time))
]
else:
non_successive_indices = np.array([]).astype(int)
# The number of peaks should be the number of intervals
# plus one extra at the beginning of each group of successive intervals
# (with no missing data there should be N_intervals + 1 peaks)
to_insert_indices = np.concatenate((np.array([0]), non_successive_indices))
times_to_insert = intervals_time[to_insert_indices] - intervals[to_insert_indices] / 1000
peaks_time = np.sort(np.concatenate((intervals_time, times_to_insert)))
# convert seconds to sample indices
peaks = peaks_time * sampling_rate
return np.array([int(np.round(i)) for i in peaks])