File: statemonitor.py

package info (click to toggle)
brian 2.9.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,872 kB
  • sloc: python: 51,820; cpp: 2,033; makefile: 108; sh: 72
file content (427 lines) | stat: -rw-r--r-- 16,978 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
import numbers
from collections.abc import Sequence

import numpy as np

from brian2.core.variables import Variables, get_dtype
from brian2.groups.group import CodeRunner, Group
from brian2.units.allunits import second
from brian2.units.fundamentalunits import Quantity
from brian2.utils.logger import get_logger

__all__ = ["StateMonitor"]

logger = get_logger(__name__)


class StateMonitorView:
    def __init__(self, monitor, item):
        self.monitor = monitor
        self.item = item
        self.indices = self._calc_indices(item)
        self._group_attribute_access_active = True

    def __getattr__(self, item):
        # We do this because __setattr__ and __getattr__ are not active until
        # _group_attribute_access_active attribute is set, and if it is set,
        # then __getattr__ will not be called. Therefore, if getattr is called
        # with this name, it is because it hasn't been set yet and so this
        # method should raise an AttributeError to agree that it hasn't been
        # called yet.
        if item == "_group_attribute_access_active":
            raise AttributeError
        if not hasattr(self, "_group_attribute_access_active"):
            raise AttributeError

        mon = self.monitor
        if item == "t":
            return Quantity(mon.variables["t"].get_value(), dim=second.dim)
        elif item == "t_":
            return mon.variables["t"].get_value()
        elif item in mon.record_variables:
            dims = mon.variables[item].dim
            return Quantity(
                mon.variables[item].get_value().T[self.indices], dim=dims, copy=True
            )
        elif item.endswith("_") and item[:-1] in mon.record_variables:
            return mon.variables[item[:-1]].get_value().T[self.indices].copy()
        else:
            raise AttributeError(f"Unknown attribute {item}")

    def _calc_indices(self, item):
        """
        Convert the neuron indices to indices into the stored values. For example, if neurons [0, 5, 10] have been
        recorded, [5, 10] is converted to [1, 2].
        """
        dtype = get_dtype(item)
        # scalar value
        if np.issubdtype(dtype, np.signedinteger) and not isinstance(item, np.ndarray):
            indices = np.nonzero(self.monitor.record == item)[0]
            if len(indices) == 0:
                raise IndexError(f"Index number {int(item)} has not been recorded")
            return indices[0]

        if self.monitor.record_all:
            return item
        indices = []
        for index in item:
            if index in self.monitor.record:
                indices.append(np.nonzero(self.monitor.record == index)[0][0])
            else:
                raise IndexError(f"Index number {int(index)} has not been recorded")
        return np.array(indices)

    def __repr__(self):
        classname = self.__class__.__name__
        return (
            f"<{classname}, giving access to elements {self.item!r} recorded by "
            f"{self.monitor.name}>"
        )


class StateMonitor(Group, CodeRunner):
    """
    Record values of state variables during a run

    To extract recorded values after a run, use the ``t`` attribute for the
    array of times at which values were recorded, and variable name attribute
    for the values. The values will have shape ``(len(indices), len(t))``,
    where ``indices`` are the array indices which were recorded. When indexing
    the `StateMonitor` directly, the returned object can be used to get the
    recorded values for the specified indices, i.e. the indexing semantic
    refers to the indices in ``source``, not to the relative indices of the
    recorded values. For example, when recording only neurons with even numbers,
    `mon[[0, 2]].v` will return the values for neurons 0 and 2, whereas
    `mon.v[[0, 2]]` will return the values for the first and third *recorded*
    neurons, i.e. for neurons 0 and 4.

    Parameters
    ----------
    source : `Group`
        Which object to record values from.
    variables : str, sequence of str, True
        Which variables to record, or ``True`` to record all variables
        (note that this may use a great deal of memory).
    record : bool, sequence of ints
        Which indices to record, nothing is recorded for ``False``,
        everything is recorded for ``True`` (warning: may use a great deal of
        memory), or a specified subset of indices.
    dt : `Quantity`, optional
        The time step to be used for the monitor. Cannot be combined with
        the `clock` argument.
    clock : `Clock`, optional
        The update clock to be used. If neither a clock, nor the ``dt`` argument
        is specified, the clock of the `source` will be used.
    when : str, optional
        At which point during a time step the values should be recorded.
        Defaults to ``'start'``. See :ref:`scheduling` for possible values.
    order : int, optional
        The priority of of this group for operations occurring at the same time
        step and in the same scheduling slot. Defaults to 0.
    name : str, optional
        A unique name for the object, otherwise will use
        ``source.name+'statemonitor_0'``, etc.
    codeobj_class : `CodeObject`, optional
        The `CodeObject` class to create.

    Examples
    --------

    Record all variables, first 5 indices::

        eqs = '''
        dV/dt = (2-V)/(10*ms) : 1
        '''
        threshold = 'V>1'
        reset = 'V = 0'
        G = NeuronGroup(100, eqs, threshold=threshold, reset=reset)
        G.V = rand(len(G))
        M = StateMonitor(G, True, record=range(5))
        run(100*ms)
        plot(M.t, M.V.T)
        show()

    Notes
    -----

    Since this monitor by default records in the ``'start'`` time slot,
    recordings of the membrane potential in integrate-and-fire models may look
    unexpected: the recorded membrane potential trace will never be above
    threshold in an integrate-and-fire model, because the reset statement will
    have been applied already. Set the ``when`` keyword to a different value if
    this is not what you want.

    Note that ``record=True`` only works in runtime mode for synaptic variables.
    This is because the actual array of indices has to be calculated and this is
    not possible in standalone mode, where the synapses have not been created
    yet at this stage. Consider using an explicit array of indices instead,
    i.e. something like ``record=np.arange(n_synapses)``.
    """

    invalidates_magic_network = False
    add_to_magic_network = True

    def __init__(
        self,
        source,
        variables,
        record,
        dt=None,
        clock=None,
        when="start",
        order=0,
        name="statemonitor*",
        codeobj_class=None,
    ):
        self.source = source
        # Make the monitor use the explicitly defined namespace of its source
        # group (if it exists)
        self.namespace = getattr(source, "namespace", None)
        self.codeobj_class = codeobj_class

        # run by default on source clock at the end
        if dt is None and clock is None:
            clock = source.clock

        # variables should always be a list of strings
        if variables is True:
            variables = source.equations.names
        elif isinstance(variables, str):
            variables = [variables]
        #: The variables to record
        self.record_variables = variables

        # record should always be an array of ints
        self.record_all = False
        if hasattr(record, "_indices"):
            # The ._indices method always returns absolute indices
            # If the source is already a subgroup of another group, we therefore
            # have to shift the indices to become relative to the subgroup
            record = record._indices() - getattr(source, "_offset", 0)
        if record is True:
            self.record_all = True
            try:
                record = np.arange(len(source), dtype=np.int32)
            except NotImplementedError:
                # In standalone mode, this is not possible for synaptic
                # variables because the number of synapses is not defined yet
                raise NotImplementedError(
                    "Cannot determine the actual "
                    "indices to record for record=True. "
                    "This can occur for example in "
                    "standalone mode when trying to "
                    "record a synaptic variable. "
                    "Consider providing an explicit "
                    "array of indices for the record "
                    "argument."
                )
        elif record is False:
            record = np.array([], dtype=np.int32)
        elif isinstance(record, numbers.Number):
            record = np.array([record], dtype=np.int32)
        else:
            record = np.asarray(record, dtype=np.int32)

        #: The array of recorded indices
        self.record = record
        self.n_indices = len(record)

        if not self.record_all:
            try:
                if len(record) and (
                    np.max(record) >= len(source) or np.min(record) < 0
                ):
                    # Check whether the values in record make sense
                    error_message = (
                        "The indices to record from contain values outside of the"
                        f" range [0, {len(source)-1}] allowed for the group"
                        f" '{source.name}'"
                    )
                    raise IndexError(error_message)
            except NotImplementedError:
                logger.warn(
                    "Cannot check whether the indices to record from are valid. This"
                    " can happen in standalone mode when recording from synapses that"
                    " have been created with a connection pattern. You can avoid this"
                    " situation by using synaptic indices in the connect call.",
                    name_suffix="cannot_check_statemonitor_indices",
                )

        # Some dummy code so that code generation takes care of the indexing
        # and subexpressions
        code = [f"_to_record_{v} = _source_{v}" for v in variables]
        code = "\n".join(code)

        CodeRunner.__init__(
            self,
            group=self,
            template="statemonitor",
            code=code,
            name=name,
            clock=clock,
            dt=dt,
            when=when,
            order=order,
            check_units=False,
        )

        self.add_dependency(source)

        # Setup variables
        self.variables = Variables(self)

        self.variables.add_dynamic_array(
            "t",
            size=0,
            dimensions=second.dim,
            constant=False,
            dtype=self._clock.variables["t"].dtype,
            read_only=True,
        )
        self.variables.add_array(
            "N", dtype=np.int32, size=1, scalar=True, read_only=True
        )
        self.variables.add_array(
            "_indices",
            size=len(self.record),
            dtype=self.record.dtype,
            constant=True,
            read_only=True,
            values=self.record,
        )
        self.variables.create_clock_variables(self._clock, prefix="_clock_")
        for varname in variables:
            var = source.variables[varname]
            if var.scalar and len(self.record) > 1:
                logger.warn(
                    "Variable %s is a shared variable but it will be "
                    "recorded once for every target." % varname,
                    once=True,
                )
            index = source.variables.indices[varname]
            self.variables.add_reference(
                f"_source_{varname}", source, varname, index=index
            )
            if index not in ("_idx", "0") and index not in variables:
                self.variables.add_reference(index, source)
            self.variables.add_dynamic_array(
                varname,
                size=(0, len(self.record)),
                resize_along_first=True,
                dimensions=var.dim,
                dtype=var.dtype,
                constant=False,
                read_only=True,
            )

        for varname in variables:
            var = self.source.variables[varname]
            self.variables.add_auxiliary_variable(
                f"_to_record_{varname}",
                dimensions=var.dim,
                dtype=var.dtype,
                scalar=var.scalar,
            )

        self.recorded_variables = {
            varname: self.variables[varname] for varname in variables
        }
        recorded_names = [varname for varname in variables]

        self.needed_variables = recorded_names
        self.template_kwds = {"_recorded_variables": self.recorded_variables}
        self.written_readonly_vars = {
            self.variables[varname] for varname in self.record_variables
        }
        self._enable_group_attributes()

    def resize(self, new_size):
        self.variables["N"].set_value(new_size)
        self.variables["t"].resize(new_size)

        for var in self.recorded_variables.values():
            var.resize((new_size, self.n_indices))

    def reinit(self):
        raise NotImplementedError()

    def __getitem__(self, item):
        dtype = get_dtype(item)
        if np.issubdtype(dtype, np.signedinteger):
            return StateMonitorView(self, item)
        elif isinstance(item, Sequence):
            index_array = np.array(item)
            if not np.issubdtype(index_array.dtype, np.signedinteger):
                raise TypeError("Index has to be an integer or a sequence of integers")
            return StateMonitorView(self, item)
        elif hasattr(item, "_indices"):
            # objects that support the indexing interface will return absolute
            # indices but here we need relative ones
            # TODO: How to we prevent the use of completely unrelated objects here?
            source_offset = getattr(self.source, "_offset", 0)
            return StateMonitorView(self, item._indices() - source_offset)
        else:
            raise TypeError(f"Cannot use object of type {type(item)} as an index")

    def __getattr__(self, item):
        # We do this because __setattr__ and __getattr__ are not active until
        # _group_attribute_access_active attribute is set, and if it is set,
        # then __getattr__ will not be called. Therefore, if getattr is called
        # with this name, it is because it hasn't been set yet and so this
        # method should raise an AttributeError to agree that it hasn't been
        # called yet.
        if item == "_group_attribute_access_active":
            raise AttributeError
        if not hasattr(self, "_group_attribute_access_active"):
            raise AttributeError
        if item in self.record_variables:
            var_dim = self.variables[item].dim
            return Quantity(self.variables[item].get_value().T, dim=var_dim, copy=True)
        elif item.endswith("_") and item[:-1] in self.record_variables:
            return self.variables[item[:-1]].get_value().T
        else:
            return Group.__getattr__(self, item)

    def __repr__(self):
        classname = self.__class__.__name__
        variables = repr(self.record_variables)
        return f"<{classname}, recording {variables} from '{self.source.name}'>"

    def record_single_timestep(self):
        """
        Records a single time step. Useful for recording the values at the end
        of the simulation -- otherwise a `StateMonitor` will not record the
        last simulated values since its ``when`` attribute defaults to
        ``'start'``, i.e. the last recording is at the *beginning* of the last
        time step.

        Notes
        -----
        This function will only work if the `StateMonitor` has been already run,
        but a run with a length of ``0*ms`` does suffice.

        Examples
        --------
        >>> from brian2 import *
        >>> G = NeuronGroup(1, 'dv/dt = -v/(5*ms) : 1')
        >>> G.v = 1
        >>> mon = StateMonitor(G, 'v', record=True)
        >>> run(0.5*ms)
        >>> print(np.array_str(mon.v[:], precision=3))
        [[ 1.     0.98   0.961  0.942  0.923]]
        >>> print(mon.t[:])
        [   0.  100.  200.  300.  400.] us
        >>> print(np.array_str(G.v[:], precision=3))  # last value had not been recorded
        [ 0.905]
        >>> mon.record_single_timestep()
        >>> print(mon.t[:])
        [   0.  100.  200.  300.  400.  500.] us
        >>> print(np.array_str(mon.v[:], precision=3))
        [[ 1.     0.98   0.961  0.942  0.923  0.905]]
        """
        if self.codeobj is None:
            raise TypeError(
                "Can only record a single time step after the "
                "network has been run once."
            )
        self.codeobj()