File: gmeans.py

package info (click to toggle)
python-pyclustering 0.10.1.2-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 11,128 kB
  • sloc: cpp: 38,888; python: 24,311; sh: 384; makefile: 105
file content (430 lines) | stat: -rwxr-xr-x 16,662 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
"""!

@brief The module contains G-Means algorithm and other related services.
@details Implementation based on paper @cite inproceedings::cluster::gmeans::1.

@authors Andrei Novikov (pyclustering@yandex.ru)
@date 2014-2020
@copyright BSD-3-Clause

"""


import numpy
import scipy.stats

from pyclustering.core.gmeans_wrapper import gmeans as gmeans_wrapper
from pyclustering.core.wrapper import ccore_library

from pyclustering.cluster.center_initializer import kmeans_plusplus_initializer
from pyclustering.cluster.encoder import type_encoding
from pyclustering.cluster.kmeans import kmeans
from pyclustering.utils import distance_metric, type_metric


class gmeans:
    """!
    @brief Class implements G-Means clustering algorithm.
    @details The G-means algorithm starts with a small number of centers, and grows the number of centers.
              Each iteration of the G-Means algorithm splits into two those centers whose data appear not to come from a
              Gaussian distribution. G-means repeatedly makes decisions based on a statistical test for the data
              assigned to each center.

    Implementation based on the paper @cite inproceedings::cluster::gmeans::1.

    @image html gmeans_example_clustering.png "G-Means clustering results on most common data-sets."

    Example #1. In this example, G-Means starts analysis from single cluster.
    @code
        from pyclustering.cluster import cluster_visualizer
        from pyclustering.cluster.gmeans import gmeans
        from pyclustering.utils import read_sample
        from pyclustering.samples.definitions import FCPS_SAMPLES

        # Read sample 'Lsun' from file.
        sample = read_sample(FCPS_SAMPLES.SAMPLE_LSUN)

        # Create instance of G-Means algorithm. By default the algorithm starts search from a single cluster.
        gmeans_instance = gmeans(sample).process()

        # Extract clustering results: clusters and their centers
        clusters = gmeans_instance.get_clusters()
        centers = gmeans_instance.get_centers()

        # Print total sum of metric errors
        print("Total WCE:", gmeans_instance.get_total_wce())

        # Visualize clustering results
        visualizer = cluster_visualizer()
        visualizer.append_clusters(clusters, sample)
        visualizer.show()
    @endcode

    Example #2. Sometimes G-Means might find local optimum. `repeat` value can be used to increase probability to
    find global optimum. Argument `repeat` defines how many times K-Means clustering with K-Means++
    initialization should be run in order to find optimal clusters.
    @code
        # Read sample 'Tetra' from file.
        sample = read_sample(FCPS_SAMPLES.SAMPLE_TETRA)

        # Create instance of G-Means algorithm. By default algorithm start search from single cluster.
        gmeans_instance = gmeans(sample, repeat=10).process()

        # Extract clustering results: clusters and their centers
        clusters = gmeans_instance.get_clusters()

        # Visualize clustering results
        visualizer = cluster_visualizer()
        visualizer.append_clusters(clusters, sample)
        visualizer.show()
    @endcode

    In case of requirement to have labels instead of default representation of clustering results `CLUSTER_INDEX_LIST_SEPARATION`:
    @code
        from pyclustering.cluster.gmeans import gmeans
        from pyclustering.cluster.encoder import type_encoding, cluster_encoder
        from pyclustering.samples.definitions import SIMPLE_SAMPLES
        from pyclustering.utils import read_sample

        data = read_sample(SIMPLE_SAMPLES.SAMPLE_SIMPLE1)

        gmeans_instance = gmeans(data).process()
        clusters = gmeans_instance.get_clusters()

        # Change cluster representation from default to labeling.
        encoder = cluster_encoder(type_encoding.CLUSTER_INDEX_LIST_SEPARATION, clusters, data)
        encoder.set_encoding(type_encoding.CLUSTER_INDEX_LABELING)
        labels = encoder.get_clusters()

        print(labels)   # Display labels
    @endcode

    There is an output of the code above:
    @code
        [0, 0, 0, 0, 0, 1, 1, 1, 1, 1]
    @endcode

    """

    def __init__(self, data, k_init=1, ccore=True, **kwargs):
        """!
        @brief Initializes G-Means algorithm.

        @param[in] data (array_like): Input data that is presented as array of points (objects), each point should be
                    represented by array_like data structure.
        @param[in] k_init (uint): Initial amount of centers (by default started search from 1).
        @param[in] ccore (bool): Defines whether CCORE library (C/C++ part of the library) should be used instead of
                    Python code.
        @param[in] **kwargs: Arbitrary keyword arguments (available arguments: `tolerance`, `repeat`, `k_max`, `random_state`).

        <b>Keyword Args:</b><br>
            - tolerance (double): tolerance (double): Stop condition for each K-Means iteration: if maximum value of
               change of centers of clusters is less than tolerance than algorithm will stop processing.
            - repeat (unit): How many times K-Means should be run to improve parameters (by default is 3).
               With larger 'repeat' values suggesting higher probability of finding global optimum.
            - k_max (uint): Maximum amount of cluster that might be allocated. The argument is considered as a stop
               condition. When the maximum amount is reached then algorithm stops processing. By default the maximum
               amount of clusters is not restricted (`k_max` is -1).
            - random_state (int): Seed for random state (by default is `None`, current system time is used).

        """
        self.__data = data
        self.__k_init = k_init

        self.__clusters = []
        self.__centers = []
        self.__total_wce = 0.0
        self.__ccore = ccore

        self.__tolerance = kwargs.get('tolerance', 0.001)
        self.__repeat = kwargs.get('repeat', 3)
        self.__k_max = kwargs.get('k_max', -1)
        self.__random_state = kwargs.get('random_state', None)

        if self.__ccore is True:
            self.__ccore = ccore_library.workable()

        self._verify_arguments()


    def process(self):
        """!
        @brief Performs cluster analysis in line with rules of G-Means algorithm.

        @return (gmeans) Returns itself (G-Means instance).

        @see get_clusters()
        @see get_centers()

        """
        if self.__ccore is True:
            return self._process_by_ccore()

        return self._process_by_python()


    def _process_by_ccore(self):
        """!
        @brief Performs cluster analysis using CCORE (C/C++ part of pyclustering library).

        """
        self.__clusters, self.__centers, self.__total_wce = gmeans_wrapper(self.__data, self.__k_init, self.__tolerance, self.__repeat, self.__k_max, self.__random_state)
        return self


    def _process_by_python(self):
        """!
        @brief Performs cluster analysis using Python.

        """
        self.__clusters, self.__centers, _ = self._search_optimal_parameters(self.__data, self.__k_init)

        while self._run_condition():
            current_amount_clusters = len(self.__clusters)
            self._statistical_optimization()

            if current_amount_clusters == len(self.__centers):  # amount of centers the same - no need to continue.
                break

            self._perform_clustering()

        return self


    def predict(self, points):
        """!
        @brief Calculates the closest cluster to each point.

        @param[in] points (array_like): Points for which closest clusters are calculated.

        @return (list) List of closest clusters for each point. Each cluster is denoted by index. Return empty
                 collection if 'process()' method was not called.

        """
        nppoints = numpy.array(points)
        if len(self.__clusters) == 0:
            return []

        metric = distance_metric(type_metric.EUCLIDEAN_SQUARE, numpy_usage=True)

        npcenters = numpy.array(self.__centers)
        differences = numpy.zeros((len(nppoints), len(npcenters)))
        for index_point in range(len(nppoints)):
            differences[index_point] = metric(nppoints[index_point], npcenters)

        return numpy.argmin(differences, axis=1)


    def get_clusters(self):
        """!
        @brief Returns list of allocated clusters, each cluster contains indexes of objects in list of data.

        @return (array_like) Allocated clusters.

        @see process()
        @see get_centers()

        """
        return self.__clusters


    def get_centers(self):
        """!
        @brief Returns list of centers of allocated clusters.

        @return (array_like) Allocated centers.

        @see process()
        @see get_clusters()

        """
        return self.__centers


    def get_total_wce(self):
        """!
        @brief Returns sum of metric errors that depends on metric that was used for clustering (by default SSE - Sum of Squared Errors).
        @details Sum of metric errors is calculated using distance between point and its center:
                 \f[error=\sum_{i=0}^{N}distance(x_{i}-center(x_{i}))\f]

        @see process()
        @see get_clusters()

        """

        return self.__total_wce


    def get_cluster_encoding(self):
        """!
        @brief Returns clustering result representation type that indicate how clusters are encoded.

        @return (type_encoding) Clustering result representation.

        @see get_clusters()

        """

        return type_encoding.CLUSTER_INDEX_LIST_SEPARATION


    def _statistical_optimization(self):
        """!
        @brief Try to split cluster into two to find optimal amount of clusters.

        """
        centers = []
        potential_amount_clusters = len(self.__clusters)
        for index in range(len(self.__clusters)):
            new_centers = self._split_and_search_optimal(self.__clusters[index])
            if (new_centers is None) or ((self.__k_max != -1) and (potential_amount_clusters >= self.__k_max)):
                centers.append(self.__centers[index])
            else:
                centers += new_centers
                potential_amount_clusters += 1

        self.__centers = centers


    def _split_and_search_optimal(self, cluster):
        """!
        @brief Split specified cluster into two by performing K-Means clustering and check correctness by
                Anderson-Darling test.

        @param[in] cluster (array_like) Cluster that should be analysed and optimized by splitting if it is required.

        @return (array_like) Two new centers if two new clusters are considered as more suitable.
                (None) If current cluster is more suitable.
        """
        if len(cluster) == 1:
            return None

        points = [self.__data[index_point] for index_point in cluster]
        new_clusters, new_centers, _ = self._search_optimal_parameters(points, 2)

        if len(new_centers) > 1:
            accept_null_hypothesis = self._is_null_hypothesis(points, new_centers)
            if not accept_null_hypothesis:
                return new_centers  # If null hypothesis is rejected then use two new clusters

        return None


    def _is_null_hypothesis(self, data, centers):
        """!
        @brief Returns whether H0 hypothesis is accepted using Anderson-Darling test statistic.

        @param[in] data (array_like): N-dimensional data for statistical test.
        @param[in] centers (array_like): Two new allocated centers.

        @return (bool) True is null hypothesis is acceptable.

        """
        v = numpy.subtract(centers[0], centers[1])
        points = self._project_data(data, v)

        estimation, critical, _ = scipy.stats.anderson(points, dist='norm')  # the Anderson-Darling test statistic

        # If the returned statistic is larger than these critical values then for the corresponding significance level,
        # the null hypothesis that the data come from the chosen distribution can be rejected.
        return estimation < critical[-1]  # False - not a gaussian distribution (reject H0)


    @staticmethod
    def _project_data(data, vector):
        """!
        @brief Transform input data by project it onto input vector using formula:

        \f[
        x_{i}^{*}=\frac{\left \langle x_{i}, v \right \rangle}{\left \| v \right \|^{2}}.
        \f]

        @param[in] data (array_like): Input data that is represented by points.
        @param[in] vector (array_like): Input vector that is used for projection.

        @return (array_like) Transformed 1-dimensional data.

        """
        square_norm = numpy.sum(numpy.multiply(vector, vector))
        return numpy.divide(numpy.sum(numpy.multiply(data, vector), axis=1), square_norm)


    def _search_optimal_parameters(self, data, amount):
        """!
        @brief Performs cluster analysis for specified data several times to find optimal clustering result in line
                with WCE.

        @param[in] data (array_like): Input data that should be clustered.
        @param[in] amount (unit): Amount of clusters that should be allocated.

        @return (tuple) Optimal clustering result: (clusters, centers, wce).

        """
        best_wce, best_clusters, best_centers = float('+inf'), [], []
        for _ in range(self.__repeat):
            initial_centers = kmeans_plusplus_initializer(data, amount, random_state=self.__random_state).initialize()
            solver = kmeans(data, initial_centers, tolerance=self.__tolerance, ccore=False).process()

            candidate_wce = solver.get_total_wce()
            if candidate_wce < best_wce:
                best_wce = candidate_wce
                best_clusters = solver.get_clusters()
                best_centers = solver.get_centers()

            if len(initial_centers) == 1:
                break   # No need to rerun clustering for one initial center.

        return best_clusters, best_centers, best_wce


    def _perform_clustering(self):
        """!
        @brief Performs cluster analysis using K-Means algorithm using current centers are initial.

        @param[in] data (array_like): Input data for cluster analysis.

        """
        solver = kmeans(self.__data, self.__centers, tolerance=self.__tolerance, ccore=False).process()
        self.__clusters = solver.get_clusters()
        self.__centers = solver.get_centers()
        self.__total_wce = solver.get_total_wce()


    def _run_condition(self):
        """!
        @brief Defines whether the algorithm should continue processing or should stop.

        @return `True` if the algorithm should continue processing, otherwise returns `False`

        """
        if (self.__k_max > 0) and (len(self.__clusters) >= self.__k_max):
            return False

        return True


    def _verify_arguments(self):
        """!
        @brief Verify input parameters for the algorithm and throw exception in case of incorrectness.

        """
        if len(self.__data) == 0:
            raise ValueError("Input data is empty (size: '%d')." % len(self.__data))

        if self.__k_init <= 0:
            raise ValueError("Initial amount of centers should be greater than 0 "
                             "(current value: '%d')." % self.__k_init)

        if self.__tolerance <= 0.0:
            raise ValueError("Tolerance should be greater than 0 (current value: '%f')." % self.__tolerance)

        if self.__repeat <= 0:
            raise ValueError("Amount of attempt to find optimal parameters should be greater than 0 "
                             "(current value: '%d')." % self.__repeat)

        if (self.__k_max != -1) and (self.__k_max <= 0):
            raise ValueError("Maximum amount of cluster that might be allocated should be greater than 0 or -1 if "
                             "the algorithm should be restricted in searching optimal number of clusters.")

        if (self.__k_max != -1) and (self.__k_max < self.__k_init):
            raise ValueError("Initial amount of clusters should be less than the maximum amount 'k_max'.")