Source code for neurokit2.ppg.ppg_simulate

# -*- coding: utf-8 -*-

import matplotlib.pyplot as plt
import numpy as np

from ..misc import check_random_state, check_random_state_children
from ..signal import signal_distort, signal_interpolate


[docs] def ppg_simulate( duration=120, sampling_rate=1000, heart_rate=70, frequency_modulation=0.2, ibi_randomness=0.1, drift=0, motion_amplitude=0.1, powerline_amplitude=0.01, burst_number=0, burst_amplitude=1, random_state=None, random_state_distort="spawn", show=False, ): """**Simulate a photoplethysmogram (PPG) signal** Phenomenological approximation of PPG. The PPG wave is described with four landmarks: wave onset, location of the systolic peak, location of the dicrotic notch and location of the diastolic peaks. These landmarks are defined as x and y coordinates (in a time series). These coordinates are then interpolated at the desired sampling rate to obtain the PPG signal. Parameters ---------- duration : int Desired recording length in seconds. The default is 120. sampling_rate : int The desired sampling rate (in Hz, i.e., samples/second). The default is 1000. heart_rate : int Desired simulated heart rate (in beats per minute). The default is 70. Note that for the ECGSYN method, random fluctuations are to be expected to mimic a real heart rate. These fluctuations can cause some slight discrepancies between the requested heart rate and the empirical heart rate, especially for shorter signals. frequency_modulation : float Float between 0 and 1. Determines how pronounced respiratory sinus arrythmia (RSA) is (0 corresponds to absence of RSA). The default is 0.3. ibi_randomness : float Float between 0 and 1. Determines how much random noise there is in the duration of each PPG wave (0 corresponds to absence of variation). The default is 0.1. drift : float Float between 0 and 1. Determines how pronounced the baseline drift (.05 Hz) is (0 corresponds to absence of baseline drift). The default is 1. motion_amplitude : float Float between 0 and 1. Determines how pronounced the motion artifact (0.5 Hz) is (0 corresponds to absence of motion artifact). The default is 0.1. powerline_amplitude : float Float between 0 and 1. Determines how pronounced the powerline artifact (50 Hz) is (0 corresponds to absence of powerline artifact). Note that powerline_amplitude > 0 is only possible if ``sampling_rate`` is >= 500. The default is 0.1. burst_amplitude : float Float between 0 and 1. Determines how pronounced high frequency burst artifacts are (0 corresponds to absence of bursts). The default is 1. burst_number : int Determines how many high frequency burst artifacts occur. The default is 0. show : bool If ``True``, returns a plot of the landmarks and interpolated PPG. Useful for debugging. random_state : None, int, numpy.random.RandomState or numpy.random.Generator Seed for the random number generator. See for ``misc.check_random_state`` for further information. random_state_distort : {'legacy', 'spawn'}, None, int, numpy.random.RandomState or numpy.random.Generator Random state to be used to distort the signal. If ``"legacy"``, use the same random state used to generate the signal (discouraged as it creates dependent random streams). If ``"spawn"``, spawn independent children random number generators from the random_state argument. If any of the other types, generate independent children random number generators from the random_state_distort provided (this allows generating multiple version of the same signal distorted by different random noise realizations). Returns ------- ppg : array A vector containing the PPG. See Also -------- ecg_simulate, rsp_simulate, eda_simulate, emg_simulate Examples -------- .. ipython:: python import neurokit2 as nk ppg = nk.ppg_simulate(duration=40, sampling_rate=500, heart_rate=75, random_state=42) """ # Seed the random generator for reproducible results rng = check_random_state(random_state) random_state_distort = check_random_state_children(random_state, random_state_distort, n_children=4) # At the requested sampling rate, how long is a period at the requested # heart-rate and how often does that period fit into the requested # duration? period = 60 / heart_rate # in seconds n_period = int(np.floor(duration / period)) periods = np.ones(n_period) * period # Seconds at which waves begin. x_onset = np.cumsum(periods) x_onset -= x_onset[0] # make sure seconds start at zero # Add respiratory sinus arrythmia (frequency modulation). periods, x_onset = _frequency_modulation( periods, x_onset, modulation_frequency=0.05, modulation_strength=frequency_modulation, ) # Randomly modulate duration of waves by subracting a random value between # 0 and ibi_randomness% of the wave duration (see function definition). x_onset = _random_x_offset(x_onset, ibi_randomness, rng) # Corresponding signal amplitudes. y_onset = rng.normal(0, 0.1, n_period) # Seconds at which the systolic peaks occur within the waves. x_sys = x_onset + rng.normal(0.175, 0.01, n_period) * periods # Corresponding signal amplitudes. y_sys = y_onset + rng.normal(1.5, 0.15, n_period) # Seconds at which the dicrotic notches occur within the waves. x_notch = x_onset + rng.normal(0.4, 0.001, n_period) * periods # Corresponding signal amplitudes (percentage of systolic peak height). y_notch = y_sys * rng.normal(0.49, 0.01, n_period) # Seconds at which the diastolic peaks occur within the waves. x_dia = x_onset + rng.normal(0.45, 0.001, n_period) * periods # Corresponding signal amplitudes (percentage of systolic peak height). y_dia = y_sys * rng.normal(0.51, 0.01, n_period) x_all = np.concatenate((x_onset, x_sys, x_notch, x_dia)) x_all.sort(kind="mergesort") x_all = np.ceil(x_all * sampling_rate).astype(int) # convert seconds to samples y_all = np.zeros(n_period * 4) y_all[0::4] = y_onset y_all[1::4] = y_sys y_all[2::4] = y_notch y_all[3::4] = y_dia if show: __, (ax0, ax1) = plt.subplots(nrows=2, ncols=1, sharex=True) ax0.scatter(x_all, y_all, c="r") # Interpolate a continuous signal between the landmarks (i.e., Cartesian # coordinates). samples = np.arange(int(np.ceil(duration * sampling_rate))) ppg = signal_interpolate(x_values=x_all, y_values=y_all, x_new=samples, method="akima") # Remove NAN (values outside interpolation range, i.e., after last sample). ppg[np.isnan(ppg)] = np.nanmean(ppg) if show: ax0.plot(ppg) # Add baseline drift. if drift > 0: drift_freq = 0.05 if drift_freq < (1 / duration) * 2: drift_freq = (1 / duration) * 2 ppg = signal_distort( ppg, sampling_rate=sampling_rate, noise_amplitude=drift, noise_frequency=drift_freq, random_state=random_state_distort[0], silent=True, ) # Add motion artifacts. if motion_amplitude > 0: motion_freq = 0.5 ppg = signal_distort( ppg, sampling_rate=sampling_rate, noise_amplitude=motion_amplitude, noise_frequency=motion_freq, random_state=random_state_distort[1], silent=True, ) # Add high frequency bursts. if burst_amplitude > 0: ppg = signal_distort( ppg, sampling_rate=sampling_rate, artifacts_amplitude=burst_amplitude, artifacts_frequency=100, artifacts_number=burst_number, random_state=random_state_distort[2], silent=True, ) # Add powerline noise. if powerline_amplitude > 0: ppg = signal_distort( ppg, sampling_rate=sampling_rate, powerline_amplitude=powerline_amplitude, powerline_frequency=50, random_state=random_state_distort[3], silent=True, ) if show: ax1.plot(ppg) return ppg
def _frequency_modulation(periods, seconds, modulation_frequency, modulation_strength): """modulator_frequency determines the frequency at which respiratory sinus arrhythmia occurs (in Hz). modulator_strength must be between 0 and 1. """ modulation_mean = 1 # Enforce minimum inter-beat-interval of 300 milliseconds. if (modulation_mean - modulation_strength) * periods[ 0 ] < 0.3: # elements in periods all have the same value at this point print( "Skipping frequency modulation, since the modulation_strength" f" {modulation_strength} leads to physiologically implausible" f" wave durations of {((modulation_mean - modulation_strength) * periods[0]) * 1000}" f" milliseconds." ) return periods, seconds # Apply a very conservative Nyquist criterion. nyquist = (1 / periods[0]) * 0.1 if modulation_frequency > nyquist: print(f"Please choose a modulation frequency lower than {nyquist}.") # Generate a sine with mean 1 and amplitude 0.5 * modulation_strength, that is, # ranging from 1 - 0.5 * modulation_strength to 1 + 0.5 * modulation_strength. # For example, at a heart rate of 100 and modulation_strenght=1, the heart rate will # fluctuate between 150 and 50. At the default modulatiom_strenght=.2, it will # fluctuate between 110 and 90. modulator = ( 0.5 * modulation_strength * np.sin(2 * np.pi * modulation_frequency * seconds) + modulation_mean ) periods_modulated = periods * modulator seconds_modulated = np.cumsum(periods_modulated) seconds_modulated -= seconds_modulated[0] # make sure seconds start at zero return periods_modulated, seconds_modulated def _random_x_offset(x, offset_weight, rng): """From each wave onset xi subtract offset_weight * (xi - xi-1) where xi-1 is the wave onset preceding xi. offset_weight must be between 0 and 1. """ # Sanitize offset to min 0 and max .99 offset_weight = min(offset_weight, 0.99) offset_weight = max(offset_weight, 0) x_diff = np.diff(x) # Enforce minimum inter-beat-interval of 300 milliseconds. min_x_diff = min(x_diff) if (min_x_diff - (min_x_diff * offset_weight)) < 0.3: print( "Skipping random IBI modulation, since the offset_weight" f" {offset_weight} leads to physiologically implausible wave" f" durations of {(min_x_diff - (min_x_diff * offset_weight)) * 1000}" f" milliseconds." ) return x max_offsets = offset_weight * x_diff offsets = [rng.uniform(0, i) for i in max_offsets] x_offset = x.copy() x_offset[1:] -= offsets return x_offset def _amplitude_modulation(): # TODO pass