File: rock.py

package info (click to toggle)
python-pyclustering 0.10.1.2-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 11,128 kB
  • sloc: cpp: 38,888; python: 24,311; sh: 384; makefile: 105
file content (237 lines) | stat: -rwxr-xr-x 9,128 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
"""!

@brief Cluster analysis algorithm: ROCK
@details Implementation based on paper @cite inproceedings::rock::1.

@authors Andrei Novikov (pyclustering@yandex.ru)
@date 2014-2020
@copyright BSD-3-Clause

"""


from pyclustering.cluster.encoder import type_encoding

from pyclustering.utils import euclidean_distance

from pyclustering.core.wrapper import ccore_library

import pyclustering.core.rock_wrapper as wrapper


class rock:
    """!
    @brief The class represents clustering algorithm ROCK.

    Example:
    @code
        from pyclustering.cluster import cluster_visualizer
        from pyclustering.cluster.rock import rock
        from pyclustering.samples.definitions import FCPS_SAMPLES
        from pyclustering.utils import read_sample

        # Read sample for clustering from file.
        sample = read_sample(FCPS_SAMPLES.SAMPLE_HEPTA)

        # Create instance of ROCK algorithm for cluster analysis. Seven clusters should be allocated.
        rock_instance = rock(sample, 1.0, 7)

        # Run cluster analysis.
        rock_instance.process()

        # Obtain results of clustering.
        clusters = rock_instance.get_clusters()

        # Visualize clustering results.
        visualizer = cluster_visualizer()
        visualizer.append_clusters(clusters, sample)
        visualizer.show()
    @endcode
       
    """
    
    def __init__(self, data, eps, number_clusters, threshold=0.5, ccore=True):
        """!
        @brief Constructor of clustering algorithm ROCK.
        
        @param[in] data (list): Input data - list of points where each point is represented by list of coordinates.
        @param[in] eps (double): Connectivity radius (similarity threshold), points are neighbors if distance between them is less than connectivity radius.
        @param[in] number_clusters (uint): Defines number of clusters that should be allocated from the input data set.
        @param[in] threshold (double): Value that defines degree of normalization that influences on choice of clusters for merging during processing.
        @param[in] ccore (bool): Defines should be CCORE (C++ pyclustering library) used instead of Python code or not.
        
        """
        
        self.__pointer_data = data
        self.__eps = eps
        self.__number_clusters = number_clusters
        self.__threshold = threshold
        
        self.__clusters = None
        
        self.__ccore = ccore
        if self.__ccore:
            self.__ccore = ccore_library.workable()

        self.__verify_arguments()

        self.__degree_normalization = 1.0 + 2.0 * ((1.0 - threshold) / (1.0 + threshold))

        self.__adjacency_matrix = None
        self.__create_adjacency_matrix()


    def process(self):
        """!
        @brief Performs cluster analysis in line with rules of ROCK algorithm.

        @return (rock) Returns itself (ROCK instance).
        
        @see get_clusters()
        
        """
        
        # TODO: (Not related to specification, just idea) First iteration should be investigated. Euclidean distance should be used for clustering between two 
        # points and rock algorithm between clusters because we consider non-categorical samples. But it is required more investigations.
        
        if self.__ccore is True:
            self.__clusters = wrapper.rock(self.__pointer_data, self.__eps, self.__number_clusters, self.__threshold)
        
        else:  
            self.__clusters = [[index] for index in range(len(self.__pointer_data))]
            
            while len(self.__clusters) > self.__number_clusters:
                indexes = self.__find_pair_clusters(self.__clusters)
                
                if indexes != [-1, -1]:
                    self.__clusters[indexes[0]] += self.__clusters[indexes[1]]
                    self.__clusters.pop(indexes[1])   # remove merged cluster.
                else:
                    break  # totally separated clusters have been allocated
        return self

    
    def get_clusters(self):
        """!
        @brief Returns list of allocated clusters, each cluster contains indexes of objects in list of data.
        
        @return (list) List of allocated clusters, each cluster contains indexes of objects in list of data.
        
        @see process()
        
        """
        
        return self.__clusters


    def get_cluster_encoding(self):
        """!
        @brief Returns clustering result representation type that indicate how clusters are encoded.
        
        @return (type_encoding) Clustering result representation.
        
        @see get_clusters()
        
        """
        
        return type_encoding.CLUSTER_INDEX_LIST_SEPARATION


    def __find_pair_clusters(self, clusters):
        """!
        @brief Returns pair of clusters that are best candidates for merging in line with goodness measure.
               The pair of clusters for which the above goodness measure is maximum is the best pair of clusters to be merged.
               
        @param[in] clusters (list): List of clusters that have been allocated during processing, each cluster is represented by list of indexes of points from the input data set.
        
        @return (list) List that contains two indexes of clusters (from list 'clusters') that should be merged on this step.
                It can be equals to [-1, -1] when no links between clusters.
        
        """
        
        maximum_goodness = 0.0
        cluster_indexes = [-1, -1]
        
        for i in range(0, len(clusters)):
            for j in range(i + 1, len(clusters)):
                goodness = self.__calculate_goodness(clusters[i], clusters[j])
                if goodness > maximum_goodness:
                    maximum_goodness = goodness
                    cluster_indexes = [i, j]
        
        return cluster_indexes


    def __calculate_links(self, cluster1, cluster2):
        """!
        @brief Returns number of link between two clusters. 
        @details Link between objects (points) exists only if distance between them less than connectivity radius.
        
        @param[in] cluster1 (list): The first cluster.
        @param[in] cluster2 (list): The second cluster.
        
        @return (uint) Number of links between two clusters.
        
        """
        
        number_links = 0
        
        for index1 in cluster1:
            for index2 in cluster2:
                number_links += self.__adjacency_matrix[index1][index2]
                
        return number_links
            

    def __create_adjacency_matrix(self):
        """!
        @brief Creates 2D adjacency matrix (list of lists) where each element described existence of link between points (means that points are neighbors).
        
        """
        
        size_data = len(self.__pointer_data)
        
        self.__adjacency_matrix = [[0 for i in range(size_data)] for j in range(size_data)]
        for i in range(0, size_data):
            for j in range(i + 1, size_data):
                distance = euclidean_distance(self.__pointer_data[i], self.__pointer_data[j])
                if (distance <= self.__eps):
                    self.__adjacency_matrix[i][j] = 1
                    self.__adjacency_matrix[j][i] = 1
        
    

    def __calculate_goodness(self, cluster1, cluster2):
        """!
        @brief Calculates coefficient 'goodness measurement' between two clusters. The coefficient defines level of suitability of clusters for merging.
        
        @param[in] cluster1 (list): The first cluster.
        @param[in] cluster2 (list): The second cluster.
        
        @return Goodness measure between two clusters.
        
        """
        
        number_links = self.__calculate_links(cluster1, cluster2)
        devider = (len(cluster1) + len(cluster2)) ** self.__degree_normalization - len(cluster1) ** self.__degree_normalization - len(cluster2) ** self.__degree_normalization
        
        return number_links / devider


    def __verify_arguments(self):
        """!
        @brief Verify input parameters for the algorithm and throw exception in case of incorrectness.

        """
        if len(self.__pointer_data) == 0:
            raise ValueError("Input data is empty (size: '%d')." % len(self.__pointer_data))

        if self.__eps < 0:
            raise ValueError("Connectivity radius (current value: '%d') should be greater or equal to 0." % self.__eps)

        if self.__threshold < 0 or self.__threshold > 1:
            raise ValueError("Threshold (current value: '%d') should be in range (0, 1)." % self.__threshold)

        if (self.__number_clusters is not None) and (self.__number_clusters <= 0):
            raise ValueError("Amount of clusters (current value: '%d') should be greater than 0." %
                             self.__number_clusters)