File: compensator.py

package info (click to toggle)
python-mne 0.8.6%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 87,892 kB
  • ctags: 6,639
  • sloc: python: 54,697; makefile: 165; sh: 15
file content (160 lines) | stat: -rw-r--r-- 5,199 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import numpy as np

from .constants import FIFF


def get_current_comp(info):
    """Get the current compensation in effect in the data
    """
    comp = None
    first_comp = -1
    for k, chan in enumerate(info['chs']):
        if chan['kind'] == FIFF.FIFFV_MEG_CH:
            comp = int(chan['coil_type']) >> 16
            if first_comp < 0:
                first_comp = comp
            elif comp != first_comp:
                raise ValueError('Compensation is not set equally on '
                                 'all MEG channels')
    return comp


def set_current_comp(info, comp):
    """Set the current compensation in effect in the data
    """
    comp_now = get_current_comp(info)
    for k, chan in enumerate(info['chs']):
        if chan['kind'] == FIFF.FIFFV_MEG_CH:
            rem = chan['coil_type'] - (comp_now << 16)
            chan['coil_type'] = int(rem + (comp << 16))


def _make_compensator(info, kind):
    """Auxiliary function for make_compensator
    """
    for k in range(len(info['comps'])):
        if info['comps'][k]['kind'] == kind:
            this_data = info['comps'][k]['data']

            #   Create the preselector
            presel = np.zeros((this_data['ncol'], info['nchan']))
            for col, col_name in enumerate(this_data['col_names']):
                ind = [k for k, ch in enumerate(info['ch_names'])
                       if ch == col_name]
                if len(ind) == 0:
                    raise ValueError('Channel %s is not available in '
                                     'data' % col_name)
                elif len(ind) > 1:
                    raise ValueError('Ambiguous channel %s' % col_name)
                presel[col, ind[0]] = 1.0

            #   Create the postselector
            postsel = np.zeros((info['nchan'], this_data['nrow']))
            for c, ch_name in enumerate(info['ch_names']):
                ind = [k for k, ch in enumerate(this_data['row_names'])
                       if ch == ch_name]
                if len(ind) > 1:
                    raise ValueError('Ambiguous channel %s' % ch_name)
                elif len(ind) == 1:
                    postsel[c, ind[0]] = 1.0
            this_comp = np.dot(postsel, np.dot(this_data['data'], presel))
            return this_comp

    raise ValueError('Desired compensation matrix (kind = %d) not'
                     ' found' % kind)


def make_compensator(info, from_, to, exclude_comp_chs=False):
    """Returns compensation matrix eg. for CTF system.

    Create a compensation matrix to bring the data from one compensation
    state to another.

    Parameters
    ----------
    info : dict
        The measurement info.
    from_ : int
        Compensation in the input data.
    to : int
        Desired compensation in the output.
    exclude_comp_chs : bool
        Exclude compensation channels from the output.

    Returns
    -------
    comp : array | None.
        The compensation matrix. Might be None if no compensation
        is needed (from == to).
    """
    if from_ == to:
        return None

    if from_ == 0:
        C1 = np.zeros((info['nchan'], info['nchan']))
    else:
        C1 = _make_compensator(info, from_)

    if to == 0:
        C2 = np.zeros((info['nchan'], info['nchan']))
    else:
        C2 = _make_compensator(info, to)

    #   s_orig = s_from + C1*s_from = (I + C1)*s_from
    #   s_to   = s_orig - C2*s_orig = (I - C2)*s_orig
    #   s_to   = (I - C2)*(I + C1)*s_from = (I + C1 - C2 - C2*C1)*s_from
    comp = np.eye(info['nchan']) + C1 - C2 - np.dot(C2, C1)

    if exclude_comp_chs:
        pick = [k for k, c in enumerate(info['chs'])
                if c['kind'] != FIFF.FIFFV_REF_MEG_CH]

        if len(pick) == 0:
            raise ValueError('Nothing remains after excluding the '
                             'compensation channels')

        comp = comp[pick, :]

    return comp


# @verbose
# def compensate_to(data, to, verbose=None):
#     """
#     %
#     % [newdata] = mne_compensate_to(data,to)
#     %
#     % Apply compensation to the data as desired
#     %
#     """
#
#     newdata = data.copy()
#     now = get_current_comp(newdata['info'])
#
#     #   Are we there already?
#     if now == to:
#         logger.info('Data are already compensated as desired')
#
#     #   Make the compensator and apply it to all data sets
#     comp = make_compensator(newdata['info'], now, to)
#     for k in range(len(newdata['evoked'])):
#         newdata['evoked'][k]['epochs'] = np.dot(comp,
#                                               newdata['evoked'][k]['epochs'])
#
#     #  Update the compensation info in the channel descriptors
#     newdata['info']['chs'] = set_current_comp(newdata['info']['chs'], to)
#     return newdata


# def set_current_comp(chs, value):
#     """Set the current compensation value in the channel info structures
#     """
#     new_chs = chs
#
#     lower_half = int('FFFF', 16) # hex2dec('FFFF')
#     for k in range(len(chs)):
#         if chs[k]['kind'] == FIFF.FIFFV_MEG_CH:
#             coil_type = float(chs[k]['coil_type']) & lower_half
#             new_chs[k]['coil_type'] = int(coil_type | (value << 16))
#
#     return new_chs