Source code for neurokit2.rsp.rsp_plot

# -*- coding: utf-8 -*-
from warnings import warn

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from ..misc import NeuroKitWarning


[docs] def rsp_plot(rsp_signals, info=None, static=True): """**Visualize respiration (RSP) data** Parameters ---------- rsp_signals : DataFrame DataFrame obtained from :func:`.rsp_process`. info : dict The information Dict returned by ``rsp_process()``. Defaults to ``None``. static : bool If True, a static plot will be generated with matplotlib. If False, an interactive plot will be generated with plotly. Defaults to True. See Also -------- rsp_process Returns ------- See :func:`.ecg_plot` for details on how to access the figure, modify the size and save it. Examples -------- .. ipython:: python import neurokit2 as nk # Simulate data rsp = nk.rsp_simulate(duration=90, respiratory_rate=15, sampling_rate=100) # Process signal rsp_signals, info = nk.rsp_process(rsp, sampling_rate=100) # Plot @savefig p_rsp_plot1.png scale=100% nk.rsp_plot(rsp_signals, info) @suppress plt.close() """ if info is None: warn( "'info' dict not provided. Some information might be missing." + " Sampling rate will be set to 1000 Hz.", category=NeuroKitWarning, ) info = { "sampling_rate": 1000, } # Mark peaks, troughs and phases. peaks = np.where(rsp_signals["RSP_Peaks"] == 1)[0] troughs = np.where(rsp_signals["RSP_Troughs"] == 1)[0] inhale = np.where(rsp_signals["RSP_Phase"] == 1)[0] exhale = np.where(rsp_signals["RSP_Phase"] == 0)[0] nrow = 2 # Determine mean rate. rate_mean = np.mean(rsp_signals["RSP_Rate"]) if "RSP_Amplitude" in list(rsp_signals.columns): nrow += 1 # Determine mean amplitude. amplitude_mean = np.mean(rsp_signals["RSP_Amplitude"]) if "RSP_RVT" in list(rsp_signals.columns): nrow += 1 # Determine mean RVT. rvt_mean = np.mean(rsp_signals["RSP_RVT"]) if "RSP_Symmetry_PeakTrough" in list(rsp_signals.columns): nrow += 1 # Get signals marking inspiration and expiration. exhale_signal, inhale_signal = _rsp_plot_phase(rsp_signals, troughs, peaks) # Determine unit of x-axis. x_label = "Time (seconds)" x_axis = np.linspace(0, len(rsp_signals) / info["sampling_rate"], len(rsp_signals)) if static: fig, ax = plt.subplots(nrows=nrow, ncols=1, sharex=True) last_ax = fig.get_axes()[-1] last_ax.set_xlabel(x_label) # Plot cleaned and raw respiration as well as peaks and troughs. ax[0].set_title("Raw and Cleaned Signal") fig.suptitle("Respiration (RSP)", fontweight="bold") ax[0].plot( x_axis, rsp_signals["RSP_Raw"], color="#B0BEC5", label="Raw", zorder=1 ) ax[0].plot( x_axis, rsp_signals["RSP_Clean"], color="#2196F3", label="Cleaned", zorder=2, linewidth=1.5, ) ax[0].scatter( x_axis[peaks], rsp_signals["RSP_Clean"][peaks], color="red", label="Exhalation Onsets", zorder=3, ) ax[0].scatter( x_axis[troughs], rsp_signals["RSP_Clean"][troughs], color="orange", label="Inhalation Onsets", zorder=4, ) # Shade region to mark inspiration and expiration. ax[0].fill_between( x_axis[exhale], exhale_signal[exhale], rsp_signals["RSP_Clean"][exhale], where=rsp_signals["RSP_Clean"][exhale] > exhale_signal[exhale], color="#CFD8DC", linestyle="None", label="exhalation", ) ax[0].fill_between( x_axis[inhale], inhale_signal[inhale], rsp_signals["RSP_Clean"][inhale], where=rsp_signals["RSP_Clean"][inhale] > inhale_signal[inhale], color="#ECEFF1", linestyle="None", label="inhalation", ) ax[0].legend(loc="upper right") # Plot rate and optionally amplitude. ax[1].set_title("Breathing Rate") ax[1].plot( x_axis, rsp_signals["RSP_Rate"], color="#4CAF50", label="Rate", linewidth=1.5, ) ax[1].axhline(y=rate_mean, label="Mean", linestyle="--", color="#4CAF50") ax[1].legend(loc="upper right") if "RSP_Amplitude" in list(rsp_signals.columns): ax[2].set_title("Breathing Amplitude") ax[2].plot( x_axis, rsp_signals["RSP_Amplitude"], color="#009688", label="Amplitude", linewidth=1.5, ) ax[2].axhline( y=amplitude_mean, label="Mean", linestyle="--", color="#009688" ) ax[2].legend(loc="upper right") if "RSP_RVT" in list(rsp_signals.columns): ax[3].set_title("Respiratory Volume per Time") ax[3].plot( x_axis, rsp_signals["RSP_RVT"], color="#00BCD4", label="RVT", linewidth=1.5, ) ax[3].axhline(y=rvt_mean, label="Mean", linestyle="--", color="#009688") ax[3].legend(loc="upper right") if "RSP_Symmetry_PeakTrough" in list(rsp_signals.columns): ax[4].set_title("Cycle Symmetry") ax[4].plot( x_axis, rsp_signals["RSP_Symmetry_PeakTrough"], color="green", label="Peak-Trough Symmetry", linewidth=1.5, ) ax[4].plot( x_axis, rsp_signals["RSP_Symmetry_RiseDecay"], color="purple", label="Rise-Decay Symmetry", linewidth=1.5, ) ax[4].legend(loc="upper right") else: # Generate interactive plot with plotly. try: import plotly.graph_objects as go from plotly.subplots import make_subplots except ImportError as e: raise ImportError( "NeuroKit error: rsp_plot(): the 'plotly'", " module is required when 'static' is False.", " Please install it first (`pip install plotly`).", ) from e subplot_titles = ["Raw and Cleaned Signal", "Breathing Rate"] if "RSP_Amplitude" in list(rsp_signals.columns): subplot_titles.append("Breathing Amplitude") if "RSP_RVT" in list(rsp_signals.columns): subplot_titles.append("Respiratory Volume per Time") if "RSP_Symmetry_PeakTrough" in list(rsp_signals.columns): subplot_titles.append("Cycle Symmetry") subplot_titles = tuple(subplot_titles) fig = make_subplots( rows=nrow, cols=1, shared_xaxes=True, subplot_titles=subplot_titles, ) # Plot cleaned and raw RSP fig.add_trace( go.Scatter( x=x_axis, y=rsp_signals["RSP_Raw"], name="Raw", marker_color="#B0BEC5" ), row=1, col=1, ) fig.add_trace( go.Scatter( x=x_axis, y=rsp_signals["RSP_Clean"], name="Cleaned", marker_color="#2196F3", ), row=1, col=1, ) # Plot peaks and troughs. fig.add_trace( go.Scatter( x=x_axis[peaks], y=rsp_signals["RSP_Clean"][peaks], name="Exhalation Onsets", marker_color="red", mode="markers", ), row=1, col=1, ) fig.add_trace( go.Scatter( x=x_axis[troughs], y=rsp_signals["RSP_Clean"][troughs], name="Inhalation Onsets", marker_color="orange", mode="markers", ), row=1, col=1, ) # TODO: Shade region to mark inspiration and expiration. # Plot rate and optionally amplitude. fig.add_trace( go.Scatter( x=x_axis, y=rsp_signals["RSP_Rate"], name="Rate", marker_color="#4CAF50" ), row=2, col=1, ) fig.add_trace( go.Scatter( x=x_axis, y=[rate_mean] * len(x_axis), name="Mean Rate", marker_color="#4CAF50", line=dict(dash="dash"), ), row=2, col=1, ) if "RSP_Amplitude" in list(rsp_signals.columns): fig.add_trace( go.Scatter( x=x_axis, y=rsp_signals["RSP_Amplitude"], name="Amplitude", marker_color="#009688", ), row=3, col=1, ) fig.add_trace( go.Scatter( x=x_axis, y=[amplitude_mean] * len(x_axis), name="Mean Amplitude", marker_color="#009688", line=dict(dash="dash"), ), row=3, col=1, ) if "RSP_RVT" in list(rsp_signals.columns): fig.add_trace( go.Scatter( x=x_axis, y=rsp_signals["RSP_RVT"], name="RVT", marker_color="#00BCD4", ), row=4, col=1, ) fig.add_trace( go.Scatter( x=x_axis, y=[rvt_mean] * len(x_axis), name="Mean RVT", marker_color="#00BCD4", line=dict(dash="dash"), ), row=4, col=1, ) if "RSP_Symmetry_PeakTrough" in list(rsp_signals.columns): fig.add_trace( go.Scatter( x=x_axis, y=rsp_signals["RSP_Symmetry_PeakTrough"], name="Peak-Trough Symmetry", marker_color="green", ), row=5, col=1, ) fig.add_trace( go.Scatter( x=x_axis, y=rsp_signals["RSP_Symmetry_RiseDecay"], name="Rise-Decay Symmetry", marker_color="purple", ), row=5, col=1, ) fig.update_layout(title_text="Respiration (RSP)", height=1250, width=750) for i in range(1, nrow + 1): fig.update_xaxes(title_text=x_label, row=i, col=1) return fig
# ============================================================================= # Internals # ============================================================================= def _rsp_plot_phase(rsp_signals, troughs, peaks): exhale_signal = pd.Series(np.full(len(rsp_signals), np.nan)) exhale_signal[troughs] = rsp_signals["RSP_Clean"][troughs].values exhale_signal[peaks] = rsp_signals["RSP_Clean"][peaks].values exhale_signal = exhale_signal.bfill() inhale_signal = pd.Series(np.full(len(rsp_signals), np.nan)) inhale_signal[troughs] = rsp_signals["RSP_Clean"][troughs].values inhale_signal[peaks] = rsp_signals["RSP_Clean"][peaks].values inhale_signal = inhale_signal.ffill() return exhale_signal, inhale_signal