Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
anm = autonetkit.anm.NetworkModel(all_multigraph=all_multigraph)
g_in = anm.initialise_input(input_graph)
# autonetkit.update_vis(anm)
# set defaults
if not g_in.data.specified_int_names:
# if not specified then automatically assign interface names
g_in.data.specified_int_names = False
#import autonetkit.plugins.graph_product as graph_product
# graph_product.expand(g_in) # apply graph products if relevant
expand_fqdn = False
# TODO: make this set from config and also in the input file
if expand_fqdn and len(ank_utils.unique_attr(g_in, "asn")) > 1:
# Multiple ASNs set, use label format device.asn
anm.set_node_label(".", ['label', 'asn'])
g_in.update(g_in.routers(platform="junosphere"), syntax="junos")
g_in.update(g_in.routers(platform="dynagen"), syntax="ios")
g_in.update(g_in.routers(platform="netkit"), syntax="quagga")
# TODO: is this used?
g_in.update(g_in.servers(platform="netkit"), syntax="quagga")
# TODO: check this is needed
#autonetkit.ank.set_node_default(g_in, specified_int_names=None)
g_graphics = anm.add_overlay("graphics") # plotting data
g_graphics.add_nodes_from(g_in, retain=['x', 'y', 'device_type',
'label', 'device_subtype', 'asn'])
def build_vrf(anm):
"""Build VRF Overlay"""
g_in = anm['input']
g_layer3 = anm['layer3']
g_vrf = anm.add_overlay("vrf")
import autonetkit
autonetkit.ank.set_node_default(g_in, vrf=None)
if not any(True for n in g_in.routers() if n.vrf):
log.debug("No VRFs set")
return
g_vrf.add_nodes_from(g_in.routers(), retain=["vrf_role", "vrf"])
allocate_vrf_roles(g_vrf)
vrf_pre_process(anm)
def is_pe_ce_edge(edge):
if not(edge.src in g_vrf and edge.dst in g_vrf):
return False
src_vrf_role = g_vrf.node(edge.src).vrf_role
dst_vrf_role = g_vrf.node(edge.dst).vrf_role
def allocate(G_phy, G_bgp):
log.info("Allocating route reflectors")
graph_phy = G_phy._graph
for asn, devices in G_phy.groupby("asn").items():
routers = [d for d in devices if d.is_router]
router_ids = list(ank_utils.unwrap_nodes(routers))
mapping_id_to_device = dict(zip(router_ids, routers)) # to reverse lookup id back to device
subgraph_phy = graph_phy.subgraph(router_ids)
if len(subgraph_phy) == 1:
continue # single node in graph, no ibgp
betw_cen = nx.degree_centrality(subgraph_phy)
ordered = sorted(subgraph_phy.nodes(), key = lambda x: betw_cen[x], reverse = True)
rr_count = len(subgraph_phy)/4 or 1# Take top 20% to be route reflectors
route_reflectors = ordered[:rr_count] # most connected x%
log.debug("Chose route_reflectors %s" % route_reflectors)
rr_clients = ordered[rr_count:] # the other routers
route_reflectors = list(ank_utils.wrap_nodes(G_bgp, route_reflectors))
rr_clients = list(ank_utils.wrap_nodes(G_bgp, rr_clients))
def network_hostname(node):
network = node.Network
if network:
return "%s_%s" % (ank.name_folder_safe(network),
ank.name_folder_safe(node.label))
else:
return ank.name_folder_safe(node.label)
def compile(self):
log.info("Compiling Dynagen for %s" % self.host)
g_phy = self.anm['phy']
G_graphics = self.anm['graphics']
ios_compiler = IosClassicCompiler(self.nidb, self.anm)
for phy_node in g_phy.routers(host=self.host, syntax='ios'):
DmNode = self.nidb.node(phy_node)
graphics_node = G_graphics.node(phy_node)
DmNode.render.template = os.path.join("templates", "ios.mako")
DmNode.render.dst_folder = os.path.join(
"rendered", self.host, "dynagen", self.config_dir)
DmNode.render.dst_file = "%s.cfg" % ank.name_folder_safe(
phy_node.label)
# TODO: may want to normalise x/y
DmNode.x = graphics_node.x
DmNode.y = graphics_node.y
# Allocate edges
# assign interfaces
# Note this could take external data
int_ids = self.interface_ids()
for interface in DmNode.physical_interfaces():
interface.id = int_ids.next()
ios_compiler.compile(DmNode)
self.allocate_ports()
def assign_asn_to_interasn_cds(G_ip):
# TODO: remove this if no longer needed
# TODO: rename to assign_asn_to_cds as also does intra-asn cds
# TODO: make this a common function to ip4 and ip6
G_phy = G_ip.overlay('phy')
for broadcast_domain in G_ip.nodes('broadcast_domain'):
neigh_asn = list(ank_utils.neigh_attr(G_ip, broadcast_domain,
'asn', G_phy)) # asn of neighbors
if len(set(neigh_asn)) == 1:
asn = set(neigh_asn).pop() # asn of any neigh, as all same
else:
# allocate bc to asn with most neighbors in it
asn = ank_utils.most_frequent(neigh_asn)
broadcast_domain.asn = asn
return
# temporary fix for gh-90
asn = ank_utils.neigh_most_frequent(
g_l2_bc, node, 'asn', g_phy) # arbitrary choice
node['graphics'].set('asn', asn)
node.set('asn', asn) # need to use asn in IP overlay for aggregating subnets
# also allocate an ASN for virtual switches
vswitches = [n for n in g_l2_bc.nodes()
if n['layer2'].get('device_type') == "switch"
and n['layer2'].get('device_subtype') == "virtual"]
for node in vswitches:
# TODO: refactor neigh_most_frequent to allow fallthrough attributes
asns = [n['layer2'].get('asn') for n in node.neighbors()]
asns = [x for x in asns if x is not None]
asn = ank_utils.most_frequent(asns)
node.set('asn', asn) # need to use asn in IP overlay for aggregating subnets
# also mark as broadcast domain
from collections import defaultdict
coincident_nodes = defaultdict(list)
for node in split_created_nodes:
coincident_nodes[(node['graphics'].get('x'), node['graphics'].get('y'))].append(node)
coincident_nodes = {k: v for k, v in coincident_nodes.items()
if len(v) > 1} # trim out single node co-ordinates
import math
for _, val in coincident_nodes.items():
for index, item in enumerate(val):
index += 1
x_offset = 25 * math.floor(index / 2) * math.pow(-1, index)
y_offset = -1 * 25 * math.floor(index / 2) * math.pow(-1, index)
def build_layer3(anm):
""" l3_connectivity graph: switch nodes aggregated and exploded"""
g_in = anm['input']
gl2_conn = anm['layer2_conn']
g_l3 = anm.add_overlay("layer3")
g_l3.add_nodes_from(gl2_conn, retain=['label'])
g_l3.add_nodes_from(g_in.switches(), retain=['asn'])
g_l3.add_edges_from(gl2_conn.edges())
switches = g_l3.switches()
ank_utils.aggregate_nodes(g_l3, switches)
exploded_edges = ank_utils.explode_nodes(g_l3,
switches)
# also explode virtual switches
vswitches = [n for n in g_l3.nodes()
if n['layer2'].device_type == "switch"
and n['layer2'].device_subtype == "virtual"]
# explode each seperately?
for edge in exploded_edges:
edge.multipoint = True
edge.src_int.multipoint = True
edge.dst_int.multipoint = True