Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _key_values (name, data):
if len(data) < 2:
raise Notify(2,0,"Bad length for OPEN %s (<2) %s" % (name,Capability.hex(data)))
ld = ordinal(data[1])
boundary = ld+2
if len(data) < boundary:
raise Notify(2,0,"Bad length for OPEN %s (buffer underrun) %s" % (name,Capability.hex(data)))
key = ordinal(data[0])
value = data[2:boundary]
rest = data[boundary:]
return key,value,rest
def klass (cls, what):
if what in cls.registered_capability:
kls = cls.registered_capability[what]
kls.ID = what
return kls
if cls.unknown_capability:
return cls.unknown_capability
raise Notify (2,4,'can not handle capability %s' % what)
def unpack (cls,data,length):
if length != 4:
raise Notify(3,5, "Unable to decode attribute. Incorrect Size")
else:
b = BitArray(bytes=data)
colormask = b.unpack('uintbe:32')
return cls(colormask=colormask)
if ID not in (FlowDestination.ID,FlowSource.ID):
ordered_rules.append(chr(ID))
ordered_rules.append(''.join(rule.pack() for rule in rules))
components = ''.join(ordered_rules)
if self.safi == SAFI.flow_vpn:
components = self.rd.pack() + components
l = len(components)
if l < 0xF0:
data = "%s%s" % (chr(l),components)
elif l < 0x0FFF:
data = "%s%s" % (pack('!H',l | 0xF000),components)
else:
raise Notify(3,0,"rule too big for NLRI - how to handle this - does this work ?")
# data = "%s" % chr(0)
return data
raise Notify(3,11,'invalid AS Path type sent %d' % stype)
end = 2+(slen*length)
sdata = data[2:end]
data = data[end:]
asns = as_choice[stype]
for i in range(slen):
asn = unpack(upr,sdata[:length])[0]
asns.append(ASN(asn))
sdata = sdata[length:]
except IndexError:
raise Notify(3,11,'not enough data to decode AS_PATH or AS4_PATH')
except error: # struct
raise Notify(3,11,'not enough data to decode AS_PATH or AS4_PATH')
return klass(as_seq,as_set,backup)
else:
attributes = cls().parse(data,negotiated)
if Attribute.CODE.AS_PATH in attributes and Attribute.CODE.AS4_PATH in attributes:
attributes.merge_attributes()
if Attribute.CODE.MP_REACH_NLRI not in attributes and Attribute.CODE.MP_UNREACH_NLRI not in attributes:
cls.previous = data
cls.cached = attributes
else:
cls.previous = ''
cls.cache = None
return attributes
except IndexError:
raise Notify(3,2,data)
def unpack (cls,data,length):
v6sid = -1
if cls.LENGTH != length:
raise Notify(3,5, "Invalid TLV size. Should be {0} but {1} received".format(cls.LENGTH, length))
# RESERVED: 24 bit field for future use. MUST be clear on
# transmission an MUST be ignored at reception.
data = data[3:19]
v6sid = IP.unpack(data)
return cls(v6sid=v6sid,packed=data)
length = len(data)
len_withdrawn = unpack('!H',data[0:2])[0]
withdrawn = data[2:len_withdrawn+2]
if len(withdrawn) != len_withdrawn:
raise Notify(3,1,'invalid withdrawn routes length, not enough data available')
start_attributes = len_withdrawn+4
len_attributes = unpack('!H',data[len_withdrawn+2:start_attributes])[0]
start_announced = len_withdrawn+len_attributes+4
attributes = data[start_attributes:start_announced]
announced = data[start_announced:]
if len(attributes) != len_attributes:
raise Notify(3,1,'invalid total path attribute length, not enough data available')
if 2 + len_withdrawn + 2 + len_attributes + len(announced) != length:
raise Notify(3,1,'error in BGP message length, not enough data for the size announced')
return withdrawn,attributes,announced
what,bgp = ordinal(bgp[0]),bgp[1:]
if what not in decode.get(afi,{}):
raise Notify(3,10,'unknown flowspec component received for address family %d' % what)
seen.append(what)
if sorted(seen) != seen:
raise Notify(3,10,'components are not sent in the right order %s' % seen)
decoded = decode[afi][what]
klass = factory[afi][what]
if decoded == 'prefix':
adding,bgp = klass.make(bgp)
if not nlri.add(adding):
raise Notify(3,10,'components are incompatible (two sources, two destinations, mix ipv4/ipv6) %s' % seen)
else:
end = False
while not end:
byte,bgp = ordinal(bgp[0]),bgp[1:]
end = CommonOperator.eol(byte)
operator = CommonOperator.operator(byte)
length = CommonOperator.length(byte)
value,bgp = bgp[:length],bgp[length:]
adding = klass.decoder(value)
nlri.add(klass(operator,adding))
return nlri, bgp+over