# -*- coding: utf-8 -*-
import matplotlib.pyplot as plt
import numpy as np
from ..misc import check_random_state, check_random_state_children
from ..signal import signal_distort, signal_interpolate
[docs]
def ppg_simulate(
duration=120,
sampling_rate=1000,
heart_rate=70,
frequency_modulation=0.2,
ibi_randomness=0.1,
drift=0,
motion_amplitude=0.1,
powerline_amplitude=0.01,
burst_number=0,
burst_amplitude=1,
random_state=None,
random_state_distort="spawn",
show=False,
):
"""**Simulate a photoplethysmogram (PPG) signal**
Phenomenological approximation of PPG. The PPG wave is described with four landmarks: wave
onset, location of the systolic peak, location of the dicrotic notch and location of the
diastolic peaks. These landmarks are defined as x and y coordinates (in a time series). These
coordinates are then interpolated at the desired sampling rate to obtain the PPG signal.
Parameters
----------
duration : int
Desired recording length in seconds. The default is 120.
sampling_rate : int
The desired sampling rate (in Hz, i.e., samples/second). The default is 1000.
heart_rate : int
Desired simulated heart rate (in beats per minute). The default is 70. Note that for the
ECGSYN method, random fluctuations are to be expected to mimic a real heart rate. These
fluctuations can cause some slight discrepancies between the requested heart rate and the
empirical heart rate, especially for shorter signals.
frequency_modulation : float
Float between 0 and 1. Determines how pronounced respiratory sinus arrythmia (RSA) is
(0 corresponds to absence of RSA). The default is 0.3.
ibi_randomness : float
Float between 0 and 1. Determines how much random noise there is in the duration of each
PPG wave (0 corresponds to absence of variation). The default is 0.1.
drift : float
Float between 0 and 1. Determines how pronounced the baseline drift (.05 Hz) is
(0 corresponds to absence of baseline drift). The default is 1.
motion_amplitude : float
Float between 0 and 1. Determines how pronounced the motion artifact (0.5 Hz) is
(0 corresponds to absence of motion artifact). The default is 0.1.
powerline_amplitude : float
Float between 0 and 1. Determines how pronounced the powerline artifact (50 Hz) is
(0 corresponds to absence of powerline artifact). Note that powerline_amplitude > 0 is only
possible if ``sampling_rate`` is >= 500. The default is 0.1.
burst_amplitude : float
Float between 0 and 1. Determines how pronounced high frequency burst artifacts are
(0 corresponds to absence of bursts). The default is 1.
burst_number : int
Determines how many high frequency burst artifacts occur. The default is 0.
show : bool
If ``True``, returns a plot of the landmarks and interpolated PPG. Useful for debugging.
random_state : None, int, numpy.random.RandomState or numpy.random.Generator
Seed for the random number generator. See for ``misc.check_random_state`` for further information.
random_state_distort : {'legacy', 'spawn'}, None, int, numpy.random.RandomState or numpy.random.Generator
Random state to be used to distort the signal. If ``"legacy"``, use the same random state used to
generate the signal (discouraged as it creates dependent random streams). If ``"spawn"``, spawn
independent children random number generators from the random_state argument. If any of the other types,
generate independent children random number generators from the random_state_distort provided (this
allows generating multiple version of the same signal distorted by different random noise realizations).
Returns
-------
ppg : array
A vector containing the PPG.
See Also
--------
ecg_simulate, rsp_simulate, eda_simulate, emg_simulate
Examples
--------
.. ipython:: python
import neurokit2 as nk
ppg = nk.ppg_simulate(duration=40, sampling_rate=500, heart_rate=75, random_state=42)
"""
# Seed the random generator for reproducible results
rng = check_random_state(random_state)
random_state_distort = check_random_state_children(random_state, random_state_distort, n_children=4)
# At the requested sampling rate, how long is a period at the requested
# heart-rate and how often does that period fit into the requested
# duration?
period = 60 / heart_rate # in seconds
n_period = int(np.floor(duration / period))
periods = np.ones(n_period) * period
# Seconds at which waves begin.
x_onset = np.cumsum(periods)
x_onset -= x_onset[0] # make sure seconds start at zero
# Add respiratory sinus arrythmia (frequency modulation).
periods, x_onset = _frequency_modulation(
periods,
x_onset,
modulation_frequency=0.05,
modulation_strength=frequency_modulation,
)
# Randomly modulate duration of waves by subracting a random value between
# 0 and ibi_randomness% of the wave duration (see function definition).
x_onset = _random_x_offset(x_onset, ibi_randomness, rng)
# Corresponding signal amplitudes.
y_onset = rng.normal(0, 0.1, n_period)
# Seconds at which the systolic peaks occur within the waves.
x_sys = x_onset + rng.normal(0.175, 0.01, n_period) * periods
# Corresponding signal amplitudes.
y_sys = y_onset + rng.normal(1.5, 0.15, n_period)
# Seconds at which the dicrotic notches occur within the waves.
x_notch = x_onset + rng.normal(0.4, 0.001, n_period) * periods
# Corresponding signal amplitudes (percentage of systolic peak height).
y_notch = y_sys * rng.normal(0.49, 0.01, n_period)
# Seconds at which the diastolic peaks occur within the waves.
x_dia = x_onset + rng.normal(0.45, 0.001, n_period) * periods
# Corresponding signal amplitudes (percentage of systolic peak height).
y_dia = y_sys * rng.normal(0.51, 0.01, n_period)
x_all = np.concatenate((x_onset, x_sys, x_notch, x_dia))
x_all.sort(kind="mergesort")
x_all = np.ceil(x_all * sampling_rate).astype(int) # convert seconds to samples
y_all = np.zeros(n_period * 4)
y_all[0::4] = y_onset
y_all[1::4] = y_sys
y_all[2::4] = y_notch
y_all[3::4] = y_dia
if show:
__, (ax0, ax1) = plt.subplots(nrows=2, ncols=1, sharex=True)
ax0.scatter(x_all, y_all, c="r")
# Interpolate a continuous signal between the landmarks (i.e., Cartesian
# coordinates).
samples = np.arange(int(np.ceil(duration * sampling_rate)))
ppg = signal_interpolate(x_values=x_all, y_values=y_all, x_new=samples, method="akima")
# Remove NAN (values outside interpolation range, i.e., after last sample).
ppg[np.isnan(ppg)] = np.nanmean(ppg)
if show:
ax0.plot(ppg)
# Add baseline drift.
if drift > 0:
drift_freq = 0.05
if drift_freq < (1 / duration) * 2:
drift_freq = (1 / duration) * 2
ppg = signal_distort(
ppg,
sampling_rate=sampling_rate,
noise_amplitude=drift,
noise_frequency=drift_freq,
random_state=random_state_distort[0],
silent=True,
)
# Add motion artifacts.
if motion_amplitude > 0:
motion_freq = 0.5
ppg = signal_distort(
ppg,
sampling_rate=sampling_rate,
noise_amplitude=motion_amplitude,
noise_frequency=motion_freq,
random_state=random_state_distort[1],
silent=True,
)
# Add high frequency bursts.
if burst_amplitude > 0:
ppg = signal_distort(
ppg,
sampling_rate=sampling_rate,
artifacts_amplitude=burst_amplitude,
artifacts_frequency=100,
artifacts_number=burst_number,
random_state=random_state_distort[2],
silent=True,
)
# Add powerline noise.
if powerline_amplitude > 0:
ppg = signal_distort(
ppg,
sampling_rate=sampling_rate,
powerline_amplitude=powerline_amplitude,
powerline_frequency=50,
random_state=random_state_distort[3],
silent=True,
)
if show:
ax1.plot(ppg)
return ppg
def _frequency_modulation(periods, seconds, modulation_frequency, modulation_strength):
"""modulator_frequency determines the frequency at which respiratory sinus arrhythmia occurs (in Hz).
modulator_strength must be between 0 and 1.
"""
modulation_mean = 1
# Enforce minimum inter-beat-interval of 300 milliseconds.
if (modulation_mean - modulation_strength) * periods[
0
] < 0.3: # elements in periods all have the same value at this point
print(
"Skipping frequency modulation, since the modulation_strength"
f" {modulation_strength} leads to physiologically implausible"
f" wave durations of {((modulation_mean - modulation_strength) * periods[0]) * 1000}"
f" milliseconds."
)
return periods, seconds
# Apply a very conservative Nyquist criterion.
nyquist = (1 / periods[0]) * 0.1
if modulation_frequency > nyquist:
print(f"Please choose a modulation frequency lower than {nyquist}.")
# Generate a sine with mean 1 and amplitude 0.5 * modulation_strength, that is,
# ranging from 1 - 0.5 * modulation_strength to 1 + 0.5 * modulation_strength.
# For example, at a heart rate of 100 and modulation_strenght=1, the heart rate will
# fluctuate between 150 and 50. At the default modulatiom_strenght=.2, it will
# fluctuate between 110 and 90.
modulator = (
0.5 * modulation_strength * np.sin(2 * np.pi * modulation_frequency * seconds)
+ modulation_mean
)
periods_modulated = periods * modulator
seconds_modulated = np.cumsum(periods_modulated)
seconds_modulated -= seconds_modulated[0] # make sure seconds start at zero
return periods_modulated, seconds_modulated
def _random_x_offset(x, offset_weight, rng):
"""From each wave onset xi subtract offset_weight * (xi - xi-1) where xi-1 is
the wave onset preceding xi. offset_weight must be between 0 and 1.
"""
# Sanitize offset to min 0 and max .99
offset_weight = min(offset_weight, 0.99)
offset_weight = max(offset_weight, 0)
x_diff = np.diff(x)
# Enforce minimum inter-beat-interval of 300 milliseconds.
min_x_diff = min(x_diff)
if (min_x_diff - (min_x_diff * offset_weight)) < 0.3:
print(
"Skipping random IBI modulation, since the offset_weight"
f" {offset_weight} leads to physiologically implausible wave"
f" durations of {(min_x_diff - (min_x_diff * offset_weight)) * 1000}"
f" milliseconds."
)
return x
max_offsets = offset_weight * x_diff
offsets = [rng.uniform(0, i) for i in max_offsets]
x_offset = x.copy()
x_offset[1:] -= offsets
return x_offset
def _amplitude_modulation():
# TODO
pass