Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _newASN (self, value):
if value.count('.'):
high,low = value.split('.',1)
as_number = (int(high) << 16) + int(low)
else:
as_number = int(value)
return ASN(as_number)
def _newASN (self,value):
if value.count('.'):
high,low = value.split('.',1)
asn = (int(high) << 16) + int(low)
else:
asn = int(value)
return ASN(asn)
def asn (tokeniser, value=None):
value = tokeniser() if value is None else value
try:
if value.count('.'):
high,low = value.split('.',1)
as_number = (int(high) << 16) + int(low)
else:
as_number = int(value)
return ASN(as_number)
except ValueError:
raise ValueError('"%s" is an invalid ASN' % value)
def _newASN (self,value):
if value.count('.'):
high,low = value.split('.',1)
asn = (int(high) << 16) + int(low)
else:
asn = int(value)
return ASN(asn)
def _route_aggregator (self,scope,tokens):
try:
if tokens:
if tokens.pop(0) != '(':
raise ValueError('invalid aggregator syntax')
asn,address = tokens.pop(0).split(':')
if tokens.pop(0) != ')':
raise ValueError('invalid aggregator syntax')
local_as = ASN(asn)
local_address = RouterID(address)
else:
local_as = scope[-1]['local-as']
local_address = scope[-1]['local-address']
except (ValueError,IndexError):
self._error = self._str_route_error
if self.debug: raise
return False
except KeyError:
self._error = 'local-as and/or local-address missing from neighbor/group to make aggregator'
if self.debug: raise
return False
except ValueError:
self._error = self._str_route_error
if self.debug: raise
return False
def unpack (cls, data, negotiated):
if negotiated.asn4:
return cls(ASN.unpack(data[:4]),IPv4.unpack(data[-4:]))
return cls(ASN.unpack(data[:2]),IPv4.unpack(data[-4:]))
def unpack (data):
asn,target = unpack('!HL',data[2:8])
return TrafficRedirect(ASN(asn),target,data[:8])
def _flow_route_rate_limit (self,scope,tokens):
# README: We are setting the ASN as zero as that what Juniper (and Arbor) did when we created a local flow route
try:
speed = int(tokens[0])
if speed < 9600 and speed != 0:
self.logger.warning("rate-limiting flow under 9600 bytes per seconds may not work",'configuration')
if speed > 1000000000000:
speed = 1000000000000
self.logger.warning("rate-limiting changed for 1 000 000 000 000 bytes from %s" % tokens[0],'configuration')
scope[-1]['routes'][-1].add_action(to_FlowTrafficRate(ASN(0),speed))
return True
except ValueError:
self._error = self._str_route_error
if self.debug: raise
return False