1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
|
# Authors: Eric Larson <larson.eric.d@gmail.com>
#
# License: BSD (3-clause)
import os
from os import path as op
import shutil
import warnings
import numpy as np
from nose.tools import assert_raises, assert_true, assert_false
from numpy.testing import assert_allclose, assert_array_equal, assert_equal
from mne import pick_types
from mne.tests.common import assert_dig_allclose
from mne.transforms import apply_trans
from mne.io import Raw, read_raw_ctf
from mne.io.tests.test_raw import _test_raw_reader
from mne.utils import _TempDir, run_tests_if_main, slow_test
from mne.datasets import testing, spm_face
ctf_dir = op.join(testing.data_path(download=False), 'CTF')
ctf_fname_continuous = 'testdata_ctf.ds'
ctf_fname_1_trial = 'testdata_ctf_short.ds'
ctf_fname_2_trials = 'testdata_ctf_pseudocontinuous.ds'
ctf_fname_discont = 'testdata_ctf_short_discontinuous.ds'
ctf_fname_somato = 'somMDYO-18av.ds'
ctf_fname_catch = 'catch-alp-good-f.ds'
block_sizes = {
ctf_fname_continuous: 12000,
ctf_fname_1_trial: 4801,
ctf_fname_2_trials: 12000,
ctf_fname_discont: 1201,
ctf_fname_somato: 313,
ctf_fname_catch: 2500,
}
single_trials = (
ctf_fname_continuous,
ctf_fname_1_trial,
)
ctf_fnames = tuple(sorted(block_sizes.keys()))
@slow_test
@testing.requires_testing_data
def test_read_ctf():
"""Test CTF reader"""
temp_dir = _TempDir()
out_fname = op.join(temp_dir, 'test_py_raw.fif')
# Create a dummy .eeg file so we can test our reading/application of it
os.mkdir(op.join(temp_dir, 'randpos'))
ctf_eeg_fname = op.join(temp_dir, 'randpos', ctf_fname_catch)
shutil.copytree(op.join(ctf_dir, ctf_fname_catch), ctf_eeg_fname)
with warnings.catch_warnings(record=True) as w: # reclassified ch
raw = _test_raw_reader(read_raw_ctf, directory=ctf_eeg_fname)
assert_true(all('MISC channel' in str(ww.message) for ww in w))
picks = pick_types(raw.info, meg=False, eeg=True)
pos = np.random.RandomState(42).randn(len(picks), 3)
fake_eeg_fname = op.join(ctf_eeg_fname, 'catch-alp-good-f.eeg')
# Create a bad file
with open(fake_eeg_fname, 'wb') as fid:
fid.write('foo\n'.encode('ascii'))
assert_raises(RuntimeError, read_raw_ctf, ctf_eeg_fname)
# Create a good file
with open(fake_eeg_fname, 'wb') as fid:
for ii, ch_num in enumerate(picks):
args = (str(ch_num + 1), raw.ch_names[ch_num],) + tuple(
'%0.5f' % x for x in 100 * pos[ii]) # convert to cm
fid.write(('\t'.join(args) + '\n').encode('ascii'))
pos_read_old = np.array([raw.info['chs'][p]['loc'][:3] for p in picks])
with warnings.catch_warnings(record=True) as w: # reclassified channel
raw = read_raw_ctf(ctf_eeg_fname) # read modified data
assert_true(all('MISC channel' in str(ww.message) for ww in w))
pos_read = np.array([raw.info['chs'][p]['loc'][:3] for p in picks])
assert_allclose(apply_trans(raw.info['ctf_head_t'], pos), pos_read,
rtol=1e-5, atol=1e-5)
assert_true((pos_read == pos_read_old).mean() < 0.1)
shutil.copy(op.join(ctf_dir, 'catch-alp-good-f.ds_randpos_raw.fif'),
op.join(temp_dir, 'randpos', 'catch-alp-good-f.ds_raw.fif'))
# Create a version with no hc, starting out *with* EEG pos (error)
os.mkdir(op.join(temp_dir, 'nohc'))
ctf_no_hc_fname = op.join(temp_dir, 'no_hc', ctf_fname_catch)
shutil.copytree(ctf_eeg_fname, ctf_no_hc_fname)
remove_base = op.join(ctf_no_hc_fname, op.basename(ctf_fname_catch[:-3]))
os.remove(remove_base + '.hc')
with warnings.catch_warnings(record=True): # no coord tr
assert_raises(RuntimeError, read_raw_ctf, ctf_no_hc_fname)
os.remove(remove_base + '.eeg')
shutil.copy(op.join(ctf_dir, 'catch-alp-good-f.ds_nohc_raw.fif'),
op.join(temp_dir, 'no_hc', 'catch-alp-good-f.ds_raw.fif'))
# All our files
use_fnames = [op.join(ctf_dir, c) for c in ctf_fnames]
for fname in use_fnames:
raw_c = Raw(fname + '_raw.fif', add_eeg_ref=False, preload=True)
with warnings.catch_warnings(record=True) as w: # reclassified ch
raw = read_raw_ctf(fname)
assert_true(all('MISC channel' in str(ww.message) for ww in w))
# check info match
assert_array_equal(raw.ch_names, raw_c.ch_names)
assert_allclose(raw.times, raw_c.times)
assert_allclose(raw._cals, raw_c._cals)
for key in ('version', 'usecs'):
assert_equal(raw.info['meas_id'][key], raw_c.info['meas_id'][key])
py_time = raw.info['meas_id']['secs']
c_time = raw_c.info['meas_id']['secs']
max_offset = 24 * 60 * 60 # probably overkill but covers timezone
assert_true(c_time - max_offset <= py_time <= c_time)
for t in ('dev_head_t', 'dev_ctf_t', 'ctf_head_t'):
assert_allclose(raw.info[t]['trans'], raw_c.info[t]['trans'],
rtol=1e-4, atol=1e-7)
for key in ('acq_pars', 'acq_stim', 'bads',
'ch_names', 'custom_ref_applied', 'description',
'events', 'experimenter', 'highpass', 'line_freq',
'lowpass', 'nchan', 'proj_id', 'proj_name',
'projs', 'sfreq', 'subject_info'):
assert_equal(raw.info[key], raw_c.info[key], key)
if op.basename(fname) not in single_trials:
# We don't force buffer size to be smaller like MNE-C
assert_equal(raw.info['buffer_size_sec'],
raw_c.info['buffer_size_sec'])
assert_equal(len(raw.info['comps']), len(raw_c.info['comps']))
for c1, c2 in zip(raw.info['comps'], raw_c.info['comps']):
for key in ('colcals', 'rowcals'):
assert_allclose(c1[key], c2[key])
assert_equal(c1['save_calibrated'], c2['save_calibrated'])
for key in ('row_names', 'col_names', 'nrow', 'ncol'):
assert_array_equal(c1['data'][key], c2['data'][key])
assert_allclose(c1['data']['data'], c2['data']['data'], atol=1e-7,
rtol=1e-5)
assert_allclose(raw.info['hpi_results'][0]['coord_trans']['trans'],
raw_c.info['hpi_results'][0]['coord_trans']['trans'],
rtol=1e-5, atol=1e-7)
assert_equal(len(raw.info['chs']), len(raw_c.info['chs']))
for ii, (c1, c2) in enumerate(zip(raw.info['chs'], raw_c.info['chs'])):
for key in ('kind', 'scanno', 'unit', 'ch_name', 'unit_mul',
'range', 'coord_frame', 'coil_type', 'logno'):
if c1['ch_name'] == 'RMSP' and \
'catch-alp-good-f' in fname and \
key in ('kind', 'unit', 'coord_frame', 'coil_type',
'logno'):
continue # XXX see below...
assert_equal(c1[key], c2[key], err_msg=key)
for key in ('cal',):
assert_allclose(c1[key], c2[key], atol=1e-6, rtol=1e-4,
err_msg='raw.info["chs"][%d][%s]' % (ii, key))
# XXX 2016/02/24: fixed bug with normal computation that used
# to exist, once mne-C tools are updated we should update our FIF
# conversion files, then the slices can go away (and the check
# can be combined with that for "cal")
for key in ('loc',):
if c1['ch_name'] == 'RMSP' and 'catch-alp-good-f' in fname:
continue
assert_allclose(c1[key][:3], c2[key][:3], atol=1e-6, rtol=1e-4,
err_msg='raw.info["chs"][%d][%s]' % (ii, key))
assert_allclose(c1[key][9:12], c2[key][9:12], atol=1e-6,
rtol=1e-4,
err_msg='raw.info["chs"][%d][%s]' % (ii, key))
if fname.endswith('catch-alp-good-f.ds'): # omit points from .pos file
raw.info['dig'] = raw.info['dig'][:-10]
assert_dig_allclose(raw.info, raw_c.info)
# check data match
raw_c.save(out_fname, overwrite=True, buffer_size_sec=1.)
raw_read = Raw(out_fname, add_eeg_ref=False)
# so let's check tricky cases based on sample boundaries
rng = np.random.RandomState(0)
pick_ch = rng.permutation(np.arange(len(raw.ch_names)))[:10]
bnd = int(round(raw.info['sfreq'] * raw.info['buffer_size_sec']))
assert_equal(bnd, raw._raw_extras[0]['block_size'])
assert_equal(bnd, block_sizes[op.basename(fname)])
slices = (slice(0, bnd), slice(bnd - 1, bnd), slice(3, bnd),
slice(3, 300), slice(None))
if len(raw.times) >= 2 * bnd: # at least two complete blocks
slices = slices + (slice(bnd, 2 * bnd), slice(bnd, bnd + 1),
slice(0, bnd + 100))
for sl_time in slices:
assert_allclose(raw[pick_ch, sl_time][0],
raw_c[pick_ch, sl_time][0])
assert_allclose(raw_read[pick_ch, sl_time][0],
raw_c[pick_ch, sl_time][0])
# all data / preload
with warnings.catch_warnings(record=True) as w: # reclassified ch
raw = read_raw_ctf(fname, preload=True)
assert_true(all('MISC channel' in str(ww.message) for ww in w))
assert_allclose(raw[:][0], raw_c[:][0])
raw.plot(show=False) # Test plotting with ref_meg channels.
assert_raises(ValueError, raw.plot, order='selection')
assert_raises(TypeError, read_raw_ctf, 1)
assert_raises(ValueError, read_raw_ctf, ctf_fname_continuous + 'foo.ds')
# test ignoring of system clock
read_raw_ctf(op.join(ctf_dir, ctf_fname_continuous), 'ignore')
assert_raises(ValueError, read_raw_ctf,
op.join(ctf_dir, ctf_fname_continuous), 'foo')
@spm_face.requires_spm_data
def test_read_spm_ctf():
"""Test CTF reader with omitted samples."""
data_path = spm_face.data_path()
raw_fname = op.join(data_path, 'MEG', 'spm',
'SPM_CTF_MEG_example_faces1_3D.ds')
raw = read_raw_ctf(raw_fname)
extras = raw._raw_extras[0]
assert_equal(extras['n_samp'], raw.n_times)
assert_false(extras['n_samp'] == extras['n_samp_tot'])
run_tests_if_main()
|