Source code for neurokit2.epochs.epochs_create

# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd

from ..events.events_find import _events_find_label
from ..misc import listify


[docs] def epochs_create( data, events=None, sampling_rate=1000, epochs_start=0, epochs_end="from_events", event_labels=None, event_conditions=None, baseline_correction=False, ): """**Create Epochs** Create epochs of a signal or a dataframe. Parameters ---------- data : DataFrame A DataFrame containing the different signal(s) as different columns. If a vector of values is passed, it will be transformed in a DataFrame with a single 'Signal' column. events : list or ndarray or dict Events onset location. If a dict is passed (e.g., from ``events_find()``), will select only the 'onset' list. If an integer is passed, will use this number to create an evenly spaced list of events. If None, will chunk the signal into successive blocks of the set duration. sampling_rate : int The sampling frequency of the signal (in Hz, i.e., samples/second). epochs_start : int, list Epochs start relative to events_onsets (in seconds). The start can be negative to start epochs before a given event (to have a baseline for instance). An integer can be specified to have the same start for all epochs. A list of equal length to the events can be specified to have a different start for each epoch. epochs_end : int, list Epochs end relative to events_onsets (in seconds). An integer can be specified to have the same end for all epochs. A list of equal length to the events can be specified to have a different end for each epoch. If ``"from_events"``, events must be a dict (from :func:`.events_find`). Duration from events will be used as ``epochs_end``. event_labels : list A list containing unique event identifiers. If ``None``, will use the event index number. event_conditions : list An optional list containing, for each event, for example the trial category, group or experimental conditions. baseline_correction : bool Defaults to False. Returns ---------- dict A dict containing DataFrames for all epochs. See Also ---------- events_find, events_plot, epochs_to_df, epochs_plot Examples ---------- * **Example 1**: Find events .. ipython:: python import neurokit2 as nk # Get data data = nk.data("bio_eventrelated_100hz") # Find events events = nk.events_find(data["Photosensor"], threshold_keep='below', event_conditions=["Negative", "Neutral", "Neutral", "Negative"]) @savefig p_epochs_create1.png scale=100% nk.events_plot(events, data) @suppress plt.close() * **Example 2**: Create epochs .. ipython:: python epochs = nk.epochs_create(data, events, sampling_rate=100, epochs_end=3) @savefig p_epochs_create2.png scale=100% nk.epochs_plot(epochs) @suppress plt.close() * **Example 3**: Baseline correction .. ipython:: python epochs = nk.epochs_create(data, events, sampling_rate=100, epochs_end=3, baseline_correction=True) @savefig p_epochs_create3.png scale=100% nk.epochs_plot(epochs) @suppress plt.close() * **Example 4**: Arbitrary epoching .. ipython:: python # Chunk into n blocks of 1 second epochs = nk.epochs_create(data, sampling_rate=100, epochs_end=1) * **Example 5**: Epochs with list of starting points .. ipython:: python epochs = nk.epochs_create(data, events, sampling_rate=100, epochs_start=[0, -1, -1, 0], epochs_end=[1, 0, 0, 1]) [len(epochs[i]) for i in epochs.keys()] """ # Santize data input if isinstance(data, tuple): # If a tuple of data and info is passed data = data[0] if isinstance(data, (list, np.ndarray, pd.Series)): data = pd.DataFrame({"Signal": list(data)}) # Sanitize events input if events is None: max_duration = (np.max(epochs_end) - np.min(epochs_start)) * sampling_rate events = np.arange(0, len(data) - max_duration, max_duration) if isinstance(events, int): events = np.linspace(0, len(data), events + 2)[1:-1] if isinstance(events, dict) is False: events = _events_find_label({"onset": events}, event_labels=event_labels, event_conditions=event_conditions) event_onsets = list(events["onset"]) event_labels = list(events["label"]) if "condition" in events.keys(): event_conditions = list(events["condition"]) # Create epochs if epochs_end == "from_events": if "duration" not in events.keys(): events["duration"] = list(np.diff(events["onset"])) + [len(data) - 1] epochs_end = [i / sampling_rate for i in events["duration"]] parameters = listify( onset=event_onsets, label=event_labels, condition=event_conditions, start=epochs_start, end=epochs_end, ) # Find the maximum numbers of samples in an epoch parameters["duration"] = list(np.array(parameters["end"]) - np.array(parameters["start"])) epoch_max_duration = int(max((i * sampling_rate for i in parameters["duration"]))) # Extend data by the max samples in epochs * NaN (to prevent non-complete data) length_buffer = epoch_max_duration # First createa buffer of the same dtype as data and fill with it 0s buffer = pd.DataFrame(0, index=range(length_buffer), columns=data.columns).astype(dtype=data.dtypes) # Only then, we convert the non-integers to nans (because regular numpy's ints cannot be nan) buffer.select_dtypes(exclude="int64").replace({0.0: np.nan}, inplace=True) # Now we can combine the buffer with the data data = pd.concat([buffer, data, buffer], ignore_index=True, sort=False) # Adjust the Onset of the events for the buffer parameters["onset"] = [i + length_buffer for i in parameters["onset"]] epochs = {} for i, label in enumerate(parameters["label"]): # Find indices start = parameters["onset"][i] + (parameters["start"][i] * sampling_rate) end = parameters["onset"][i] + (parameters["end"][i] * sampling_rate) # Slice dataframe epoch = data.iloc[int(start) : int(end)].copy() # Correct index epoch["Index"] = epoch.index.values - length_buffer epoch.index = np.linspace( start=parameters["start"][i], stop=parameters["end"][i], num=len(epoch), endpoint=True ) if baseline_correction is True: baseline_end = 0 if epochs_start <= 0 else epochs_start epoch = epoch - epoch.loc[:baseline_end].mean() # Add additional epoch["Label"] = parameters["label"][i] if parameters["condition"][i] is not None: epoch["Condition"] = parameters["condition"][i] # Store epochs[label] = epoch # Sanitize dtype of individual columns for i in epochs: for colname, column in epochs[i].select_dtypes(include=["object"]).items(): # Check whether columns are indices or label/condition values = column.unique().tolist() zero_or_one = not (False in [x in [0, 1] for x in values]) if zero_or_one: # Force to int64 epochs[i][colname] = epochs[i][colname].astype("int64") else: epochs[i][colname] = epochs[i][colname].astype("string") return epochs