# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
from ..misc import find_closest
from ..signal import signal_formatpeaks
from .eda_findpeaks import eda_findpeaks
from .eda_fixpeaks import eda_fixpeaks
[docs]
def eda_peaks(eda_phasic, sampling_rate=1000, method="neurokit", amplitude_min=0.1):
"""**Find Skin Conductance Responses (SCR) in Electrodermal Activity (EDA)**
Identify Skin Conductance Responses (SCR) peaks in the phasic component of
Electrodermal Activity (EDA) with different possible methods, such as:
* `Gamboa, H. (2008) <http://www.lx.it.pt/~afred/pub/thesisHugoGamboa.pdf>`_
* `Kim et al. (2004) <http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.102.7385&rep=rep1&type=pdf>`_
Parameters
----------
eda_phasic : Union[list, np.array, pd.Series]
The phasic component of the EDA signal (from :func:`eda_phasic()`).
sampling_rate : int
The sampling frequency of the EDA signal (in Hz, i.e., samples/second).
method : str
The processing pipeline to apply. Can be one of ``"neurokit"`` (default),
``"gamboa2008"``, ``"kim2004"`` (the default in BioSPPy), ``"vanhalem2020"`` or ``"nabian2018"``.
amplitude_min : float
Only used if ``method`` is ``"neurokit"`` or ``"kim2004"``. Minimum threshold by which to
exclude SCRs (peaks) as relative to the largest amplitude in the signal.
Returns
-------
info : dict
A dictionary containing additional information, in this case the aplitude of the SCR, the
samples at which the SCR onset and the SCR peaks occur. Accessible with the keys
``"SCR_Amplitude"``, ``"SCR_Onsets"``, and ``"SCR_Peaks"`` respectively. It also contains the signals' sampling rate.
signals : DataFrame
A DataFrame of same length as the input signal in which occurences of SCR peaks are marked
as "1" in lists of zeros with the same length as ``"eda_cleaned"``. Accessible with the keys
``"SCR_Peaks"``.
See Also
--------
eda_simulate, eda_clean, eda_phasic, eda_process, eda_plot
Examples
---------
.. ipython:: python
import neurokit2 as nk
# Get phasic component
eda_signal = nk.eda_simulate(duration=30, scr_number=5, drift=0.1, noise=0, sampling_rate=100)
eda_cleaned = nk.eda_clean(eda_signal, sampling_rate=100)
eda = nk.eda_phasic(eda_cleaned, sampling_rate=100)
eda_phasic = eda["EDA_Phasic"].values
# Find peaks
_, kim2004 = nk.eda_peaks(eda_phasic, sampling_rate=100, method="kim2004")
_, neurokit = nk.eda_peaks(eda_phasic, sampling_rate=100, method="neurokit")
_, nabian2018 = nk.eda_peaks(eda_phasic, sampling_rate=100, method="nabian2018")
@savefig p_eda_peaks.png scale=100%
nk.events_plot([
nabian2018["SCR_Peaks"],
kim2004["SCR_Peaks"],
neurokit["SCR_Peaks"]
], eda_phasic)
@suppress
plt.close()
References
----------
* Gamboa, H. (2008). Multi-modal behavioral biometrics based on hci and electrophysiology.
PhD ThesisUniversidade.
* Kim, K. H., Bang, S. W., & Kim, S. R. (2004). Emotion recognition system using short-term
monitoring of physiological signals. Medical and biological engineering and computing, 42(3),
419-427.
* van Halem, S., Van Roekel, E., Kroencke, L., Kuper, N., & Denissen, J. (2020).
Moments That Matter? On the Complexity of Using Triggers Based on Skin Conductance to Sample
Arousing Events Within an Experience Sampling Framework. European Journal of Personality.
* Nabian, M., Yin, Y., Wormwood, J., Quigley, K. S., Barrett, L. F., & Ostadabbas, S. (2018). An
Open-Source Feature Extraction Tool for the Analysis of Peripheral Physiological Data. IEEE
journal of translational engineering in health and medicine, 6, 2800711.
"""
if isinstance(eda_phasic, (pd.DataFrame, pd.Series)):
try:
eda_phasic = eda_phasic["EDA_Phasic"]
except KeyError:
eda_phasic = eda_phasic.values
# Get basic
info = eda_findpeaks(
eda_phasic, sampling_rate=sampling_rate, method=method, amplitude_min=amplitude_min
)
info = eda_fixpeaks(info)
# Get additional features (rise time, half recovery time, etc.)
info = _eda_peaks_getfeatures(info, eda_phasic, sampling_rate, recovery_percentage=0.5)
# Prepare output.
peak_signal = signal_formatpeaks(
info,
desired_length=len(eda_phasic),
peak_indices=info["SCR_Peaks"],
other_indices=info["SCR_Recovery"],
)
info["sampling_rate"] = sampling_rate # Add sampling rate in dict info
return peak_signal, info
# =============================================================================
# Utility
# =============================================================================
def _eda_peaks_getfeatures(info, eda_phasic, sampling_rate=1000, recovery_percentage=0.5):
# Sanity checks -----------------------------------------------------------
# Peaks (remove peaks before first onset)
valid_peaks = np.logical_and(
info["SCR_Peaks"] > np.nanmin(info["SCR_Onsets"]), ~np.isnan(info["SCR_Onsets"])
) # pylint: disable=E1111
peaks = info["SCR_Peaks"][valid_peaks]
# Onsets (remove onsets with after last peak)
valid_onsets = ~np.isnan(info["SCR_Onsets"])
valid_onsets[valid_onsets] = info["SCR_Onsets"][valid_onsets] < np.nanmax(info["SCR_Peaks"])
onsets = info["SCR_Onsets"][valid_onsets].astype(int)
if len(onsets) != len(peaks):
raise ValueError(
"NeuroKit error: eda_peaks(): Peaks and onsets don't ",
"match, so cannot get amplitude safely. Check why using `find_peaks()`.",
)
# Amplitude and Rise Time -------------------------------------------------
# Amplitudes
amplitude = np.full(len(info["SCR_Height"]), np.nan)
amplitude[valid_peaks] = info["SCR_Height"][valid_peaks] - eda_phasic[onsets]
# Rise times (in seconds)
risetime = np.full(len(info["SCR_Peaks"]), np.nan)
risetime[valid_peaks] = (peaks - onsets) / sampling_rate
# Save info
info["SCR_Amplitude"] = amplitude
info["SCR_RiseTime"] = risetime
# Recovery time -----------------------------------------------------------
# (Half) Recovery times
recovery = np.full(len(info["SCR_Peaks"]), np.nan)
recovery_time = np.full(len(info["SCR_Peaks"]), np.nan)
recovery_values = eda_phasic[onsets] + (amplitude[valid_peaks] * recovery_percentage)
for i, peak_index in enumerate(peaks):
# Get segment between peak and next peak
try:
segment = eda_phasic[peak_index : peaks[i + 1]]
except IndexError:
segment = eda_phasic[peak_index::]
# Adjust segment (cut when it reaches minimum to avoid picking out values on the rise of the next peak)
segment = segment[0 : np.argmin(segment)]
# Find recovery time
recovery_value = find_closest(
recovery_values[i], segment, direction="smaller", strictly=False
)
# Detect recovery points only if there are datapoints below recovery value
if np.min(segment) < recovery_value:
segment_index = np.where(segment == recovery_value)[0][0]
recovery[np.where(valid_peaks)[0][i]] = peak_index + segment_index
recovery_time[np.where(valid_peaks)[0][i]] = segment_index / sampling_rate
# Save ouput
info["SCR_Recovery"] = recovery
info["SCR_RecoveryTime"] = recovery_time
return info