import numpy as np
from ..misc import as_vector
[docs]
def signal_decompose(signal, method="emd", n_components=None, **kwargs):
    """**Decompose a signal**
    Signal decomposition into different sources using different methods, such as Empirical Mode
    Decomposition (EMD) or Singular spectrum analysis (SSA)-based signal separation method.
    The extracted components can then be recombined into meaningful sources using
    :func:`.signal_recompose`.
    Parameters
    -----------
    signal : Union[list, np.array, pd.Series]
        Vector of values.
    method : str
        The decomposition method. Can be one of ``"emd"`` or ``"ssa"``.
    n_components : int
        Number of components to extract. Only used for ``"ssa"`` method. If ``None``, will default
        to 50.
    **kwargs
        Other arguments passed to other functions.
    Returns
    -------
    Array
        Components of the decomposed signal.
    See Also
    --------
    signal_recompose
    Examples
    --------
    .. ipython:: python
      import neurokit2 as nk
      # Create complex signal
      signal = nk.signal_simulate(duration=10, frequency=1, noise=0.01)  # High freq
      signal += 3 * nk.signal_simulate(duration=10, frequency=3, noise=0.01)  # Higher freq
      signal += 3 * np.linspace(0, 2, len(signal))  # Add baseline and trend
      signal += 2 * nk.signal_simulate(duration=10, frequency=0.1, noise=0)
      @savefig p_signal_decompose1.png scale=100%
      nk.signal_plot(signal)
      @suppress
      plt.close()
    .. ipython:: python
      :okexcept:
      # Example 1: Using the EMD method
      components = nk.signal_decompose(signal, method="emd")
      # Visualize Decomposed Signal Components
      @savefig p_signal_decompose2.png scale=100%
      nk.signal_plot(components)
      @suppress
      plt.close()
    .. ipython:: python
      # Example 2: USing the SSA method
      components = nk.signal_decompose(signal, method="ssa", n_components=5)
      # Visualize Decomposed Signal Components
      @savefig p_signal_decompose3.png scale=100%
      nk.signal_plot(components)  # Visualize components
      @suppress
      plt.close()
    """
    # Apply method
    method = method.lower()
    if method in ["emd"]:
        components = _signal_decompose_emd(signal, **kwargs)
    elif method in ["ssa"]:
        components = _signal_decompose_ssa(signal, n_components=n_components)
    else:
        raise ValueError(
            "NeuroKit error: signal_decompose(): 'method' should be one of 'emd' or 'ssa'."
        )
    return components 
# =============================================================================
# Singular spectrum analysis (SSA)
# =============================================================================
def _signal_decompose_ssa(signal, n_components=None):
    """Singular spectrum analysis (SSA)-based signal separation method.
    SSA decomposes a time series into a set of summable components that are grouped together and
    interpreted as trend, periodicity and noise.
    References
    ----------
    - https://www.kaggle.com/jdarcy/introducing-ssa-for-time-series-decomposition
    """
    # sanitize input
    signal = as_vector(signal)
    # Parameters
    # The window length.
    if n_components is None:
        L = 50 if len(signal) >= 100 else int(len(signal) / 2)
    else:
        L = n_components
    # Length.
    N = len(signal)
    if not 2 <= L <= N / 2:
        raise ValueError("`n_components` must be in the interval [2, len(signal)/2].")
    # The number of columns in the trajectory matrix.
    K = N - L + 1
    # Embed the time series in a trajectory matrix by pulling the relevant subseries of F,
    # and stacking them as columns.
    X = np.array([signal[i : L + i] for i in range(0, K)]).T
    # Get n components
    d = np.linalg.matrix_rank(X)
    # Decompose the trajectory matrix
    u, sigma, vt = np.linalg.svd(X, full_matrices=False)
    # Initialize components matrix
    components = np.zeros((N, d))
    # Reconstruct the elementary matrices without storing them
    for i in range(d):
        X_elem = sigma[i] * np.outer(u[:, i], vt[i, :])
        X_rev = X_elem[::-1]
        components[:, i] = [
            X_rev.diagonal(j).mean() for j in range(-X_rev.shape[0] + 1, X_rev.shape[1])
        ]
    # Return the components
    return components.T
# =============================================================================
# ICA
# =============================================================================
# import sklearn.decomposition
# def _signal_decompose_scica(signal, n_components=3, **kwargs):
#    # sanitize input
#    signal = as_vector(signal)
#
#    # Single-channel ICA (SCICA)
#    if len(signal.shape) == 1:
#        signal = signal.reshape(-1, 1)
#
#    c = sklearn.decomposition.FastICA(n_components=n_components, **kwargs).fit_transform(signal)
# =============================================================================
# Empirical Mode Decomposition (EMD)
# =============================================================================
def _signal_decompose_emd(signal, ensemble=False, **kwargs):
    """References
    ------------
    - http://perso.ens-lyon.fr/patrick.flandrin/CSDATrendfiltering.pdf
    - https://github.com/laszukdawid/PyEMD
    - https://towardsdatascience.com/decomposing-signal-using-empirical-mode-decomposition-algorithm-explanation-for-dummy-93a93304c541 # noqa: E501
    """
    try:
        import PyEMD
    except ImportError as e:
        raise ImportError(
            "NeuroKit error: _signal_decompose_emd(): the 'PyEMD' module is required for this"
            " function to run. Please install it first (`pip install EMD-signal`).",
        ) from e
    if ensemble is False:
        emd = PyEMD.EMD(extrema_detection="parabol", **kwargs)
        imfs = emd.emd(signal, **kwargs)
    else:
        emd = PyEMD.EEMD(extrema_detection="parabol", **kwargs)
        imfs = emd.eemd(signal, **kwargs)
    #    _, residue = emd.get_imfs_and_residue()
    return imfs