File: column.py

package info (click to toggle)
orange3 3.40.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 15,908 kB
  • sloc: python: 162,745; ansic: 622; makefile: 322; sh: 93; cpp: 77
file content (155 lines) | stat: -rw-r--r-- 5,875 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
from typing import Optional

import numpy as np

from Orange.data import Variable, DiscreteVariable, Domain, Table
from Orange.classification import LogisticRegressionLearner
from Orange.regression import LinearRegressionLearner
from Orange.modelling import Model, Learner

__all__ = ["ColumnLearner", "ColumnModel"]


def _check_column_combinations(
        class_var: Variable,
        column: Variable,
        fit_regression: bool):
    if class_var.is_continuous:
        if not column.is_continuous:
            raise ValueError(
                "Regression can only be used with numeric variables")
        return

    assert isinstance(class_var, DiscreteVariable)  # remove type warnings
    if column.is_continuous:
        if len(class_var.values) != 2:
            raise ValueError(
                "Numeric columns can only be used with binary class variables")
    else:
        assert isinstance(column, DiscreteVariable)
        if not valid_value_sets(class_var, column):
            raise ValueError(
                "Column contains values that are not in class variable")
    if fit_regression and not column.is_continuous:
        raise ValueError(
            "Intercept and coefficient are only allowed for continuous "
            "variables")


def valid_prob_range(values: np.ndarray):
    return np.nanmin(values) >= 0 and np.nanmax(values) <= 1


def valid_value_sets(class_var: DiscreteVariable,
                     column_var: DiscreteVariable):
    return set(column_var.values) <= set(class_var.values)


class ColumnLearner(Learner):
    def __init__(self,
                 class_var: Variable,
                 column: Variable,
                 fit_regression: bool = False):
        super().__init__()
        _check_column_combinations(class_var, column, fit_regression)
        self.class_var = class_var
        self.column = column
        self.fit_regression = fit_regression
        self.name = f"column '{column.name}'"

    def __fit_coefficients(self, data: Table):
        # Use learners from Orange rather than directly calling
        # scikit-learn, so that we make sure we use the same parameters
        # and get the same result as we would if we used the widgets.
        data1 = data.transform(Domain([self.column], self.class_var))
        if self.class_var.is_discrete:
            model = LogisticRegressionLearner()(data1)
            return model.intercept[0], model.coefficients[0][0]
        else:
            model = LinearRegressionLearner()(data1)
            return model.intercept, model.coefficients[0]

    def fit_storage(self, data: Table):
        if data.domain.class_var != self.class_var:
            raise ValueError("Class variable does not match the data")
        if not self.fit_regression:
            return ColumnModel(self.class_var, self.column)

        intercept, coefficient = self.__fit_coefficients(data)
        return ColumnModel(self.class_var, self.column, intercept, coefficient)


class ColumnModel(Model):
    def __init__(self,
                 class_var: Variable,
                 column: Variable,
                 intercept: Optional[float] = None,
                 coefficient: Optional[float] = None):
        super().__init__(Domain([column], class_var))

        _check_column_combinations(class_var, column, intercept is not None)
        if (intercept is not None) is not (coefficient is not None):
            raise ValueError(
                "Intercept and coefficient must both be provided or absent")

        self.class_var = class_var
        self.column = column
        self.intercept = intercept
        self.coefficient = coefficient
        if (column.is_discrete and
                class_var.values[:len(column.values)] != column.values):
            self.value_mapping = np.array([class_var.to_val(x)
                                          for x in column.values])
        else:
            self.value_mapping = None

        pars = f" ({intercept}, {coefficient})" if intercept is not None else ""
        self.name = f"column '{column.name}'{pars}"

    def predict_storage(self, data: Table):
        vals = data.get_column(self.column)
        if self.class_var.is_discrete:
            return self._predict_discrete(vals)
        else:
            return self._predict_continuous(vals)

    def _predict_discrete(self, vals):
        assert isinstance(self.class_var, DiscreteVariable)
        nclasses = len(self.class_var.values)
        proba = np.full((len(vals), nclasses), np.nan)
        rows = np.isfinite(vals)
        if self.column.is_discrete:
            mapped = vals[rows].astype(int)
            if self.value_mapping is not None:
                mapped = self.value_mapping[mapped]
                vals = vals.copy()
                vals[rows] = mapped
            proba[rows] = 0
            proba[rows, mapped] = 1
        else:
            if self.coefficient is None:
                if not valid_prob_range(vals):
                    raise ValueError("Column values must be in [0, 1] range "
                                     "unless logistic function is applied")
                proba[rows, 1] = vals[rows]
            else:
                proba[rows, 1] = (
                    1 /
                    (1 + np.exp(-self.intercept - self.coefficient * vals[rows])
                     ))

            proba[rows, 0] = 1 - proba[rows, 1]
            vals = (proba[:, 1] > 0.5).astype(float)
            vals[~rows] = np.nan
        return vals, proba

    def _predict_continuous(self, vals):
        if self.coefficient is None:
            return vals
        else:
            return vals * self.coefficient + self.intercept

    def __str__(self):
        pars = f" ({self.intercept}, {self.coefficient})" \
            if self.intercept is not None else ""
        return f'ColumnModel {self.column.name}{pars}'