File: agglomerative.py

package info (click to toggle)
python-pyclustering 0.10.1.2-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 11,128 kB
  • sloc: cpp: 38,888; python: 24,311; sh: 384; makefile: 105
file content (371 lines) | stat: -rwxr-xr-x 14,116 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
"""!

@brief Cluster analysis algorithm: agglomerative algorithm.
@details Implementation based on paper @cite book::algorithms_for_clustering_data.

@authors Andrei Novikov (pyclustering@yandex.ru)
@date 2014-2020
@copyright BSD-3-Clause

"""


from enum import IntEnum

from pyclustering.cluster.encoder import type_encoding

from pyclustering.utils import euclidean_distance_square

from pyclustering.core.wrapper import ccore_library

import pyclustering.core.agglomerative_wrapper as wrapper


class type_link(IntEnum):
    """!
    @brief Enumerator of types of link between clusters.
    
    """

    ## Distance between the two nearest objects in clusters is considered as a link, so-called SLINK method (the single-link clustering method).
    SINGLE_LINK = 0
    
    ## Distance between the farthest objects in clusters is considered as a link, so-called CLINK method (the complete-link clustering method).
    COMPLETE_LINK = 1
    
    ## Average distance between objects in clusters is considered as a link.
    AVERAGE_LINK = 2
    
    ## Distance between centers of clusters is considered as a link.
    CENTROID_LINK = 3


class agglomerative:
    """!
    @brief Class represents agglomerative algorithm for cluster analysis.
    @details Agglomerative algorithm considers each data point (object) as a separate cluster at the beginning and
              step by step finds the best pair of clusters for merge until required amount of clusters is obtained.
    
    Example of agglomerative algorithm where centroid link is used:
    @code
        from pyclustering.cluster.agglomerative import agglomerative, type_link
        from pyclustering.cluster import cluster_visualizer
        from pyclustering.samples.definitions import FCPS_SAMPLES
        from pyclustering.utils import read_sample

        # Sample for cluster analysis (represented by list)
        sample = read_sample(FCPS_SAMPLES.SAMPLE_TARGET)

        # Create object that uses python code only
        agglomerative_instance = agglomerative(sample, 6, type_link.SINGLE_LINK, ccore=True)

        # Cluster analysis
        agglomerative_instance.process()

        # Obtain results of clustering
        clusters = agglomerative_instance.get_clusters()

        # Visualize clustering results
        visualizer = cluster_visualizer()
        visualizer.append_clusters(clusters, sample)
        visualizer.show()
    @endcode
    
    There is example of clustering 'LSUN' sample:
    @code
        from pyclustering.cluster.agglomerative import agglomerative, type_link
        from pyclustering.cluster import cluster_visualizer
        from pyclustering.samples.definitions import FCPS_SAMPLES
        from pyclustering.utils import read_sample

        # sample Lsun for cluster analysis
        lsun_sample = read_sample(FCPS_SAMPLES.SAMPLE_LSUN)

        # create instance of the algorithm that will use ccore library (the last argument)
        agglomerative_instance = agglomerative(lsun_sample, 3, type_link.SINGLE_LINK, True)

        # start processing
        agglomerative_instance.process()

        # get result and visualize it
        lsun_clusters = agglomerative_instance.get_clusters()
        visualizer = cluster_visualizer()
        visualizer.append_clusters(lsun_clusters, lsun_sample)
        visualizer.show()
    @endcode
    
    Example of agglomerative clustering using different links:
    @image html agglomerative_lsun_clustering_single_link.png
    
    """
    
    def __init__(self, data, number_clusters, link = None, ccore = True):
        """!
        @brief Constructor of agglomerative hierarchical algorithm.
        
        @param[in] data (list): Input data that is presented as a list of points (objects), each point should be represented by list, for example
                    [[0.1, 0.2], [0.4, 0.5], [1.3, 0.9]].
        @param[in] number_clusters (uint): Number of clusters that should be allocated.
        @param[in] link (type_link): Link type that is used for calculation similarity between objects and clusters, if it is not specified centroid link will be used by default.
        @param[in] ccore (bool): Defines should be CCORE (C++ pyclustering library) used instead of Python code or not (by default it is 'False').
        
        """  
        
        self.__pointer_data = data
        self.__number_clusters = number_clusters
        self.__similarity = link

        self.__verify_arguments()

        if self.__similarity is None:
            self.__similarity = type_link.CENTROID_LINK
        
        self.__clusters = []
        self.__ccore = ccore
        if self.__ccore:
            self.__ccore = ccore_library.workable()

        if self.__similarity == type_link.CENTROID_LINK:
            self.__centers = self.__pointer_data.copy()    # used in case of usage of centroid links

    
    def process(self):
        """!
        @brief Performs cluster analysis in line with rules of agglomerative algorithm and similarity.

        @return (agglomerative) Returns itself (Agglomerative instance).

        @see get_clusters()
        
        """
        
        if self.__ccore is True:
            self.__clusters = wrapper.agglomerative_algorithm(self.__pointer_data, self.__number_clusters, self.__similarity)

        else:
            self.__clusters = [[index] for index in range(0, len(self.__pointer_data))]
            
            current_number_clusters = len(self.__clusters)
                
            while current_number_clusters > self.__number_clusters:
                self.__merge_similar_clusters()
                current_number_clusters = len(self.__clusters)

        return self

    
    def get_clusters(self):
        """!
        @brief Returns list of allocated clusters, each cluster contains indexes of objects in list of data.
        
        @remark Results of clustering can be obtained using corresponding gets methods.
        
        @return (list) List of allocated clusters, each cluster contains indexes of objects in list of data.
        
        @see process()
        
        """
        
        return self.__clusters
    
    
    def get_cluster_encoding(self):
        """!
        @brief Returns clustering result representation type that indicate how clusters are encoded.
        
        @return (type_encoding) Clustering result representation.
        
        @see get_clusters()
        
        """
        
        return type_encoding.CLUSTER_INDEX_LIST_SEPARATION


    def __merge_similar_clusters(self):
        """!
        @brief Merges the most similar clusters in line with link type.
        
        """
        
        if self.__similarity == type_link.AVERAGE_LINK:
            self.__merge_by_average_link()
        
        elif self.__similarity == type_link.CENTROID_LINK:
            self.__merge_by_centroid_link()
        
        elif self.__similarity == type_link.COMPLETE_LINK:
            self.__merge_by_complete_link()
        
        elif self.__similarity == type_link.SINGLE_LINK:
            self.__merge_by_signle_link()
        
        else:
            raise NameError('Not supported similarity is used')
    
    
    def __merge_by_average_link(self):
        """!
        @brief Merges the most similar clusters in line with average link type.
        
        """
        
        minimum_average_distance = float('Inf')
        
        for index_cluster1 in range(0, len(self.__clusters)):
            for index_cluster2 in range(index_cluster1 + 1, len(self.__clusters)):
                
                # Find farthest objects
                candidate_average_distance = 0.0
                for index_object1 in self.__clusters[index_cluster1]:
                    for index_object2 in self.__clusters[index_cluster2]:
                        candidate_average_distance += euclidean_distance_square(self.__pointer_data[index_object1], self.__pointer_data[index_object2])
                
                candidate_average_distance /= (len(self.__clusters[index_cluster1]) + len(self.__clusters[index_cluster2]))
                
                if candidate_average_distance < minimum_average_distance:
                    minimum_average_distance = candidate_average_distance
                    indexes = [index_cluster1, index_cluster2]
        
        self.__clusters[indexes[0]] += self.__clusters[indexes[1]]
        self.__clusters.pop(indexes[1])   # remove merged cluster.
        
    
    def __merge_by_centroid_link(self):
        """!
        @brief Merges the most similar clusters in line with centroid link type.
        
        """
        
        minimum_centroid_distance = float('Inf')
        indexes = None
        
        for index1 in range(0, len(self.__centers)):
            for index2 in range(index1 + 1, len(self.__centers)):
                distance = euclidean_distance_square(self.__centers[index1], self.__centers[index2])
                if distance < minimum_centroid_distance:
                    minimum_centroid_distance = distance
                    indexes = [index1, index2]
        
        self.__clusters[indexes[0]] += self.__clusters[indexes[1]]
        self.__centers[indexes[0]] = self.__calculate_center(self.__clusters[indexes[0]])
         
        self.__clusters.pop(indexes[1])   # remove merged cluster.
        self.__centers.pop(indexes[1])    # remove merged center.
    
    
    def __merge_by_complete_link(self):
        """!
        @brief Merges the most similar clusters in line with complete link type.
        
        """
        
        minimum_complete_distance = float('Inf')
        indexes = None
        
        for index_cluster1 in range(0, len(self.__clusters)):
            for index_cluster2 in range(index_cluster1 + 1, len(self.__clusters)):
                candidate_maximum_distance = self.__calculate_farthest_distance(index_cluster1, index_cluster2)
                
                if candidate_maximum_distance < minimum_complete_distance:
                    minimum_complete_distance = candidate_maximum_distance
                    indexes = [index_cluster1, index_cluster2]

        self.__clusters[indexes[0]] += self.__clusters[indexes[1]]
        self.__clusters.pop(indexes[1])   # remove merged cluster.
    
    
    def __calculate_farthest_distance(self, index_cluster1, index_cluster2):
        """!
        @brief Finds two farthest objects in two specified clusters in terms and returns distance between them.
        
        @param[in] (uint) Index of the first cluster.
        @param[in] (uint) Index of the second cluster.
        
        @return The farthest euclidean distance between two clusters.
        
        """
        candidate_maximum_distance = 0.0
        for index_object1 in self.__clusters[index_cluster1]:
            for index_object2 in self.__clusters[index_cluster2]:
                distance = euclidean_distance_square(self.__pointer_data[index_object1], self.__pointer_data[index_object2])
                
                if distance > candidate_maximum_distance:
                    candidate_maximum_distance = distance
    
        return candidate_maximum_distance
    
    
    def __merge_by_signle_link(self):
        """!
        @brief Merges the most similar clusters in line with single link type.
        
        """
        
        minimum_single_distance = float('Inf')
        indexes = None
        
        for index_cluster1 in range(0, len(self.__clusters)):
            for index_cluster2 in range(index_cluster1 + 1, len(self.__clusters)):
                candidate_minimum_distance = self.__calculate_nearest_distance(index_cluster1, index_cluster2)
                
                if candidate_minimum_distance < minimum_single_distance:
                    minimum_single_distance = candidate_minimum_distance
                    indexes = [index_cluster1, index_cluster2]

        self.__clusters[indexes[0]] += self.__clusters[indexes[1]]
        self.__clusters.pop(indexes[1])   # remove merged cluster.
    
    
    def __calculate_nearest_distance(self, index_cluster1, index_cluster2):
        """!
        @brief Finds two nearest objects in two specified clusters and returns distance between them.
        
        @param[in] (uint) Index of the first cluster.
        @param[in] (uint) Index of the second cluster.
        
        @return The nearest euclidean distance between two clusters.
        
        """
        candidate_minimum_distance = float('Inf')
        
        for index_object1 in self.__clusters[index_cluster1]:
            for index_object2 in self.__clusters[index_cluster2]:
                distance = euclidean_distance_square(self.__pointer_data[index_object1], self.__pointer_data[index_object2])
                if distance < candidate_minimum_distance:
                    candidate_minimum_distance = distance
        
        return candidate_minimum_distance
    
    
    def __calculate_center(self, cluster):
        """!
        @brief Calculates new center.
        
        @return (list) New value of the center of the specified cluster.
        
        """
         
        dimension = len(self.__pointer_data[cluster[0]])
        center = [0] * dimension
        for index_point in cluster:
            for index_dimension in range(0, dimension):
                center[index_dimension] += self.__pointer_data[index_point][index_dimension]
         
        for index_dimension in range(0, dimension):
            center[index_dimension] /= len(cluster)
             
        return center


    def __verify_arguments(self):
        """!
        @brief Verify input parameters for the algorithm and throw exception in case of incorrectness.

        """
        if len(self.__pointer_data) == 0:
            raise ValueError("Input data is empty (size: '%d')." % len(self.__pointer_data))

        if self.__number_clusters <= 0:
            raise ValueError("Amount of cluster (current value: '%d') for allocation should be greater than 0." %
                             self.__number_clusters)