File: kmeans_templates.py

package info (click to toggle)
python-pyclustering 0.10.1.2-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 11,128 kB
  • sloc: cpp: 38,888; python: 24,311; sh: 384; makefile: 105
file content (153 lines) | stat: -rwxr-xr-x 5,743 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""!

@brief Test templates for K-Means clustering module.

@authors Andrei Novikov (pyclustering@yandex.ru)
@date 2014-2020
@copyright BSD-3-Clause

"""


from pyclustering.tests.assertion import assertion

from pyclustering.cluster.encoder import type_encoding, cluster_encoder
from pyclustering.cluster.kmeans import kmeans, kmeans_observer, kmeans_visualizer

from pyclustering.utils import read_sample
from pyclustering.utils.metric import distance_metric, type_metric

from random import random

import numpy


class KmeansTestTemplates:
    @staticmethod
    def templateLengthProcessData(data, start_centers, expected_cluster_length, ccore, **kwargs):
        if isinstance(data, str):
            sample = read_sample(data)
        else:
            sample = data

        metric = kwargs.get('metric', distance_metric(type_metric.EUCLIDEAN_SQUARE))
        itermax = kwargs.get('itermax', 200)
        
        kmeans_instance = kmeans(sample, start_centers, 0.001, ccore, metric=metric, itermax=itermax)
        kmeans_instance.process()
        
        clusters = kmeans_instance.get_clusters()
        centers = kmeans_instance.get_centers()
        wce = kmeans_instance.get_total_wce()

        if itermax == 0:
            assertion.eq(start_centers, centers)
            assertion.eq([], clusters)
            assertion.eq(0.0, wce)
            return

        expected_wce = 0.0
        for index_cluster in range(len(clusters)):
            for index_point in clusters[index_cluster]:
                expected_wce += metric(sample[index_point], centers[index_cluster])

        assertion.eq(expected_wce, wce)

        obtained_cluster_sizes = [len(cluster) for cluster in clusters]
        assertion.eq(len(sample), sum(obtained_cluster_sizes))
        
        assertion.eq(len(clusters), len(centers))
        for center in centers:
            assertion.eq(len(sample[0]), len(center))
        
        if expected_cluster_length is not None:
            obtained_cluster_sizes.sort()
            expected_cluster_length.sort()
            assertion.eq(obtained_cluster_sizes, expected_cluster_length)


    @staticmethod
    def templatePredict(path_to_file, initial_centers, points, expected_closest_clusters, ccore, **kwargs):
        sample = read_sample(path_to_file)

        metric = kwargs.get('metric', distance_metric(type_metric.EUCLIDEAN_SQUARE))
        itermax = kwargs.get('itermax', 200)

        kmeans_instance = kmeans(sample, initial_centers, 0.001, ccore, metric=metric, itermax=itermax)
        kmeans_instance.process()

        closest_clusters = kmeans_instance.predict(points)
        assertion.eq(len(expected_closest_clusters), len(closest_clusters))
        assertion.true(numpy.array_equal(numpy.array(expected_closest_clusters), closest_clusters))


    @staticmethod
    def templateClusterAllocationOneDimensionData(ccore_flag):
        input_data = [ [random()] for _ in range(10) ] + [ [random() + 3] for _ in range(10) ] + [ [random() + 5] for _ in range(10) ] + [ [random() + 8] for _ in range(10) ]
        
        kmeans_instance = kmeans(input_data, [ [0.0], [3.0], [5.0], [8.0] ], 0.025, ccore_flag)
        kmeans_instance.process()
        clusters = kmeans_instance.get_clusters()
        
        assertion.eq(4, len(clusters))
        for cluster in clusters:
            assertion.eq(10, len(cluster))


    @staticmethod
    def templateEncoderProcedures(filename, initial_centers, number_clusters, ccore_flag):
        sample = read_sample(filename)
        
        kmeans_instance = kmeans(sample, initial_centers, 0.025, ccore_flag)
        kmeans_instance.process()
        
        clusters = kmeans_instance.get_clusters()
        encoding = kmeans_instance.get_cluster_encoding()
        
        encoder = cluster_encoder(encoding, clusters, sample)
        encoder.set_encoding(type_encoding.CLUSTER_INDEX_LABELING)
        encoder.set_encoding(type_encoding.CLUSTER_OBJECT_LIST_SEPARATION)
        encoder.set_encoding(type_encoding.CLUSTER_INDEX_LIST_SEPARATION)
        
        assertion.eq(number_clusters, len(clusters))


    @staticmethod
    def templateCollectEvolution(filename, initial_centers, number_clusters, ccore_flag):
        sample = read_sample(filename)
        
        observer = kmeans_observer()
        kmeans_instance = kmeans(sample, initial_centers, 0.025, ccore_flag, observer=observer)
        kmeans_instance.process()
        
        assertion.le(1, len(observer))
        for i in range(len(observer)):
            assertion.le(1, len(observer.get_centers(i)))
            for center in observer.get_centers(i):
                assertion.eq(len(sample[0]), len(center))
            
            assertion.le(1, len(observer.get_clusters(i)))


    @staticmethod
    def templateShowClusteringResultNoFailure(filename, initial_centers, ccore_flag):
        sample = read_sample(filename)

        kmeans_instance = kmeans(sample, initial_centers, 0.025, ccore_flag)
        kmeans_instance.process()

        clusters = kmeans_instance.get_clusters()
        centers = kmeans_instance.get_centers()

        kmeans_visualizer.show_clusters(sample, clusters, centers, initial_centers)


    @staticmethod
    def templateAnimateClusteringResultNoFailure(filename, initial_centers, ccore_flag):
        sample = read_sample(filename)

        observer = kmeans_observer()
        kmeans_instance = kmeans(sample, initial_centers, 0.025, ccore_flag, observer=observer)
        kmeans_instance.process()

        kmeans_visualizer.animate_cluster_allocation(sample, observer)