Source code for neurokit2.ecg.ecg_simulate

# -*- coding: utf-8 -*-
import math

import numpy as np
import pandas as pd
import scipy

from ..misc import check_random_state, check_random_state_children
from ..signal import signal_distort, signal_resample


[docs] def ecg_simulate( duration=10, length=None, sampling_rate=1000, noise=0.01, heart_rate=70, heart_rate_std=1, method="ecgsyn", random_state=None, random_state_distort="spawn", **kwargs, ): """**Simulate an ECG/EKG signal** Generate an artificial (synthetic) ECG signal of a given duration and sampling rate using either the ECGSYN dynamical model (McSharry et al., 2003) or a simpler model based on Daubechies wavelets to roughly approximate cardiac cycles. Parameters ---------- duration : int Desired recording length in seconds. sampling_rate : int The desired sampling rate (in Hz, i.e., samples/second). length : int The desired length of the signal (in samples). noise : float Noise level (amplitude of the laplace noise). heart_rate : int Desired simulated heart rate (in beats per minute). The default is 70. Note that for the ``"ECGSYN"`` method, random fluctuations are to be expected to mimick a real heart rate. These fluctuations can cause some slight discrepancies between the requested heart rate and the empirical heart rate, especially for shorter signals. heart_rate_std : int Desired heart rate standard deviation (beats per minute). method : str The model used to generate the signal. Can be ``"simple"`` for a simulation based on Daubechies wavelets that roughly approximates a single cardiac cycle. If ``"ecgsyn"`` (default), will use the model desbribed `McSharry et al. (2003) <https://physionet.org/content/ecgsyn/>`_. If ``"multileads"``, will return a DataFrame containing 12-leads (see `12-leads ECG simulation <https://neuropsychology.github.io/NeuroKit/examples/ecg_generate_12leads/ecg_generate_12leads.html>`_). random_state : None, int, numpy.random.RandomState or numpy.random.Generator Seed for the random number generator. See for ``misc.check_random_state`` for further information. random_state_distort : {'legacy', 'spawn'}, None, int, numpy.random.RandomState or numpy.random.Generator Random state to be used to distort the signal. If ``"legacy"``, use the same random state used to generate the signal (discouraged as it creates dependent random streams). If ``"spawn"``, spawn independent children random number generators from the random_state argument. If any of the other types, generate independent children random number generators from the random_state_distort provided (this allows generating multiple version of the same signal distorted by different random noise realizations). **kwargs Other keywords parameters for ECGSYN algorithm, such as ``"lfhfratio"``, ``"ti"``, ``"ai"``, ``"bi"``. Returns ------- array Vector containing the ECG signal. Examples ---------- * **Example 1:** Simulate single lead ECG .. ipython:: python import neurokit2 as nk ecg1 = nk.ecg_simulate(duration=10, method="simple") ecg2 = nk.ecg_simulate(duration=10, method="ecgsyn") # Visualize result @savefig p_ecg_simulate1.png scale=100% nk.signal_plot([ecg1, ecg2], labels=["simple", "ecgsyn"], subplots=True) @suppress plt.close() * **Example 2:** Simulate 12-leads ECG .. ipython:: python ecg12 = nk.ecg_simulate(duration=10, method="multileads") # Visualize result @savefig p_ecg_simulate2.png scale=100% nk.signal_plot(ecg12, subplots=True) @suppress plt.close() See Also -------- .rsp_simulate, .eda_simulate, .ppg_simulate, .emg_simulate References ----------- * McSharry, P. E., Clifford, G. D., Tarassenko, L., & Smith, L. A. (2003). A dynamical model for generating synthetic electrocardiogram signals. IEEE transactions on biomedical engineering, 50 (3), 289-294. """ # Seed the random generator for reproducible results rng = check_random_state(random_state) # Generate number of samples automatically if length is unspecified if length is None: length = duration * sampling_rate if duration is None: duration = length / sampling_rate # Run appropriate method if method.lower() in ["simple", "daubechies"]: signals = _ecg_simulate_daubechies( duration=duration, length=length, sampling_rate=sampling_rate, heart_rate=heart_rate ) else: approx_number_beats = int(np.round(duration * (heart_rate / 60))) if method.lower() in ["multi", "multilead", "multileads", "multichannel"]: # Gamma, a (12,5) matrix to modify the five waves' amplitudes of 12 leads (P, Q, R, S, T) gamma = np.array( [ [1, 0.1, 1, 1.2, 1], [2, 0.2, 0.2, 0.2, 3], [1, -0.1, -0.8, -1.1, 2.5], [-1, -0.05, -0.8, -0.5, -1.2], [0.05, 0.05, 1, 1, 1], [1, -0.05, -0.1, -0.1, 3], [-0.5, 0.05, 0.2, 0.5, 1], [0.05, 0.05, 1.3, 2.5, 2], [1, 0.05, 1, 2, 1], [1.2, 0.05, 1, 2, 2], [1.5, 0.1, 0.8, 1, 2], [1.8, 0.05, 0.5, 0.1, 2], ] ) signals, results = _ecg_simulate_ecgsyn( sfecg=sampling_rate, N=approx_number_beats, hrmean=heart_rate, hrstd=heart_rate_std, sfint=sampling_rate, gamma=gamma, rng=rng, **kwargs, ) else: signals, results = _ecg_simulate_ecgsyn( sfecg=sampling_rate, N=approx_number_beats, hrmean=heart_rate, hrstd=heart_rate_std, sfint=sampling_rate, gamma=np.ones((1, 5)), rng=rng, **kwargs, ) # Cut to match expected length for i in range(len(signals)): signals[i] = signals[i][0:length] # Add random noise if noise > 0: # Seed for random noise random_state_distort = check_random_state_children(random_state, random_state_distort, n_children=len(signals)) # Call signal_distort on each signal for i in range(len(signals)): signals[i] = signal_distort( signals[i], sampling_rate=sampling_rate, noise_amplitude=noise, noise_frequency=[5, 10, 100], noise_shape="laplace", random_state=random_state_distort[i], silent=True, ) # Format if len(signals) == 1: ecg = signals[0] else: ecg = pd.DataFrame( np.array(signals).T, columns=["I", "II", "III", "aVR", "aVL", "aVF", "V1", "V2", "V3", "V4", "V5", "V6"], ) return ecg
# ============================================================================= # Daubechies # ============================================================================= def _ecg_simulate_daubechies(duration=10, length=None, sampling_rate=1000, heart_rate=70): """Generate an artificial (synthetic) ECG signal of a given duration and sampling rate. It uses a 'Daubechies' wavelet that roughly approximates a single cardiac cycle. This function is based on `this script <https://github.com/diarmaidocualain/ecg_simulation>`_. """ # The "Daubechies" wavelet is a rough approximation to a real, single, cardiac cycle cardiac = scipy.signal.daub(10) # Add the gap after the pqrst when the heart is resting. cardiac = np.concatenate([cardiac, np.zeros(10)]) # Caculate the number of beats in capture time period num_heart_beats = int(duration * heart_rate / 60) # Concatenate together the number of heart beats needed ecg = np.tile(cardiac, num_heart_beats) # Change amplitude ecg = ecg * 10 # Resample ecg = signal_resample( ecg, sampling_rate=int(len(ecg) / 10), desired_length=length, desired_sampling_rate=sampling_rate, ) # Return the signal in a list to match # with the potential multichanel output of ecgsyn return [ecg] # ============================================================================= # ECGSYN # ============================================================================= def _ecg_simulate_ecgsyn( sfecg=256, N=256, hrmean=60, hrstd=1, lfhfratio=0.5, sfint=512, ti=(-70, -15, 0, 15, 100), ai=(1.2, -5, 30, -7.5, 0.75), bi=(0.25, 0.1, 0.1, 0.1, 0.4), gamma=np.ones((1, 5)), rng=None, **kwargs, ): """ This function is a python translation of the matlab script by `McSharry & Clifford (2013) <https://physionet.org/content/ecgsyn>`_. Parameters ---------- sfecg: ECG sampling frequency [256 Hertz] N: approximate number of heart beats [256] Anoise: Additive uniformly distributed measurement noise [0 mV] hrmean: Mean heart rate [60 beats per minute] hrstd: Standard deviation of heart rate [1 beat per minute] lfhfratio: LF/HF ratio [0.5] sfint: Internal sampling frequency [256 Hertz] ti angles of extrema (in degrees). Order of extrema is (P Q R S T). ai z-position of extrema. bi Gaussian width of peaks. gamma This determines the different leads. Returns ------- array Vector containing simulated ecg signal. # Examples # -------- # >>> import matplotlib.pyplot as plt # >>> import neurokit2 as nk # >>> # >>> s = _ecg_simulate_ecgsynth() # >>> x = np.linspace(0, len(s)-1, len(s)) # >>> num_points = 4000 # >>> # >>> num_points = min(num_points, len(s)) # >>> plt.plot(x[:num_points], s[:num_points]) #doctest: +SKIP # >>> plt.show() #doctest: +SKIP """ if not isinstance(ti, np.ndarray): ti = np.array(ti) if not isinstance(ai, np.ndarray): ai = np.array(ai) if not isinstance(bi, np.ndarray): bi = np.array(bi) ti = ti * np.pi / 180 # Adjust extrema parameters for mean heart rate hrfact = np.sqrt(hrmean / 60) hrfact2 = np.sqrt(hrfact) bi = hrfact * bi ti = np.array([hrfact2, hrfact, 1, hrfact, hrfact2]) * ti # Check that sfint is an integer multiple of sfecg q = np.round(sfint / sfecg) qd = sfint / sfecg if q != qd: raise ValueError( "Internal sampling frequency (sfint) must be an integer multiple of the ECG sampling frequency" " (sfecg). Your current choices are: sfecg = " + str(sfecg) + " and sfint = " + str(sfint) + "." ) # Define frequency parameters for rr process # flo and fhi correspond to the Mayer waves and respiratory rate respectively flo = 0.1 fhi = 0.25 flostd = 0.01 fhistd = 0.01 # Calculate time scales for rr and total output sfrr = 1 trr = 1 / sfrr rrmean = 60 / hrmean n = 2 ** (np.ceil(np.log2(N * rrmean / trr))) rr0 = _ecg_simulate_rrprocess(flo, fhi, flostd, fhistd, lfhfratio, hrmean, hrstd, sfrr, n, rng) # Upsample rr time series from 1 Hz to sfint Hz rr = signal_resample(rr0, sampling_rate=1, desired_sampling_rate=sfint) # Make the rrn time series dt = 1 / sfint rrn = np.zeros(len(rr)) tecg = 0 i = 0 while i < len(rr): tecg += rr[i] ip = int(np.round(tecg / dt)) rrn[i:ip] = rr[i] i = ip Nt = ip # Integrate system using fourth order Runge-Kutta x0 = np.array([1, 0, 0.04]) # tspan is a tuple of (min, max) which defines the lower and upper bound of t in ODE # t_eval is the list of desired t points for ODE # in Matlab, ode45 can accepts both tspan and t_eval in one argument Tspan = [0, (Nt - 1) * dt] t_eval = np.linspace(0, (Nt - 1) * dt, Nt) # Initialize results containers results = [] signals = [] # Multichannel modification (#625): # -------------------------------------------------- # Loop over the twelve leads modifying ai in the loop to generate each lead's data # Because these are all starting at the same position, it may make sense to grab a random # segment within the series to simulate random phase and to forget the initial conditions for lead in range(len(gamma)): # as passing extra arguments to derivative function is not supported yet in solve_ivp # lambda function is used to serve the purpose result = scipy.integrate.solve_ivp( lambda t, x: _ecg_simulate_derivsecgsyn(t, x, rrn, ti, sfint, gamma[lead] * ai, bi), Tspan, x0, t_eval=t_eval, ) results.append(result) # store results X0 = result.y # get signal # downsample to required sfecg X = X0[:, np.arange(0, X0.shape[1], q).astype(int)] # Scale signal to lie between -0.4 and 1.2 mV z = X[2, :].copy() zmin = np.min(z) zmax = np.max(z) zrange = zmax - zmin z = (z - zmin) * 1.6 / zrange - 0.4 signals.append(z) return signals, results def _ecg_simulate_derivsecgsyn(t, x, rr, ti, sfint, ai, bi): ta = math.atan2(x[1], x[0]) r0 = 1 a0 = 1.0 - np.sqrt(x[0] ** 2 + x[1] ** 2) / r0 ip = np.floor(t * sfint).astype(int) w0 = 2 * np.pi / rr[min(ip, len(rr) - 1)] # w0 = 2*np.pi/rr[ip[ip <= np.max(rr)]] fresp = 0.25 zbase = 0.005 * np.sin(2 * np.pi * fresp * t) dx1dt = a0 * x[0] - w0 * x[1] dx2dt = a0 * x[1] + w0 * x[0] # matlab rem and numpy rem are different # dti = np.remainder(ta - ti, 2*np.pi) dti = (ta - ti) - np.round((ta - ti) / 2 / np.pi) * 2 * np.pi dx3dt = -np.sum(ai * dti * np.exp(-0.5 * (dti / bi) ** 2)) - 1 * (x[2] - zbase) dxdt = np.array([dx1dt, dx2dt, dx3dt]) return dxdt def _ecg_simulate_rrprocess( flo=0.1, fhi=0.25, flostd=0.01, fhistd=0.01, lfhfratio=0.5, hrmean=60, hrstd=1, sfrr=1, n=256, rng=None, ): w1 = 2 * np.pi * flo w2 = 2 * np.pi * fhi c1 = 2 * np.pi * flostd c2 = 2 * np.pi * fhistd sig2 = 1 sig1 = lfhfratio rrmean = 60 / hrmean rrstd = 60 * hrstd / (hrmean * hrmean) df = sfrr / n w = np.arange(n) * 2 * np.pi * df dw1 = w - w1 dw2 = w - w2 Hw1 = sig1 * np.exp(-0.5 * (dw1 / c1) ** 2) / np.sqrt(2 * np.pi * c1 ** 2) Hw2 = sig2 * np.exp(-0.5 * (dw2 / c2) ** 2) / np.sqrt(2 * np.pi * c2 ** 2) Hw = Hw1 + Hw2 Hw0 = np.concatenate((Hw[0 : int(n / 2)], Hw[int(n / 2) - 1 :: -1])) Sw = (sfrr / 2) * np.sqrt(Hw0) ph0 = 2 * np.pi * rng.uniform(size=int(n / 2 - 1)) ph = np.concatenate([[0], ph0, [0], -np.flipud(ph0)]) SwC = Sw * np.exp(1j * ph) x = (1 / n) * np.real(np.fft.ifft(SwC)) xstd = np.std(x) ratio = rrstd / xstd return rrmean + x * ratio # Return RR