Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
ip,mask = ip.split('/')
mask = int(mask)
except ValueError:
mask = 32
try:
if 'rd' in tokens:
klass = MPLS
elif 'route-distinguisher' in tokens:
klass = MPLS
elif 'label' in tokens:
klass = MPLS
else:
klass = Prefix
# nexthop must be false and its str return nothing .. an empty string does that
update = Change(klass(afi=IP.toafi(ip),safi=IP.tosafi(ip),packed=IP.pton(ip),mask=mask,nexthop=None,action=OUT.ANNOUNCE),Attributes())
except ValueError:
self._error = self._str_route_error
if self.debug: raise Exception() # noqa
return False
if 'announce' not in scope[-1]:
scope[-1]['announce'] = []
scope[-1]['announce'].append(update)
return True
# packed and not pack() but does not matter atm, it is an IP not a NextHop
nexthop = change.nlri.nexthop.packed
change.nlri.mask = split
change.nlri = None
# generate the new routes
for _ in range(number):
# update ip to the next route, this recalculate the "ip" field of the Inet class
nlri = klass(afi,safi,pack_int(afi,ip,split),split,nexthop,OUT.ANNOUNCE,path_info)
if klass is MPLS:
nlri.labels = labels
nlri.rd = rd
# next ip
ip += increment
# save route
scope[-1]['announce'].append(Change(nlri,change.attributes))
return True
def _insert_l2vpn_vpls (self, scope, tokens=None):
try:
attributes = Attributes()
change = Change(VPLS(None,None,None,None,None),attributes)
except ValueError:
self._error = self._str_vpls_error
if self.debug: raise Exception() # noqa
return False
if 'announce' not in scope[-1]:
scope[-1]['announce'] = []
scope[-1]['announce'].append(change)
return True
def flow (tokeniser,afi,safi):
change = Change(
Flow(afi,safi,OUT.ANNOUNCE),
Attributes()
)
while True:
command = tokeniser()
if not command:
break
action = AnnounceFlow.action[command]
if action == 'nlri-add':
for adding in AnnounceFlow.known[command](tokeniser):
change.nlri.add(adding)
elif action == 'attribute-add':
def ip_unicast (tokeniser,afi,safi):
action = OUT.ANNOUNCE if tokeniser.announce else OUT.WITHDRAW
ipmask = prefix(tokeniser)
nlri = INET(afi, safi, action)
nlri.cidr = CIDR(ipmask.pack(),ipmask.mask)
change = Change(
nlri,
Attributes()
)
while True:
command = tokeniser()
if not command:
break
action = AnnouncePath.action.get(command,'')
if action == 'attribute-add':
change.attributes.add(AnnouncePath.known[command](tokeniser))
elif action == 'nlri-set':
change.nlri.assign(AnnouncePath.assign[command],AnnouncePath.known[command](tokeniser))
def ip_vpn (tokeniser,afi,safi):
action = OUT.ANNOUNCE if tokeniser.announce else OUT.WITHDRAW
ipmask = prefix(tokeniser)
nlri = IPVPN(afi, safi, action)
nlri.cidr = CIDR(ipmask.pack(),ipmask.mask)
change = Change(
nlri,
Attributes()
)
while True:
command = tokeniser()
if not command:
break
action = AnnounceVPN.action.get(command,'')
if action == 'attribute-add':
change.attributes.add(AnnounceVPN.known[command](tokeniser))
elif action == 'nlri-set':
change.nlri.assign(AnnounceVPN.assign[command],AnnounceVPN.known[command](tokeniser))
while not self._teardown:
for message in self.proto.read_message():
self.recv_timer.check_ka(message)
if send_ka() is not False:
# we need and will send a keepalive
while send_ka() is None:
yield ACTION.NOW
# Received update
if message.TYPE == Update.TYPE:
number += 1
self.logger.debug('<< UPDATE #%d' % number,self.id())
for nlri in message.nlris:
self.neighbor.rib.incoming.update_cache(Change(nlri,message.attributes))
self.logger.debug(LazyFormat(' UPDATE #%d nlri ' % number,nlri,str),self.id())
elif message.TYPE == RouteRefresh.TYPE:
if message.reserved == RouteRefresh.request:
self._resend_routes = SEND.REFRESH
send_families.append((message.afi,message.safi))
# SEND OPERATIONAL
if self.neighbor.operational:
if not operational:
new_operational = self.neighbor.messages.popleft() if self.neighbor.messages else None
if new_operational:
operational = self.proto.new_operational(new_operational,self.proto.negotiated)
if operational:
try: