Source code for neurokit2.eog.eog_features

# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd

from ..epochs import epochs_create
from ..signal import signal_zerocrossings


[docs] def eog_features(eog_cleaned, peaks, sampling_rate=1000): """**Extracts Features of EOG Eye Blinks** Extracts features of EOG eye blinks e.g., velocity measures, blink-amplitude-ratio (BAR), duration, and markers of onset and offset of each blink. The positive amplitude velocity ratio (pAVR) and the negative amplitude velocity ratio (nAVR). The positive amplitude velocity ratio is the ratio of the maximum amplitude of the blink over the maximum velocity (rate of change) during the blink upStroke. Similarly, the negative amplitude velocity ratio is the ratio of the maximum amplitude of the blink over the maximum velocity found in the blink downStroke. These measures have units of centiseconds and are indicators of fatigue. The blink-amplitude ratio (BAR) is the average amplitude of the signal between the blink leftZero and rightZero zero crossings divided by the average amplitude of the positive fraction of the signal “outside” the blink. BAR values in the range [5, 20]. BAR is a measure of the signal-to-noise ratio (SNR) of the blink to the background in a candidate signal. Parameters ---------- eog_cleaned : Union[list, np.array, pd.Series] The cleaned EOG channel, extracted from :func:`.eog_clean`. peaks : np.array Vector containing the samples at which EOG-peaks occur. sampling_rate : int The sampling frequency of :func:`.eog_signal` (in Hz, i.e., samples/second). Defaults to 1000. Returns ------- info : dict A dictionary containing information of the features of the EOG blinks, accessible with keys ``"Blink_LeftZeros"`` (point when eye closes), ``"Blink_RightZeros"`` (point when eye opens) , ``"Blink_pAVR"``, ``"Blink_nAVR"``, ``"Blink_BAR"``, and ``"Blink_Duration"`` (duration of each blink in seconds). See Also -------- eog_clean, eog_findpeaks Examples -------- .. ipython:: python import neurokit2 as nk # Get data eog_signal = nk.data('eog_100hz') eog_cleaned = nk.eog_clean(eog_signal, sampling_rate=100) peaks = nk.eog_findpeaks(eog_cleaned, sampling_rate=100) info = nk.eog_features(eog_cleaned, peaks, sampling_rate=100) References ---------- * Kleifges, K., Bigdely-Shamlo, N., Kerick, S. E., & Robbins, K. A. (2017). BLINKER: automated extraction of ocular indices from EEG enabling large-scale analysis. Frontiers in neuroscience, 11, 12. """ BARs, _, leftzeros, rightzeros, downstrokes, upstrokes = _eog_features_delineate( eog_cleaned, peaks, sampling_rate=sampling_rate ) pAVR_list = [] nAVR_list = [] duration_list = [] for i in range(len(peaks)): # Closing blink (pAVR) blink_close = upstrokes[i].Signal change_close = np.diff(blink_close) duration_close = len(change_close) / sampling_rate pAVR = abs(change_close.max() / duration_close) * 100 pAVR_list.append(pAVR) # Opening blink (nAVR) blink_open = downstrokes[i].Signal change_open = np.diff(blink_open) duration_open = len(change_open) / sampling_rate nAVR = abs(change_open.max() / duration_open) * 100 nAVR_list.append(nAVR) # Duration blink_full = np.hstack([np.array(upstrokes[i].Signal), np.array(downstrokes[i].Signal)]) duration_full = len(blink_full) / sampling_rate # in seconds duration_list.append(duration_full) # Return info dictionary info = { "Blink_LeftZeros": leftzeros, "Blink_RightZeros": rightzeros, "Blink_pAVR": pAVR_list, "Blink_nAVR": nAVR_list, "Blink_BAR": BARs, "Blink_Duration": duration_list, } return info
# ============================================================================= # Internals # ============================================================================= def _eog_features_delineate(eog_cleaned, candidates, sampling_rate=1000): # Calculate blink landmarks epochs = epochs_create( eog_cleaned, events=candidates, sampling_rate=sampling_rate, epochs_start=-0.5, epochs_end=0.5, ) # max value marker peaks = [] leftzeros = [] rightzeros = [] downstrokes = [] upstrokes = [] BARs = [] for i in epochs: max_value = epochs[i].Signal.max() # Check if peak is at the end or start of epoch t = epochs[i].loc[epochs[i]["Signal"] == max_value].index if np.all(0.3 < t < 0.51): # Trim end of epoch epochs[i] = epochs[i][-0.5:0.3] max_value = epochs[i].Signal.max() if np.all(-0.51 < t < -0.3): # Trim start of epoch epochs[i] = epochs[i][-0.3:0.5] max_value = epochs[i].Signal.max() # Find position of peak max_frame = epochs[i]["Index"].loc[epochs[i]["Signal"] == max_value] max_frame = np.array(max_frame) if len(max_frame) > 1: max_frame = max_frame[0] # If two points achieve max value, first one is blink else: max_frame = int(max_frame) # left and right zero markers crossings = signal_zerocrossings(epochs[i].Signal) crossings_idx = epochs[i]["Index"].iloc[crossings] crossings_idx = np.sort(np.append([np.array(crossings_idx)], [max_frame])) max_position = int(np.where(crossings_idx == max_frame)[0]) if (max_position - 1) >= 0: # crosses zero point leftzero = crossings_idx[max_position - 1] else: max_value_t = epochs[i].Signal.idxmax() sliced_before = epochs[i].loc[slice(max_value_t), :] leftzero = sliced_before["Index"].loc[ sliced_before["Signal"] == sliced_before["Signal"].min() ] leftzero = int(np.array(leftzero)) if (max_position + 1) < len(crossings_idx): # crosses zero point rightzero = crossings_idx[max_position + 1] else: max_value_t = epochs[i].Signal.idxmax() sliced_before = epochs[i].loc[slice(max_value_t), :] sliced_after = epochs[i].tail(epochs[i].shape[0] - sliced_before.shape[0]) rightzero = sliced_after["Index"].loc[ sliced_after["Signal"] == sliced_after["Signal"].min() ] rightzero = int(np.array(rightzero)) # upstroke and downstroke markers upstroke_idx = list(np.arange(leftzero, max_frame)) upstroke = epochs[i].loc[epochs[i]["Index"].isin(upstroke_idx)] downstroke_idx = list(np.arange(max_frame, rightzero)) downstroke = epochs[i].loc[epochs[i]["Index"].isin(downstroke_idx)] # left base and right base markers leftbase_idx = list(np.arange(epochs[i]["Index"].iloc[0], leftzero)) leftbase_signal = epochs[i].loc[epochs[i]["Index"].isin(leftbase_idx)] # leftbase_min = leftbase_signal['Signal'].min() # leftbase = np.array(leftbase_signal['Index'].loc[leftbase_signal['Signal'] == leftbase_min])[0] rightbase_idx = list(np.arange(rightzero, epochs[i]["Index"].iloc[epochs[i].shape[0] - 1])) rightbase_signal = epochs[i].loc[epochs[i]["Index"].isin(rightbase_idx)] # rightbase_min = rightbase_signal['Signal'].min() # rightbase = np.array(rightbase_signal['Index'].loc[rightbase_signal['Signal'] == rightbase_min])[0] # Rejecting candidate signals with low SNR (BAR = blink-amplitude-ratio) inside_blink_idx = list(np.arange(leftzero, rightzero)) inside_blink = epochs[i].loc[epochs[i]["Index"].isin(inside_blink_idx)] outside_blink = pd.concat([leftbase_signal, rightbase_signal], axis=0) BAR = inside_blink.Signal.mean() / outside_blink.Signal[outside_blink["Signal"] > 0].mean() # Features of all candidates BARs.append(BAR) leftzeros.append(leftzero) rightzeros.append(rightzero) downstrokes.append(downstroke) upstrokes.append(upstroke) # BAR values in the range [5, 20] usually capture blinks reasonably well if 3 < BAR < 50: peaks.append(max_frame) return BARs, peaks, leftzeros, rightzeros, downstrokes, upstrokes