1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335
|
#-------------------------------------------------------------------------------
#
# Define classes for (uni/multi)-variate kernel density estimation.
#
# Currently, only Gaussian kernels are implemented.
#
# Written by: Robert Kern
#
# Date: 2004-08-09
#
# Modified: 2005-02-10 by Robert Kern.
# Contributed to Scipy
# 2005-10-07 by Robert Kern.
# Some fixes to match the new scipy_core
#
# Copyright 2004-2005 by Enthought, Inc.
#
#-------------------------------------------------------------------------------
# Standard library imports.
import warnings
# Scipy imports.
from scipy import linalg, special
from numpy import atleast_2d, reshape, zeros, newaxis, dot, exp, pi, sqrt, \
ravel, power, atleast_1d, squeeze, sum, transpose
from numpy.random import randint, multivariate_normal
# Local imports.
import stats
import mvn
__all__ = ['gaussian_kde',
]
class gaussian_kde(object):
""" Representation of a kernel-density estimate using Gaussian kernels.
Parameters
----------
dataset : (# of dims, # of data)-array
datapoints to estimate from
Members
-------
d : int
number of dimensions
n : int
number of datapoints
Methods
-------
kde.evaluate(points) : array
evaluate the estimated pdf on a provided set of points
kde(points) : array
same as kde.evaluate(points)
kde.integrate_gaussian(mean, cov) : float
multiply pdf with a specified Gaussian and integrate over the whole domain
kde.integrate_box_1d(low, high) : float
integrate pdf (1D only) between two bounds
kde.integrate_box(low_bounds, high_bounds) : float
integrate pdf over a rectangular space between low_bounds and high_bounds
kde.integrate_kde(other_kde) : float
integrate two kernel density estimates multiplied together
Internal Methods
----------------
kde.covariance_factor() : float
computes the coefficient that multiplies the data covariance matrix to
obtain the kernel covariance matrix. Set this method to
kde.scotts_factor or kde.silverman_factor (or subclass to provide your
own). The default is scotts_factor.
"""
def __init__(self, dataset):
self.dataset = atleast_2d(dataset)
self.d, self.n = self.dataset.shape
self._compute_covariance()
def evaluate(self, points):
"""Evaluate the estimated pdf on a set of points.
Parameters
----------
points : (# of dimensions, # of points)-array
Alternatively, a (# of dimensions,) vector can be passed in and
treated as a single point.
Returns
-------
values : (# of points,)-array
The values at each point.
Raises
------
ValueError if the dimensionality of the input points is different than
the dimensionality of the KDE.
"""
points = atleast_2d(points).astype(self.dataset.dtype)
d, m = points.shape
if d != self.d:
if d == 1 and m == self.d:
# points was passed in as a row vector
points = reshape(points, (self.d, 1))
m = 1
else:
msg = "points have dimension %s, dataset has dimension %s" % (d,
self.d)
raise ValueError(msg)
result = zeros((m,), points.dtype)
if m >= self.n:
# there are more points than data, so loop over data
for i in range(self.n):
diff = self.dataset[:,i,newaxis] - points
tdiff = dot(self.inv_cov, diff)
energy = sum(diff*tdiff,axis=0)/2.0
result += exp(-energy)
else:
# loop over points
for i in range(m):
diff = self.dataset - points[:,i,newaxis]
tdiff = dot(self.inv_cov, diff)
energy = sum(diff*tdiff,axis=0)/2.0
result[i] = sum(exp(-energy),axis=0)
result /= self._norm_factor
return result
__call__ = evaluate
def integrate_gaussian(self, mean, cov):
"""Multiply estimated density by a multivariate Gaussian and integrate
over the wholespace.
Parameters
----------
mean : vector
the mean of the Gaussian
cov : matrix
the covariance matrix of the Gaussian
Returns
-------
result : scalar
the value of the integral
Raises
------
ValueError if the mean or covariance of the input Gaussian differs from
the KDE's dimensionality.
"""
mean = atleast_1d(squeeze(mean))
cov = atleast_2d(cov)
if mean.shape != (self.d,):
raise ValueError("mean does not have dimension %s" % self.d)
if cov.shape != (self.d, self.d):
raise ValueError("covariance does not have dimension %s" % self.d)
# make mean a column vector
mean = mean[:,newaxis]
sum_cov = self.covariance + cov
diff = self.dataset - mean
tdiff = dot(linalg.inv(sum_cov), diff)
energies = sum(diff*tdiff,axis=0)/2.0
result = sum(exp(-energies),axis=0)/sqrt(linalg.det(2*pi*sum_cov))/self.n
return result
def integrate_box_1d(self, low, high):
"""Computes the integral of a 1D pdf between two bounds.
Parameters
----------
low : scalar
lower bound of integration
high : scalar
upper bound of integration
Returns
-------
value : scalar
the result of the integral
Raises
------
ValueError if the KDE is over more than one dimension.
"""
if self.d != 1:
raise ValueError("integrate_box_1d() only handles 1D pdfs")
stdev = ravel(sqrt(self.covariance))[0]
normalized_low = ravel((low - self.dataset)/stdev)
normalized_high = ravel((high - self.dataset)/stdev)
value = stats.mean(special.ndtr(normalized_high) -
special.ndtr(normalized_low))
return value
def integrate_box(self, low_bounds, high_bounds, maxpts=None):
"""Computes the integral of a pdf over a rectangular interval.
Parameters
----------
low_bounds : vector
lower bounds of integration
high_bounds : vector
upper bounds of integration
maxpts=None : int
maximum number of points to use for integration
Returns
-------
value : scalar
the result of the integral
"""
if maxpts is not None:
extra_kwds = {'maxpts': maxpts}
else:
extra_kwds = {}
value, inform = mvn.mvnun(low_bounds, high_bounds, self.dataset,
self.covariance, **extra_kwds)
if inform:
msg = ('an integral in mvn.mvnun requires more points than %s' %
(self.d*1000))
warnings.warn(msg)
return value
def integrate_kde(self, other):
"""Computes the integral of the product of this kernel density estimate
with another.
Parameters
----------
other : gaussian_kde instance
the other kde
Returns
-------
value : scalar
the result of the integral
Raises
------
ValueError if the KDEs have different dimensionality.
"""
if other.d != self.d:
raise ValueError("KDEs are not the same dimensionality")
# we want to iterate over the smallest number of points
if other.n < self.n:
small = other
large = self
else:
small = self
large = other
sum_cov = small.covariance + large.covariance
result = 0.0
for i in range(small.n):
mean = small.dataset[:,i,newaxis]
diff = large.dataset - mean
tdiff = dot(linalg.inv(sum_cov), diff)
energies = sum(diff*tdiff,axis=0)/2.0
result += sum(exp(-energies),axis=0)
result /= sqrt(linalg.det(2*pi*sum_cov))*large.n*small.n
return result
def resample(self, size=None):
"""Randomly sample a dataset from the estimated pdf.
Parameters
----------
size : int, optional
The number of samples to draw.
If not provided, then the size is the same as the underlying
dataset.
Returns
-------
dataset : (self.d, size)-array
sampled dataset
"""
if size is None:
size = self.n
norm = transpose(multivariate_normal(zeros((self.d,), float),
self.covariance, size=size))
indices = randint(0, self.n, size=size)
means = self.dataset[:,indices]
return means + norm
def scotts_factor(self):
return power(self.n, -1./(self.d+4))
def silverman_factor(self):
return power(self.n*(self.d+2.0)/4.0, -1./(self.d+4))
# This can be replaced with silverman_factor if one wants to use Silverman's
# rule for choosing the bandwidth of the kernels.
covariance_factor = scotts_factor
def _compute_covariance(self):
"""Computes the covariance matrix for each Gaussian kernel using
covariance_factor
"""
self.factor = self.covariance_factor()
self.covariance = atleast_2d(stats.cov(self.dataset, rowvar=1) *
self.factor * self.factor)
self.inv_cov = linalg.inv(self.covariance)
self._norm_factor = sqrt(linalg.det(2*pi*self.covariance)) * self.n
|