-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdbscan.py
75 lines (59 loc) · 2.75 KB
/
dbscan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from .base_unsupervized import BaseUnsupervized
import numpy as np
from scipy.spatial.distance import cdist
from itertools import compress
class DBSCAN(BaseUnsupervized):
''' DBSCAN clustering algorithm
Ref : Ester, M., H. P. Kriegel, J. Sander, and X. Xu, “A Density-Based Algorithm for Discovering Clusters in Large Spatial Databases with Noise”. In: Proceedings of the 2nd International Conference on Knowledge Discovery and Data Mining, Portland, OR, AAAI Press, pp. 226-231. 1996
Parameter
---------
eps : float,
Radius of the neighborhood of one point
min_pts : int,
Minimum number of points in the neighborhood of a point to be considered a core
metric : string,
Metric type for the computation of distance between the points
'''
def __init__(self,eps=2,min_pts=8,metric='minkowski'):
self.eps = eps
self.min_pts = min_pts
self.metric = metric
def fit_predict(self,X):
dist_all = cdist(X,X,metric=self.metric) # Dist between all points and all points
neighbors = np.where(dist_all < self.eps,1,0) # Neighborhood of points
n_neighbors = np.sum(neighbors,axis=0)
# Classify points as noise/core/border point
#is_noise = np.where(n_neighbors == 0,1,0)
is_core = np.where(n_neighbors >= self.min_pts,1,0) # Core points
# Boolean to indexes
set_neighbors_idx = [list(compress(range(len(bool_arr )), bool_arr )) for bool_arr in neighbors]
visited = np.zeros(X.shape[0]) # Points we have already seen
clusters = [] # list of clusters
def add_neighbhor(point):
''' Recursively add point's neibhors'''
current_cluster.append(point)
visited[point] = 1
for p in set_neighbors_idx[point] :
if not visited[p] :
add_neighbhor(p)
# Iter all points
for i in range(X.shape[0]) :
if not visited[i]:
visited[i] = 1
if is_core[i] : # Core points
current_cluster = [i]
for n in set_neighbors_idx[i] : # Add all neihgbhors
add_neighbhor(n)
clusters.append(current_cluster)
y_hat = -np.ones(X.shape[0]) # Noise points
for i in range(len(clusters)): # Clusters
y_hat[clusters[i]] = i
return y_hat
def fit(self,*args,**kwargs):
return self.fit_predict(X)
def predict(self,*args,**kwargs):
return self.fit_predict(X)
def score(self,X,y):
y_hat = self.predict(X)
acc = np.count_nonzero(np.array(y_hat)==np.array(y)) /len(y)
return acc