Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _route_origin (self,scope,tokens):
data = tokens.pop(0).lower()
if data == 'igp':
scope[-1]['routes'][-1].attributes.add(Origin(Origin.IGP))
return True
if data == 'egp':
scope[-1]['routes'][-1].attributes.add(Origin(Origin.EGP))
return True
if data == 'incomplete':
scope[-1]['routes'][-1].attributes.add(Origin(Origin.INCOMPLETE))
return True
self._error = self._str_route_error
if self.debug: raise
return False
return True
except KeyError:
return False
if not Attributes.cache:
for attribute in AID._str:
Attributes.cache[attribute] = Cache()
# There can only be one, build it now :)
Attributes.cache[AID.ATOMIC_AGGREGATE][''] = AtomicAggregate()
IGP = Origin(Origin.IGP)
EGP = Origin(Origin.EGP)
INC = Origin(Origin.INCOMPLETE)
Attributes.cache[AID.ORIGIN][IGP.pack()] = IGP
Attributes.cache[AID.ORIGIN][EGP.pack()] = EGP
Attributes.cache[AID.ORIGIN][INC.pack()] = INC
def _route_origin (self,scope,tokens):
data = tokens.pop(0).lower()
if data == 'igp':
scope[-1]['routes'][-1].attributes.add(Origin(Origin.IGP))
return True
if data == 'egp':
scope[-1]['routes'][-1].attributes.add(Origin(Origin.EGP))
return True
if data == 'incomplete':
scope[-1]['routes'][-1].attributes.add(Origin(Origin.INCOMPLETE))
return True
self._error = self._str_route_error
if self.debug: raise
return False
if self.hasmp:
if code not in (AID.MP_REACH_NLRI, AID.MP_UNREACH_NLRI):
self.cacheable = False
self.prefix = ''
else:
self.prefix += data[:offset+length]
data = data[offset:]
next = data[length:]
attribute = data[:length]
logger = Logger()
logger.parser(LazyFormat("parsing flag %x type %02x (%s) len %02x %s" % (flag,int(code),code,length,'payload ' if length else ''),od,data[:length]))
if code == AID.ORIGIN and flag.matches(Origin.FLAG):
# This if block should never be called anymore ...
if not self.add_from_cache(code,attribute):
self.add(Origin.unpack(attribute),attribute)
return self.factory(next)
# only 2-4% of duplicated data - is it worth to cache ?
if code == AID.AS_PATH and flag.matches(ASPath.FLAG):
if length:
# we store the AS4_PATH as AS_PATH, do not over-write
if not self.has(code):
if not self.add_from_cache(code,attribute):
self.add(self.__new_ASPath(attribute),attribute)
return self.factory(next)
if code == AID.AS4_PATH and flag.matches(AS4Path.FLAG):
if length:
def _route_origin (self,scope,tokens):
data = tokens.pop(0).lower()
if data == 'igp':
scope[-1]['routes'][-1].attributes.add(Origin(Origin.IGP))
return True
if data == 'egp':
scope[-1]['routes'][-1].attributes.add(Origin(Origin.EGP))
return True
if data == 'incomplete':
scope[-1]['routes'][-1].attributes.add(Origin(Origin.INCOMPLETE))
return True
self._error = self._str_route_error
if self.debug: raise
return False
def _route_origin (self,scope,tokens):
data = tokens.pop(0).lower()
if data == 'igp':
scope[-1]['routes'][-1].attributes.add(Origin(Origin.IGP))
return True
if data == 'egp':
scope[-1]['routes'][-1].attributes.add(Origin(Origin.EGP))
return True
if data == 'incomplete':
scope[-1]['routes'][-1].attributes.add(Origin(Origin.INCOMPLETE))
return True
self._error = self._str_route_error
if self.debug: raise
return False
def __len__ (self):
return len(self.pack())
def __str__ (self):
return 'MultiAttibutes(%s)' % ' '.join(str(_) for _ in self)
class Attributes (dict):
# we need this to not create an include loop !
nlriFactory = None
# A cache of parsed attributes
cache = {}
# A previously parsed object
cached = None
lookup = {
AID.ORIGIN : Origin, # 1
AID.AS_PATH : ASPath, # 2
# NextHop # 3
AID.MED : MED, # 4
AID.LOCAL_PREF : LocalPreference, # 5
AID.ATOMIC_AGGREGATE : AtomicAggregate, # 6
AID.AGGREGATOR : Aggregator, # 7
AID.COMMUNITY : Communities, # 8
AID.ORIGINATOR_ID : OriginatorID, # 9
AID.CLUSTER_LIST : ClusterList, # 10
AID.EXTENDED_COMMUNITY : ExtendedCommunities, # 16
AID.AS4_PATH : AS4Path, # 17
AID.AS4_AGGREGATOR : Aggregator, # 18
# AID.PMSI_TUNNEL : PMSI, # 22
AID.AIGP : AIGP, # 26
}
Attribute.CODE.ORIGIN: lambda l,r: Origin(Origin.IGP),
Attribute.CODE.AS_PATH: lambda l,r: ASPath([],[]) if l == r else ASPath([local_asn,],[]),