Source code for neurokit2.bio.bio_analyze

# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd

from ..ecg import ecg_analyze
from ..eda import eda_analyze
from ..emg import emg_analyze
from ..eog import eog_analyze
from ..hrv import hrv_rsa
from ..ppg import ppg_analyze
from ..rsp import rsp_analyze


[docs] def bio_analyze(data, sampling_rate=1000, method="auto", window_lengths="constant"): """**Automated analysis of physiological signals** Wrapper for other bio analyze functions of electrocardiography signals (ECG), respiration signals (RSP), electrodermal activity (EDA), electromyography signals (EMG) and electrooculography signals (EOG). Parameters ---------- data : DataFrame The DataFrame containing all the processed signals, typically produced by :func:`.bio_process`, :func:`.ecg_process`, :func:`.rsp_process`, :func:`.eda_process`, :func:`.emg_process` or :func:`.eog_process`. Can also be an epochs object. sampling_rate : int The sampling frequency of the signals (in Hz, i.e., samples/second). Defaults to 1000. method : str Can be one of ``"event-related"`` for event-related analysis on epochs, or ``"interval-related"`` for analysis on longer periods of data. Defaults to ``auto`` where the right method will be chosen based on the mean duration of the data (event-related for duration under 10s). window_lengths : dict If ``constant`` (default), will use the full epoch for all the signals. Can also be a dictionary with the epoch start and end times for different types of signals, e.g., ``window_lengths = {"ECG": [0.5, 1.5], "EDA": [0.5, 3.5]}`` Returns ---------- DataFrame DataFrame of the analyzed bio features. See docstrings of :func:`.ecg_analyze()`, :func:`.rsp_analyze()`, :func:`.eda_analyze()`, :func:`.emg_analyze()` and :func:`.eog_analyze()` for more details. Also returns Respiratory Sinus Arrhythmia features produced by :func:`.hrv_rsa()` if interval-related analysis is carried out. See Also ---------- .ecg_analyze, .rsp_analyze, .eda_analyze, .emg_analyze, .eog_analyze Examples ---------- **Example 1**: Event-related analysis .. ipython:: python import neurokit2 as nk # Download data data = nk.data("bio_eventrelated_100hz") # Process the data df, info = nk.bio_process(ecg=data["ECG"], rsp=data["RSP"], eda=data["EDA"], keep=data["Photosensor"], sampling_rate=100) # Build epochs around photosensor-marked events events = nk.events_find(data["Photosensor"], threshold_keep="below", event_conditions=["Negative", "Neutral", "Neutral", "Negative"]) epochs = nk.epochs_create(df, events, sampling_rate=100, epochs_start=-0.1, epochs_end=1.9) # Analyze nk.bio_analyze(epochs, sampling_rate=100) **Example 2**: Interval-related analysis .. ipython:: python # Download data data = nk.data("bio_resting_5min_100hz") # Process the data df, info = nk.bio_process(ecg=data["ECG"], rsp=data["RSP"], ppg=data["PPG"], sampling_rate=100) # Analyze nk.bio_analyze(df, sampling_rate=100) """ features = pd.DataFrame() method = method.lower() # Sanitize input if isinstance(data, pd.DataFrame): ecg_cols = [col for col in data.columns if "ECG" in col] rsp_cols = [col for col in data.columns if "RSP" in col] eda_cols = [col for col in data.columns if "EDA" in col] emg_cols = [col for col in data.columns if "EMG" in col] ppg_cols = [col for col in data.columns if "PPG" in col] eog_cols = [col for col in data.columns if "EOG" in col] ecg_rate_col = [col for col in data.columns if "ECG_Rate" in col] rsp_phase_col = [col for col in data.columns if "RSP_Phase" in col] elif isinstance(data, dict): for i in data: ecg_cols = [col for col in data[i].columns if "ECG" in col] rsp_cols = [col for col in data[i].columns if "RSP" in col] eda_cols = [col for col in data[i].columns if "EDA" in col] emg_cols = [col for col in data[i].columns if "EMG" in col] ppg_cols = [col for col in data[i].columns if "PPG" in col] eog_cols = [col for col in data[i].columns if "EOG" in col] ecg_rate_col = [col for col in data[i].columns if "ECG_Rate" in col] rsp_phase_col = [col for col in data[i].columns if "RSP_Phase" in col] else: raise ValueError( "NeuroKit error: bio_analyze(): Wrong input, please make sure you enter a DataFrame or a dictionary. " ) # ECG if len(ecg_cols) != 0: ecg_data = data.copy() if window_lengths != "constant": if "ECG" in window_lengths.keys(): # only for epochs ecg_data = _bio_analyze_slicewindow(ecg_data, window_lengths, signal="ECG") ecg_analyzed = ecg_analyze(ecg_data, sampling_rate=sampling_rate, method=method) features = pd.concat([features, ecg_analyzed], axis=1, sort=False) # RSP if len(rsp_cols) != 0: rsp_data = data.copy() if window_lengths != "constant": if "RSP" in window_lengths.keys(): # only for epochs rsp_data = _bio_analyze_slicewindow(rsp_data, window_lengths, signal="RSP") rsp_analyzed = rsp_analyze(rsp_data, sampling_rate=sampling_rate, method=method) features = pd.concat([features, rsp_analyzed], axis=1, sort=False) # EDA if len(eda_cols) != 0: eda_data = data.copy() if window_lengths != "constant": if "EDA" in window_lengths.keys(): # only for epochs eda_data = _bio_analyze_slicewindow(eda_data, window_lengths, signal="EDA") eda_analyzed = eda_analyze(eda_data, sampling_rate=sampling_rate, method=method) features = pd.concat([features, eda_analyzed], axis=1, sort=False) # EMG if len(emg_cols) != 0: emg_data = data.copy() if window_lengths != "constant": if "EMG" in window_lengths.keys(): # only for epochs emg_data = _bio_analyze_slicewindow(emg_data, window_lengths, signal="EMG") emg_analyzed = emg_analyze(emg_data, sampling_rate=sampling_rate, method=method) features = pd.concat([features, emg_analyzed], axis=1, sort=False) # PPG if len(ppg_cols) != 0: ppg_data = data.copy() if window_lengths != "constant": if "PPG" in window_lengths.keys(): # only for epochs ppg_data = _bio_analyze_slicewindow(ppg_data, window_lengths, signal="PPG") ppg_analyzed = ppg_analyze(ppg_data, sampling_rate=sampling_rate, method=method) features = pd.concat([features, ppg_analyzed], axis=1, sort=False) # EOG if len(eog_cols) != 0: eog_data = data.copy() if window_lengths != "constant": if "EOG" in window_lengths.keys(): # only for epochs eog_data = _bio_analyze_slicewindow(eog_data, window_lengths, signal="EOG") eog_analyzed = eog_analyze(eog_data, sampling_rate=sampling_rate, method=method) features = pd.concat([features, eog_analyzed], axis=1, sort=False) # RSA if len(ecg_rate_col + rsp_phase_col) >= 3: if method == "auto": duration = _bio_analyze_findduration(data, sampling_rate=sampling_rate) if duration >= 10: method = "interval" else: method = "event" # Event-related if method in ["event-related", "event", "epoch"]: rsa = _bio_analyze_rsa_event(data.copy()) # Interval-related elif method in ["interval-related", "interval", "resting-state"]: rsa = _bio_analyze_rsa_interval(data.copy(), sampling_rate=sampling_rate) # Auto else: raise ValueError("Wrong `method` argument.") features = pd.concat([features, rsa], axis=1, sort=False) # Remove duplicate columns of Label and Condition if "Label" in features.columns.values: features = features.loc[:, ~features.columns.duplicated()] return features
# ============================================================================= # Internals # ============================================================================= def _bio_analyze_slicewindow(data, window_lengths, signal="ECG"): if signal in window_lengths.keys(): start = window_lengths[signal][0] end = window_lengths[signal][1] epochs = {} for _, label in enumerate(data): # Slice window epoch = data[label].loc[(data[label].index > start) & (data[label].index < end)] epochs[label] = epoch return epochs def _bio_analyze_findduration(data, sampling_rate=1000): # If DataFrame if isinstance(data, pd.DataFrame): if "Label" in data.columns: labels = data["Label"].unique() durations = [len(data[data["Label"] == label]) / sampling_rate for label in labels] else: durations = [len(data) / sampling_rate] # If dictionary if isinstance(data, dict): durations = [len(data[i]) / sampling_rate for i in data] return np.nanmean(durations) def _bio_analyze_rsa_interval(data, sampling_rate=1000): # RSA features for interval-related analysis if isinstance(data, pd.DataFrame): rsa = hrv_rsa(data, sampling_rate=sampling_rate, continuous=False) rsa = pd.DataFrame.from_dict(rsa, orient="index").T elif isinstance(data, dict): for index in data: rsa[index] = {} # Initialize empty container data[index] = data[index].set_index("Index").drop(["Label"], axis=1) rsa[index] = hrv_rsa(data[index], sampling_rate=sampling_rate, continuous=False) rsa = pd.DataFrame.from_dict(rsa, orient="index") return rsa def _bio_analyze_rsa_event(data): # RSA features for event-related analysis rsa = {} if isinstance(data, dict): for i in data: rsa[i] = _bio_analyze_rsa_epoch(data[i]) rsa = pd.DataFrame.from_dict(rsa, orient="index") elif isinstance(data, pd.DataFrame): # Convert back to dict for label, df in data.groupby("Label"): rsa[label] = {} epoch = df.set_index("Time") rsa[label] = _bio_analyze_rsa_epoch(epoch, rsa[label]) rsa = pd.DataFrame.from_dict(rsa, orient="index") # Fix index sorting to combine later with features dataframe rsa.index = rsa.index.astype(int) rsa = rsa.sort_index().rename_axis(None) rsa.index = rsa.index.astype(str) return rsa def _bio_analyze_rsa_epoch(epoch): # RSA features for event-related analysis: epoching output = {} # To remove baseline if np.min(epoch.index.values) <= 0: baseline = epoch["RSA_P2T"][epoch.index <= 0].values signal = epoch["RSA_P2T"][epoch.index > 0].values output["RSA_P2T"] = np.mean(signal) - np.mean(baseline) baseline = epoch["RSA_Gates"][epoch.index <= 0].values signal = epoch["RSA_Gates"][epoch.index > 0].values output["RSA_Gates"] = np.nanmean(signal) - np.nanmean(baseline) else: signal = epoch["RSA_P2T"].values output["RSA_P2T"] = np.mean(signal) signal = epoch["RSA_Gates"].values output["RSA_Gates"] = np.nanmean(signal) return output