File: tag.py

package info (click to toggle)
python-nixio 1.5.4%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 2,888 kB
  • sloc: python: 12,527; cpp: 832; makefile: 25
file content (413 lines) | stat: -rw-r--r-- 15,238 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
# -*- coding: utf-8 -*-
# Copyright © 2014 - 2022, German Neuroinformatics Node (G-Node)
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted under the terms of the BSD License. See
# LICENSE file in the root of the Project.
import warnings

import numpy as np

from .entity import Entity
from .source_link_container import SourceLinkContainer
from .container import Container, LinkContainer

from .datatype import DataType
from .data_array import DataArray
from .data_view import DataView
from .feature import Feature
from .exceptions import (OutOfBounds, IncompatibleDimensions,
                         UninitializedEntity, InvalidUnit)
from .dimension_type import DimensionType
from .link_type import LinkType
from . import util
from .section import Section
from .dimensions import SliceMode


class FeatureContainer(Container):
    """
    The FeatureContainer has one minor difference from the regular Container:
    A Feature can be retrieved by the ID or name of the linked DataArray as
    well as the ID of the feature itself.
    """
    def __getitem__(self, item):
        try:
            return Container.__getitem__(self, item)
        except KeyError as exc:
            # item might be the ID of the referenced data; try it as well
            for feat in self:
                if feat.data.id == item or feat.data.name == item:
                    return feat
            raise exc

    def __contains__(self, item):
        if isinstance(item, Feature):
            item = item.id
        if not Container.__contains__(self, item):
            # check if it contains a Feature whose data matches 'item'
            for feat in self:
                if feat.data.id == item or feat.data.name == item:
                    return True
            return False
        return True


class BaseTag(Entity):
    """
    Base class for Tag and MultiTag
    """
    def __init__(self, nixfile, nixparent, h5group):
        super(BaseTag, self).__init__(nixfile, nixparent, h5group)
        self._sources = None
        self._references = None
        self._features = None

    @property
    def units(self):
        """
        Property containing the units of the tag. The tag must provide a
        unit for each dimension of the position or extent vector.
        This is a read-write property.

        :type: list of str
        """
        return tuple(u.decode() if isinstance(u, bytes) else u
                     for u in self._h5group.get_data("units"))

    @units.setter
    def units(self, units):
        if not units:
            if self._h5group.has_data("units"):
                del self._h5group["units"]
        else:
            sanitized = []
            for unit in units:
                util.check_attr_type(unit, str)
                unit = util.units.sanitizer(unit)
                sanitized.append(unit)

            dtype = DataType.String
            self._h5group.write_data("units", sanitized, dtype)
        if self.file.auto_update_timestamps:
            self.force_updated_at()

    def create_feature(self, data, link_type):
        """
        Create a new feature.

        :param data: The data array of this feature.
        :type data: nixio.DataArray
        :param link_type: The link type of this feature.
        :type link_type: nixio.LinkType

        :returns: The created feature object.
        :rtype: nixio.Feature
        """
        if isinstance(link_type, str):
            link_type = link_type.lower()
        link_type = LinkType(link_type)
        features = self._h5group.open_group("features")
        feat = Feature.create_new(self.file, self, features, data, link_type)
        return feat

    def _calc_data_slices(self, data, position, extent, stop_rule):
        refslice = list()
        if not self.units:
            units = [None] * len(data.dimensions)
        else:
            units = self.units

        for idx, dim in enumerate(data.dimensions):
            if idx < len(position):
                start_pos = position[idx]
                start_pos, scaling = self._scale_position(start_pos, units[idx], dim)
                if extent is not None and idx < len(extent):
                    stop_pos = extent[idx]
                    stop_pos *= scaling
                    stop_pos += start_pos
                    slice_mode = stop_rule if extent[idx] > 0.0 else SliceMode.Inclusive
                else:
                    stop_pos = start_pos
                    slice_mode = SliceMode.Inclusive
                range_indices = dim.range_indices(start_pos, stop_pos, slice_mode)
                refslice.append(range_indices if range_indices is None else slice(range_indices[0], range_indices[1] + 1))
            else:  # no position, we take the whole slice for this dimension
                start_index = 0
                stop_index = data.shape[idx]
                refslice.append(slice(start_index, stop_index))

        return tuple(refslice)

    @staticmethod
    def _slices_in_data(data, slices):
        if slices is None or not all(slices):
            return False
        dasize = data.data_extent
        stops = tuple(sl.stop for sl in slices)
        return np.all(np.less_equal(stops, dasize))

    @staticmethod
    def _scale_position(pos, unit, dim):
        dimtype = dim.dimension_type
        if dimtype == DimensionType.Set:
            dimunit = None
        else:
            dimunit = dim.unit
        scaling = 1.0
        if dimtype == DimensionType.Set:
            if unit and unit != "none":
                raise IncompatibleDimensions(
                    "Cannot apply a position with unit to a SetDimension",
                    "Tag._pos_to_idx"
                )
        else:
            if dimunit is None and unit is not None:
                raise IncompatibleDimensions(
                    "If a unit if given for the position the dimension must not be without a unit!",
                    "Tag._pos_to_idx"
                )
            elif dimunit is not None and unit is not None:
                try:
                    scaling = util.units.scaling(unit, dimunit)
                except InvalidUnit:
                    raise IncompatibleDimensions(
                        "Cannot scale Tag unit {} to match dimension unit {}".format(unit, dimunit),
                        "Tag._pos_to_idx"
                    )
        return pos * scaling, scaling

    @staticmethod
    def _pos_to_idx(pos, unit, dim, mode):
        dimtype = dim.dimension_type
        if dimtype == DimensionType.Set:
            dimunit = None
        else:
            dimunit = dim.unit
        scaling = 1.0
        if dimtype == DimensionType.Sample:
            if not dimunit and unit is not None:
                raise IncompatibleDimensions(
                    "Units of position and SampledDimension "
                    "must both be given!",
                    "Tag._pos_to_idx"
                )
            if dimunit and unit is not None:
                try:
                    scaling = util.units.scaling(unit, dimunit)
                except InvalidUnit:
                    raise IncompatibleDimensions(
                        "Cannot scale Tag unit {} to match dimension unit {}".format(unit, dimunit),
                        "Tag._pos_to_idx"
                    )
            index = dim.index_of(pos * scaling, mode)
        elif dimtype == DimensionType.Set:
            if unit and unit != "none":
                raise IncompatibleDimensions(
                    "Cannot apply a position with unit to a SetDimension",
                    "Tag._pos_to_idx"
                )
            index = dim.index_of(pos, mode)
        else:  # dimtype == DimensionType.Range:
            if dimunit and unit is not None:
                try:
                    scaling = util.units.scaling(unit, dimunit)
                except InvalidUnit:
                    raise IncompatibleDimensions(
                        "Cannot scale Tag unit {} to match dimension unit {}".format(unit, dimunit),
                        "Tag._pos_to_idx"
                    )
            index = dim.index_of(pos * scaling, mode)

        return int(index)

    @property
    def features(self):
        """
        A property containing all features. Features can be obtained
        via their index or their ID. Features can be deleted from the list.
        Adding new features is done using the create_feature method.
        This is a read only attribute.

        :type: Container of nixio.Feature.
        """
        if self._features is None:
            self._features = FeatureContainer("features", self.file,
                                              self, Feature)
        return self._features


class Tag(BaseTag):

    @classmethod
    def create_new(cls, nixfile, nixparent, h5parent, name, type_, position):
        newentity = super(Tag, cls).create_new(nixfile, nixparent, h5parent,
                                               name, type_)
        newentity.position = position
        return newentity

    @property
    def position(self):
        """
        The position defined by the tag. This is a read-write property.

        :type: list of float
        """
        return tuple(self._h5group.get_data("position"))

    @position.setter
    def position(self, pos):
        if pos is not None and not hasattr(pos, "__getitem__"):
            pos = [pos]
        if pos is None or len(pos) == 0:
            if self._h5group.has_data("position"):
                del self._h5group["position"]
        else:
            dtype = DataType.Double
            self._h5group.write_data("position", pos, dtype)
        if self.file.auto_update_timestamps:
            self.force_updated_at()

    @property
    def extent(self):
        """
        The extent defined by the tag. This is an optional read-write
        property and may be set to None.

        :type: list of float
        """
        return tuple(self._h5group.get_data("extent"))

    @extent.setter
    def extent(self, ext):
        if ext is not None and not hasattr(ext, "__getitem__"):
            ext = [ext]
        if ext is None or len(ext) == 0:
            if self._h5group.has_data("extent"):
                del self._h5group["extent"]
        else:
            dtype = DataType.Double
            self._h5group.write_data("extent", ext, dtype)
        if self.file.auto_update_timestamps:
            self.force_updated_at()

    def retrieve_data(self, refidx):
        msg = ("Call to deprecated method Tag.retrieve_data. "
               "Use Tag.tagged_data instead.")
        warnings.warn(msg, category=DeprecationWarning)
        return self.tagged_data(refidx)

    def tagged_data(self, refidx, stop_rule=SliceMode.Exclusive):
        references = self.references
        position = self.position
        extent = self.extent
        if len(references) == 0:
            raise OutOfBounds("There are no references in this tag!")

        if isinstance(refidx, int) and refidx >= len(references):
            raise OutOfBounds("Reference index out of bounds.")

        ref = references[refidx]
        if extent and len(position) != len(extent):
            raise IncompatibleDimensions(
                "Number of dimensions in position and extent "
                "do not match ", extent)

        slices = self._calc_data_slices(ref, self.position, self.extent, stop_rule)
        if not all(slices):
            return DataView(ref, slices)
        if not self._slices_in_data(ref, slices):
            raise OutOfBounds("References data slice out of the extent of the "
                              "DataArray!")
        return DataView(ref, slices)

    def retrieve_feature_data(self, featidx):
        msg = ("Call to deprecated method Tag.retrieve_feature_data. "
               "Use Tag.feature_data instead.")
        warnings.warn(msg, category=DeprecationWarning)
        return self.feature_data(featidx)

    def feature_data(self, featidx, stop_rule=SliceMode.Exclusive):
        if len(self.features) == 0:
            raise OutOfBounds("There are no features associated with this tag!")

        try:
            feat = self.features[featidx]
        except KeyError:
            feat = None
            for feature in self.features:
                if feature.data.name == featidx or feature.data.id == featidx:
                    feat = feature
                    break
            if feat is None:
                raise
        data = feat.data
        if data is None:
            raise UninitializedEntity()
        if feat.link_type == LinkType.Tagged:
            slices = self._calc_data_slices(data, self.position, self.extent, stop_rule)
            if not self._slices_in_data(data, slices):
                raise OutOfBounds("Requested data slice out of the extent "
                                  "of the Feature!")
            return DataView(data, slices)
        # For untagged and indexed return the full data
        fullslices = tuple(slice(0, stop) for stop in data.shape)
        return DataView(data, fullslices)

    @property
    def references(self):
        """
        A property containing all data arrays referenced by the tag. Referenced
        data arrays can be obtained by index or their id. References can be
        removed from the list, removing a referenced DataArray will not remove
        it from the file. New references can be added using the append method
        of the list.
        This is a read only attribute.

        :type: Container of nixio.DataArray
        """
        if self._references is None:
            self._references = LinkContainer("references", self, DataArray,
                                             self._parent.data_arrays)
        return self._references

    @property
    def sources(self):
        """
        A property containing all Sources referenced by the Tag. Sources
        can be obtained by index or their id. Sources can be removed from the
        list, but removing a referenced Source will not remove it from the
        file. New Sources can be added using the append method of the list.
        This is a read only attribute.
        """
        if self._sources is None:
            self._sources = SourceLinkContainer(self)
        return self._sources

    # metadata
    @property
    def metadata(self):
        """

        Associated metadata of the entity. Sections attached to the entity via
        this attribute can provide additional annotations. This is an optional
        read-write property, and can be None if no metadata is available.

        :type: nixio.Section
        """
        if "metadata" in self._h5group:
            return Section(self.file, None, self._h5group.open_group("metadata"))
        return None

    @metadata.setter
    def metadata(self, sect):
        if not isinstance(sect, Section):
            raise TypeError("{} is not of type Section".format(sect))
        self._h5group.create_link(sect, "metadata")

    @metadata.deleter
    def metadata(self):
        if "metadata" in self._h5group:
            self._h5group.delete("metadata")