1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
|
# -*- coding: utf-8 -*-
# Copyright © 2016, German Neuroinformatics Node (G-Node)
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted under the terms of the BSD License. See
# LICENSE file in the root of the Project.
import h5py
import numpy as np
from .h5dataset import H5DataSet
from ..datatype import DataType
from .. import util
class H5Group:
def __init__(self, parent, name, create=False):
self._parent = parent
self.name = name
self.group = None
if create or name in self._parent:
self._create_h5obj()
self.h5obj = self.group
def _create_h5obj(self):
if self.name in self._parent:
self.group = self._parent[self.name]
else:
gcpl = h5py.h5p.create(h5py.h5p.GROUP_CREATE)
flags = h5py.h5p.CRT_ORDER_TRACKED | h5py.h5p.CRT_ORDER_INDEXED
gcpl.set_link_creation_order(flags)
name = self.name.encode("utf-8")
gid = h5py.h5g.create(self._parent.id, name, gcpl=gcpl)
self.group = h5py.Group(gid)
@property
def group(self):
if self._group is None:
if self.name in self._parent:
self._group = self._parent[self.name]
else:
return None
return self._group
@group.setter
def group(self, grp):
self._group = grp
def create_link(self, target, name):
self._create_h5obj()
if name in self.group:
del self.group[name]
self.group[name] = target._h5group.group
@classmethod
def create_from_h5obj(cls, h5obj):
parent = h5obj.parent
name = h5obj.name.split("/")[-1]
if isinstance(h5obj, h5py.Group):
return cls(parent, name)
elif isinstance(h5obj, h5py.Dataset):
return H5DataSet(parent, name)
else:
raise ValueError("Invalid object: "
"{} must be either h5py.Group of h5py.Dataset.")
def open_group(self, name, create=False):
"""
Returns a new H5Group with the given name contained in the current
group. If the current group does not exist in the file,
it is automatically created.
:param name: the name of the group
:param create: creates the child group in the file if it does not exist
:return: a new H5Group object
"""
self._create_h5obj()
return H5Group(self.group, name, create)
def create_dataset(self, name, shape, dtype, compression=False):
"""
Creates a dataset object under the current group with a given name,
shape, and type.
:param name: the name of the dataset
:param shape: tuple representing the shape of the dataset
:param dtype: the type of the data for this dataset (DataType)
:param compression: whether to compress the data (default: False)
:return: a new H5DataSet object
"""
self._create_h5obj()
return H5DataSet(self.group, name, dtype, shape, compression)
def get_dataset(self, name):
"""
Returns a contained H5DataSet object.
:param name: name of the dataset
:return: H5DataSet object
"""
notfound = KeyError("No DataSet named {} found.")
if self.group is None:
raise notfound
if name in self.group:
dset = self.group[name]
return H5DataSet.create_from_h5obj(dset)
else:
raise notfound
def write_data(self, name, data, dtype=None, compression=False):
"""
Writes the data to a Dataset contained in the group with the
given name. Creates the Dataset if necessary.
:param name: name of the Dataset object
:param data: the data to write
:param dtype: optionally specify the data type, otherwise it will be
automatically determined by the data
:param compression: whether to compress the data (default: False)
"""
shape = np.shape(data)
if self.has_data(name):
dset = self.get_dataset(name)
dset.shape = shape
else:
if dtype is None:
dtype = DataType.get_dtype(data[0])
dset = self.create_dataset(name, shape, dtype, compression)
dset.write_data(data)
def get_data(self, name):
"""
Returns the data contained in the dataset identified by 'name', or an
empty list if a dataset of that name does not exist in the Group.
:param name: The name of the dataset
:return: The data contained in the dataset as a numpy array or None
"""
if name not in self.group:
return []
dset = self.group[name]
# TODO: Error if dset is Group?
return dset[:]
def has_data(self, name):
"""
Return True if the Group contains a Dataset object with the given name.
:param name: name of Dataset
:return: True if Dataset exists in Group, False if it does not exist,
or exists and is not a Dataset
"""
if self.group.get(name, getclass=True) == h5py.Dataset:
return True
else:
return False
def has_by_id(self, id_or_name):
if not self.group:
return False
if util.is_uuid(id_or_name):
for item in self:
if item.get_attr("entity_id") == id_or_name:
return True
return False
else:
return id_or_name in self.group
def get_by_id_or_name(self, id_or_name):
if util.is_uuid(id_or_name):
return self.get_by_id(id_or_name)
else:
return self.get_by_name(id_or_name)
def get_by_name(self, name):
if self.group and name in self.group:
return self.create_from_h5obj(self.group[name])
else:
raise KeyError("Item not found '{}'".format(name))
def get_by_id(self, id_):
if self.group:
for item in self:
if item.get_attr("entity_id") == id_:
return item
raise KeyError("Item not found '{}'".format(id_))
def get_by_pos(self, pos):
if not self.group:
raise IndexError
# Using low level interface to specify iteration order
name, _ = self.group.id.links.iterate(lambda n: n,
idx_type=h5py.h5.INDEX_CRT_ORDER,
order=h5py.h5.ITER_INC,
idx=pos)
return self.get_by_name(name)
def delete(self, id_or_name, delete_if_empty=True):
"""
Deletes the child HDF5 group that matches the given name or id.
"""
if util.is_uuid(id_or_name):
name = self.get_by_id_or_name(id_or_name).name
else:
name = id_or_name
try:
del self.group[name]
except Exception:
raise ValueError("Error deleting {} ".format(name))
# Delete if empty and non-root container
groupdepth = len(self.group.name.split("/")) - 1
if delete_if_empty and not len(self.group) and groupdepth > 1:
del self.parent.group[self.name]
# del self.group
self.group = None
def delete_all(self, eid):
"""
Deletes all references to a given list of objects, identified by their
entity_id, below the current object.
"""
# Use visit_items to traverse groups and check their children.
# visit_items visits each item only once, so instead of checking
# whether each item is the one we're searching for, we check whether
# it *contains* the one we're searching for
# We delete the child as soon as we find it; this doesn't cause
# iteration issues since it's deleted before descending into the
# children of the current group
def delete_by_id(_, obj):
if not isinstance(obj, h5py.Group):
return
grp = self.create_from_h5obj(obj)
for child in grp:
if child.get_attr("entity_id") in eid:
del grp[child.name]
self._group.visititems(delete_by_id)
def set_attr(self, name, value):
self._create_h5obj()
if value is None:
if name in self.group.attrs:
del self.group.attrs[name]
else:
if isinstance(value, np.str_):
value = str(value)
self.group.attrs[name] = value
def get_attr(self, name):
if self.group is None:
return None
attr = self.group.attrs.get(name)
if isinstance(attr, bytes):
attr = attr.decode()
return attr
def find_children(self, filtr=None, limit=None):
result = []
def match(name, obj):
curdepth = len(name.split("/"))
if limit is not None and curdepth > limit:
return None
h5grp = H5Group.create_from_h5obj(obj)
if filtr is None or filtr(h5grp):
result.append(h5grp)
self.group.visititems(match)
return result
def copy(self, source, dest, name=None, cls=None, shallow=False,
keep_id=True):
grp = self.group
dest.open_group(cls, create=True)
dest_grp = dest.group[cls]
grp.copy(source=source, dest=dest_grp, name=name, shallow=shallow)
grp = dest_grp[name]
grp.attrs["name"] = name
if not keep_id:
def change_id(_, igrp):
if "entity_id" in igrp.attrs:
id_ = util.create_id()
igrp.attrs.modify("entity_id", np.bytes_(id_))
id_ = util.create_id()
grp.attrs.modify("entity_id", np.bytes_(id_))
grp.visititems(change_id)
return grp
@property
def parent(self):
return self.create_from_h5obj(self._parent)
def __iter__(self):
if not len(self):
return
for grp in self.group.values():
yield self.create_from_h5obj(grp)
def __contains__(self, item):
if self.group is None:
return False
return item in self.group
def __len__(self):
if self.group is None:
return 0
else:
return len(self.group)
def __delitem__(self, key):
del self.group[key]
def __str__(self):
return "<H5Group object: {}>".format(self.group.name)
|