1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393
|
# Authors: Tal Linzen <linzen@nyu.edu>
# Teon Brooks <teon.brooks@gmail.com>
# Denis A. Engemann <denis.engemann@gmail.com>
# Jona Sassenhagen <jona.sassenhagen@gmail.com>
# Marijn van Vliet <w.m.vanvliet@gmail.com>
#
# License: BSD (3-clause)
from inspect import isgenerator
from collections import namedtuple
import numpy as np
from scipy import linalg, sparse
from ..externals.six import string_types
from ..source_estimate import SourceEstimate
from ..epochs import _BaseEpochs
from ..evoked import Evoked, EvokedArray
from ..utils import logger, _reject_data_segments, warn
from ..io.pick import pick_types, pick_info
def linear_regression(inst, design_matrix, names=None):
"""Fit Ordinary Least Squares regression (OLS)
Parameters
----------
inst : instance of Epochs | iterable of SourceEstimate
The data to be regressed. Contains all the trials, sensors, and time
points for the regression. For Source Estimates, accepts either a list
or a generator object.
design_matrix : ndarray, shape (n_observations, n_regressors)
The regressors to be used. Must be a 2d array with as many rows as
the first dimension of `data`. The first column of this matrix will
typically consist of ones (intercept column).
names : list-like | None
Optional parameter to name the regressors. If provided, the length must
correspond to the number of columns present in regressors
(including the intercept, if present).
Otherwise the default names are x0, x1, x2...xn for n regressors.
Returns
-------
results : dict of namedtuple
For each regressor (key) a namedtuple is provided with the
following attributes:
beta : regression coefficients
stderr : standard error of regression coefficients
t_val : t statistics (beta / stderr)
p_val : two-sided p-value of t statistic under the t distribution
mlog10_p_val : -log10 transformed p-value.
The tuple members are numpy arrays. The shape of each numpy array is
the shape of the data minus the first dimension; e.g., if the shape of
the original data was (n_observations, n_channels, n_timepoints),
then the shape of each of the arrays will be
(n_channels, n_timepoints).
"""
if names is None:
names = ['x%i' % i for i in range(design_matrix.shape[1])]
if isinstance(inst, _BaseEpochs):
picks = pick_types(inst.info, meg=True, eeg=True, ref_meg=True,
stim=False, eog=False, ecg=False,
emg=False, exclude=['bads'])
if [inst.ch_names[p] for p in picks] != inst.ch_names:
warn('Fitting linear model to non-data or bad channels. '
'Check picking')
msg = 'Fitting linear model to epochs'
data = inst.get_data()
out = EvokedArray(np.zeros(data.shape[1:]), inst.info, inst.tmin)
elif isgenerator(inst):
msg = 'Fitting linear model to source estimates (generator input)'
out = next(inst)
data = np.array([out.data] + [i.data for i in inst])
elif isinstance(inst, list) and isinstance(inst[0], SourceEstimate):
msg = 'Fitting linear model to source estimates (list input)'
out = inst[0]
data = np.array([i.data for i in inst])
else:
raise ValueError('Input must be epochs or iterable of source '
'estimates')
logger.info(msg + ', (%s targets, %s regressors)' %
(np.product(data.shape[1:]), len(names)))
lm_params = _fit_lm(data, design_matrix, names)
lm = namedtuple('lm', 'beta stderr t_val p_val mlog10_p_val')
lm_fits = {}
for name in names:
parameters = [p[name] for p in lm_params]
for ii, value in enumerate(parameters):
out_ = out.copy()
if isinstance(out_, SourceEstimate):
out_._data[:] = value
elif isinstance(out_, Evoked):
out_.data[:] = value
else:
raise RuntimeError('Invalid container.')
parameters[ii] = out_
lm_fits[name] = lm(*parameters)
logger.info('Done')
return lm_fits
def _fit_lm(data, design_matrix, names):
"""Aux function"""
from scipy import stats
n_samples = len(data)
n_features = np.product(data.shape[1:])
if design_matrix.ndim != 2:
raise ValueError('Design matrix must be a 2d array')
n_rows, n_predictors = design_matrix.shape
if n_samples != n_rows:
raise ValueError('Number of rows in design matrix must be equal '
'to number of observations')
if n_predictors != len(names):
raise ValueError('Number of regressor names must be equal to '
'number of column in design matrix')
y = np.reshape(data, (n_samples, n_features))
betas, resid_sum_squares, _, _ = linalg.lstsq(a=design_matrix, b=y)
df = n_rows - n_predictors
sqrt_noise_var = np.sqrt(resid_sum_squares / df).reshape(data.shape[1:])
design_invcov = linalg.inv(np.dot(design_matrix.T, design_matrix))
unscaled_stderrs = np.sqrt(np.diag(design_invcov))
tiny = np.finfo(np.float64).tiny
beta, stderr, t_val, p_val, mlog10_p_val = (dict() for _ in range(5))
for x, unscaled_stderr, predictor in zip(betas, unscaled_stderrs, names):
beta[predictor] = x.reshape(data.shape[1:])
stderr[predictor] = sqrt_noise_var * unscaled_stderr
p_val[predictor] = np.empty_like(stderr[predictor])
t_val[predictor] = np.empty_like(stderr[predictor])
stderr_pos = (stderr[predictor] > 0)
beta_pos = (beta[predictor] > 0)
t_val[predictor][stderr_pos] = (beta[predictor][stderr_pos] /
stderr[predictor][stderr_pos])
cdf = stats.t.cdf(np.abs(t_val[predictor][stderr_pos]), df)
p_val[predictor][stderr_pos] = np.clip((1. - cdf) * 2., tiny, 1.)
# degenerate cases
mask = (~stderr_pos & beta_pos)
t_val[predictor][mask] = np.inf * np.sign(beta[predictor][mask])
p_val[predictor][mask] = tiny
# could do NaN here, but hopefully this is safe enough
mask = (~stderr_pos & ~beta_pos)
t_val[predictor][mask] = 0
p_val[predictor][mask] = 1.
mlog10_p_val[predictor] = -np.log10(p_val[predictor])
return beta, stderr, t_val, p_val, mlog10_p_val
def linear_regression_raw(raw, events, event_id=None, tmin=-.1, tmax=1,
covariates=None, reject=None, flat=None, tstep=1.,
decim=1, picks=None, solver='cholesky'):
"""Estimate regression-based evoked potentials/fields by linear modelling
This models the full M/EEG time course, including correction for
overlapping potentials and allowing for continuous/scalar predictors.
Internally, this constructs a predictor matrix X of size
n_samples * (n_conds * window length), solving the linear system
``Y = bX`` and returning ``b`` as evoked-like time series split by
condition. See [1]_.
Parameters
----------
raw : instance of Raw
A raw object. Note: be very careful about data that is not
downsampled, as the resulting matrices can be enormous and easily
overload your computer. Typically, 100 Hz sampling rate is
appropriate - or using the decim keyword (see below).
events : ndarray of int, shape (n_events, 3)
An array where the first column corresponds to samples in raw
and the last to integer codes in event_id.
event_id : dict
As in Epochs; a dictionary where the values may be integers or
iterables of integers, corresponding to the 3rd column of
events, and the keys are condition names.
tmin : float | dict
If float, gives the lower limit (in seconds) for the time window for
which all event types' effects are estimated. If a dict, can be used to
specify time windows for specific event types: keys correspond to keys
in event_id and/or covariates; for missing values, the default (-.1) is
used.
tmax : float | dict
If float, gives the upper limit (in seconds) for the time window for
which all event types' effects are estimated. If a dict, can be used to
specify time windows for specific event types: keys correspond to keys
in event_id and/or covariates; for missing values, the default (1.) is
used.
covariates : dict-like | None
If dict-like (e.g., a pandas DataFrame), values have to be array-like
and of the same length as the columns in ```events```. Keys correspond
to additional event types/conditions to be estimated and are matched
with the time points given by the first column of ```events```. If
None, only binary events (from event_id) are used.
reject : None | dict
For cleaning raw data before the regression is performed: set up
rejection parameters based on peak-to-peak amplitude in continuously
selected subepochs. If None, no rejection is done.
If dict, keys are types ('grad' | 'mag' | 'eeg' | 'eog' | 'ecg')
and values are the maximal peak-to-peak values to select rejected
epochs, e.g.::
reject = dict(grad=4000e-12, # T / m (gradiometers)
mag=4e-11, # T (magnetometers)
eeg=40e-5, # V (EEG channels)
eog=250e-5 # V (EOG channels))
flat : None | dict
or cleaning raw data before the regression is performed: set up
rejection parameters based on flatness of the signal. If None, no
rejection is done. If a dict, keys are ('grad' | 'mag' |
'eeg' | 'eog' | 'ecg') and values are minimal peak-to-peak values to
select rejected epochs.
tstep : float
Length of windows for peak-to-peak detection for raw data cleaning.
decim : int
Decimate by choosing only a subsample of data points. Highly
recommended for data recorded at high sampling frequencies, as
otherwise huge intermediate matrices have to be created and inverted.
picks : None | list
List of indices of channels to be included. If None, defaults to all
MEG and EEG channels.
solver : str | function
Either a function which takes as its inputs the sparse predictor
matrix X and the observation matrix Y, and returns the coefficient
matrix b; or a string. If str, must be ``'cholesky'``, in which case
the solver used is ``linalg.solve(dot(X.T, X), dot(X.T, y))``.
Returns
-------
evokeds : dict
A dict where the keys correspond to conditions and the values are
Evoked objects with the ER[F/P]s. These can be used exactly like any
other Evoked object, including e.g. plotting or statistics.
References
----------
.. [1] Smith, N. J., & Kutas, M. (2015). Regression-based estimation of ERP
waveforms: II. Non-linear effects, overlap correction, and practical
considerations. Psychophysiology, 52(2), 169-189.
"""
if isinstance(solver, string_types):
if solver == 'cholesky':
def solver(X, y):
a = (X.T * X).toarray() # dot product of sparse matrices
return linalg.solve(a, X.T * y.T, sym_pos=True,
overwrite_a=True, overwrite_b=True).T
else:
raise ValueError("No such solver: {0}".format(solver))
# build data
data, info, events = _prepare_rerp_data(raw, events, picks=picks,
decim=decim)
# build predictors
X, conds, cond_length, tmin_s, tmax_s = _prepare_rerp_preds(
n_samples=data.shape[1], sfreq=info["sfreq"], events=events,
event_id=event_id, tmin=tmin, tmax=tmax, covariates=covariates)
# remove "empty" and contaminated data points
X, data = _clean_rerp_input(X, data, reject, flat, decim, info, tstep)
# solve linear system
coefs = solver(X, data)
# construct Evoked objects to be returned from output
evokeds = _make_evokeds(coefs, conds, cond_length, tmin_s, tmax_s, info)
return evokeds
def _prepare_rerp_data(raw, events, picks=None, decim=1):
"""Prepare events and data, primarily for `linear_regression_raw`. See
there for an explanation of parameters and output."""
if picks is None:
picks = pick_types(raw.info, meg=True, eeg=True, ref_meg=True)
info = pick_info(raw.info, picks)
decim = int(decim)
info["sfreq"] /= decim
data, times = raw[:]
data = data[picks, ::decim]
if len(set(events[:, 0])) < len(events[:, 0]):
raise ValueError("`events` contains duplicate time points. Make "
"sure all entries in the first column of `events` "
"are unique.")
events = events.copy()
events[:, 0] -= raw.first_samp
events[:, 0] //= decim
if len(set(events[:, 0])) < len(events[:, 0]):
raise ValueError("After decimating, `events` contains duplicate time "
"points. This means some events are too closely "
"spaced for the requested decimation factor. Choose "
"different events, drop close events, or choose a "
"different decimation factor.")
return data, info, events
def _prepare_rerp_preds(n_samples, sfreq, events, event_id=None, tmin=-.1,
tmax=1, covariates=None):
"""Build predictor matrix as well as metadata (e.g. condition time
windows), primarily for `linear_regression_raw`. See there for
an explanation of parameters and output."""
conds = list(event_id)
if covariates is not None:
conds += list(covariates)
# time windows (per event type) are converted to sample points from times
if isinstance(tmin, (float, int)):
tmin_s = dict((cond, int(tmin * sfreq)) for cond in conds)
else:
tmin_s = dict((cond, int(tmin.get(cond, -.1) * sfreq))
for cond in conds)
if isinstance(tmax, (float, int)):
tmax_s = dict(
(cond, int((tmax * sfreq) + 1.)) for cond in conds)
else:
tmax_s = dict((cond, int((tmax.get(cond, 1.) * sfreq) + 1))
for cond in conds)
# Construct predictor matrix
# We do this by creating one array per event type, shape (lags, samples)
# (where lags depends on tmin/tmax and can be different for different
# event types). Columns correspond to predictors, predictors correspond to
# time lags. Thus, each array is mostly sparse, with one diagonal of 1s
# per event (for binary predictors).
cond_length = dict()
xs = []
for cond in conds:
tmin_, tmax_ = tmin_s[cond], tmax_s[cond]
n_lags = int(tmax_ - tmin_) # width of matrix
if cond in event_id: # for binary predictors
ids = ([event_id[cond]]
if isinstance(event_id[cond], int)
else event_id[cond])
onsets = -(events[np.in1d(events[:, 2], ids), 0] + tmin_)
values = np.ones((len(onsets), n_lags))
else: # for predictors from covariates, e.g. continuous ones
covs = covariates[cond]
if len(covs) != len(events):
error = ("Condition {0} from ```covariates``` is "
"not the same length as ```events```").format(cond)
raise ValueError(error)
onsets = -(events[np.where(covs != 0), 0] + tmin_)[0]
v = np.asarray(covs)[np.nonzero(covs)].astype(float)
values = np.ones((len(onsets), n_lags)) * v[:, np.newaxis]
cond_length[cond] = len(onsets)
xs.append(sparse.dia_matrix((values, onsets),
shape=(n_samples, n_lags)))
return sparse.hstack(xs), conds, cond_length, tmin_s, tmax_s
def _clean_rerp_input(X, data, reject, flat, decim, info, tstep):
"""Remove empty and contaminated points from data and predictor matrices,
for `linear_regression_raw`. See there for an explanation of parameters."""
# find only those positions where at least one predictor isn't 0
has_val = np.unique(X.nonzero()[0])
# reject positions based on extreme steps in the data
if reject is not None:
_, inds = _reject_data_segments(data, reject, flat, decim=None,
info=info, tstep=tstep)
for t0, t1 in inds:
has_val = np.setdiff1d(has_val, range(t0, t1))
return X.tocsr()[has_val], data[:, has_val]
def _make_evokeds(coefs, conds, cond_length, tmin_s, tmax_s, info):
"""Create a dictionary of Evoked objects from a coefs matrix and condition
durations, primarily for `linear_regression_raw`. See there for an
explanation of parameters and output."""
evokeds = dict()
cumul = 0
for cond in conds:
tmin_, tmax_ = tmin_s[cond], tmax_s[cond]
evokeds[cond] = EvokedArray(
coefs[:, cumul:cumul + tmax_ - tmin_], info=info, comment=cond,
tmin=tmin_ / float(info["sfreq"]), nave=cond_length[cond],
kind='average') # nave and kind are technically incorrect
cumul += tmax_ - tmin_
return evokeds
|