File: normalize.py

package info (click to toggle)
orange3 3.40.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 15,908 kB
  • sloc: python: 162,745; ansic: 622; makefile: 322; sh: 93; cpp: 77
file content (83 lines) | stat: -rw-r--r-- 3,308 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import numpy as np

from Orange.data import Domain, ContinuousVariable
from Orange.statistics import basic_stats
from Orange.util import Reprable
from .preprocess import Normalize
from .transformation import Normalizer as Norm
__all__ = ["Normalizer"]


class Normalizer(Reprable):
    def __init__(self,
                 zero_based=True,
                 norm_type=Normalize.NormalizeBySD,
                 transform_class=False,
                 center=True,
                 normalize_datetime=False):
        self.zero_based = zero_based
        self.norm_type = norm_type
        self.transform_class = transform_class
        self.center = center
        self.normalize_datetime = normalize_datetime

    def __call__(self, data):
        stats = basic_stats.DomainBasicStats(data, compute_variance=True)
        new_attrs = [self.normalize(stats[i], var) for
                     (i, var) in enumerate(data.domain.attributes)]

        new_class_vars = data.domain.class_vars
        if self.transform_class:
            attr_len = len(data.domain.attributes)
            new_class_vars = [self.normalize(stats[i + attr_len], var) for
                              (i, var) in enumerate(data.domain.class_vars)]

        domain = Domain(new_attrs, new_class_vars, data.domain.metas)
        return data.transform(domain)

    def normalize(self, stats, var):
        if not var.is_continuous or (var.is_time and not self.normalize_datetime):
            return var
        elif self.norm_type == Normalize.NormalizeBySD:
            var = self.normalize_by_sd(stats, var)
        elif self.norm_type == Normalize.NormalizeBySpan:
            var = self.normalize_by_span(stats, var)
        return var

    def normalize_by_sd(self, stats, var: ContinuousVariable) -> ContinuousVariable:
        avg, sd = (stats.mean, stats.var**0.5)
        if np.isnan(avg):
            avg = 0
        if np.isnan(sd):
            sd = 1
        if sd == 0:
            sd = 1
        if self.center:
            compute_val = Norm(var, avg, 1 / sd)
        else:
            compute_val = Norm(var, 0, 1 / sd)

        # When dealing with integers, and multiplying by something smaller than
        # 1, the number of decimals should be decreased, but this integer will
        # likely turn into a float, which should have some default number of
        # decimals
        num_decimals = var.number_of_decimals + int(np.round(np.log10(sd)))
        num_decimals = max(num_decimals, 1)  # num decimals can't be negative

        return var.copy(compute_value=compute_val, number_of_decimals=num_decimals)

    def normalize_by_span(self, stats, var: ContinuousVariable) -> ContinuousVariable:
        dma, dmi = (stats.max, stats.min)
        diff = dma - dmi
        if diff < 1e-15:
            diff = 1
        if self.zero_based:
            compute_val = Norm(var, dmi, 1 / diff)
        else:
            compute_val = Norm(var, (dma + dmi) / 2, 2 / diff)
        if not np.isnan(diff):
            num_decimals = var.number_of_decimals + int(np.ceil(np.log10(diff)))
            num_decimals = max(num_decimals, 0)  # num decimals can't be negative
            return var.copy(compute_value=compute_val, number_of_decimals=num_decimals)
        else:
            return var.copy(compute_value=compute_val)