Source code for neurokit2.signal.signal_changepoints
import numpy as np
from ..events import events_plot
from ..misc import as_vector
from .signal_plot import signal_plot
[docs]
def signal_changepoints(signal, change="meanvar", penalty=None, show=False):
"""**Change Point Detection**
Only the PELT method is implemented for now.
Parameters
-----------
signal : Union[list, np.array, pd.Series]
Vector of values.
change : str
Can be one of ``"meanvar"`` (default), ``"mean"`` or ``"var"``.
penalty : float
The algorithm penalty. Defaults to ``np.log(len(signal))``.
show : bool
Defaults to ``False``.
Returns
-------
Array
Values indicating the samples at which the changepoints occur.
Fig
Figure of plot of signal with markers of changepoints.
Examples
--------
.. ipython:: python
import neurokit2 as nk
signal = nk.emg_simulate(burst_number=3)
@savefig p_signal_changepoints1.png scale=100%
nk.signal_changepoints(signal, change="var", show=True)
@suppress
plt.close()
References
----------
* Killick, R., Fearnhead, P., & Eckley, I. A. (2012). Optimal detection of changepoints with a
linear computational cost. Journal of the American Statistical Association, 107(500), 1590-1598.
"""
signal = as_vector(signal)
changepoints = _signal_changepoints_pelt(signal, change=change, penalty=penalty)
if show is True:
if len(changepoints) > 0:
events_plot(changepoints, signal)
else:
signal_plot(signal)
return changepoints
def _signal_changepoints_pelt(signal, change="meanvar", penalty=None):
"""PELT algorithm to find change points in a signal.
Adapted from: https://github.com/ruipgil/changepy https://github.com/deepcharles/ruptures
https://github.com/STOR-i/Changepoints.jl https://github.com/rkillick/changepoint/
"""
# Initialize
length = len(signal)
if penalty is None:
penalty = np.log(length) # pylint: disable=E1111
if change.lower() == "var":
cost = _signal_changepoints_cost_var(signal)
elif change.lower() == "mean":
cost = _signal_changepoints_cost_mean(signal)
else:
cost = _signal_changepoints_cost_meanvar(signal)
# Run algorithm
F = np.zeros(length + 1)
R = np.array([0], dtype=int)
candidates = np.zeros(length + 1, dtype=int)
F[0] = -penalty # pylint: disable=E1130
for tstar in range(2, length + 1):
cpt_cands = R
seg_costs = np.array([cost(cpt_cands[i], tstar) for i in range(len(cpt_cands))])
F_cost = F[cpt_cands] + seg_costs
F[tstar] = np.nanmin(F_cost) + penalty
tau = np.nanargmin(F_cost)
candidates[tstar] = cpt_cands[tau]
ineq_prune = [val < F[tstar] for val in F_cost]
R = [cpt_cands[j] for j, val in enumerate(ineq_prune) if val]
R.append(tstar - 1)
R = np.array(R, dtype=int)
changepoints = np.sort(np.unique(candidates[candidates]))
changepoints = changepoints[changepoints > 0]
return changepoints
# =============================================================================
# Cost functions
# =============================================================================
def _signal_changepoints_cost_mean(signal):
"""Cost function for a normally distributed signal with a changing mean."""
i_variance_2 = 1 / (np.var(signal) ** 2)
cmm = [0.0]
cmm.extend(np.cumsum(signal))
cmm2 = [0.0]
cmm2.extend(np.cumsum(np.abs(signal)))
def cost(start, end):
cmm2_diff = cmm2[end] - cmm2[start]
cmm_diff = pow(cmm[end] - cmm[start], 2)
i_diff = end - start
diff = cmm2_diff - cmm_diff
return (diff / i_diff) * i_variance_2
return cost
def _signal_changepoints_cost_var(signal):
"""Cost function for a normally distributed signal with a changing variance."""
cumm = [0.0]
cumm.extend(np.cumsum(np.power(np.abs(signal - np.mean(signal)), 2)))
def cost(s, t):
dist = float(t - s)
diff = cumm[t] - cumm[s]
return dist * np.log(diff / dist)
return cost
def _signal_changepoints_cost_meanvar(signal):
"""Cost function for a normally distributed signal with a changing mean and variance."""
signal = np.hstack(([0.0], np.array(signal)))
cumm = np.cumsum(signal)
cumm_sq = np.cumsum([val ** 2 for val in signal])
def cost(s, t):
ts_i = 1.0 / (t - s)
mu = (cumm[t] - cumm[s]) * ts_i
sig = (cumm_sq[t] - cumm_sq[s]) * ts_i - mu ** 2
sig_i = 1.0 / sig
if sig <= 0:
return np.nan
else:
return (
(t - s) * np.log(sig)
+ (cumm_sq[t] - cumm_sq[s]) * sig_i
- 2 * (cumm[t] - cumm[s]) * mu * sig_i
+ ((t - s) * mu ** 2) * sig_i
)
return cost