Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
>>> K5 = nx.complete_graph(5)
>>> A = nx.nx_pydot.to_pydot(K5)
>>> G = nx.nx_pydot.from_pydot(A) # return MultiGraph
# make a Graph instead of MultiGraph
>>> G = nx.Graph(nx.nx_pydot.from_pydot(A))
"""
if P.get_strict(None): # pydot bug: get_strict() shouldn't take argument
multiedges=False
else:
multiedges=True
if P.get_type()=='graph': # undirected
if multiedges:
N = nx.MultiGraph()
else:
N = nx.Graph()
else:
if multiedges:
N = nx.MultiDiGraph()
else:
N = nx.DiGraph()
# assign defaults
name=P.get_name().strip('"')
if name != '':
N.name = name
# add nodes, attributes to N.node_attr
for p in P.get_node_list():
n=p.get_name().strip('"')
def get_graph(res, directed=True):
"""
This function takes the result (subgraph) of a ipython-cypher query and builds a networkx graph from it
:param res: output from an ipython-cypher query
:param directed: Flag indicating if the resulting graph should be treated as directed or not
:return: networkx graph (MultiDiGraph or MultiGraph)
"""
if nx is None:
raise ImportError("Try installing NetworkX first.")
if directed:
graph = nx.MultiDiGraph()
else:
graph = nx.MultiGraph()
for item in res._results.graph:
for node in item['nodes']:
graph.add_node(node['id'], properties=node['properties'], labels=node['labels'], names=node['properties']['name'], description=node['properties']['description'])
for rel in item['relationships']:
graph.add_edge(rel['startNode'], rel['endNode'], id=rel['id'], properties=rel['properties'], type=rel['type'])
return graph
if dic_arc_diam[arc] in dic_LoopedDiam_costs.keys():
dic_arc_optimalDiameters[arc] = (2, dic_newDiam_oldDiam[dic_arc_diam[arc]])
else:
dic_arc_optimalDiameters[arc] = (1, dic_arc_diam[arc])
else:
dic_arc_optimalDiameters[arc] = (1, dic_arc_diam[arc])
print("Optimal Diameters: arc: (number of pipes, diameter)")
print(dic_arc_optimalDiameters)
# plot network with new diameters
print("Network with optimized diameters")
print("Looped pipes are indicated by two colored edges")
print("Thicker edge means larger diameter")
finalG = nx.MultiGraph()
for arc in dic_arc_optimalDiameters.keys():
if dic_arc_optimalDiameters[arc][0] == 1:
# we have a single not looped pipe
finalG.add_edge(arc[0], arc[1], color='black', weight=5 * dic_arc_optimalDiameters[arc][1])
else:
# we have a looped pipe
finalG.add_edge(arc[0], arc[1], color='r',
weight=10 * dic_arc_optimalDiameters[arc][1])
finalG.add_edge(arc[0], arc[1], color='b',
weight=5 * dic_arc_optimalDiameters[arc][1])
# pos = nx.circular_layout(finalG)
edges = finalG.edges()
colors = []
Returns:
"""
# Generate the required base DataFrame from raw Annotations
df = hashed_annotations_graph_process(group_pk)
# Numpy Arr of Unique Annotations via sanitized text
nd_arr = df.clean_text.unique()
# Unique Node labels (not using text as Identifier)
names = ['n' + str(x + 1) for x in range(len(nd_arr))]
# ID >> Cleantext lookup dictionary
nodes = pd.Series(names, index=nd_arr).to_dict()
# Start the Network off by adding all the unique Nodes (text annotations)
G = nx.MultiGraph()
G.add_nodes_from(names)
anns = []
for (doc, text, user), g_df in df.groupby(['document_pk', 'clean_text', 'username']):
anns.append({
'node': nodes[text],
'doc': int(doc),
'text': text,
'user': user
})
new_df = pd.DataFrame(anns)
if parallel:
edge_idx = 1
for (doc, user), g_df in new_df.groupby(['doc', 'user']):
for node1, node2 in itertools.combinations(list(g_df.node), 2):
if "graph" not in dct:
raise NetworkXError("input contains no graph")
graph = dct["graph"]
if isinstance(graph, list):
raise NetworkXError("input contains more than one graph")
return graph
tokens = tokenize()
graph = parse_graph()
directed = graph.pop("directed", False)
multigraph = graph.pop("multigraph", False)
if not multigraph:
G = nx.DiGraph() if directed else nx.Graph()
else:
G = nx.MultiDiGraph() if directed else nx.MultiGraph()
graph_attr = {k: v for k, v in graph.items() if k not in ("node", "edge")}
G.graph.update(graph_attr)
def pop_attr(dct, category, attr, i):
try:
return dct.pop(attr)
except KeyError:
outputs = (category, i, attr)
raise NetworkXError("%s #%d has no '%s' attribute" % outputs)
nodes = graph.get("node", [])
mapping = {}
node_labels = set()
for i, node in enumerate(nodes if isinstance(nodes, list) else [nodes]):
id = pop_attr(node, "node", "id", i)
if id in G:
def get_graph_for_vlan(vlan):
"""Builds a simple topology graph of the active netboxes in vlan.
Any netbox that seems to be down at the moment will not be included in
the graph.
:returns: A networkx.Graph object.
"""
swpvlan = SwPortVlan.objects.filter(vlan=vlan).select_related(
'interface', 'interface__netbox', 'interface__to_netbox',
'interface__to_interface')
graph = networkx.MultiGraph(name='graph for vlan %s' % vlan)
for swp in swpvlan:
source = swp.interface.netbox
source_ifc = swp.interface
target = swp.interface.to_netbox
target_ifc = swp.interface.to_interface
if target:
key = tuple(sorted(
(source_ifc.id, target_ifc.id if target_ifc else None)))
data = set([source_ifc, target_ifc])
graph.add_edge(source, target, key=key, data=data)
return graph
"""
This function takes the result (subgraph) of a ipython-cypher query and builds a networkx graph from it
:param res: output from an ipython-cypher query
:param directed: Flag indicating if the resulting graph should be treated as directed or not
:return: networkx graph (MultiDiGraph or MultiGraph)
"""
if not res:
#raise Exception("Empty graph. Cypher query input returned no results.")
raise CustomExceptions.EmptyCypherError("unkown query")
if nx is None:
raise ImportError("Try installing NetworkX first.")
if multigraph:
if directed:
graph = nx.MultiDiGraph()
else:
graph = nx.MultiGraph()
else:
if directed:
graph = nx.DiGraph()
else:
graph = nx.Graph()
for item in res._results.graph:
for node in item['nodes']:
graph.add_node(node['id'], properties=node['properties'], labels=node['labels'], names=node['properties']['id'], description=node['properties']['name'], id=node['properties']['id'])
for rel in item['relationships']:
graph.add_edge(rel['startNode'], rel['endNode'], id=rel['id'], properties=rel['properties'], type=rel['type'])
return graph
def load_pickle_graph(graph_file_name: str) -> MultiGraph:
open_fn = gzip.open if graph_file_name.endswith(".gz") else open
with open_fn(graph_file_name, "rb") as file:
graph = pickle.load(file)
# Check file content
if not isinstance(graph, MultiGraph):
raise ValueError(f"{graph_file_name} does not contain a nx.MultiGraph")
return graph
def make_graph(self, graph_xml, graphml_keys, defaults, G=None):
# set default graph type
edgedefault = graph_xml.get("edgedefault", None)
if G is None:
if edgedefault == 'directed':
G = nx.MultiDiGraph()
else:
G = nx.MultiGraph()
# set defaults for graph attributes
G.graph['node_default'] = {}
G.graph['edge_default'] = {}
for key_id, value in defaults.items():
key_for = graphml_keys[key_id]['for']
name = graphml_keys[key_id]['name']
python_type = graphml_keys[key_id]['type']
if key_for == 'node':
G.graph['node_default'].update({name: python_type(value)})
if key_for == 'edge':
G.graph['edge_default'].update({name: python_type(value)})
# hyperedges are not supported
hyperedge = graph_xml.find("{%s}hyperedge" % self.NS_GRAPHML)
if hyperedge is not None:
raise nx.NetworkXError("GraphML reader doesn't support hyperedges")
# add nodes
contains functions that are useful for image analysis
'''
from __future__ import division
import cv2
import numpy as np
import networkx as nx
from shapely import geometry
import curves
class MorphologicalGraph(nx.MultiGraph):
""" class that represents a morphological graph.
Note that a morphological graph generally might have parallel edges.
"""
def __init__(self, *args, **kwargs):
super(MorphologicalGraph, self).__init__(*args, **kwargs)
self._unique_node_id = 0 #< used to ensure unique nodes
def get_node_points(self):
""" returns the coordinates of all nodes """
return nx.get_node_attributes(self, 'coords').values()
def get_edge_curves(self):
""" returns a list of all edge curves """