Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
parts = str(ipNetwork).split("/")
ip = parts[0]
mask = parts[1]
if netaddr.valid_ipv4(ip) and int(mask) <= 32:
v4nets.append(ipNetwork)
elif netaddr.valid_ipv6(ip) and int(mask) <= 128:
v6nets.append(ipNetwork)
return v4nets, v6nets
v4nets, v6nets = netlist(nets)
# merge nets and print
for v4net in netaddr.cidr_merge(v4nets):
print(v4net)
for v6net in netaddr.cidr_merge(v6nets):
print(v6net)
The smaller the number, the more fine-grained splitting is.
Default is 5 bits.
:param convertoffset: offset to use when converting paths.
If you have topology that uses with nodeID of 0, set this to 1.
:return: a list of routes to be installed on the switches.
"""
routeList = []
for k in pptc:
numpaths = len(pptc[k])
if numpaths > 1:
# The complex case, need to compute a split between paths
assigned = self._computeSplit(k, pptc[k], blockbits, False)
for path in assigned:
sources, dests = zip(*assigned[path])
subsrcprefix = netaddr.cidr_merge(sources)
subdstprefix = netaddr.cidr_merge(dests)
# print path, subsrcprefix, subdstprefix
#TODO: test the correctness of this better
assert len(subsrcprefix) == len(subdstprefix)
for s, d in itertools.izip(subsrcprefix, subdstprefix):
routeList.append((convertPath(path, offset=convertoffset),
daylightGraph, str(s), str(d)))
else:
# Easy case, only one flow-carrying path
routeList.append((convertPath(pptc[k][0], offset=convertoffset),
daylightGraph,
k.srcprefix, k.dstprefix))
return routeList
print "baska bahara kaldi"
exit(0)
result = json.loads(result)
resultip = json.loads(ips)
block = {}
global myid
for node in resultip:
block2 = []
for node2 in resultip[node]:
myid = node2["ip_inst_id"]
if node2["ip_type"]==4:
if node2["ip_prefixlen"] ==None:
ip_list = list(netaddr.iter_iprange(str(node2["ip_begin"]),str(node2["ip_end"])))
ip_list = netaddr.cidr_merge(ip_list)
for i in ip_list:
block2.append( str(i))
else:
ip_range = str(node2["ip_begin"]+"-"+node2["ip_end"])
block2.append( str(IP(ip_range)))
block[myid]=block2
print "taken all data from ws"
def generateAllPaths(self, pptc, optPaths, blockbits=5):
pathList = []
self.filterPaths(pptc, optPaths)
pptc = self.pptc
for tc in pptc:
numpath = len(pptc[tc])
if numpath <= 1:
pathList.append((pptc[tc], tc.srcIPPrefix, tc.dstIPPrefix))
else:
assigned = self._computeSplit(tc, pptc[tc], blockbits, False)
for path in assigned:
sources, dests = zip(*assigned[path])
subsrcprefix = netaddr.cidr_merge(sources)
subdstprefix = netaddr.cidr_merge(dests)
# print path, subsrcprefix, subdstprefix
# TODO: test the correctness of this better
assert len(subsrcprefix) == len(subdstprefix)
for s, d in itertools.izip(subsrcprefix, subdstprefix):
pathList.append((path, str(s), str(d)))
return pathList
v6nets = []
for net in nets:
ipNetwork = netaddr.IPNetwork(net)
parts = str(ipNetwork).split("/")
ip = parts[0]
mask = parts[1]
if netaddr.valid_ipv4(ip) and int(mask) <= 32:
v4nets.append(ipNetwork)
elif netaddr.valid_ipv6(ip) and int(mask) <= 128:
v6nets.append(ipNetwork)
return v4nets, v6nets
v4nets, v6nets = netlist(nets)
# print ips
for net in netaddr.cidr_merge(v4nets) + netaddr.cidr_merge(v6nets):
for ip in net:
print(ip)
'total': 0,
'active': 0,
'reserved': 0,
'deprecated': 0,
'available': 0,
}
aggregate_list = Aggregate.objects.filter(family=family, rir=rir)
for aggregate in aggregate_list:
queryset = Prefix.objects.filter(prefix__net_contained_or_equal=str(aggregate.prefix))
# Find all consumed space for each prefix status (we ignore containers for this purpose).
active_prefixes = netaddr.cidr_merge(
[p.prefix for p in queryset.filter(status=PREFIX_STATUS_ACTIVE)]
)
reserved_prefixes = netaddr.cidr_merge(
[p.prefix for p in queryset.filter(status=PREFIX_STATUS_RESERVED)]
)
deprecated_prefixes = netaddr.cidr_merge(
[p.prefix for p in queryset.filter(status=PREFIX_STATUS_DEPRECATED)]
)
# Find all available prefixes by subtracting each of the existing prefix sets from the aggregate prefix.
available_prefixes = (
netaddr.IPSet([aggregate.prefix]) -
netaddr.IPSet(active_prefixes) -
netaddr.IPSet(reserved_prefixes) -
netaddr.IPSet(deprecated_prefixes)
)
# Add the size of each metric to the RIR total.
stats['total'] += int(aggregate.prefix.size / denominator)
`None`. Mutates the cls.OBJECT_STORE['cidr'] datastructure.
"""
if not 'cidr' in cls.OBJECT_STORE:
return
# step 1
merged = defaultdict(set)
for cidr, accounts in cls.OBJECT_STORE['cidr'].items():
for account in accounts:
merged[account].add(cidr)
del cls.OBJECT_STORE['cidr']
# step 2
for account, cidrs in merged.items():
merged_cidrs = netaddr.cidr_merge(cidrs)
for cidr in merged_cidrs:
add(cls.OBJECT_STORE['cidr'], str(cidr), account)
if n.loopback and n.loopback not in loopback_block]
if len(mismatched_nodes):
log.warning("IPv4 loopbacks set on nodes %s are not in global "
"loopback allocation block %s"
% (sorted(mismatched_nodes), loopback_block))
# mismatch = [n for n in g_ipv4.l3devices() if n.loopback not in
# also need to form aggregated IP blocks (used for e.g. routing prefix
# advertisement)
loopback_blocks = {}
for (asn, devices) in g_ipv4.groupby('asn').items():
routers = [d for d in devices if d.is_router()]
loopbacks = [r.loopback for r in routers]
loopback_blocks[asn] = netaddr.cidr_merge(loopbacks)
g_ipv4.data.loopback_blocks = loopback_blocks
# formatted = {key: [str(v) for v in val] for key, val in loopback_blocks.items()}
IPNetwork('173.124.0.0/18'),
IPNetwork('173.154.0.0/16'),
IPNetwork('193.99.144.85/32')]
'''
networks = sorted([IPNetwork(net) for net in networks])
new_networks = []
for chunk in chunks(networks, 2):
if len(chunk) > 1:
spanning = netaddr.spanning_cidr(chunk)
if spanning.prefixlen >= min_prefixlen:
new_networks.append(spanning)
else:
new_networks.extend(chunk)
else:
new_networks.append(chunk[0])
merged = netaddr.cidr_merge(new_networks)
return merged