1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385
|
# -*- coding: utf-8 -*-
# Copyright © 2014, German Neuroinformatics Node (G-Node)
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted under the terms of the BSD License. See
# LICENSE file in the root of the Project.
try:
from collections.abc import Sequence, Iterable
except ImportError:
from collections import Sequence, Iterable
from enum import Enum
from numbers import Number
import numpy as np
from .datatype import DataType
from .entity import Entity
from . import util
def ensure_str(s):
if isinstance(s, bytes):
return s.decode()
else:
return s
class OdmlType(Enum):
"""
OdmlType provides all types currently supported by the odML
data format. It provides additional information about the
nature of the values of an odML Property.
"""
Boolean = 'boolean'
Int = 'int'
Float = 'float'
String = 'string'
Text = 'text'
URL = 'url'
Person = 'person'
Datetime = 'datetime'
Date = 'date'
Time = 'time'
def __str__(self):
return self.value
def compatible(self, value):
"""
compatible returns True or False depending on whether a
passed value can be mapped to an OdmlType or not.
:param value: Any single value
:return: Boolean
"""
if (self in (self.String, self.Text, self.URL, self.Person) and
DataType.get_dtype(value) == DataType.String):
return True
elif (self == self.Boolean and
DataType.get_dtype(value) == DataType.Bool):
return True
elif (self == self.Float and
DataType.get_dtype(value) == DataType.Double):
return True
elif self == self.Int and DataType.get_dtype(value) == DataType.Int64:
return True
elif (self in (self.Time, self.Date, self.Datetime) and
DataType.get_dtype(value) == DataType.String):
# This might need some extra work, treating as String for now, but
# keeping it separated from other String values.
return True
return False
@classmethod
def get_odml_type(cls, dtype):
"""
get_odml_type returns the appropriate OdmlType
for a handed in nix value DataType.
:param dtype: nix DataType
:return: OdmlType
"""
if dtype in DataType.FloatTypes:
return cls.Float
elif dtype in DataType.IntTypes:
return cls.Int
elif dtype == DataType.String:
return cls.String
elif dtype == DataType.Bool:
return cls.Boolean
raise TypeError("No available OdmlType for type '%s'" % dtype)
class Property(Entity):
"""An odML Property"""
def __init__(self, nixfile, nixparent, h5dataset):
super(Property, self).__init__(nixfile, nixparent, h5dataset)
self._h5dataset = self._h5group
@classmethod
def create_new(cls, nixfile, nixparent, h5parent, name,
dtype, shape=None, oid=None):
if shape is None or shape[0] == 0:
shape = (8, )
util.check_entity_name(name)
dtype = cls._make_h5_dtype(dtype)
h5dataset = h5parent.create_dataset(name, shape=shape, dtype=dtype)
h5dataset.set_attr("name", name)
if not util.is_uuid(oid):
oid = util.create_id()
h5dataset.set_attr("entity_id", oid)
newentity = cls(nixfile, nixparent, h5dataset)
newentity.force_created_at()
newentity.force_updated_at()
return newentity
@property
def name(self):
return self._h5dataset.get_attr("name")
@property
def definition(self):
return self._h5dataset.get_attr("definition")
@definition.setter
def definition(self, d):
util.check_attr_type(d, str)
self._h5dataset.set_attr("definition", d)
@property
def unit(self):
return self._h5dataset.get_attr("unit")
@unit.setter
def unit(self, new):
if new:
new = util.units.sanitizer(new)
if new == "":
new = None
util.check_attr_type(new, str)
self._h5dataset.set_attr("unit", new)
@property
def uncertainty(self):
dataset = self._h5dataset
filever = tuple(dataset._parent.file.attrs["version"])
if filever < (1, 1, 1):
val = self._h5dataset.dataset[:]
uncertainty = val[0]["uncertainty"]
return uncertainty
return self._h5dataset.get_attr("uncertainty")
@uncertainty.setter
def uncertainty(self, uncertainty):
util.check_attr_type(uncertainty, Number)
uncertainty = float(uncertainty) if uncertainty is not None else None
self._h5dataset.set_attr("uncertainty", uncertainty)
@property
def reference(self):
dataset = self._h5dataset
filever = tuple(dataset._parent.file.attrs["version"])
if filever < (1, 1, 1):
val = self._h5dataset.dataset[:]
reference = val[0]["reference"]
return reference
return self._h5dataset.get_attr("reference")
@reference.setter
def reference(self, ref):
util.check_attr_type(ref, str)
self._h5dataset.set_attr("reference", ref)
@property
def dependency(self):
return self._h5dataset.get_attr("dependency")
@dependency.setter
def dependency(self, dep):
util.check_attr_type(dep, str)
self._h5dataset.set_attr("dependency", dep)
@property
def dependency_value(self):
return self._h5dataset.get_attr("dependency_value")
@dependency_value.setter
def dependency_value(self, depval):
util.check_attr_type(depval, str)
self._h5dataset.set_attr("dependency_value", depval)
@property
def value_origin(self):
return self._h5dataset.get_attr("value_origin")
@value_origin.setter
def value_origin(self, origin):
util.check_attr_type(origin, str)
self._h5dataset.set_attr("value_origin", origin)
@property
def odml_type(self):
otype = self._h5dataset.get_attr("odml_type")
if not otype:
return None
return OdmlType(otype)
@odml_type.setter
def odml_type(self, new_type):
"""
odml_type can only be set if the handed in new type is a valid
OdmlType and if it is compatible with the value data type of
the property.
:param new_type: OdmlType
"""
if not isinstance(new_type, OdmlType):
raise TypeError("'{}' is not a valid odml_type.".format(new_type))
if not new_type.compatible(self.values[0]):
raise TypeError("Type '{}' is incompatible "
"with property values".format(new_type))
self._h5dataset.set_attr("odml_type", str(new_type))
def _read_old_values(self):
val = self._h5dataset.dataset[:]
if len(val) > 0 and isinstance(val[0]["value"], bytes):
return tuple(ensure_str(v["value"]) for v in val)
return tuple(v["value"] for v in val)
@property
def values(self):
dataset = self._h5dataset
filever = tuple(dataset._parent.file.attrs["version"])
if filever < (1, 1, 1):
values = self._read_old_values()
return values
if not sum(dataset.shape):
return tuple()
data = dataset.read_data()
def data_to_value(dat):
if isinstance(dat, bytes):
dat = dat.decode()
return dat
values = tuple(map(data_to_value, data))
return values
@values.setter
def values(self, vals):
"""
Set the value of the property discarding any previous information.
:param vals: a single value or list of values.
"""
# Make sure boolean value 'False' gets through as well...
if vals is None or (isinstance(vals, (Sequence, Iterable)) and not len(vals)):
self.delete_values()
return
if not isinstance(vals, (Sequence, Iterable)) or isinstance(vals, str):
vals = [vals]
# Make sure all values are of the same data type
vtype = self._check_new_value_types(vals)
if vtype == DataType.String:
vals = [str(v) for v in vals]
self._h5dataset.shape = np.shape(vals)
data = np.array(vals, dtype=vtype)
self._h5dataset.write_data(data)
def extend_values(self, data):
"""
Extends values to existing data.
Suitable when new data is nested or original data is long.
"""
vtype = self._check_new_value_types(data)
arr = np.array(data, dtype=vtype).flatten('C')
dataset = self._h5dataset
src_len = len(self.values)
dlen = len(arr)
dataset.shape = (src_len + dlen,)
dataset.write_data(arr, slc=np.s_[src_len: src_len + dlen])
def _check_new_value_types(self, data):
if isinstance(data, (Sequence, Iterable)) and not isinstance(data, str):
single_val = data[0]
else:
single_val = data
data = [data]
def check_prop_consistent(vtype):
# Check if the new data has the same type as the existing property
# data
if vtype != self.data_type:
raise TypeError("New data type '{}' is inconsistent with the "
"Property's data type '{}'".format(
vtype, self.data_type))
def check_new_data_consistent(vtype):
# Check if each value in the new data has the same type
for val in data:
if DataType.get_dtype(val) != vtype:
raise TypeError("Array contains inconsistent values. "
"Only values of type '{}' can be "
"assigned".format(vtype))
if hasattr(data, "dtype"):
# numpy array: no need to scan values, arrays are consistent but
# check for 1D
vtype = data.dtype
check_prop_consistent(vtype)
else:
# Will raise an error, if the data type of the first value is not
# valid
vtype = DataType.get_dtype(single_val)
check_prop_consistent(vtype)
check_new_data_consistent(vtype)
return vtype
@property
def data_type(self):
return self._h5dataset.dtype
def delete_values(self):
self._h5dataset.shape = (0,)
@staticmethod
def _make_h5_dtype(valued_type):
str_ = util.vlen_str_dtype
if valued_type == DataType.String:
valued_type = str_
return valued_type
def __str__(self):
return "{}: {{name = {}}}".format(
type(self).__name__, self.name
)
def __repr__(self):
return self.__str__()
def pprint(self, indent=2, max_length=80, current_depth=-1):
"""
Pretty print method. Method is called in Section.pprint()
"""
property_spaces = ""
prefix = ""
if current_depth >= 0:
property_spaces = " " * ((current_depth + 2) * indent)
prefix = "|-"
if self.unit is None:
value_string = str(self.values)
else:
value_string = "{}{}".format(self.values, self.unit)
p_len = len(property_spaces) + len(self.name) + len(value_string)
if p_len >= max_length - 4:
split_len = int((max_length - len(property_spaces) +
len(self.name) - len(prefix)) / 2)
str1 = value_string[0: split_len]
str2 = value_string[-split_len:]
print(("{}{} {}: {} ... {}".format(property_spaces, prefix,
self.name, str1, str2)))
else:
print(("{}{} {}: {}".format(property_spaces, prefix, self.name,
value_string)))
|