1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307
|
# Authors: Manoj Kumar mks542@nyu.edu
# License: BSD 3 clause
import numpy as np
from scipy import optimize, sparse
from ..base import BaseEstimator, RegressorMixin
from .base import LinearModel
from ..utils import check_X_y
from ..utils import check_consistent_length
from ..utils import axis0_safe_slice
from ..utils.extmath import safe_sparse_dot
def _huber_loss_and_gradient(w, X, y, epsilon, alpha, sample_weight=None):
"""Returns the Huber loss and the gradient.
Parameters
----------
w : ndarray, shape (n_features + 1,) or (n_features + 2,)
Feature vector.
w[:n_features] gives the coefficients
w[-1] gives the scale factor and if the intercept is fit w[-2]
gives the intercept factor.
X : ndarray, shape (n_samples, n_features)
Input data.
y : ndarray, shape (n_samples,)
Target vector.
epsilon : float
Robustness of the Huber estimator.
alpha : float
Regularization parameter.
sample_weight : ndarray, shape (n_samples,), optional
Weight assigned to each sample.
Returns
-------
loss : float
Huber loss.
gradient : ndarray, shape (len(w))
Returns the derivative of the Huber loss with respect to each
coefficient, intercept and the scale as a vector.
"""
X_is_sparse = sparse.issparse(X)
_, n_features = X.shape
fit_intercept = (n_features + 2 == w.shape[0])
if fit_intercept:
intercept = w[-2]
sigma = w[-1]
w = w[:n_features]
n_samples = np.sum(sample_weight)
# Calculate the values where |y - X'w -c / sigma| > epsilon
# The values above this threshold are outliers.
linear_loss = y - safe_sparse_dot(X, w)
if fit_intercept:
linear_loss -= intercept
abs_linear_loss = np.abs(linear_loss)
outliers_mask = abs_linear_loss > epsilon * sigma
# Calculate the linear loss due to the outliers.
# This is equal to (2 * M * |y - X'w -c / sigma| - M**2) * sigma
outliers = abs_linear_loss[outliers_mask]
num_outliers = np.count_nonzero(outliers_mask)
n_non_outliers = X.shape[0] - num_outliers
# n_sq_outliers includes the weight give to the outliers while
# num_outliers is just the number of outliers.
outliers_sw = sample_weight[outliers_mask]
n_sw_outliers = np.sum(outliers_sw)
outlier_loss = (2. * epsilon * np.sum(outliers_sw * outliers) -
sigma * n_sw_outliers * epsilon ** 2)
# Calculate the quadratic loss due to the non-outliers.-
# This is equal to |(y - X'w - c)**2 / sigma**2| * sigma
non_outliers = linear_loss[~outliers_mask]
weighted_non_outliers = sample_weight[~outliers_mask] * non_outliers
weighted_loss = np.dot(weighted_non_outliers.T, non_outliers)
squared_loss = weighted_loss / sigma
if fit_intercept:
grad = np.zeros(n_features + 2)
else:
grad = np.zeros(n_features + 1)
# Gradient due to the squared loss.
X_non_outliers = -axis0_safe_slice(X, ~outliers_mask, n_non_outliers)
grad[:n_features] = (
2. / sigma * safe_sparse_dot(weighted_non_outliers, X_non_outliers))
# Gradient due to the linear loss.
signed_outliers = np.ones_like(outliers)
signed_outliers_mask = linear_loss[outliers_mask] < 0
signed_outliers[signed_outliers_mask] = -1.0
X_outliers = axis0_safe_slice(X, outliers_mask, num_outliers)
sw_outliers = sample_weight[outliers_mask] * signed_outliers
grad[:n_features] -= 2. * epsilon * (
safe_sparse_dot(sw_outliers, X_outliers))
# Gradient due to the penalty.
grad[:n_features] += alpha * 2. * w
# Gradient due to sigma.
grad[-1] = n_samples
grad[-1] -= n_sw_outliers * epsilon ** 2
grad[-1] -= squared_loss / sigma
# Gradient due to the intercept.
if fit_intercept:
grad[-2] = -2. * np.sum(weighted_non_outliers) / sigma
grad[-2] -= 2. * epsilon * np.sum(sw_outliers)
loss = n_samples * sigma + squared_loss + outlier_loss
loss += alpha * np.dot(w, w)
return loss, grad
class HuberRegressor(LinearModel, RegressorMixin, BaseEstimator):
"""Linear regression model that is robust to outliers.
The Huber Regressor optimizes the squared loss for the samples where
``|(y - X'w) / sigma| < epsilon`` and the absolute loss for the samples
where ``|(y - X'w) / sigma| > epsilon``, where w and sigma are parameters
to be optimized. The parameter sigma makes sure that if y is scaled up
or down by a certain factor, one does not need to rescale epsilon to
achieve the same robustness. Note that this does not take into account
the fact that the different features of X may be of different scales.
This makes sure that the loss function is not heavily influenced by the
outliers while not completely ignoring their effect.
Read more in the :ref:`User Guide <huber_regression>`
.. versionadded:: 0.18
Parameters
----------
epsilon : float, greater than 1.0, default 1.35
The parameter epsilon controls the number of samples that should be
classified as outliers. The smaller the epsilon, the more robust it is
to outliers.
max_iter : int, default 100
Maximum number of iterations that scipy.optimize.fmin_l_bfgs_b
should run for.
alpha : float, default 0.0001
Regularization parameter.
warm_start : bool, default False
This is useful if the stored attributes of a previously used model
has to be reused. If set to False, then the coefficients will
be rewritten for every call to fit.
See :term:`the Glossary <warm_start>`.
fit_intercept : bool, default True
Whether or not to fit the intercept. This can be set to False
if the data is already centered around the origin.
tol : float, default 1e-5
The iteration will stop when
``max{|proj g_i | i = 1, ..., n}`` <= ``tol``
where pg_i is the i-th component of the projected gradient.
Attributes
----------
coef_ : array, shape (n_features,)
Features got by optimizing the Huber loss.
intercept_ : float
Bias.
scale_ : float
The value by which ``|y - X'w - c|`` is scaled down.
n_iter_ : int
Number of iterations that fmin_l_bfgs_b has run for.
.. versionchanged:: 0.20
In SciPy <= 1.0.0 the number of lbfgs iterations may exceed
``max_iter``. ``n_iter_`` will now report at most ``max_iter``.
outliers_ : array, shape (n_samples,)
A boolean mask which is set to True where the samples are identified
as outliers.
Examples
--------
>>> import numpy as np
>>> from sklearn.linear_model import HuberRegressor, LinearRegression
>>> from sklearn.datasets import make_regression
>>> np.random.seed(0)
>>> X, y, coef = make_regression(
... n_samples=200, n_features=2, noise=4.0, coef=True, random_state=0)
>>> X[:4] = np.random.uniform(10, 20, (4, 2))
>>> y[:4] = np.random.uniform(10, 20, 4)
>>> huber = HuberRegressor().fit(X, y)
>>> huber.score(X, y) # doctest: +ELLIPSIS
-7.284608623514573
>>> huber.predict(X[:1,])
array([806.7200...])
>>> linear = LinearRegression().fit(X, y)
>>> print("True coefficients:", coef)
True coefficients: [20.4923... 34.1698...]
>>> print("Huber coefficients:", huber.coef_)
Huber coefficients: [17.7906... 31.0106...]
>>> print("Linear Regression coefficients:", linear.coef_)
Linear Regression coefficients: [-1.9221... 7.0226...]
References
----------
.. [1] Peter J. Huber, Elvezio M. Ronchetti, Robust Statistics
Concomitant scale estimates, pg 172
.. [2] Art B. Owen (2006), A robust hybrid of lasso and ridge regression.
http://statweb.stanford.edu/~owen/reports/hhu.pdf
"""
def __init__(self, epsilon=1.35, max_iter=100, alpha=0.0001,
warm_start=False, fit_intercept=True, tol=1e-05):
self.epsilon = epsilon
self.max_iter = max_iter
self.alpha = alpha
self.warm_start = warm_start
self.fit_intercept = fit_intercept
self.tol = tol
def fit(self, X, y, sample_weight=None):
"""Fit the model according to the given training data.
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training vector, where n_samples in the number of samples and
n_features is the number of features.
y : array-like, shape (n_samples,)
Target vector relative to X.
sample_weight : array-like, shape (n_samples,)
Weight given to each sample.
Returns
-------
self : object
"""
X, y = check_X_y(
X, y, copy=False, accept_sparse=['csr'], y_numeric=True)
if sample_weight is not None:
sample_weight = np.array(sample_weight)
check_consistent_length(y, sample_weight)
else:
sample_weight = np.ones_like(y)
if self.epsilon < 1.0:
raise ValueError(
"epsilon should be greater than or equal to 1.0, got %f"
% self.epsilon)
if self.warm_start and hasattr(self, 'coef_'):
parameters = np.concatenate(
(self.coef_, [self.intercept_, self.scale_]))
else:
if self.fit_intercept:
parameters = np.zeros(X.shape[1] + 2)
else:
parameters = np.zeros(X.shape[1] + 1)
# Make sure to initialize the scale parameter to a strictly
# positive value:
parameters[-1] = 1
# Sigma or the scale factor should be non-negative.
# Setting it to be zero might cause undefined bounds hence we set it
# to a value close to zero.
bounds = np.tile([-np.inf, np.inf], (parameters.shape[0], 1))
bounds[-1][0] = np.finfo(np.float64).eps * 10
parameters, f, dict_ = optimize.fmin_l_bfgs_b(
_huber_loss_and_gradient, parameters,
args=(X, y, self.epsilon, self.alpha, sample_weight),
maxiter=self.max_iter, pgtol=self.tol, bounds=bounds,
iprint=0)
if dict_['warnflag'] == 2:
raise ValueError("HuberRegressor convergence failed:"
" l-BFGS-b solver terminated with %s"
% dict_['task'].decode('ascii'))
# In scipy <= 1.0.0, nit may exceed maxiter.
# See https://github.com/scipy/scipy/issues/7854.
self.n_iter_ = min(dict_['nit'], self.max_iter)
self.scale_ = parameters[-1]
if self.fit_intercept:
self.intercept_ = parameters[-2]
else:
self.intercept_ = 0.0
self.coef_ = parameters[:X.shape[1]]
residual = np.abs(
y - safe_sparse_dot(X, self.coef_) - self.intercept_)
self.outliers_ = residual > self.scale_ * self.epsilon
return self
|