"""
Density-based Clustering Algorithms
"""
from matplotlib import pyplot as plt
import numpy as np
import scipy as sp
import json
from yaml import dump as ydump
from yaml import Dumper
from sklearn.cluster import DBSCAN
from sklearn.metrics import silhouette_score
from sklearn.neighbors import NearestNeighbors
from .ml_cluster_base import ClusterBase, FitClusterBase
from ..io import NpEncoder
[docs]class DBCluster(ClusterBase):
"""
Performing clustering using density based clustering algorithm
"""
_pairwise = True
def __init__(self, trainer):
super().__init__()
self.trainer = trainer
# cluster labels
self.labels = None
# number of clusters
self.n_clusters = None
# number of noise points
self.n_noise = None
[docs] def fit(self, dmatrix, rho=None):
"""fit the clustering model, assume input of NxN distance matrix or Nxm coordinates"""
self.labels = self.trainer.fit(dmatrix, rho)
# Number of clusters in labels, ignoring noise if present.
self.n_clusters = len(set(self.labels)) - (1 if -1 in self.labels else 0)
self.n_noise = list(self.labels).count(-1)
print('Estimated number of clusters: %d' % self.n_clusters)
print('Estimated number of noise points: %d' % self.n_noise)
if np.shape(dmatrix)[0] == np.shape(dmatrix)[1]:
silscore = silhouette_score(dmatrix, self.labels, metric="precomputed")
else:
silscore = silhouette_score(dmatrix, self.labels, metric="euclidean")
print("Silhouette Coefficient: %0.3f" % silscore)
[docs] def get_cluster_labels(self, index=[]):
"""return the label of the samples in the list of index"""
if len(index) == 0:
return self.labels
else:
return self.labels[index]
[docs] def get_n_cluster(self):
return self.n_clusters
[docs] def get_n_noise(self):
return self.n_noise
[docs] def pack(self):
"""return all the info"""
state = dict(trainer=self.trainer.name, trainer_params=self.trainer.pack(), labels=self.labels.tolist(),
n_clusters=self.n_clusters, n_noise=self.n_noise)
return state
[docs] def save_state(self, filename, mode='json'):
if mode == 'yaml':
with open(filename+'-clustering-state.yaml', 'w') as yd:
ydump(self.pack(), yd, sort_keys=True, Dumper=Dumper)
else:
with open(filename+'-clustering-state.json', 'w') as jd:
json.dump(self.pack(), jd, sort_keys=True, cls=NpEncoder)
[docs]class sklearn_DB(FitClusterBase):
"""
https://scikit-learn.org/stable/modules/generated/sklearn.cluster.DBSCAN.html
eps : float, optional
The maximum distance between two samples for one to be considered as in the neighborhood of the other.
This is not a maximum bound on the distances of points within a cluster.
This is the most important DBSCAN parameter to choose appropriately for your dataset and distance function.
min_samples : int, optional
The number of samples (or total weight) in a neighborhood for a point to be considered as a core point.
This includes the point itself.
metric : string, or callable
The metric to use when calculating distance between instances in a feature array.
If metric is a string or callable, it must be one of the options allowed by sklearn.metrics.pairwise_distances for its metric parameter.
If metric is “precomputed”, X is assumed to be a distance matrix and must be square.
X may be a sparse matrix, in which case only “nonzero” elements may be considered neighbors for DBSCAN.
"""
_pairwise = True
def __init__(self, eps=None, min_samples=None, metrictype='precomputed'):
super().__init__()
self.name='DBSCAN'
self.metric = metrictype # e.g. 'euclidean'
# distance metric
self.eps = eps
# The number of samples in a neighborhood for a point to be considered as a core point.
self.min_samples = min_samples
self.db = None
[docs] def fit(self, dmatrix, rho=None):
self.db = DBSCAN(eps=self.eps, min_samples=self.min_samples, metric=self.metric).fit(dmatrix)
return self.db.labels_
[docs] def pack(self):
"""return all the info"""
return self.db.get_params()
[docs]class LAIO_DB(FitClusterBase):
"""
Clustering by fast search and find of density peaks, Rodriguez and Laio (2014).
https://science.sciencemag.org/content/sci/344/6191/1492.full.pdf
:math: \rho_i\,=\,\sum_j{\chi(d_{ij}-d_{cut})}
:math: \delta_i\,=\,\min_{j:\rho_j>\rho_i}(d_{ij})
"""
def __init__(self, distances=None, indices=None, dens_type=None, dc=None, percent=2.0):
"""
Parameters
----------
distances: numpy array
array of distances between data points and their neighbours of shape (Nele, n_neigh_comp)
where Nele is the number of data points and n_neigh_comp is the number of neighbours to consider.
indices: numpy array
array of shape (Nele, n_neigh_comp) where row i gives the indices of the neighbours for data point i
dens_type: str
The type of density to compute. Can be 'exp' (exponential) or None (linear)
dc: float
giving the cutoff distance beyond which data points don't contribute to the local density computation
of another datapoint.
percent: int
Criterion for choosing dc, int from 0-100, typically chosen such that the average number of neighbours
is 1-2% of the total number of points in the dataset. Default is 2%.
"""
self.name='LAIO_DB'
self.distances = distances
self.indices = indices
self.dens_type = dens_type
self.dc = dc
self.percent = percent
# numpy array of shape (Nele,) where the densities of the data points expressed in terms of the number of data
# points within the cutoff distance
self.dens = None
# numpy array of shape (Nele,) of the distances to the nearest data point to i
# possessing a higher local density.
self.delta_to_neigh = None
# numpy array of shape (Nele,) where the ith entry gives the index of the nearest neighbour with
# higher density than data point i.
self.ref_neigh = None
# Unused currently
self.decision_graph = None
# Clusters must lie at a distance greater than self.delta_cut (float) apart to be designated as independent
# clusters
self.delta_cut = None
# If a "cluster" has a density smaller than self.dens_cut (float) it is discarded as noise
self.dens_cut = None
# numpy array of shape (Nele, ) giving the cluster (int from 0 to N) each data point belongs to.
# Clusters are labelled by density. Cluster 0 has the highest density and cluster N has the smallest density.
self.cluster = None
# numpy array of shape (N,) giving the indices of the N cluster centres.
self.center_indices = None
# numpy array of shape (Nele,) where halo points are designated as -1 and otherwise are assigned to their
# respective cluster centres
self.halo = None
[docs] def get_dc(self, data):
"""
Compute the cutoff distance given the data.
Parameters
----------
data: np.matrix
The data array of shape (Nele, proj_dim) where N is the number of data points and proj_dim is the number of
projected components of the kernel matrix.
Returns
-------
self.dc: float
the cutoff distance
"""
Nele = data.shape[0]
# The number of neighbours to consider is dictated by the percent parameter provided (default is 2% of the
# total data points.
n_neigh = int(self.percent/100.*Nele)
# Compute with 4 times the recommended percentage of neighbours in order that we may re-estimate the percent
# value supplied (rper) based on the data.
n_neigh_comp = int(4.*self.percent/100.*Nele)
neigh = NearestNeighbors(n_neighbors=n_neigh_comp).fit(data)
self.distances, self.indices = neigh.kneighbors(data)
# The cutoff distance dc is the average distance (averaged over all data points) of the "0.02*Nele"th neighbour
dc = np.average(self.distances[:, n_neigh])
# To store the local densities of each data point
dens = np.zeros(data.shape[0])
tt = 1.0
factor = 1.0
# We update dc based on a re-estimated percentage rper. Stop when rper is close to self.percent
while tt > 0.05:
dc = dc*factor
for i in range(Nele):
a = self.distances[i, :]
dens[i] = len(a[(a <= dc)])
rper = 100.*np.average(dens)/Nele
tt = rper/self.percent - 1.
if tt > 0.:
factor = factor/1.001
else:
factor = factor*1.001
tt = abs(tt)
self.dc = dc
[docs] def get_decision_graph(self, data, fplot=True):
"""
Method currently doesn't produce the decision graph.
Parameters
----------
data: numpy array of shape (Nele, proj_dim).
fplot: Boolean indicating whether or not to plot the decision graph
Returns
-------
"""
Nele = data.shape[0]
self.dens = np.zeros(Nele)
if self.dc == None:
self.get_dc(data)
else: # check compatibility between the provided cutoff distance and the provided distance matrix
n_neigh_comp = int(10.*self.percent/100.*Nele)
neigh = NearestNeighbors(n_neighbors=n_neigh_comp).fit(data)
self.distances, self.indices = neigh.kneighbors(data)
if self.dc > np.min(self.distances[:, n_neigh_comp - 1]):
print("dc too big for being included within the", 10.*self.percent, "% of data, consider using a "
"small dc or augment the percent "
"parameter")
for i in range(Nele):
a = self.distances[i, :]
self.dens[i] = len(a[(a <= self.dc)])
if self.dens_type == 'exp': # Density is no longer number but exponential
for i in range(Nele):
a = self.distances[i, :]/self.dc # distances expressed as a fraction of the cutoff distance
self.dens[i] = np.sum(np.exp(-a**2))
# The densities computed by this class or not log densities. Log densities are returned by the KDE class.
# self.dens_cut = 0.2 * np.mean(np.log(self.dens)) + 0.8 * np.min(np.log(self.dens))
self.dens_cut = 0.2 * np.mean(self.dens) + 0.8 * np.min(self.dens)
self.delta_cut = np.mean(self.distances)
self.delta_to_neigh = np.zeros(data.shape[0])
self.ref_neigh = np.zeros(data.shape[0], dtype='int')
indices = np.arange(data.shape[0])
imax = [] # holds index of data point with highest local density
for i in range(data.shape[0]):
higher_dens_indices = indices[((self.dens > self.dens[i]) & (indices != i))]
higher_dens_data = data[((self.dens > self.dens[i]) & (indices != i))]
if higher_dens_data.shape[0] > 0:
ds = np.transpose(sp.spatial.distance.cdist([np.transpose(data[i, :])], higher_dens_data))
j = np.argmin(ds)
self.ref_neigh[i] = higher_dens_indices[j]
self.delta_to_neigh[i] = ds[j]
else:
self.delta_to_neigh[i] = -100.
imax.append(i)
self.delta_to_neigh[imax] = np.max(self.delta_to_neigh)*1.05
# Plot kernel density on x-axis and delta_to_neigh (point of higher density) on the y-axis
if fplot:
plt.scatter(self.dens, self.delta_to_neigh)
plt.plot([min(self.dens), max(self.dens)], [self.delta_cut, self.delta_cut], c='red')
plt.plot([self.dens_cut, self.dens_cut], [np.min(self.distances), np.max(self.distances)], c='red')
plt.title('Decision Graph')
plt.xlabel('rho')
plt.ylabel('delta')
plt.show()
[docs] def get_assignation(self, data):
"""
Parameters
----------
data: numpy array of shape (Nele, proj_dim) where proj_dim gives the number of dimensions the kernel matrix is
projected to
Returns: self.halo numpy array of shape (Nele, ) where halo points are designated as -1 and otherwise are
assigned to their respective cluster centres.
"""
ordered_by_dens = np.argsort(-self.dens) # data points in descending order of density
self.cluster = np.zeros(data.shape[0], dtype='int')
indices = np.arange(data.shape[0])
center_label = np.zeros(data.shape[0], dtype='int')
ncluster = -1
for i in range(data.shape[0]): # trick to iterate through the ordered_by_dens array
j = ordered_by_dens[i]
if (self.dens[j] > self.dens_cut) & (self.delta_to_neigh[j] > self.delta_cut):
ncluster = ncluster + 1
self.cluster[j] = ncluster
center_label[j] = ncluster
else:
# the assigned cluster is the cluster the closest neighbour of higher density belongs to
self.cluster[j] = self.cluster[self.ref_neigh[j]]
center_label[j] = -1
self.center_indices = indices[(center_label != -1)]
# border points is a binary array, 1 if data point has a neighbour that has been assigned a different cluster
# centre and 0 otherwise.
border_points = np.zeros(data.shape[0], dtype='int')
self.halo = np.copy(self.cluster)
for i in range(data.shape[0]):
for j in self.indices[i, :][(self.distances[i, :] <= self.dc)]:
if self.cluster[i] != self.cluster[j]:
border_points[i] = 1
halo_cutoff = np.zeros(ncluster + 1)
for i in range(ncluster + 1):
# Extract the densities of a cluster's border points
dens_of_border_points = self.dens[((border_points == 1) & (self.cluster == i))]
if len(dens_of_border_points) == 0: # if a cluster has no border points then it has no halo points.
halo_cutoff[i] = 0
else:
halo_cutoff[i] = np.max(dens_of_border_points)
i += 1
self.halo[indices[(self.dens < halo_cutoff[self.cluster])]] = -1
return self.halo
[docs] def fit(self, data, rho=None):
"""
Compute the center labels.
Parameters
----------
data: numpy array of shape (Nele, proj_dim) where proj_dim is the number of components the kernel matrix has been
projected to.
rho: densities, default is None since the DP.py module computes the densities itself.
Returns: cluster_labels: numpy array of shape (Nele,) giving the cluster (int from 0 to N) each data point
belongs to. Halo points are designated as -1.
-------
"""
self.get_dc(data)
self.get_decision_graph(data)
cluster_labels = self.get_assignation(data)
return cluster_labels
[docs] def pack(self):
"""
Return dictionary containing the cutoff distance, self.dc, for data points to contribute to the local density
of another data point as well as self.dens_cut, the density threshold for defining a cluster.
"""
state = dict(deltamin=self.dc, rhomin=self.dens_cut)
return state
"""
Legacy Code below for old Fast Search and Find of Density Peaks class.
"""
[docs]class old_LAIO(FitClusterBase):
"""Laio Clustering scheme
Clustering by fast search and find of density peaks
https://science.sciencemag.org/content/sci/344/6191/1492.full.pdf
:math: \\rho_i\,=\,\\sum_j{\\chi(d_{ij}-d_{cut})}
i.e. the local density of data point x_i
:math: \\delta_i\,=\,\\min_{j:\\rho_j>\\rho_i}(d_{ij})
i.e. the minimum distance to a neighbour with higher density
A summary of laio clustering algorithm:
1. First do a kernel density estimation (rho_i) for each data point i
2. For each data point i, compute the distance (delta_i) between i and j,
j is the closet data point that has a density higher then i, i.e. rho(j) > rho(i).
3. Plot the decision graph, which is a scatter plot of (rho_i, delta_i)
4. Select cluster centers ({cl}), which are the outliers in the decision graph that fulfills:
i) rho({cl}) > rhomin
ii) delta({cl}) > delta_min
one needs to set the two parameters rhomin and delta_min.
5. After the cluster centers are determined, data points are assigned to the nearest cluster center.
one needs to set two parameters:
"""
_pairwise = True
def __init__(self, deltamin=-1, rhomin=-1):
"""
:param deltamin: the lower bound on the distance between two cluster centres
:param rhomin: The lower bound in kernel density, if the density of a cluster is lower
than this threshold, this "cluster" will be discarded as noise
"""
self.deltamin = deltamin
self.rhomin = rhomin
[docs] def fit(self, dmatrix, rho=None):
"""
Parameters
----------
dmatrix: The distance matrix of shape (Nele, Nele)
rho: The log densities of the points of shape (Nele,)
Returns
-------
"""
if rho is None:
raise ValueError('for fdb it is better to compute kernel density first')
delta, nneigh = self.estimate_delta(dmatrix, rho)
# if there's no input values for rhomin and deltamin, we use simple heuristics
if self.rhomin < 0:
self.rhomin = 0.2*np.mean(rho) + 0.8*np.min(rho)
if self.deltamin < 0:
self.deltamin = np.mean(delta)
#here we make the decision graph
#x axis (rho) is the kernel density for each data point
#y axis (delta) is the distance to the nearest higher density points
plt.scatter(rho, delta)
plt.plot([min(rho), max(rho)], [self.deltamin, self.deltamin], c='red')
plt.plot([self.rhomin, self.rhomin], [min(delta), max(delta)], c='red')
plt.xlabel('rho')
plt.ylabel('delta')
plt.show()
nclust = 0 # cluster index
cl = np.zeros(len(rho), dtype='int')-1 # numpy array of -1's
for i in range(len(rho)):
if rho[i] > self.rhomin and delta[i] > self.deltamin:
nclust += 1
cl[i] = nclust
# Assignment
ordrho = np.argsort(rho)[::-1] # Indices of data points in descending order of local density
rho_ord = rho[ordrho]
for i in range(len(rho)):
if cl[ordrho[i]] == -1:
cl[ordrho[i]] = cl[nneigh[ordrho[i]]]
return cl
[docs] def estimate_delta(self, dist, rho):
"""
For each data point i, compute the distance (delta_i) between i and j,
j is the closest data point that has a density higher then i, i.e. rho(j) > rho(i).
Parameters
----------
dist: distance matrix of shape (Nele, Nele)
rho: log densities for each data point array of shape (Nele,)
Returns: delta: numpy array of distances to nearest cluster centre for each datapoint.
nneight: numpy array giving the index of the nearest cluster centre.
-------
"""
delta = (rho*0.0).copy()
nneigh = np.ones(len(delta), dtype='int')
for i in range(len(rho)):
# for data i, find all points that have higher density
js = np.where(rho > rho[i])[0]
# if there's no j's that have higher density than i, we set delta_i to be a large distance
if len(js) == 0:
delta[i] = np.max(dist[i, :])
nneigh[i] = i
else:
# find the nearest j that has higher density then i
delta[i] = np.min(dist[i, js])
nneigh[i] = js[np.argmin(dist[i, js])]
return delta, nneigh
[docs] def pack(self):
'''return all the info'''
state = dict(deltamin=self.deltamin, rhomin=self.rhomin)
return state