Source code for neurokit2.ecg.ecg_invert
import matplotlib.pyplot as plt
import numpy as np
from .ecg_clean import ecg_clean
[docs]
def ecg_invert(ecg_signal, sampling_rate=1000, force=False, show=False):
"""**ECG signal inversion**
Checks whether an ECG signal is inverted, and if so, corrects for this inversion.
To automatically detect the inversion, the ECG signal is cleaned, the mean is subtracted,
and with a rolling window of 2 seconds, the original value corresponding to the maximum
of the squared signal is taken. If the median of these values is negative, it is
assumed that the signal is inverted.
Parameters
----------
ecg_signal : Union[list, np.array, pd.Series]
The raw ECG channel.
sampling_rate : int
The sampling frequency of ``ecg_signal`` (in Hz, i.e., samples/second). Defaults to 1000.
force : bool
Whether to force inversion of the signal regardless of whether it is
detected as inverted. The default is False.
show : bool
Shows a plot of the original and inverted signal.
Returns
-------
array
Vector containing the corrected ECG signal.
bool
Whether the inversion was performed.
Examples
--------
.. ipython:: python
import neurokit2 as nk
ecg = -1 * nk.ecg_simulate(duration=10, sampling_rate=200, heart_rate=70)
# Invert if necessary
@savefig p_ecg_invert1.png scale=100%
ecg_fixed, is_inverted = nk.ecg_invert(ecg, sampling_rate=200, show=True)
@suppress
plt.close()
"""
# Invert in any case (needed to perform the check)
inverted_ecg = np.array(ecg_signal) * -1 + 2 * np.nanmean(ecg_signal)
if show is True:
fig, ax = plt.subplots(2, 1, sharex=True, sharey=True)
ax[0].plot(ecg_signal)
ax[0].set_title("Original ECG")
ax[1].plot(inverted_ecg)
ax[1].set_title("Inverted ECG")
if force:
was_inverted = True
else:
if _ecg_inverted(ecg_signal, sampling_rate=sampling_rate):
was_inverted = True
else:
inverted_ecg = ecg_signal
was_inverted = False
return inverted_ecg, was_inverted
def _ecg_inverted(ecg_signal, sampling_rate=1000, window_time=2.0):
"""Checks whether an ECG signal is inverted."""
ecg_cleaned = ecg_clean(ecg_signal, sampling_rate=sampling_rate)
# mean should already be close to zero after filtering but just in case, subtract
ecg_cleaned_meanzero = ecg_cleaned - np.nanmean(ecg_cleaned)
# take the median of the original value of the maximum of the squared signal
# over a window where we would expect at least one heartbeat
med_max_squared = np.nanmedian(
_roll_orig_max_squared(ecg_cleaned_meanzero, window=int(window_time * sampling_rate))
)
# if median is negative, assume inverted
return med_max_squared < 0
def _roll_orig_max_squared(x, window=2000):
"""With a rolling window, takes the original value corresponding to the maximum of the squared signal."""
x_rolled = np.lib.stride_tricks.sliding_window_view(x, window, axis=0)
# https://stackoverflow.com/questions/61703879/in-numpy-how-to-select-elements-based-on-the-maximum-of-their-absolute-values
shape = np.array(x_rolled.shape)
shape[-1] = -1
return np.take_along_axis(x_rolled, np.square(x_rolled).argmax(-1).reshape(shape), axis=-1)