Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import binascii
from struct import unpack
from exabgp.vendoring import six
from exabgp.util import concat_bytes_i
from exabgp.bgp.message.update.attribute.attribute import Attribute
# =====================================================================
# draft-ietf-idr-bgp-prefix-sid
# This Attribute may contain up to 3 TLVs
# Label-Index TLV ( type = 1 ) is mandatory for this attribute.
@Attribute.register()
class PrefixSid (Attribute):
ID = Attribute.CODE.BGP_PREFIX_SID
FLAG = FLAG = Attribute.Flag.TRANSITIVE | Attribute.Flag.OPTIONAL
CACHING = True
TLV = -1
# Registered subclasses we know how to decode
registered_srids = dict()
def __init__(self, sr_attrs, packed=None):
self.sr_attrs = sr_attrs
self._packed = self._attribute(packed if packed else concat_bytes_i(_.pack() for _ in sr_attrs))
@classmethod
def register(cls,srid=None,flag=None):
def register_srid (klass):
scode = klass.TLV if srid is None else srid
if scode in cls.registered_srids:
def flag_attribute_content (data):
flag = Attribute.Flag(ord(data[0]))
attr = Attribute.CODE(ord(data[1]))
if flag & Attribute.Flag.EXTENDED_LENGTH:
length = unpack('!H',data[2:4])[0]
return flag, attr, data[4:length+4]
else:
length = ord(data[2])
return flag, attr, data[3:length+3]
t = ordinal(data[0])
length = unpack('!H',data[1:3])[0]
v,data = data[3:length],data[length:]
yield TLV(t,v)
return TLVS(list(loop(data)))
def pack (self):
return concat_bytes_i(concat_bytes(character(tlv.type),pack('!H',len(tlv.value)+3),tlv.value) for tlv in self)
# ==================================================================== AIGP (26)
#
@Attribute.register()
class AIGP (Attribute):
ID = Attribute.CODE.AIGP
FLAG = Attribute.Flag.OPTIONAL
CACHING = True
TYPES = [1,]
__slots__ = ['aigp','_packed']
def __init__ (self, aigp, packed=None):
self.aigp = aigp
if packed:
self._packed = packed
else:
self._packed = self._attribute(aigp)
def __eq__ (self, other):
return \
self.ID == other.ID and \
from exabgp.protocol.ip import IPv4
from exabgp.bgp.message.update.attribute.attribute import Attribute
# ===================================================================
#
class ClusterID (IPv4):
def __init__ (self, ip):
IPv4.__init__(self,ip)
@Attribute.register()
class ClusterList (Attribute):
ID = Attribute.CODE.CLUSTER_LIST
FLAG = Attribute.Flag.OPTIONAL
CACHING = True
__slots__ = ['clusters','packed','_len']
def __init__ (self, clusters, packed=None):
self.clusters = clusters
self._packed = self._attribute(packed if packed else concat_bytes_i(_.pack() for _ in clusters))
self._len = len(clusters)*4
def __eq__ (self, other):
return \
self.ID == other.ID and \
self.FLAG == other.FLAG and \
self.clusters == other.clusters
"""
from exabgp.util import ordinal
from exabgp.bgp.message.update.attribute.attribute import Attribute
from exabgp.bgp.message.update.attribute.community.extended.community import ExtendedCommunity
from exabgp.bgp.message.update.attribute.community.initial.communities import Communities
from exabgp.bgp.message.notification import Notify
# ===================================================== ExtendedCommunities (16)
# https://www.iana.org/assignments/bgp-extended-communities
@Attribute.register()
class ExtendedCommunities (Communities):
ID = Attribute.CODE.EXTENDED_COMMUNITY
@staticmethod
def unpack (data, negotiated):
communities = ExtendedCommunities()
while data:
if data and len(data) < 8:
raise Notify(3,1,'could not decode extended community %s' % str([hex(ordinal(_)) for _ in data]))
communities.add(ExtendedCommunity.unpack(data[:8],negotiated))
data = data[8:]
return communities
def json (self):
return '{ "asn" : %d, "speaker" : "%d" }' % (self.asn,self.speaker)
@classmethod
def unpack (cls, data, negotiated):
if negotiated.asn4:
return cls(ASN.unpack(data[:4]),IPv4.unpack(data[-4:]))
return cls(ASN.unpack(data[:2]),IPv4.unpack(data[-4:]))
# ============================================================== AGGREGATOR (18)
#
@Attribute.register()
class Aggregator4 (Aggregator):
ID = Attribute.CODE.AS4_AGGREGATOR
if sys.version_info[0] < 3:
__slots__ = ['pack']
def pack (self, negotiated):
return self._attribute(self.asn.pack(True)+self.speaker.pack())
Created by Thomas Mangin on 2009-11-05.
Copyright (c) 2009-2017 Exa Networks. All rights reserved.
License: 3-clause BSD. (See the COPYRIGHT file)
"""
from exabgp.util import character
from exabgp.util import ordinal
from exabgp.bgp.message.update.attribute.attribute import Attribute
# =================================================================== Origin (1)
@Attribute.register()
class Origin (Attribute):
ID = Attribute.CODE.ORIGIN
FLAG = Attribute.Flag.TRANSITIVE
CACHING = True
IGP = 0x00
EGP = 0x01
INCOMPLETE = 0x02
__slots__ = ['origin','_packed']
def __init__ (self, origin, packed=None):
self.origin = origin
self._packed = self._attribute(packed if packed else character(origin))
def __eq__ (self, other):
return \
self.ID == other.ID and \
logger.parser(LazyFormat("parsing flag %x type %02x (%s) len %02x %s" % (flag,int(aid),aid,length,'payload ' if length else ''),data[:length]))
# remove the PARTIAL bit before comparaison if the attribute is optional
if aid in Attribute.attributes_optional:
flag &= Attribute.Flag.MASK_PARTIAL & 0xFF
# flag &= ~Attribute.Flag.PARTIAL & 0xFF # cleaner than above (python use signed integer for ~)
# handle the attribute if we know it
if Attribute.registered(aid,flag):
self.add(Attribute.unpack(aid,flag,attribute,negotiated))
return self.parse(left,negotiated)
# XXX: FIXME: we could use a fallback function here like capability
# if we know the attribute but the flag is not what the RFC says. ignore it.
if aid in Attribute.attributes_known:
logger.parser('invalid flag for attribute %s (flag 0x%02X, aid 0x%02X)' % (Attribute.CODE.names.get(aid,'unset'),flag,aid))
return self.parse(left,negotiated)
# it is an unknown transitive attribute we need to pass on
if flag & Attribute.Flag.TRANSITIVE:
logger.parser('unknown transitive attribute (flag 0x%02X, aid 0x%02X)' % (flag,aid))
self.add(GenericAttribute(aid,flag | Attribute.Flag.PARTIAL,attribute),attribute)
return self.parse(left,negotiated)
# it is an unknown non-transitive attribute we can ignore.
logger.parser('ignoring unknown non-transitive attribute (flag 0x%02X, aid 0x%02X)' % (flag,aid))
return self.parse(left,negotiated)
Copyright (c) 2009-2017 Exa Networks. All rights reserved.
License: 3-clause BSD. (See the COPYRIGHT file)
"""
from struct import pack
from struct import unpack
from exabgp.bgp.message.update.attribute.attribute import Attribute
# ========================================================= Local Preference (5)
#
@Attribute.register()
class LocalPreference (Attribute):
ID = Attribute.CODE.LOCAL_PREF
FLAG = Attribute.Flag.TRANSITIVE
CACHING = True
__slots__ = ['localpref','_packed']
def __init__ (self, localpref, packed=None):
self.localpref = localpref
self._packed = self._attribute(packed if packed is not None else pack('!L',localpref))
def __eq__ (self, other):
return \
self.ID == other.ID and \
self.FLAG == other.FLAG and \
self.localpref == other.localpref
def __ne__ (self, other):
from exabgp.bgp.message.update.attribute.attribute import Attribute
from exabgp.bgp.message.notification import Notify
# =================================================================== ASPath (2)
# only 2-4% of duplicated data therefore it is not worth to cache
@Attribute.register()
class ASPath (Attribute):
AS_SET = 0x01
AS_SEQUENCE = 0x02
AS_CONFED_SEQUENCE = 0x03
AS_CONFED_SET = 0x04
ASN4 = False
ID = Attribute.CODE.AS_PATH
FLAG = Attribute.Flag.TRANSITIVE
__slots__ = ['as_seq','as_set','as_cseq','as_cset','segments','_packed','index','_str','_json']
def __init__ (self, as_sequence, as_set, as_conf_sequence=None,as_conf_set=None,index=None):
self.as_seq = as_sequence
self.as_set = as_set
self.as_cseq = as_conf_sequence if as_conf_sequence is not None else []
self.as_cset = as_conf_set if as_conf_set is not None else []
self.segments = b''
self._packed = {True:b'',False:b''}
self.index = index # the original packed data, use for indexing
self._str = ''
self._json = {}
def __eq__ (self, other):