File: xmeans_templates.py

package info (click to toggle)
python-pyclustering 0.10.1.2-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 11,128 kB
  • sloc: cpp: 38,888; python: 24,311; sh: 384; makefile: 105
file content (126 lines) | stat: -rwxr-xr-x 5,242 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
"""!

@brief Test templates for X-Means clustering module.

@authors Andrei Novikov (pyclustering@yandex.ru)
@date 2014-2020
@copyright BSD-3-Clause

"""

import numpy
import random

from pyclustering.cluster.xmeans import xmeans, splitting_type
from pyclustering.cluster.center_initializer import random_center_initializer
from pyclustering.samples.definitions import SIMPLE_SAMPLES
from pyclustering.utils import read_sample, distance_metric, type_metric

from pyclustering.tests.assertion import assertion


class XmeansTestTemplates:
    @staticmethod
    def templateLengthProcessData(input_sample, start_centers, expected_cluster_length, type_splitting, kmax, ccore, **kwargs):
        if isinstance(input_sample, str):
            sample = read_sample(input_sample)
        else:
            sample = input_sample

        xmeans_instance = xmeans(sample, start_centers, kmax, 0.025, type_splitting, ccore, **kwargs)
        xmeans_instance.process()
         
        clusters = xmeans_instance.get_clusters()
        centers = xmeans_instance.get_centers()
        wce = xmeans_instance.get_total_wce()
    
        obtained_cluster_sizes = [len(cluster) for cluster in clusters]

        assertion.eq(len(sample), sum(obtained_cluster_sizes))
        assertion.eq(len(clusters), len(centers))
        assertion.le(len(centers), kmax)

        expected_wce = 0.0
        metric = kwargs.get('metric', distance_metric(type_metric.EUCLIDEAN_SQUARE))
        for index_cluster in range(len(clusters)):
            for index_point in clusters[index_cluster]:
                expected_wce += metric(sample[index_point], centers[index_cluster])

        assertion.eq(expected_wce, wce)

        if expected_cluster_length is not None:
            assertion.eq(len(expected_cluster_length), len(centers))

            obtained_cluster_sizes.sort()
            expected_cluster_length.sort()
            
            assertion.eq(obtained_cluster_sizes, expected_cluster_length)


    @staticmethod
    def templatePredict(path_to_file, initial_centers, points, expected_amount, expected_closest_clusters, ccore, **kwargs):
        sample = read_sample(path_to_file)

        kmax = kwargs.get('kmax', 20)

        xmeans_instance = xmeans(sample, initial_centers, kmax, 0.025, splitting_type.BAYESIAN_INFORMATION_CRITERION, ccore)
        xmeans_instance.process()

        closest_clusters = xmeans_instance.predict(points)
        assertion.eq(expected_amount, len(xmeans_instance.get_clusters()))
        assertion.eq(len(expected_closest_clusters), len(closest_clusters))
        assertion.true(numpy.array_equal(numpy.array(expected_closest_clusters), closest_clusters))


    @staticmethod
    def templateClusterAllocationOneDimensionData(ccore_flag):
        input_data = [[0.0] for _ in range(10)] + [[5.0] for _ in range(10)] + [[10.0] for _ in range(10)] + [[15.0] for _ in range(10)]
            
        xmeans_instance = xmeans(input_data, [[0.5], [5.5], [10.5], [15.5]], 20, 0.025, splitting_type.BAYESIAN_INFORMATION_CRITERION, ccore_flag)
        xmeans_instance.process()
        
        clusters = xmeans_instance.get_clusters()
        centers = xmeans_instance.get_centers()

        assertion.eq(len(clusters), 4)
        assertion.eq(len(centers), len(clusters))
        
        assertion.le(len(clusters), 20)
        for cluster in clusters:
            assertion.eq(len(cluster), 10)


    @staticmethod
    def templateMaxAllocatedClusters(ccore_flag, amount_clusters, size_cluster, offset, kinitial, kmax):
        input_data = []
        for index in range(amount_clusters):
            for _ in range(size_cluster):
                input_data.append([random.random() * index * offset, random.random() * index * offset])
        
        initial_centers = random_center_initializer(input_data, kinitial).initialize()
        xmeans_instance = xmeans(input_data, initial_centers, kmax, 0.025, splitting_type.BAYESIAN_INFORMATION_CRITERION, ccore_flag)
        xmeans_instance.process()
        
        clusters = xmeans_instance.get_clusters()
        centers = xmeans_instance.get_centers()

        if len(clusters) != len(centers):
            print(input_data)
            print(initial_centers)

        assertion.ge(kmax, len(clusters))
        assertion.ge(kmax, len(centers))
        assertion.eq(len(clusters), len(centers))


    @staticmethod
    def random_state(ccore_flag, kinitial, kmax, random_state):
        data = read_sample(SIMPLE_SAMPLES.SAMPLE_SIMPLE5)

        initial_centers = random_center_initializer(data, kinitial, random_state=random_state).initialize()
        xmeans_instance1 = xmeans(data, initial_centers, kmax, ccore=ccore_flag, random_state=random_state).process()
        xmeans_instance2 = xmeans(data, initial_centers, kmax, ccore=ccore_flag, random_state=random_state).process()

        assertion.eq(xmeans_instance1.get_total_wce(), xmeans_instance2.get_total_wce())
        assertion.eq(xmeans_instance1.get_centers(), xmeans_instance2.get_centers())
        assertion.eq(xmeans_instance1.get_clusters(), xmeans_instance2.get_clusters())