Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
?inter rdf:rest*/rdf:first ?node.
{
BIND (brick:hasTag as ?p)
?node owl:onProperty ?p.
?node owl:hasValue ?o.
} UNION {
BIND (brick:measures as ?p)
?node owl:onProperty ?p.
?node owl:hasValue ?o.
}
}""")
class2tag = defaultdict(set)
for (cname, p, o, rest) in res:
cname = cname.split('#')[1]
o = o.split('#')[1]
if p == BRICK.hasTag:
class2tag[cname].add(o)
for cname, tagset in class2tag.items():
self.lookup[tuple(tagset)].add(cname)
qstr = "select ?inst where {\n"
for substance in substances:
qstr += f" ?inst brick:measures <{substance}> .\n"
qstr += "}"
for row in self.g.query(qstr):
inst = row[0]
self.g.add((inst, RDF.type, classname))
# find entities of the class and add the substances
qstr = f"""SELECT ?inst WHERE
{{ ?inst rdf:type/rdfs:subClassOf* <{classname}>
}}"""
for row in self.g.query(qstr):
inst = row[0]
for substance in substances:
self.g.add((inst, BRICK.measures, substance))
"""
Wrapper class and convenience methods for handling Brick models
and graphs.
Keyword Args:
load_brick (bool): if True, loads Brick ontology into graph
Returns:
graph (Graph): Graph object
"""
self.g = rdflib.Graph()
self.g.bind('rdf', ns.RDF)
self.g.bind('owl', ns.OWL)
self.g.bind('rdfs', ns.RDFS)
self.g.bind('skos', ns.SKOS)
self.g.bind('brick', ns.BRICK)
self.g.bind('tag', ns.TAG)
if load_brick:
# get ontology data from package
data = pkgutil.get_data(__name__, "ontologies/Brick.ttl").decode()
# wrap in StringIO to make it file-like
self.g.parse(source=io.StringIO(data), format='turtle')
} UNION {
BIND (brick:measures as ?p)
?node owl:onProperty ?p.
?node owl:hasValue ?o.
} UNION {
BIND (rdf:type as ?p)
?node owl:onProperty ?p.
?node owl:hasValue ?o.
}
}""")
self.tag_properties = defaultdict(list)
self.measures_properties = defaultdict(list)
self.grouped_properties = defaultdict(list)
for (classname, prop, obj, groupname) in res:
if prop == BRICK.hasTag:
self.tag_properties[classname].append(obj)
elif prop == BRICK.measures:
self.measures_properties[classname].append(obj)
self.grouped_properties[(classname, groupname)].append((prop, obj))
?node owl:onProperty ?p.
?node owl:hasValue ?o.
} UNION {
BIND (rdf:type as ?p)
?node owl:onProperty ?p.
?node owl:hasValue ?o.
}
}""")
self.tag_properties = defaultdict(list)
self.measures_properties = defaultdict(list)
self.grouped_properties = defaultdict(list)
for (classname, prop, obj, groupname) in res:
if prop == BRICK.hasTag:
self.tag_properties[classname].append(obj)
elif prop == BRICK.measures:
self.measures_properties[classname].append(obj)
self.grouped_properties[(classname, groupname)].append((prop, obj))
if set(tagset).intersection(self._point_tags):
if 'point' in tagset:
tagset.remove('point')
inferred_point_classes, leftover_points = \
self.most_likely_tagsets(tagset)
triples.append((self._BLDG[point_entity_id], A,
BRICK[inferred_point_classes[0]]))
infer_results.append(
(identifier, list(tagset), inferred_point_classes)
)
if len(inferred_equip_classes) > 0 and \
inferred_equip_classes[0] != 'Equipment':
triples.append((self._BLDG[equip_entity_id], A,
BRICK[inferred_equip_classes[0]]))
triples.append((self._BLDG[equip_entity_id], BRICK.hasPoint,
self._BLDG[point_entity_id]))
infer_results.append(
(identifier, list(tagset), inferred_equip_classes)
)
return triples, infer_results
def __init__(self):
"""
Creates a new OWLRL Inference session
"""
self.g = Graph(load_brick=True)
def __init__(self):
"""
Creates a new OWLRL Inference session
"""
self.g = Graph(load_brick=True)
def __init__(self):
"""
Creates new Tag Inference session
"""
self.g = Graph(load_brick=True)
# get ontology data from package
data = pkgutil.get_data(__name__, "ontologies/taglookup.pickle")
# TODO: move on from moving pickle to something more secure?
self.lookup = pickle.loads(data)
def infer_model(self, model):
"""
Produces the inferred Brick model from the given Haystack model
Args:
model (dict): a Haystack model
"""
entities = model['rows']
# index the entities by their ID field
entities = {e['id'].replace('"', ''): {'tags': e} for e in entities}
brickgraph = Graph(load_brick=True)
# marker tag pass
for entity_id, entity in entities.items():
marker_tags = {k for k, v in entity['tags'].items()
if v == 'm:' or v == 'M'}
for f in self._filters:
marker_tags = list(filter(f, marker_tags))
# translate tags
entity_tagset = list(map(lambda x: self._tagmap[x.lower()]
if x in self._tagmap else x, marker_tags))
# infer tags for single entity
triples, _ = self.infer_entity(entity_tagset, identifier=entity_id)
brickgraph.add(*triples)
# take a pass through for relationships
for entity_id, entity in entities.items():