Source code for neurokit2.rsp.rsp_intervalrelated
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
from .rsp_rav import rsp_rav
from .rsp_rrv import rsp_rrv
# =============================================================================
# Internals
# =============================================================================
def _rsp_intervalrelated_features(data, sampling_rate, output={}):
# Sanitize input
colnames = data.columns.values
if "RSP_Rate" in colnames:
output["RSP_Rate_Mean"] = np.nanmean(data["RSP_Rate"].values)
rrv = rsp_rrv(data, sampling_rate=sampling_rate)
output.update(rrv.to_dict(orient="records")[0])
if "RSP_Amplitude" in colnames:
rav = rsp_rav(data["RSP_Amplitude"].values, peaks=data)
output.update(rav.to_dict(orient="records")[0])
if "RSP_RVT" in colnames:
output["RSP_RVT"] = np.nanmean(data["RSP_RVT"].values)
if "RSP_Symmetry_PeakTrough" in colnames:
output["RSP_Symmetry_PeakTrough"] = np.nanmean(
data["RSP_Symmetry_PeakTrough"].values
)
output["RSP_Symmetry_RiseDecay"] = np.nanmean(
data["RSP_Symmetry_RiseDecay"].values
)
if "RSP_Phase" in colnames:
# Extract inspiration durations
insp_phases = data[data["RSP_Phase"] == 1]
insp_start = insp_phases.index[insp_phases["RSP_Phase_Completion"] == 0]
insp_end = insp_phases.index[insp_phases["RSP_Phase_Completion"] == 1]
# Check that start of phase is before end of phase
if insp_start[0] > insp_end[0]:
insp_end = insp_end[1:]
# Check for unequal lengths
diff = abs(len(insp_start) - len(insp_end))
if len(insp_start) > len(insp_end):
insp_start = insp_start[
: len(insp_start) - diff
] # remove extra start points
elif len(insp_end) > len(insp_start):
insp_end = insp_end[: len(insp_end) - diff] # remove extra end points
insp_times = np.array(insp_end - insp_start) / sampling_rate
# Extract expiration durations
exp_phases = data[data["RSP_Phase"] == 0]
exp_start = exp_phases.index[exp_phases["RSP_Phase_Completion"] == 0]
exp_end = exp_phases.index[exp_phases["RSP_Phase_Completion"] == 1]
# Check that start of phase is before end of phase
if exp_start[0] > exp_end[0]:
exp_end = exp_end[1:]
# Check for unequal lengths
diff = abs(len(exp_start) - len(exp_end))
if len(exp_start) > len(exp_end):
exp_start = exp_start[: len(exp_start) - diff] # remove extra start points
elif len(exp_end) > len(exp_start):
exp_end = exp_end[: len(exp_end) - diff] # remove extra end points
exp_times = np.array(exp_end - exp_start) / sampling_rate
output["RSP_Phase_Duration_Inspiration"] = np.mean(insp_times)
output["RSP_Phase_Duration_Expiration"] = np.mean(exp_times)
output["RSP_Phase_Duration_Ratio"] = (
output["RSP_Phase_Duration_Inspiration"]
/ output["RSP_Phase_Duration_Expiration"]
)
return output