Source code for neurokit2.stats.cluster_quality

# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
import scipy.spatial
import sklearn.cluster
import sklearn.metrics
import sklearn.mixture
import sklearn.model_selection

from ..misc import check_random_state


[docs] def cluster_quality(data, clustering, clusters=None, info=None, n_random=10, random_state=None, **kwargs): """**Assess Clustering Quality** Compute quality of the clustering using several metrics. Parameters ---------- data : np.ndarray A matrix array of data (e.g., channels, sample points of M/EEG data) clustering : DataFrame Information about the distance of samples from their respective clusters, generated from :func:`.cluster`. clusters : np.ndarray Coordinates of cluster centers, which has a shape of n_clusters x n_features, generated from :func:`.cluster`. info : dict Information about the number of clusters, the function and model used for clustering, generated from :func:`.cluster`. n_random : int The number of random initializations to cluster random data for calculating the GAP statistic. random_state : None, int, numpy.random.RandomState or numpy.random.Generator Seed for the random number generator. See for ``misc.check_random_state`` for further information. **kwargs Other argument to be passed on, for instance ``GFP`` as ``'sd'`` in microstates. Returns ------- individual : DataFrame Indices of cluster quality scores for each sample point. general : DataFrame Indices of cluster quality scores for all clusters. Examples ---------- .. ipython:: python import neurokit2 as nk # Load the iris dataset data = nk.data("iris").drop("Species", axis=1) # Cluster clustering, clusters, info = nk.cluster(data, method="kmeans", n_clusters=3) # Compute indices of clustering quality individual, general = nk.cluster_quality(data, clustering, clusters, info) general References ---------- * Tibshirani, R., Walther, G., & Hastie, T. (2001). Estimating the number of clusters in a data set via the gap statistic. Journal of the Royal Statistical Society: Series B (Statistical Methodology), 63(2), 411-423. * Mohajer, M., Englmeier, K. H., & Schmid, V. J. (2011). A comparison of Gap statistic definitions with and without logarithm function. arXiv preprint arXiv:1103.4767. """ # Seed the random generator for reproducible results rng = check_random_state(random_state) # Sanity checks if isinstance(clustering, tuple): clustering, clusters, info = clustering if isinstance(data, pd.DataFrame): data = data.values if isinstance(clustering, pd.DataFrame): clustering = clustering["Cluster"].values individual, general = _cluster_quality_sklearn(data, clustering, clusters) # Individual distance from centroid distance = _cluster_quality_distance(data, clusters) distance = {"Clustering_Distance_" + str(i): distance[:, i] for i in range(distance.shape[1])} individual.update(distance) individual = pd.DataFrame(individual) # Variance explained general["Score_VarianceExplained"] = _cluster_quality_variance(data, clusters, clustering) general["Score_GEV"], _ = _cluster_quality_gev(data, clusters, clustering, **kwargs) general["Score_CrossValidation"] = _cluster_quality_crossvalidation(data, clusters, clustering) # Dispersion general["Dispersion"] = _cluster_quality_dispersion(data, clustering, **kwargs) # Gap statistic general.update(_cluster_quality_gap(data, clusters, clustering, info, n_random=n_random, rng=rng)) # Mixture models if "sklearn_model" in info: if isinstance(info["sklearn_model"], sklearn.mixture.GaussianMixture): general["Score_AIC"] = info["sklearn_model"].aic(data) general["Score_BIC"] = info["sklearn_model"].bic(data) general["Score_LogLikelihood"] = info["sklearn_model"].score(data) sklearn.model_selection.cross_val_score(info["sklearn_model"], data, cv=10) general = pd.DataFrame.from_dict(general, orient="index").T return individual, general
# ============================================================================= # Utils # ============================================================================= def _cluster_quality_sklearn(data, clustering, clusters): """Metrics from sklearn""" n_clusters = len(clusters) # Individual scores individual = {} if n_clusters == 1: individual["Clustering_Silhouette"] = np.full(len(clustering), np.nan) else: individual["Clustering_Silhouette"] = sklearn.metrics.silhouette_samples(data, clustering) # General clustering quality scores general = {"n_Clusters": n_clusters} if n_clusters == 1: general["Score_Silhouette"] = np.nan general["Score_Calinski"] = np.nan general["Score_Bouldin"] = np.nan else: general["Score_Silhouette"] = sklearn.metrics.silhouette_score(data, clustering) general["Score_Calinski"] = sklearn.metrics.calinski_harabasz_score(data, clustering) general["Score_Bouldin"] = sklearn.metrics.davies_bouldin_score(data, clustering) return individual, general def _cluster_quality_distance(data, clusters, to_dataframe=False): """Distance between samples and clusters""" distance = scipy.spatial.distance.cdist(data, clusters) if to_dataframe is True: distance = pd.DataFrame(distance).add_prefix("Distance_") return distance def _cluster_quality_sumsquares(data, clusters, clustering): """Sumsquares of the distance of each data point to its respective cluster""" distance = _cluster_quality_distance(data, clusters) min_distance = [] # distance from each sample to the centroid of its cluster for idx in range(len(data)): cluster_identity = clustering[idx] min_distance.append(distance[idx, cluster_identity]) min_distance_squared = [i**2 for i in min_distance] return np.sum(min_distance_squared) def _cluster_quality_dispersion(data, clustering, n_clusters=4): """Sumsquares of the distances between samples within each clusters. An error measure for a n_clusters cluster where the lower the better. Can be used to compare and find the optimal number of clusters. """ dispersion_state = np.full(n_clusters, np.nan) for i in range(n_clusters): idx = clustering == i data_state = data[idx, :] state_size = len(data_state) # number of samples in this cluster if state_size > 0: # pair-wise distance between members of the same cluster distance = scipy.spatial.distance.cdist(data_state, data_state) # sumsquares of distances dispersion_state[i] = 0.5 * np.nansum(distance**2) / state_size else: dispersion_state[i] = np.nan dispersion = np.nansum(dispersion_state) return dispersion def _cluster_quality_variance(data, clusters, clustering): """Variance explained by clustering""" sum_squares_within = _cluster_quality_sumsquares(data, clusters, clustering) sum_squares_total = np.sum(scipy.spatial.distance.pdist(data) ** 2) / data.shape[0] return (sum_squares_total - sum_squares_within) / sum_squares_total def _cluster_quality_gap(data, clusters, clustering, info, n_random=10, rng=None): """GAP statistic and modified GAP statistic by Mohajer (2011). The GAP statistic compares the total within intra-cluster variation for different values of k with their expected values under null reference distribution of the data. """ dispersion = _cluster_quality_sumsquares(data, clusters, clustering) mins, maxs = np.min(data, axis=0), np.max(data, axis=0) dispersion_random = np.full(n_random, np.nan) for i in range(n_random): # Random data random_data = rng.uniform(size=data.shape) # Rescale random m = (maxs - mins) / (np.max(random_data, axis=0) - np.min(random_data, axis=0)) b = mins - (m * np.min(random_data, axis=0)) random_data = np.array(m) * random_data + np.array(b) # Cluster random _, random_clusters, info = info["clustering_function"](random_data) random_activation = random_clusters.dot(random_data.T) random_clustering = np.argmax(np.abs(random_activation), axis=0) dispersion_random[i] = _cluster_quality_sumsquares( random_data, random_clusters, random_clustering ) # Compute GAP gap = np.mean(np.log(dispersion_random)) - np.log(dispersion) # Compute standard deviation sd_k = np.sqrt(np.mean((np.log(dispersion_random) - np.mean(np.log(dispersion_random))) ** 2.0)) s_k = np.sqrt(1.0 + 1.0 / n_random) * sd_k # Calculate Gap* statistic by Mohajer (2011) gap_star = np.mean(dispersion_random) - dispersion sd_k_star = np.sqrt(np.mean((dispersion_random - dispersion) ** 2.0)) s_k_star = np.sqrt(1.0 + 1.0 / n_random) * sd_k_star out = { "Score_GAP": gap, "Score_GAPmod": gap_star, "Score_GAP_sk": s_k, "Score_GAPmod_sk": s_k_star, } return out def _cluster_quality_crossvalidation(data, clusters, clustering): """Cross-validation index The original code by https://github.com/Frederic-vW/eeg_microstates/blob/master/eeg_microstates.py#L600 leads to an error when the denominator is 0. """ n_rows, n_cols = data.shape # n_sample, n_channel var = np.nansum(data**2) - np.nansum(np.nansum(clusters[clustering, :] * data, axis=1) ** 2) var /= n_rows * (n_cols - 1) denominator = (n_cols - len(clusters) - 1) ** 2 if np.abs(denominator) > 0: cv = var * (n_cols - 1) ** 2 / denominator else: # warnings.warn( # "Number of columns in data (" + str(n_cols) + ") is smaller " # "than the number of cluster (" + str(len(clusters)) + ") plus 1. " # "Returning the residual noise instead." # ) cv = np.nan # cv = var * (n_cols - 1)**2 / len(clusters) return cv # def _cluster_quality_gev(data, clusters, clustering, sd=None): # if sd is None: # sd = np.std(data, axis=1) # # # Normalize row-wise (across columns) # clusters /= np.sqrt(np.sum(clusters**2, axis=1, keepdims=True)) # activation = np.dot(data, clusters.T) # activation /= (data.shape[1] * np.outer(sd, np.std(clusters, axis=1))) # # gev = np.zeros(len(clusters)) # for k in range(len(clusters)): # idx = (clustering == k) # gev[k] = np.sum(sd[idx]**2 * activation[idx, k]**2) # gev_total = np.sum(gev) / np.sum(sd ** 2) # return gev_total def _cluster_quality_gev(data, clusters, clustering, sd=None, n_clusters=4): """Global Variance Explained (GEV)""" if sd is None: sd = np.nanstd(data, axis=1) map_corr = _correlate_vectors(data.T, clusters[clustering].T) gev_all = np.zeros(n_clusters) for state in range(n_clusters): idx = clustering == state gev_all[state] = np.nansum((sd[idx] * map_corr[idx]) ** 2) / np.nansum(sd**2) gev = np.nansum(gev_all) # gev = np.sum((sd * map_corr) ** 2) / np.sum(sd**2) return gev, gev_all def _correlate_vectors(A, B, axis=0): """Compute pairwise correlation of multiple pairs of vectors. Fast way to compute correlation of multiple pairs of vectors without computing all pairs as would with corr(A,B). Borrowed from Oli at Stack overflow. Note the resulting coefficients vary slightly from the ones obtained from corr due differences in the order of the calculations. (Differences are of a magnitude of 1e-9 to 1e-17 depending of the tested data). Parameters ---------- A : array The first collection of vectors of shape (n, m) B : array The second collection of vectors of shape (n, m) axis : int The axis that contains the elements of each vector. Defaults to 0. Returns ------- corr : array For each pair of vectors, the correlation between them with shape (m, ) """ An = A - np.nanmean(A, axis=axis) Bn = B - np.nanmean(B, axis=axis) An /= np.linalg.norm(An, axis=axis) Bn /= np.linalg.norm(Bn, axis=axis) return np.nansum(An * Bn, axis=axis)