Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def createGroup(self, obj_uuid=None):
self.initFile()
if self.readonly:
msg = "Unable to create group (Updates are not allowed)"
self.log.info(msg)
raise IOError(errno.EPERM, msg)
groups = self.dbGrp["{groups}"]
if not obj_uuid:
obj_uuid = str(uuid.uuid1())
newGroup = groups.create_group(obj_uuid)
# store reverse map as an attribute
addr = h5py.h5o.get_info(newGroup.id).addr
addrGrp = self.dbGrp["{addr}"]
addrGrp.attrs[str(addr)] = obj_uuid
#set timestamps
now = time.time()
self.setCreateTime(obj_uuid, timestamp=now)
self.setModifiedTime(obj_uuid, timestamp=now)
return obj_uuid
def createGroup(self, obj_uuid=None):
self.initFile()
if self.readonly:
msg = "Unable to create group (Updates are not allowed)"
self.log.info(msg)
raise IOError(errno.EPERM, msg)
groups = self.dbGrp["{groups}"]
if not obj_uuid:
obj_uuid = str(uuid.uuid1())
newGroup = groups.create_group(obj_uuid)
# store reverse map as an attribute
addr = h5py.h5o.get_info(newGroup.id).addr
addrGrp = self.dbGrp["{addr}"]
addrGrp.attrs[str(addr)] = obj_uuid
#set timestamps
now = time.time()
self.setCreateTime(obj_uuid, timestamp=now)
self.setModifiedTime(obj_uuid, timestamp=now)
return obj_uuid
try:
msg += ": " + ve.message
except AttributeError:
pass # no message
self.log.info(msg)
raise IOError(errno.EINVAL, msg) # assume this is due to invalid params
if newDataset:
dataset_id = newDataset.id
if dataset_id is None:
msg = 'Unexpected failure to create dataset'
self.log.error(msg)
raise IOError(errno.EIO, msg)
# store reverse map as an attribute
addr = h5py.h5o.get_info(dataset_id).addr
addrGrp = self.dbGrp["{addr}"]
addrGrp.attrs[str(addr)] = obj_uuid
# save creation props if any
if creation_props:
self.setDatasetCreationProps(obj_uuid, creation_props)
# set timestamp
now = time.time()
self.setCreateTime(obj_uuid, timestamp=now)
self.setModifiedTime(obj_uuid, timestamp=now)
item['id'] = obj_uuid
if self.update_timestamps:
item['ctime'] = self.getCreateTime(obj_uuid)
item['mtime'] = self.getModifiedTime(obj_uuid)
linkClass = 'UDLink' # user defined links
item['class'] = 'H5L_TYPE_USER_DEFINED'
if linkClass == 'SoftLink':
item['class'] = 'H5L_TYPE_SOFT'
item['h5path'] = linkObj.path
item['href'] = '#h5path(' + linkObj.path + ')'
elif linkClass == 'ExternalLink':
item['class'] = 'H5L_TYPE_EXTERNAL'
item['h5path'] = linkObj.path
item['file'] = linkObj.filename
item['href'] = '#h5path(' + linkObj.path + ')'
elif linkClass == 'HardLink':
# Hardlink doesn't have any properties itself, just get the linked
# object
obj = parent[link_name]
addr = h5py.h5o.get_info(obj.id).addr
item['class'] = 'H5L_TYPE_HARD'
item['id'] = self.getUUIDByAddress(addr)
class_name = obj.__class__.__name__
if class_name == 'Dataset':
item['href'] = 'datasets/' + item['id']
item['collection'] = 'datasets'
elif class_name == 'Group':
item['href'] = 'groups/' + item['id']
item['collection'] = 'groups'
elif class_name == 'Datatype':
item['href'] = 'datatypes/' + item['id']
item['collection'] = 'datatypes'
else:
self.log.warning("unexpected object type: " + item['type'])
return item
def getRegionReference(self, regionRef):
selectionEnums = { h5py.h5s.SEL_NONE: 'H5S_SEL_NONE',
h5py.h5s.SEL_ALL: 'H5S_SEL_ALL',
h5py.h5s.SEL_POINTS: 'H5S_SEL_POINTS',
h5py.h5s.SEL_HYPERSLABS: 'H5S_SEL_HYPERSLABS'
}
item = {}
objid = h5py.h5r.dereference(regionRef, self.f.file.file.id)
if objid:
item['id'] = self.getUUIDByAddress(h5py.h5o.get_info(objid).addr)
else:
log.info("region reference unable able to find item with objid: " + objid)
return item
sel = h5py.h5r.get_region(regionRef, objid)
select_type = sel.get_select_type()
if select_type not in selectionEnums:
msg = "Unexpected selection type: " + regionRef.typecode
self.log.error(msg)
raise IOError(errno.EIO, msg)
item['select_type'] = selectionEnums[select_type]
pointlist = None
if select_type == h5py.h5s.SEL_POINTS:
# retrieve a numpy array of selection points
points = sel.get_select_elem_pointlist()
pointlist = points.tolist()
def getRegionReference(self, regionRef):
selectionEnums = {h5py.h5s.SEL_NONE: 'H5S_SEL_NONE',
h5py.h5s.SEL_ALL: 'H5S_SEL_ALL',
h5py.h5s.SEL_POINTS: 'H5S_SEL_POINTS',
h5py.h5s.SEL_HYPERSLABS: 'H5S_SEL_HYPERSLABS'}
item = {}
objid = h5py.h5r.dereference(regionRef, self.f.file.file.id)
if objid:
item['id'] = self.getUUIDByAddress(h5py.h5o.get_info(objid).addr)
else:
self.log.info("region reference unable to find item with objid: " + objid)
return item
sel = h5py.h5r.get_region(regionRef, objid)
select_type = sel.get_select_type()
if select_type not in selectionEnums:
msg = "Unexpected selection type: " + regionRef.typecode
self.log.error(msg)
raise IOError(errno.EIO, msg)
item['select_type'] = selectionEnums[select_type]
pointlist = None
if select_type == h5py.h5s.SEL_POINTS:
# retrieve a numpy array of selection points
points = sel.get_select_elem_pointlist()
pointlist = points.tolist()
self.initFile()
self.log.info("getUUIDByPath: [" + path + "]")
if len(path) >= 6 and path[:6] == '__db__':
msg = "getUUIDByPath called with invalid path: [" + path + "]"
self.log.error(msg)
raise IOError(errno.EIO, msg)
if path == '/':
# just return the root UUID
root_uuid = self.dbGrp.attrs["rootUUID"]
if root_uuid and type(root_uuid) is not str:
# convert bytes to unicode
root_uuid = root_uuid.decode('utf-8')
return root_uuid
obj = self.f[path] # will throw KeyError if object doesn't exist
addr = h5py.h5o.get_info(obj.id).addr
obj_uuid = self.getUUIDByAddress(addr)
return obj_uuid
self.initFile()
self.log.info("getUUIDByPath: [" + path + "]")
if len(path) >= 6 and path[:6] == '__db__':
msg = "getUUIDByPath called with invalid path: [" + path + "]"
self.log.error(msg)
raise IOError(errno.EIO, msg)
if path == '/':
# just return the root UUID
root_uuid = self.dbGrp.attrs["rootUUID"]
if root_uuid and type(root_uuid) is not str:
# convert bytes to unicode
root_uuid = root_uuid.decode('utf-8')
return root_uuid
obj = self.f[path] # will throw KeyError if object doesn't exist
addr = h5py.h5o.get_info(obj.id).addr
obj_uuid = self.getUUIDByAddress(addr)
return obj_uuid
linkClass = 'UDLink' # user defined links
item['class'] = 'H5L_TYPE_USER_DEFINED'
if linkClass == 'SoftLink':
item['class'] = 'H5L_TYPE_SOFT'
item['h5path'] = linkObj.path
item['href'] = '#h5path(' + linkObj.path + ')'
elif linkClass == 'ExternalLink':
item['class'] = 'H5L_TYPE_EXTERNAL'
item['h5path'] = linkObj.path
item['file'] = linkObj.filename
item['href'] = '#h5path(' + linkObj.path + ')'
elif linkClass == 'HardLink':
# Hardlink doesn't have any properties itself, just get the linked
# object
obj = parent[link_name]
addr = h5py.h5o.get_info(obj.id).addr
item['class'] = 'H5L_TYPE_HARD'
item['id'] = self.getUUIDByAddress(addr)
class_name = obj.__class__.__name__
if class_name == 'Dataset':
item['href'] = 'datasets/' + item['id']
item['collection'] = 'datasets'
elif class_name == 'Group':
item['href'] = 'groups/' + item['id']
item['collection'] = 'groups'
elif class_name == 'Datatype':
item['href'] = 'datatypes/' + item['id']
item['collection'] = 'datatypes'
else:
self.log.warning("unexpected object type: " + item['type'])
return item
def refToList(self, data):
# todo - verify that data is a numpy.ndarray
out = None
if type(data) is h5py.h5r.Reference:
if bool(data):
grpref = self.f[data]
addr = h5py.h5o.get_info(grpref.id).addr
uuid = self.getUUIDByAddress(addr)
if self.getGroupObjByUuid(uuid):
out = "groups/" + uuid
elif self.getDatasetObjByUuid(uuid):
out = "datasets/" + uuid
elif self.getCommittedTypeObjByUuid(uuid):
out = "datatypes/" + uuid
else:
self.log.warning(
"uuid in region ref not found: [" + uuid + "]")
return None
else:
out = "null"
elif type(data) is h5py.h5r.RegionReference:
out = self.getRegionReference(data)
else: