File: dbscan.py

package info (click to toggle)
python-pyclustering 0.10.1.2-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 11,128 kB
  • sloc: cpp: 38,888; python: 24,311; sh: 384; makefile: 105
file content (301 lines) | stat: -rwxr-xr-x 10,791 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
"""!

@brief Cluster analysis algorithm: DBSCAN.
@details Implementation based on paper @cite inproceedings::dbscan::1.

@authors Andrei Novikov (pyclustering@yandex.ru)
@date 2014-2020
@copyright BSD-3-Clause

"""


from pyclustering.container.kdtree import kdtree_balanced

from pyclustering.cluster.encoder import type_encoding

from pyclustering.core.wrapper import ccore_library

import pyclustering.core.dbscan_wrapper as wrapper


class dbscan:
    """!
    @brief Class represents clustering algorithm DBSCAN.
    @details This DBSCAN algorithm is KD-tree optimized.
             
             By default C/C++ pyclustering library is used for processing that significantly increases performance.
    
    Clustering example where DBSCAN algorithm is used to process `Chainlink` data from `FCPS` collection:
    @code
        from pyclustering.cluster.dbscan import dbscan
        from pyclustering.cluster import cluster_visualizer
        from pyclustering.utils import read_sample
        from pyclustering.samples.definitions import FCPS_SAMPLES

        # Sample for cluster analysis.
        sample = read_sample(FCPS_SAMPLES.SAMPLE_CHAINLINK)

        # Create DBSCAN algorithm.
        dbscan_instance = dbscan(sample, 0.7, 3)

        # Start processing by DBSCAN.
        dbscan_instance.process()

        # Obtain results of clustering.
        clusters = dbscan_instance.get_clusters()
        noise = dbscan_instance.get_noise()

        # Visualize clustering results
        visualizer = cluster_visualizer()
        visualizer.append_clusters(clusters, sample)
        visualizer.append_cluster(noise, sample, marker='x')
        visualizer.show()
    @endcode
    
    """
    
    def __init__(self, data, eps, neighbors, ccore=True, **kwargs):
        """!
        @brief Constructor of clustering algorithm DBSCAN.
        
        @param[in] data (list): Input data that is presented as list of points or distance matrix (defined by parameter
                   'data_type', by default data is considered as a list of points).
        @param[in] eps (double): Connectivity radius between points, points may be connected if distance between them less then the radius.
        @param[in] neighbors (uint): minimum number of shared neighbors that is required for establish links between points.
        @param[in] ccore (bool): if True than DLL CCORE (C++ solution) will be used for solving the problem.
        @param[in] **kwargs: Arbitrary keyword arguments (available arguments: 'data_type').

        <b>Keyword Args:</b><br>
            - data_type (string): Data type of input sample 'data' that is processed by the algorithm ('points', 'distance_matrix').
        
        """
        
        self.__pointer_data = data
        self.__kdtree = None
        self.__eps = eps
        self.__sqrt_eps = eps * eps
        self.__neighbors = neighbors

        self.__visited = None
        self.__belong = None

        self.__data_type = kwargs.get('data_type', 'points')

        self.__clusters = []
        self.__noise = []

        self.__neighbor_searcher = None

        self.__initialize_ccore_state(ccore)

        self.__verify_arguments()


    def __getstate__(self):
        """!
        @brief Returns current state of the algorithm.
        @details It does not return internal temporal variables that are not visible for a user.

        @return (tuple) Current state of the algorithm.

        """
        return (self.__pointer_data, self.__eps, self.__sqrt_eps, self.__neighbors, self.__visited, self.__belong,
                self.__data_type, self.__clusters, self.__noise, self.__ccore)


    def __setstate__(self, state):
        """!
        @brief Set current state of the algorithm.
        @details Set state method checks if C++ pyclustering is available for the current platform, as a result `ccore`
                  state might be different if state is moved between platforms.

        """
        self.__pointer_data, self.__eps, self.__sqrt_eps, self.__neighbors, self.__visited, self.__belong, \
        self.__data_type, self.__clusters, self.__noise, self.__ccore = state

        self.__initialize_ccore_state(True)


    def process(self):
        """!
        @brief Performs cluster analysis in line with rules of DBSCAN algorithm.

        @return (dbscan) Returns itself (DBSCAN instance).

        @see get_clusters()
        @see get_noise()
        
        """
        
        if self.__ccore is True:
            (self.__clusters, self.__noise) = wrapper.dbscan(self.__pointer_data, self.__eps, self.__neighbors, self.__data_type)
            
        else:
            if self.__data_type == 'points':
                self.__kdtree = kdtree_balanced(self.__pointer_data, range(len(self.__pointer_data)))

            self.__visited = [False] * len(self.__pointer_data)
            self.__belong = [False] * len(self.__pointer_data)

            self.__neighbor_searcher = self.__create_neighbor_searcher(self.__data_type)

            for i in range(0, len(self.__pointer_data)):
                if self.__visited[i] is False:
                    cluster = self.__expand_cluster(i)
                    if cluster is not None:
                        self.__clusters.append(cluster)

            for i in range(0, len(self.__pointer_data)):
                if self.__belong[i] is False:
                    self.__noise.append(i)

        return self


    def get_clusters(self):
        """!
        @brief Returns allocated clusters.
        
        @remark Allocated clusters can be returned only after data processing (use method process()). Otherwise empty list is returned.
        
        @return (list) List of allocated clusters, each cluster contains indexes of objects in list of data.
        
        @see process()
        @see get_noise()
        
        """
        
        return self.__clusters


    def get_noise(self):
        """!
        @brief Returns allocated noise.
        
        @remark Allocated noise can be returned only after data processing (use method process() before). Otherwise empty list is returned.
        
        @return (list) List of indexes that are marked as a noise.
        
        @see process()
        @see get_clusters()
        
        """

        return self.__noise


    def get_cluster_encoding(self):
        """!
        @brief Returns clustering result representation type that indicate how clusters are encoded.
        
        @return (type_encoding) Clustering result representation.
        
        @see get_clusters()
        
        """
        
        return type_encoding.CLUSTER_INDEX_LIST_SEPARATION


    def __verify_arguments(self):
        """!
        @brief Verify input parameters for the algorithm and throw exception in case of incorrectness.

        """
        if len(self.__pointer_data) == 0:
            raise ValueError("Input data is empty (size: '%d')." % len(self.__pointer_data))

        if self.__eps < 0:
            raise ValueError("Connectivity radius (current value: '%d') should be greater or equal to 0." % self.__eps)


    def __create_neighbor_searcher(self, data_type):
        """!
        @brief Returns neighbor searcher in line with data type.

        @param[in] data_type (string): Data type (points or distance matrix).

        """
        if data_type == 'points':
            return self.__neighbor_indexes_points
        elif data_type == 'distance_matrix':
            return self.__neighbor_indexes_distance_matrix
        else:
            raise TypeError("Unknown type of data is specified '%s'" % data_type)


    def __expand_cluster(self, index_point):
        """!
        @brief Expands cluster from specified point in the input data space.
        
        @param[in] index_point (list): Index of a point from the data.

        @return (list) Return tuple of list of indexes that belong to the same cluster and list of points that are marked as noise: (cluster, noise), or None if nothing has been expanded.
        
        """
        
        cluster = None
        self.__visited[index_point] = True
        neighbors = self.__neighbor_searcher(index_point)
         
        if len(neighbors) >= self.__neighbors:
            cluster = [index_point]
             
            self.__belong[index_point] = True
             
            for i in neighbors:
                if self.__visited[i] is False:
                    self.__visited[i] = True

                    next_neighbors = self.__neighbor_searcher(i)
                     
                    if len(next_neighbors) >= self.__neighbors:
                        neighbors += [k for k in next_neighbors if ( (k in neighbors) == False) and k != index_point]
                 
                if self.__belong[i] is False:
                    cluster.append(i)
                    self.__belong[i] = True

        return cluster


    def __neighbor_indexes_points(self, index_point):
        """!
        @brief Return neighbors of the specified object in case of sequence of points.

        @param[in] index_point (uint): Index point whose neighbors are should be found.

        @return (list) List of indexes of neighbors in line the connectivity radius.

        """
        kdnodes = self.__kdtree.find_nearest_dist_nodes(self.__pointer_data[index_point], self.__eps)
        return [node_tuple[1].payload for node_tuple in kdnodes if node_tuple[1].payload != index_point]


    def __neighbor_indexes_distance_matrix(self, index_point):
        """!
        @brief Return neighbors of the specified object in case of distance matrix.

        @param[in] index_point (uint): Index point whose neighbors are should be found.

        @return (list) List of indexes of neighbors in line the connectivity radius.

        """
        distances = self.__pointer_data[index_point]
        return [index_neighbor for index_neighbor in range(len(distances))
                if ((distances[index_neighbor] <= self.__eps) and (index_neighbor != index_point))]


    def __initialize_ccore_state(self, ccore):
        """!
        @brief Initializes C++ pyclustering state.
        @details Check if it is requested and if it is available for the current platform. These information is used to
                  set status of C++ pyclustering library.

        @param[in] ccore (bool):

        """
        self.__ccore = ccore
        if self.__ccore:
            self.__ccore = ccore_library.workable()