Source code for neurokit2.rsp.rsp_intervalrelated
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
from .rsp_rav import rsp_rav
from .rsp_rrv import rsp_rrv
# =============================================================================
# Internals
# =============================================================================
def _rsp_intervalrelated_features(data, sampling_rate, output={}):
    # Sanitize input
    colnames = data.columns.values
    if "RSP_Rate" in colnames:
        output["RSP_Rate_Mean"] = np.nanmean(data["RSP_Rate"].values)
        rrv = rsp_rrv(data, sampling_rate=sampling_rate)
        output.update(rrv.to_dict(orient="records")[0])
    if "RSP_Amplitude" in colnames:
        rav = rsp_rav(data["RSP_Amplitude"].values, peaks=data)
        output.update(rav.to_dict(orient="records")[0])
    if "RSP_RVT" in colnames:
        output["RSP_RVT"] = np.nanmean(data["RSP_RVT"].values)
    if "RSP_Symmetry_PeakTrough" in colnames:
        output["RSP_Symmetry_PeakTrough"] = np.nanmean(
            data["RSP_Symmetry_PeakTrough"].values
        )
        output["RSP_Symmetry_RiseDecay"] = np.nanmean(
            data["RSP_Symmetry_RiseDecay"].values
        )
    if "RSP_Phase" in colnames:
        # Extract inspiration durations
        insp_phases = data[data["RSP_Phase"] == 1]
        insp_start = insp_phases.index[insp_phases["RSP_Phase_Completion"] == 0]
        insp_end = insp_phases.index[insp_phases["RSP_Phase_Completion"] == 1]
        # Check that start of phase is before end of phase
        if insp_start[0] > insp_end[0]:
            insp_end = insp_end[1:]
        # Check for unequal lengths
        diff = abs(len(insp_start) - len(insp_end))
        if len(insp_start) > len(insp_end):
            insp_start = insp_start[
                : len(insp_start) - diff
            ]  # remove extra start points
        elif len(insp_end) > len(insp_start):
            insp_end = insp_end[: len(insp_end) - diff]  # remove extra end points
        insp_times = np.array(insp_end - insp_start) / sampling_rate
        # Extract expiration durations
        exp_phases = data[data["RSP_Phase"] == 0]
        exp_start = exp_phases.index[exp_phases["RSP_Phase_Completion"] == 0]
        exp_end = exp_phases.index[exp_phases["RSP_Phase_Completion"] == 1]
        # Check that start of phase is before end of phase
        if exp_start[0] > exp_end[0]:
            exp_end = exp_end[1:]
        # Check for unequal lengths
        diff = abs(len(exp_start) - len(exp_end))
        if len(exp_start) > len(exp_end):
            exp_start = exp_start[: len(exp_start) - diff]  # remove extra start points
        elif len(exp_end) > len(exp_start):
            exp_end = exp_end[: len(exp_end) - diff]  # remove extra end points
        exp_times = np.array(exp_end - exp_start) / sampling_rate
        output["RSP_Phase_Duration_Inspiration"] = np.mean(insp_times)
        output["RSP_Phase_Duration_Expiration"] = np.mean(exp_times)
        output["RSP_Phase_Duration_Ratio"] = (
            output["RSP_Phase_Duration_Inspiration"]
            / output["RSP_Phase_Duration_Expiration"]
        )
    return output