# - * - coding: utf-8 - * -
from warnings import warn
import matplotlib.patches
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from ..misc import NeuroKitWarning
from ..stats import standardize
from .signal_formatpeaks import _signal_formatpeaks_sanitize
from .signal_period import signal_period
[docs]
def signal_fixpeaks(
peaks,
sampling_rate=1000,
iterative=True,
show=False,
interval_min=None,
interval_max=None,
relative_interval_min=None,
relative_interval_max=None,
robust=False,
method="Kubios",
**kwargs,
):
"""**Correct Erroneous Peak Placements**
Identify and correct erroneous peak placements based on outliers in peak-to-peak differences
(period).
Parameters
----------
peaks : list or array or DataFrame or Series or dict
The samples at which the peaks occur. If an array is passed in, it is assumed that it was
obtained with :func:`.signal_findpeaks`. If a DataFrame is passed in, it is assumed to be
obtained with :func:`.ecg_findpeaks` or :func:`.ppg_findpeaks` and to be of the same length
as the input signal.
sampling_rate : int
The sampling frequency of the signal that contains the peaks (in Hz, i.e., samples/second).
iterative : bool
Whether or not to apply the artifact correction repeatedly (results in superior artifact
correction).
show : bool
Whether or not to visualize artifacts and artifact thresholds.
interval_min : float
Only when ``method = "neurokit"``. The minimum interval between the peaks (in seconds).
interval_max : float
Only when ``method = "neurokit"``. The maximum interval between the peaks (in seconds).
relative_interval_min : float
Only when ``method = "neurokit"``. The minimum interval between the peaks as relative to
the sample (expressed in standard deviation from the mean).
relative_interval_max : float
Only when ``method = "neurokit"``. The maximum interval between the peaks as relative to
the sample (expressed in standard deviation from the mean).
robust : bool
Only when ``method = "neurokit"``. Use a robust method of standardization (see
:func:`.standardize`) for the relative thresholds.
method : str
Either ``"Kubios"`` or ``"neurokit"``. ``"Kubios"`` uses the artifact detection and
correction described in Lipponen, J. A., & Tarvainen, M. P. (2019). Note that ``"Kubios"``
is only meant for peaks in ECG or PPG. ``"neurokit"`` can be used with peaks in ECG, PPG,
or respiratory data.
**kwargs
Other keyword arguments.
Returns
-------
peaks_clean : array
The corrected peak locations.
artifacts : dict
Only if ``method="Kubios"``. A dictionary containing the indices of artifacts, accessible
with the keys ``"ectopic"``, ``"missed"``, ``"extra"``, and ``"longshort"``.
See Also
--------
signal_findpeaks, ecg_findpeaks, ecg_peaks, ppg_findpeaks, ppg_peaks
Examples
--------
.. ipython:: python
import neurokit2 as nk
# Simulate ECG data and add noisy period
ecg = nk.ecg_simulate(duration=240, sampling_rate=250, noise=2, random_state=42)
ecg[20000:30000] += np.random.uniform(size=10000)
ecg[40000:43000] = 0
# Identify and Correct Peaks using "Kubios" Method
rpeaks_uncorrected = nk.ecg_findpeaks(ecg, method="pantompkins", sampling_rate=250)
@savefig p_signal_fixpeaks1.png scale=100%
info, rpeaks_corrected = nk.signal_fixpeaks(
rpeaks_uncorrected, sampling_rate=250, iterative=True, method="Kubios", show=True
)
@suppress
plt.close()
.. ipython:: python
# Visualize Artifact Correction
rate_corrected = nk.signal_rate(rpeaks_corrected, desired_length=len(ecg))
rate_uncorrected = nk.signal_rate(rpeaks_uncorrected, desired_length=len(ecg))
@savefig p_signal_fixpeaks2.png scale=100%
nk.signal_plot(
[rate_uncorrected, rate_corrected],
labels=["Heart Rate Uncorrected", "Heart Rate Corrected"]
)
@suppress
plt.close()
.. ipython:: python
import numpy as np
# Simulate Abnormal Signals
signal = nk.signal_simulate(duration=4, sampling_rate=1000, frequency=1)
peaks_true = nk.signal_findpeaks(signal)["Peaks"]
peaks = np.delete(peaks_true, [1]) # create gaps due to missing peaks
signal = nk.signal_simulate(duration=20, sampling_rate=1000, frequency=1)
peaks_true = nk.signal_findpeaks(signal)["Peaks"]
peaks = np.delete(peaks_true, [5, 15]) # create gaps
peaks = np.sort(np.append(peaks, [1350, 11350, 18350])) # add artifacts
# Identify and Correct Peaks using 'NeuroKit' Method
info, peaks_corrected = nk.signal_fixpeaks(
peaks=peaks, interval_min=0.5, interval_max=1.5, method="neurokit"
)
# Plot and shift original peaks to the right to see the difference.
@savefig p_signal_fixpeaks3.png scale=100%
nk.events_plot([peaks + 50, peaks_corrected], signal)
@suppress
plt.close()
References
----------
* Lipponen, J. A., & Tarvainen, M. P. (2019). A robust algorithm for heart rate variability time
series artefact correction using novel beat classification. Journal of medical engineering &
technology, 43(3), 173-181. 10.1080/03091902.2019.1640306
"""
# Format input
peaks = _signal_formatpeaks_sanitize(peaks)
# If method Kubios
if method.lower() == "kubios":
info, peaks_clean = _signal_fixpeaks_kubios(
peaks, sampling_rate=sampling_rate, iterative=iterative, show=show, **kwargs
)
else:
# Else method is NeuroKit
info, peaks_clean = _signal_fixpeaks_neurokit(
peaks,
sampling_rate=sampling_rate,
interval_min=interval_min,
interval_max=interval_max,
relative_interval_min=relative_interval_min,
relative_interval_max=relative_interval_max,
robust=robust,
)
return info, peaks_clean
# =============================================================================
# Methods
# =============================================================================
def _signal_fixpeaks_neurokit(
peaks,
sampling_rate=1000,
interval_min=None,
interval_max=None,
relative_interval_min=None,
relative_interval_max=None,
robust=False,
):
"""NeuroKit method."""
peaks_clean = _remove_small(
peaks, sampling_rate, interval_min, relative_interval_min, robust
)
peaks_clean = _interpolate_big(
peaks_clean,
sampling_rate,
interval_max,
relative_interval_max,
robust,
)
valid_peaks = peaks_clean[peaks_clean >= 0]
n_invalid_idcs = len(peaks_clean) - len(valid_peaks)
if n_invalid_idcs > 0:
warn(
f" Negative peak indices detected in output. "
f" Removing {n_invalid_idcs} invalid peaks. ",
category=NeuroKitWarning,
)
peaks_clean = valid_peaks
info = {
"method": "neurokit",
"extra": [i for i in peaks if i not in peaks_clean],
"missed": [i for i in peaks_clean if i not in peaks],
}
return info, peaks_clean
def _signal_fixpeaks_kubios(
peaks, sampling_rate=1000, iterative=True, show=False, **kwargs
):
"""kubios method."""
# Get corrected peaks and normal-to-normal intervals.
artifacts, subspaces = _find_artifacts(peaks, sampling_rate=sampling_rate, **kwargs)
peaks_clean = _correct_artifacts(artifacts, peaks)
if iterative:
# Iteratively apply the artifact correction until the number
# of artifacts stops decreasing.
n_artifacts_current = sum([len(i) for i in artifacts.values()])
while True:
new_artifacts, new_subspaces = _find_artifacts(
peaks_clean, sampling_rate=sampling_rate, **kwargs
)
n_artifacts_previous = n_artifacts_current
n_artifacts_current = sum([len(i) for i in new_artifacts.values()])
if n_artifacts_current >= n_artifacts_previous:
break
artifacts = new_artifacts
subspaces = new_subspaces
peaks_clean = _correct_artifacts(artifacts, peaks_clean)
artifacts["method"] = "kubios"
artifacts.update(subspaces)
if show:
_plot_artifacts_lipponen2019(artifacts)
return artifacts, peaks_clean
# =============================================================================
# Kubios: Lipponen & Tarvainen (2019).
# =============================================================================
def _find_artifacts(
peaks,
c1=0.13,
c2=0.17,
alpha=5.2,
window_width=91,
medfilt_order=11,
sampling_rate=1000,
):
# Compute period series (make sure it has same numer of elements as peaks);
# peaks are in samples, convert to seconds.
rr = np.ediff1d(peaks, to_begin=0) / sampling_rate
# For subsequent analysis it is important that the first element has
# a value in a realistic range (e.g., for median filtering).
rr[0] = np.mean(rr[1:])
# Artifact identification #################################################
###########################################################################
# Compute dRRs: time series of differences of consecutive periods (dRRs).
drrs = np.ediff1d(rr, to_begin=0)
drrs[0] = np.mean(drrs[1:])
# Normalize by threshold.
th1 = _compute_threshold(drrs, alpha, window_width)
# ignore division by 0 warning
old_setting = np.seterr(divide="ignore", invalid="ignore")
drrs /= th1
# return old setting
np.seterr(**old_setting)
# Cast dRRs to subspace s12.
# Pad drrs with one element.
padding = 2
drrs_pad = np.pad(drrs, padding, "reflect")
s12 = np.zeros(drrs.size)
for d in np.arange(padding, padding + drrs.size):
if drrs_pad[d] > 0:
s12[d - padding] = np.max([drrs_pad[d - 1], drrs_pad[d + 1]])
elif drrs_pad[d] < 0:
s12[d - padding] = np.min([drrs_pad[d - 1], drrs_pad[d + 1]])
# Cast dRRs to subspace s22.
s22 = np.zeros(drrs.size)
for d in np.arange(padding, padding + drrs.size):
if drrs_pad[d] >= 0:
s22[d - padding] = np.min([drrs_pad[d + 1], drrs_pad[d + 2]])
elif drrs_pad[d] < 0:
s22[d - padding] = np.max([drrs_pad[d + 1], drrs_pad[d + 2]])
# Compute mRRs: time series of deviation of RRs from median.
df = pd.DataFrame({"signal": rr})
medrr = df.rolling(medfilt_order, center=True, min_periods=1).median().signal.values
mrrs = rr - medrr
mrrs[mrrs < 0] = mrrs[mrrs < 0] * 2
# Normalize by threshold.
th2 = _compute_threshold(mrrs, alpha, window_width)
mrrs /= th2
# Artifact classification #################################################
###########################################################################
# Artifact classes.
extra_idcs = []
missed_idcs = []
ectopic_idcs = []
longshort_idcs = []
i = 0
while i < rr.size - 2: # The flow control is implemented based on Figure 1
if np.abs(drrs[i]) <= 1: # Figure 1
i += 1
continue
eq1 = np.logical_and(
drrs[i] > 1, s12[i] < (-c1 * drrs[i] - c2)
) # pylint: disable=E1111
eq2 = np.logical_and(
drrs[i] < -1, s12[i] > (-c1 * drrs[i] + c2)
) # pylint: disable=E1111
if np.any([eq1, eq2]):
# If any of the two equations is true.
ectopic_idcs.append(i)
i += 1
continue
# If none of the two equations is true.
if ~np.any([np.abs(drrs[i]) > 1, np.abs(mrrs[i]) > 3]): # Figure 1
i += 1
continue
longshort_candidates = [i]
# Check if the following beat also needs to be evaluated.
if np.abs(drrs[i + 1]) < np.abs(drrs[i + 2]):
longshort_candidates.append(i + 1)
for j in longshort_candidates:
# Long beat.
eq3 = np.logical_and(drrs[j] > 1, s22[j] < -1) # pylint: disable=E1111
# Long or short.
eq4 = np.abs(mrrs[j]) > 3 # Figure 1
# Short beat.
eq5 = np.logical_and(drrs[j] < -1, s22[j] > 1) # pylint: disable=E1111
if ~np.any([eq3, eq4, eq5]):
# If none of the three equations is true: normal beat.
i += 1
continue
# If any of the three equations is true: check for missing or extra
# peaks.
# Missing.
eq6 = np.abs(rr[j] / 2 - medrr[j]) < th2[j] # Figure 1
# Extra.
eq7 = np.abs(rr[j] + rr[j + 1] - medrr[j]) < th2[j] # Figure 1
# Check if extra.
if np.all([eq5, eq7]):
extra_idcs.append(j)
i += 1
continue
# Check if missing.
if np.all([eq3, eq6]):
missed_idcs.append(j)
i += 1
continue
# If neither classified as extra or missing, classify as "long or
# short".
longshort_idcs.append(j)
i += 1
# Prepare output
artifacts = {
"ectopic": ectopic_idcs,
"missed": missed_idcs,
"extra": extra_idcs,
"longshort": longshort_idcs,
}
subspaces = {
"rr": rr,
"drrs": drrs,
"mrrs": mrrs,
"s12": s12,
"s22": s22,
"c1": c1,
"c2": c2,
}
return artifacts, subspaces
def _compute_threshold(signal, alpha, window_width):
df = pd.DataFrame({"signal": np.abs(signal)})
q1 = (
df.rolling(window_width, center=True, min_periods=1)
.quantile(0.25)
.signal.values
)
q3 = (
df.rolling(window_width, center=True, min_periods=1)
.quantile(0.75)
.signal.values
)
th = alpha * ((q3 - q1) / 2)
return th
def _correct_artifacts(artifacts, peaks):
# Artifact correction
#####################
# The integrity of indices must be maintained if peaks are inserted or
# deleted: for each deleted beat, decrease indices following that beat in
# all other index lists by 1. Likewise, for each added beat, increment the
# indices following that beat in all other lists by 1.
extra_idcs = artifacts["extra"]
missed_idcs = artifacts["missed"]
ectopic_idcs = artifacts["ectopic"]
longshort_idcs = artifacts["longshort"]
# Delete extra peaks.
if extra_idcs:
peaks = _correct_extra(extra_idcs, peaks)
# Update remaining indices.
missed_idcs = _update_indices(extra_idcs, missed_idcs, -1)
ectopic_idcs = _update_indices(extra_idcs, ectopic_idcs, -1)
longshort_idcs = _update_indices(extra_idcs, longshort_idcs, -1)
# Add missing peaks.
if missed_idcs:
peaks = _correct_missed(missed_idcs, peaks)
# Update remaining indices.
ectopic_idcs = _update_indices(missed_idcs, ectopic_idcs, 1)
longshort_idcs = _update_indices(missed_idcs, longshort_idcs, 1)
if ectopic_idcs:
peaks = _correct_misaligned(ectopic_idcs, peaks)
if longshort_idcs:
peaks = _correct_misaligned(longshort_idcs, peaks)
return peaks
def _correct_extra(extra_idcs, peaks):
corrected_peaks = peaks.copy()
corrected_peaks = np.delete(corrected_peaks, extra_idcs)
return corrected_peaks
def _correct_missed(missed_idcs, peaks):
corrected_peaks = peaks.copy()
missed_idcs = np.array(missed_idcs)
# Calculate the position(s) of new beat(s). Make sure to not generate
# negative indices. prev_peaks and next_peaks must have the same
# number of elements.
valid_idcs = np.logical_and(
missed_idcs > 1, missed_idcs < len(corrected_peaks)
) # pylint: disable=E1111
missed_idcs = missed_idcs[valid_idcs]
prev_peaks = corrected_peaks[[i - 1 for i in missed_idcs]]
next_peaks = corrected_peaks[missed_idcs]
added_peaks = prev_peaks + (next_peaks - prev_peaks) / 2
# Add the new peaks before the missed indices (see numpy docs).
corrected_peaks = np.insert(corrected_peaks, missed_idcs, added_peaks)
return corrected_peaks
def _correct_misaligned(misaligned_idcs, peaks):
corrected_peaks = peaks.copy()
misaligned_idcs = np.array(misaligned_idcs)
# Make sure to not generate negative indices, or indices that exceed
# the total number of peaks. prev_peaks and next_peaks must have the
# same number of elements.
valid_idcs = np.logical_and(
misaligned_idcs > 1,
misaligned_idcs < len(corrected_peaks) - 1, # pylint: disable=E1111
)
misaligned_idcs = misaligned_idcs[valid_idcs]
prev_peaks = corrected_peaks[[i - 1 for i in misaligned_idcs]]
next_peaks = corrected_peaks[[i + 1 for i in misaligned_idcs]]
half_ibi = (next_peaks - prev_peaks) / 2
peaks_interp = prev_peaks + half_ibi
# Shift the R-peaks from the old to the new position.
corrected_peaks = np.delete(corrected_peaks, misaligned_idcs)
corrected_peaks = np.concatenate((corrected_peaks, peaks_interp)).astype(int)
corrected_peaks.sort(kind="mergesort")
return corrected_peaks
def _update_indices(source_idcs, update_idcs, update):
"""For every element s in source_idcs, change every element u in update_idcs according to update, if u is larger
than s."""
if not update_idcs:
return update_idcs
for s in source_idcs:
update_idcs = [u + update if u > s else u for u in update_idcs]
return list(np.unique(update_idcs))
def _plot_artifacts_lipponen2019(info):
# Covnenience function to extract relevant stuff.
def _get_which_endswith(info, string):
return [s for key, s in info.items() if key.endswith(string)][0]
# Extract parameters
longshort_idcs = _get_which_endswith(info, "longshort")
ectopic_idcs = _get_which_endswith(info, "ectopic")
extra_idcs = _get_which_endswith(info, "extra")
missed_idcs = _get_which_endswith(info, "missed")
# Extract subspace info
rr = _get_which_endswith(info, "rr")
drrs = _get_which_endswith(info, "drrs")
mrrs = _get_which_endswith(info, "mrrs")
s12 = _get_which_endswith(info, "s12")
s22 = _get_which_endswith(info, "s22")
c1 = _get_which_endswith(info, "c1")
c2 = _get_which_endswith(info, "c2")
# Visualize artifact type indices.
# Set grids
gs = matplotlib.gridspec.GridSpec(ncols=2, nrows=6)
fig = plt.figure(constrained_layout=False, figsize=(17, 12))
fig.suptitle("Peak Correction", fontweight="bold")
ax0 = fig.add_subplot(gs[0:2, 0])
ax1 = fig.add_subplot(gs[2:4, 0])
ax2 = fig.add_subplot(gs[4:6, 0])
ax3 = fig.add_subplot(gs[0:3:, 1])
ax4 = fig.add_subplot(gs[3:6, 1])
ax0.set_title("Artifact types")
ax0.plot(rr, label="heart period")
ax0.scatter(
longshort_idcs,
rr[longshort_idcs],
marker="x",
c="m",
s=100,
zorder=3,
label="long/short",
)
ax0.scatter(
ectopic_idcs,
rr[ectopic_idcs],
marker="x",
c="g",
s=100,
zorder=3,
label="ectopic",
)
ax0.scatter(
extra_idcs,
rr[extra_idcs],
marker="x",
c="y",
s=100,
zorder=3,
label="false positive",
)
ax0.scatter(
missed_idcs,
rr[missed_idcs],
marker="x",
c="r",
s=100,
zorder=3,
label="false negative",
)
ax0.legend(loc="upper right")
# Visualize first threshold.
ax1.set_title("Consecutive-difference criterion")
ax1.plot(np.abs(drrs), label="normalized difference consecutive heart periods")
ax1.axhline(1, c="r", label="artifact threshold")
ax1.legend(loc="upper right")
ax1.set_ylim(0, 5)
# Visualize second threshold.
ax2.set_title("Difference-from-median criterion")
ax2.plot(np.abs(mrrs), label="difference from median over 11 periods")
ax2.axhline(3, c="r", label="artifact threshold")
ax2.legend(loc="upper right")
ax2.set_ylim(0, 5)
# Visualize subspaces.
ax4.set_title("Subspace 1")
ax4.set_xlabel("S11")
ax4.set_ylabel("S12")
ax4.scatter(drrs, s12, marker="x", label="heart periods")
ax4.set_ylim(-5, 5)
ax4.set_xlim(-10, 10)
verts0 = [(-10, 5), (-10, -c1 * -10 + c2), (-1, -c1 * -1 + c2), (-1, 5)]
poly0 = matplotlib.patches.Polygon(
verts0, alpha=0.3, facecolor="r", edgecolor=None, label="ectopic periods"
)
ax4.add_patch(poly0)
verts1 = [(1, -c1 * 1 - c2), (1, -5), (10, -5), (10, -c1 * 10 - c2)]
poly1 = matplotlib.patches.Polygon(verts1, alpha=0.3, facecolor="r", edgecolor=None)
ax4.add_patch(poly1)
ax4.legend(loc="upper right")
ax3.set_title("Subspace 2")
ax3.set_xlabel("S21")
ax3.set_ylabel("S22")
ax3.scatter(drrs, s22, marker="x", label="heart periods")
ax3.set_xlim(-10, 10)
ax3.set_ylim(-10, 10)
verts2 = [(-10, 10), (-10, 1), (-1, 1), (-1, 10)]
poly2 = matplotlib.patches.Polygon(
verts2, alpha=0.3, facecolor="r", edgecolor=None, label="short periods"
)
ax3.add_patch(poly2)
verts3 = [(1, -1), (1, -10), (10, -10), (10, -1)]
poly3 = matplotlib.patches.Polygon(
verts3, alpha=0.3, facecolor="y", edgecolor=None, label="long periods"
)
ax3.add_patch(poly3)
ax3.legend(loc="upper right")
plt.tight_layout()
# =============================================================================
# NeuroKit
# =============================================================================
def _remove_small(
peaks,
sampling_rate=1000,
interval_min=None,
relative_interval_min=None,
robust=False,
):
if interval_min is None and relative_interval_min is None:
return peaks
if interval_min is not None:
interval = signal_period(
peaks, sampling_rate=sampling_rate, desired_length=None
)
peaks = peaks[interval > interval_min]
if relative_interval_min is not None:
interval = signal_period(
peaks, sampling_rate=sampling_rate, desired_length=None
)
peaks = peaks[standardize(interval, robust=robust) > relative_interval_min]
return peaks
def _interpolate_big(
peaks,
sampling_rate=1000,
interval_max=None,
relative_interval_max=None,
robust=False,
):
if interval_max is None and relative_interval_max is None:
return peaks
else:
interval = signal_period(
peaks, sampling_rate=sampling_rate, desired_length=None
)
if relative_interval_max is not None:
outliers = standardize(interval, robust=robust) > relative_interval_max
else:
outliers = interval > interval_max
outliers_loc = np.where(outliers)[0]
# interval returned by signal_period at index 0 is the mean of the intervals
# so it does not actually correspond to whether the first peak is an outlier
outliers_loc = outliers_loc[outliers_loc != 0]
if np.sum(outliers) == 0:
return peaks
peaks_to_correct = peaks.copy().astype(float)
interval_without_outliers = interval[np.invert(outliers)]
mean_interval = np.nanmean(interval_without_outliers)
# go through the outliers starting with the highest indices
# so that the indices of the other outliers are not moved when
# unknown intervas are inserted
for loc in np.flip(outliers_loc):
# compute number of NaNs to insert based on the mean interval
n_nan = round(interval[loc] / mean_interval)
# Delete peak corresponding to large interval and replace by N NaNs
peaks_to_correct[loc] = np.nan
peaks_to_correct = np.insert(peaks_to_correct, loc, [np.nan] * (n_nan - 1))
# Interpolate values
interpolated_peaks = (
pd.Series(peaks_to_correct).interpolate(limit_area="inside").values
)
# If there are missing values remaining, remove
peaks = interpolated_peaks[np.invert(np.isnan(interpolated_peaks))].astype(
peaks.dtype
)
return peaks