File: mockclient.py

package info (click to toggle)
python-mne 0.17%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 95,104 kB
  • sloc: python: 110,639; makefile: 222; sh: 15
file content (195 lines) | stat: -rw-r--r-- 6,441 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# Authors: Mainak Jas <mainak@neuro.hut.fi>
#          Denis Engemann <denis.engemann@gmail.com>
#          Alexandre Gramfort <alexandre.gramfort@telecom-paristech.fr>
#
# License: BSD (3-clause)

import copy
import numpy as np
from ..event import find_events


class MockRtClient(object):
    """Mock Realtime Client.

    Parameters
    ----------
    raw : instance of Raw object
        The raw object which simulates the RtClient
    verbose : bool, str, int, or None
        If not None, override default verbose level (see :func:`mne.verbose`
        and :ref:`Logging documentation <tut_logging>` for more).
    """

    def __init__(self, raw, verbose=None):  # noqa: D102
        self.raw = raw
        self.info = copy.deepcopy(self.raw.info)
        self.verbose = verbose

        self._current = dict()  # pointer to current index for the event
        self._last = dict()  # Last index for the event

    def get_measurement_info(self):
        """Return the measurement info.

        Returns
        -------
        self.info : dict
            The measurement info.
        """
        return self.info

    def send_data(self, epochs, picks, tmin, tmax, buffer_size):
        """Read from raw object and send them to RtEpochs for processing.

        Parameters
        ----------
        epochs : instance of RtEpochs
            The epochs object.
        picks : array-like of int
            Indices of channels.
        tmin : float
            Time instant to start receiving buffers.
        tmax : float
            Time instant to stop receiving buffers.
        buffer_size : int
            Size of each buffer in terms of number of samples.
        """
        # this is important to emulate a thread, instead of automatically
        # or constantly sending data, we will invoke this explicitly to send
        # the next buffer

        sfreq = self.info['sfreq']
        tmin_samp = int(round(sfreq * tmin))
        tmax_samp = int(round(sfreq * tmax))

        iter_times = list(zip(
            list(range(tmin_samp, tmax_samp, buffer_size)),
            list(range(buffer_size, tmax_samp + 1, buffer_size))))
        last_iter_sample = iter_times[-1][1] if iter_times else 0
        if last_iter_sample < tmax_samp:
            iter_times.append((last_iter_sample, tmax_samp))

        for ii, (start, stop) in enumerate(iter_times):
            # channels are picked in _append_epoch_to_queue. No need to pick
            # here
            data, times = self.raw[:, start:stop]

            # to undo the calibration done in _process_raw_buffer
            cals = np.array([[self.info['chs'][k]['range'] *
                              self.info['chs'][k]['cal'] for k in picks]]).T

            data[picks, :] = data[picks, :] / cals

            epochs._process_raw_buffer(data)

    # The following methods do not seem to be important for this use case,
    # but they need to be present for the emulation to work because
    # RtEpochs expects them to be there.

    def get_event_data(self, event_id, tmin, tmax, picks, stim_channel=None,
                       min_duration=0):
        """Simulate the data for a particular event-id.

        The epochs corresponding to a particular event-id are returned. The
        method remembers the epoch that was returned in the previous call and
        returns the next epoch in sequence. Once all epochs corresponding to
        an event-id have been exhausted, the method returns None.

        Parameters
        ----------
        event_id : int
            The id of the event to consider.
        tmin : float
            Start time before event.
        tmax : float
            End time after event.
        picks : array-like of int
            Indices of channels.
        stim_channel : None | string | list of string
            Name of the stim channel or all the stim channels
            affected by the trigger. If None, the config variables
            'MNE_STIM_CHANNEL', 'MNE_STIM_CHANNEL_1', 'MNE_STIM_CHANNEL_2',
            etc. are read. If these are not found, it will default to
            'STI 014'.
        min_duration : float
            The minimum duration of a change in the events channel required
            to consider it as an event (in seconds).

        Returns
        -------
        data : 2D array with shape [n_channels, n_times]
            The epochs that are being simulated
        """
        # Get the list of all events
        events = find_events(self.raw, stim_channel=stim_channel,
                             verbose=False, output='onset',
                             consecutive='increasing',
                             min_duration=min_duration)

        # Get the list of only the specified event
        idx = np.where(events[:, -1] == event_id)[0]
        event_samp = events[idx, 0]

        # Only do this the first time for each event type
        if event_id not in self._current:

            # Initialize pointer for the event to 0
            self._current[event_id] = 0
            self._last[event_id] = len(event_samp)

        # relative start and stop positions in samples
        tmin_samp = int(round(self.info['sfreq'] * tmin))
        tmax_samp = int(round(self.info['sfreq'] * tmax)) + 1

        if self._current[event_id] < self._last[event_id]:

            # Select the current event from the events list
            ev_samp = event_samp[self._current[event_id]]

            # absolute start and stop positions in samples
            start = ev_samp + tmin_samp - self.raw.first_samp
            stop = ev_samp + tmax_samp - self.raw.first_samp

            self._current[event_id] += 1  # increment pointer

            data, _ = self.raw[picks, start:stop]

            return data

        else:
            return None

    def register_receive_callback(self, x):
        """Fake API boilerplate.

        Parameters
        ----------
        x : None
            Not used.
        """
        pass

    def start_receive_thread(self, x):
        """Fake API boilerplate.

        Parameters
        ----------
        x : None
            Not used.
        """
        pass

    def unregister_receive_callback(self, x):
        """Fake API boilerplate.

        Parameters
        ----------
        x : None
            Not used.
        """  # noqa: D401
        pass

    def _stop_receive_thread(self):
        """Fake API boilerplate."""
        pass