# - * - coding: utf-8 - * -
import matplotlib.pyplot as plt
import numpy as np
from ..epochs import epochs_create, epochs_to_df
from ..signal.signal_rate import signal_rate
[docs]
def signal_cyclesegment(signal_cleaned, cycle_indices, ratio_pre=0.5, sampling_rate=1000, show=False, signal_name="signal", **kwargs):
"""**Segment a signal into individual cycles**
Segment a signal (e.g. ECG, PPG, respiratory) into individual cycles (e.g. heartbeats, pulse waves, breaths).
Parameters
----------
signal_cleaned : Union[list, np.array, pd.Series]
The cleaned signal channel, such as that returned by ``ppg_clean()`` or ``ecg_clean()``.
cycle_indices : dict
The samples indicating individual cycles (such as PPG peaks or ECG R-peaks), such as a dict
returned by ``ppg_peaks()``.
ratio_pre : float
The proportion of the cycle window which takes place before the cycle index (e.g. the proportion of the
interbeat interval which takes place before the PPG pulse peak or ECG R peak).
sampling_rate : int
The sampling frequency of ``signal_cleaned`` (in Hz, i.e., samples/second). Defaults to 1000.
show : bool
If ``True``, will return a plot of cycles. Defaults to ``False``. If "return", returns
the axis of the plot.
signal_name : str
The name of the signal (only used for plotting).
**kwargs
Other arguments to be passed.
Returns
-------
dict
A dict containing DataFrames for all segmented cycles.
average_cycle_rate : float
The average cycle rate (e.g. heart rate, respiratory rate) (in Hz, i.e., samples/second)
See Also
--------
ppg_clean, ecg_clean, ppg_peaks, ecg_peaks, ppg_quality, ecg_quality
Examples
--------
.. ipython:: python
import neurokit2 as nk
sampling_rate = 100
ppg = nk.ppg_simulate(duration=30, sampling_rate=sampling_rate, heart_rate=80)
ppg_cleaned = nk.ppg_clean(ppg, sampling_rate=sampling_rate)
signals, peaks = nk.ppg_peaks(ppg_cleaned, sampling_rate=sampling_rate)
peaks = peaks["PPG_Peaks"]
heartbeats = nk.signal_cyclesegment(ppg_cleaned, peaks, sampling_rate=sampling_rate)
"""
if len(signal_cleaned) < sampling_rate * 4:
raise ValueError("The data length is too small to be segmented.")
epochs_start, epochs_end, average_cycle_rate = _segment_window(
cycle_indices=cycle_indices,
sampling_rate=sampling_rate,
desired_length=len(signal_cleaned),
ratio_pre=ratio_pre,
)
cycles = epochs_create(
signal_cleaned,
cycle_indices,
sampling_rate=sampling_rate,
epochs_start=epochs_start,
epochs_end=epochs_end,
)
# pad last cycle with nan so that segments are equal length
last_cycle_key = str(np.max(np.array(list(cycles.keys()), dtype=int)))
outside_bounds = cycles[last_cycle_key]["Index"] >= len(signal_cleaned)
cycles[last_cycle_key].loc[outside_bounds, "Signal"] = np.nan
# Plot or return plot axis
if show is not False:
ax = _segment_plot(cycles, cyclerate=average_cycle_rate, signal_name=signal_name, **kwargs)
if show == "return":
return ax
return cycles, average_cycle_rate
# =============================================================================
# Internals
# =============================================================================
def _segment_plot(cycles, cyclerate=0, signal_name="signal", color="#F44336", ax=None):
df = epochs_to_df(cycles)
# Get main signal column name
col = "Signal"
# Average cycle shape
mean_cycle = df.groupby("Time")[[col]].mean()
df_pivoted = df.pivot(index="Time", columns="Label", values=col)
# Prepare plot
if ax is None:
_, ax = plt.subplots()
signal_name = signal_name.lower()
if signal_name in ["ecg", "ppg"]:
cycle_name = "beat"
rate_name = "heart rate"
rate_unit = "bpm"
elif signal_name == "rsp":
cycle_name = "breath"
rate_name = "respiratory rate"
rate_unit = "breaths per min"
elif signal_name == "signal":
cycle_name = "cycle"
rate_name = "cycle rate"
rate_unit = "per min"
ax.set_title(f"Individual {cycle_name}s (average {rate_name}: {cyclerate:0.1f} {rate_unit})")
ax.set_xlabel("Time (seconds)")
ax.set_ylabel(signal_name)
# Add Vertical line at 0
ax.axvline(x=0, color="grey", linestyle="--")
# Plot average cycle
ax.plot(
mean_cycle.index,
mean_cycle,
color=color,
linewidth=7,
label=f"Average {cycle_name} shape",
zorder=1,
)
# Alpha of individual cycles decreases with more cycles
n_cycles = df_pivoted.shape[1]
if n_cycles <= 1:
alpha = 1.0
else:
alpha = 1 / np.log2(np.log2(1 + n_cycles))
# Plot all cycles
ax.plot(df_pivoted, color="grey", linewidth=alpha, alpha=alpha, zorder=2)
# Legend
ax.legend(loc="upper right")
return ax
def _segment_window(
cycle_indices=None,
sampling_rate=1000,
desired_length=(),
ratio_pre=0.5,
cycle_rate=None,
):
# Determine cycle rate
if cycle_rate is None:
if cycle_indices is not None:
cycle_rate = np.mean(
signal_rate(
cycle_indices, sampling_rate=sampling_rate, desired_length=desired_length
)
)
else:
raise ValueError("Either `cycle_rate` or `cycle_indices` must be provided.")
else:
cycle_rate = np.mean(cycle_rate) # In case it's an array
# Modulator
# Note: this is based on quick internal testing but could be improved
window_size = 60 / cycle_rate # Cycles per second
# Window
epochs_start = ratio_pre * window_size
epochs_end = (1 - ratio_pre) * window_size
return -epochs_start, epochs_end, cycle_rate