Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_networkx_multidigraph_edge_attr(self):
print('\n---------- Multi-Digraph Edge Att Test Start -----------\n')
g = nx.MultiDiGraph()
g.add_node(1)
g.add_node(2)
g.add_node(3)
g.add_edge(1, 2)
g.add_edge(1, 2, attr_dict={'foo': 'bar'})
g.add_edge(1, 2)
g.add_edge(1, 3)
edges = g.edges(data=True, keys=True)
for edge in edges:
print(edge)
cyjs = util.from_networkx(g)
print(json.dumps(cyjs, indent=4))
edge = cyjs['elements']['edges'][0]
if ext.domain and ext.suffix:
l.append(domain)
row[2] = copy.deepcopy(l)
l = list()
for ip in row[3]:
try:
_ = ipaddress.ip_address(unicode(ip))
l.append(ip)
except:
pass
row[3] = copy.deepcopy(l)
# add the ips to the set of ips
ips = ips.union(set(row[1])).union(set(row[3]))
g = nx.MultiDiGraph()
# Add indicator to graph
## (Must account for the different types of indicators)
target_uri = "class=attribute&key={0}&value={1}".format('domain', row[0])
g.add_node(target_uri, {
'class': 'attribute',
'key': 'domain',
"value": row[0],
"start_time": dt,
"uri": target_uri
})
# Threat node
threat_uri = "class=attribute&key={0}&value={1}".format("malware", row[4])
g.add_node(threat_uri, {
def test_graph_is_built_from_grakn_as_expected(self):
g1 = nx.MultiDiGraph()
g1.add_node('x')
g2 = nx.MultiDiGraph()
g2.add_node('x')
g2.add_node('n')
g2.add_edge('x', 'n', type='has')
g3 = nx.MultiDiGraph()
g3.add_node('x')
g3.add_node('r')
g3.add_node('y')
g3.add_edge('r', 'x', type='child')
g3.add_edge('r', 'y', type='parent')
query_sampler_variable_graph_tuples = [('match $x isa person; get;', mock_sampler, g1),
('match $x isa person, has name $n; get;', mock_sampler, g2),
('match $x isa person; $r(child: $x, parent: $y); get;', mock_sampler, g3),
# TODO Add functionality for loading schema at a later date
# ('match $x sub person; $x sub $type; get;', g4),
# ('match $x sub $y; get;', g5),
]
with self._client.session(keyspace=self._keyspace) as session:
def get_graph(res, directed=True):
"""
This function takes the result (subgraph) of a ipython-cypher query and builds a networkx graph from it
:param res: output from an ipython-cypher query
:param directed: Flag indicating if the resulting graph should be treated as directed or not
:return: networkx graph (MultiDiGraph or MultiGraph)
"""
if nx is None:
raise ImportError("Try installing NetworkX first.")
if directed:
graph = nx.MultiDiGraph()
else:
graph = nx.MultiGraph()
for item in res._results.graph:
for node in item['nodes']:
graph.add_node(node['id'], properties=node['properties'], labels=node['labels'], names=node['properties']['name'], description=node['properties']['description'])
for rel in item['relationships']:
graph.add_edge(rel['startNode'], rel['endNode'], id=rel['id'], properties=rel['properties'], type=rel['type'])
return graph
def get_provenance_graph(self, start=None, filter_long_labels=True):
"""
Gets an nxgraph object corresponding to the provenance graph
Args:
start (nxgraph): starting graph to build from
filter_long_labels (bool): true truncates long labels to just the symbol name
Returns:
(nxgraph): graph representation of provenance
"""
graph = start or nx.MultiDiGraph()
label = "{}: {}".format(self.symbol.name, self.pretty_string())
if filter_long_labels and len(label) > 30:
label = "{}".format(self.symbol.name)
graph.add_node(
self, fillcolor="#43A1F8", fontcolor='white', label=label)
model = getattr(self.provenance, 'model', None)
source = getattr(self.provenance, 'source', None)
if model is not None:
model = "Model: {}".format(model)
graph.add_node(model, label=model, fillcolor='orange',
fontcolor='white', shape='rectangle')
graph.add_edge(model, self)
for model_input in self.provenance.inputs:
graph = model_input.get_provenance_graph(start=graph)
graph.add_edge(model_input, model)
elif source is not None:
def __init__(self, confFile):
"""Initialize transaction network.
:param confFile: Configuration (ini) file name
"""
self.g = nx.MultiDiGraph()
self.num_accounts = 0
self.degrees = dict()
self.hubs = list()
self.conf = ConfigParser()
self.conf.read(confFile)
self.seed = int(self.conf.get("General", "seed"))
np.random.seed(self.seed)
random.seed(self.seed)
self.factor = int(self.conf.get("Base", "edge_factor"))
self.prob = float(self.conf.get("Base", "triangle_prob"))
self.default_max_amount = parse_amount(self.conf.get("General", "default_max_amount"))
self.default_min_amount = parse_amount(self.conf.get("General", "default_min_amount"))
self.total_period = parse_int(self.conf.get("General", "total_period"))
elif not attr_dict:
logger.debug('No labels defined.')
else:
logger.debug('Checking guard.')
typed_attr = TypedDict()
typed_attr.set_types(self.graph._edge_label_types)
typed_attr.update(attr_dict)
ok = label_is_desired(typed_attr, with_attr_dict)
if ok:
logger.debug('Transition label matched desired label.')
transition = (u, v, dict(attr_dict))
found_transitions.append(transition)
return found_transitions
class LabeledDiGraph(nx.MultiDiGraph):
"""Directed multi-graph with constrained labeling.
Provides facilities to define labeling functions on
vertices and edges, with given co-domains,
so that labels are type-checked, not arbitrary.
Each state (or edge) is annotated with labels.
Before removing a value from a label type,
first make sure no state (or edge) is labeled with it.
Multiple edges with the same C{attr_dict} are not possible.
So the difference from C{networkx.MultiDiGraph} is that
the C{dict} of edges between u,v is a bijection.
Between two nodes either:
def to_nx(self, graph_id, directed=False, *args):
"""Convert the graph specified by its graph_id to networkx Directed Multi-graph"""
if (
False
): # graph_id in self.G_nx.keys(): # if self.G_nx[graph_id] already exists, just return it, otherwise evaluate it
return self.G_nx[graph_id]
else: # we always re-calculate self.G_nx, since directed argument can change
print("Converting the EPGM graph {} to NetworkX graph...".format(graph_id))
if not any([graph_id in g["id"] for g in self.G["graphs"]]):
raise Exception("Graph with id {} does not exist".format(graph_id))
self.G_nx[
graph_id
] = (
nx.MultiDiGraph()
) # create an empty directed graph that can store multiedges
# add nodes to self.G_nx[graph_id], together with their attributes stored in 'data':
self.G_nx[graph_id].add_nodes_from(
[
(v["id"], {**v["data"], **{"label": v["meta"].get("label", "")}})
for v in self.G["vertices"]
]
)
# add edges to self.G_nx[graph_id], together with their attributes stored in 'data':
# I have added the edge label in the edge data; sets the label to '' if the edges don't have a label
self.G_nx[graph_id].add_edges_from(
[
(
e["source"],
e["target"],
e["id"],
e = FancyArrowPatch(n1.center,n2.center,patchA=n1,patchB=n2,
arrowstyle='-|>',
connectionstyle='arc3,rad=%s'%rad,
mutation_scale=10.0,
lw=lw,
alpha=a,
color=ec)
seen[(u,v)]=rad
ax.add_patch(e)
return e
if __name__ == "__main__":
G=nx.MultiDiGraph([(1,2),(1,2),(2,3),(3,4),(2,4),
(1,2),(1,2),(1,2),(2,3),(3,4),(2,4)]
)
pos=nx.spring_layout(G)
ax=plt.gca()
draw_curvy_network(G,pos,ax,node_color='r', node_edge_color='r')
ax.autoscale()
plt.axis('equal')
plt.axis('off')
#plt.savefig("graph.pdf")
plt.show()
def multidigraph_from_flow_catalogue (fc_dict):
"""Transform the catalogue of the flows in a nx multidigraph"""
nx_flows = nx.MultiDiGraph()
for flow_id, (src, dst, flow_dict) in fc_dict.iteritems():
if 'out' in flow_dict and 'size' in flow_dict['out']:
nx_flows.add_edge(src, dst, flow_id, {'size':flow_dict['out']['size'], 'path':[]})
if 'in' in flow_dict and 'size' in flow_dict['in']:
nx_flows.add_edge(dst, src, flow_id, {'size':flow_dict['in']['size'], 'path':[]})
return nx_flows