Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _detect_afi(ip):
if ':' in ip:
return AFI.ipv6
return AFI.ipv4
from exabgp.bgp.message.update.nlri.nlri import NLRI
from exabgp.bgp.message.update.nlri.cidr import CIDR
from exabgp.bgp.message.update.nlri.label import Label
from exabgp.bgp.message.update.nlri.qualifier import RouteDistinguisher
from exabgp.bgp.message.update.nlri.qualifier import PathInfo
from exabgp.protocol.ip import IP
from exabgp.protocol.ip import NoNextHop
# ====================================================== IPVPN
# RFC 4364
@NLRI.register(AFI.ipv4,SAFI.mpls_vpn)
@NLRI.register(AFI.ipv6,SAFI.mpls_vpn)
class IPVPN (Label):
__slots__ = ['rd']
def __init__ (self, afi, safi, action=OUT.UNSET):
Label.__init__(self, afi, safi, action)
self.rd = RouteDistinguisher.NORD
def feedback (self, action):
if self.nexthop is None and action == OUT.ANNOUNCE:
return 'ip-vpn nlri next-hop missing'
return ''
@classmethod
def new (cls, afi, safi, packed, mask, labels, rd, nexthop=None, action=OUT.UNSET):
instance = cls(afi,safi,action)
instance.cidr = CIDR(packed, mask)
FLAG = True
converter = staticmethod(converter(Fragment.named))
decoder = staticmethod(decoder(ord,Fragment))
# draft-raszuk-idr-flow-spec-v6-01
class FlowFlowLabel (IOperationByteShortLong,NumericString,IPv6):
ID = 0x0D
NAME = 'flow-label'
converter = staticmethod(converter(LabelValue))
decoder = staticmethod(_number)
# ..........................................................
decode = {AFI.ipv4: {}, AFI.ipv6: {}}
factory = {AFI.ipv4: {}, AFI.ipv6: {}}
for content in dir():
kls = globals().get(content,None)
if not isinstance(kls,type(IComponent)):
continue
if not issubclass(kls,IComponent):
continue
_ID = getattr(kls,'ID',None)
if not _ID:
continue
_afis = []
if issubclass(kls,IPv4):
_afis.append(AFI.ipv4)
if addpath:
path_identifier = PathInfo(None,None,data[:4])
data = data[4:]
length = 4
else:
path_identifier = None
length = 0
labels,rd,mask,size,prefix,left = NLRI._nlri(afi,safi,data,action)
nlri = cls(afi,safi,prefix,mask,NextHop.unpack(nexthop),action)
if addpath:
nlri.path_info = path_identifier
return length + len(data) - len(left),nlri
for safi in (SAFI.unicast, SAFI.multicast):
for afi in (AFI.ipv4, AFI.ipv6):
PathPrefix.register(afi,safi)
def unicast_v6 (tokeniser):
return ip_unicast(tokeniser,AFI.ipv6,SAFI.unicast)
def toafi (ip):
# the orders matters as ::FFFF: is an IPv6 address
if ':' in ip:
return AFI.ipv6
if '.' in ip:
return AFI.ipv4
raise ValueError('unrecognised ip address %s' % ip)
self.scope.extend('routes',routes)
self.scope.pop(self.name)
return True
class AnnounceIPv4 (ParseAnnounce):
name = 'ipv4'
afi = AFI.ipv4
def clear (self):
return True
class AnnounceIPv6 (ParseAnnounce):
name = 'ipv6'
afi = AFI.ipv6
def clear (self):
return True
class AnnounceL2VPN (ParseAnnounce):
name = 'l2vpn'
afi = AFI.l2vpn
def clear (self):
return True
def has_label (self):
if self.afi == AFI.ipv4 and self.safi in (SAFI.nlri_mpls,SAFI.mpls_vpn):
return True
if self.afi == AFI.ipv6 and self.safi == SAFI.mpls_vpn:
return True
return False
"""
from exabgp.protocol.family import AFI
from exabgp.protocol.family import SAFI
from exabgp.bgp.message.update.nlri.prefix import Prefix
from exabgp.bgp.message.update.nlri.nlri import NLRI
from exabgp.bgp.message.update.nlri.mpls import MPLS
from exabgp.bgp.message.update.nlri.vpls import VPLS
from exabgp.bgp.message.update.nlri.flow import Flow
NLRI.register_nlri(MPLS,AFI.ipv4,SAFI.unicast)
NLRI.register_nlri(MPLS,AFI.ipv6,SAFI.unicast)
NLRI.register_nlri(MPLS,AFI.ipv4,SAFI.multicast)
NLRI.register_nlri(MPLS,AFI.ipv6,SAFI.multicast)
NLRI.register_nlri(MPLS,AFI.ipv4,SAFI.nlri_mpls)
NLRI.register_nlri(MPLS,AFI.ipv6,SAFI.nlri_mpls)
NLRI.register_nlri(MPLS,AFI.ipv4,SAFI.mpls_vpn)
NLRI.register_nlri(MPLS,AFI.ipv6,SAFI.mpls_vpn)
NLRI.register_nlri(VPLS,AFI.l2vpn,SAFI.vpls)
NLRI.register_nlri(Flow,AFI.ipv4,SAFI.flow_ip)
NLRI.register_nlri(Flow,AFI.ipv6,SAFI.flow_ip)
NLRI.register_nlri(Flow,AFI.ipv4,SAFI.flow_vpn)
NLRI.register_nlri(Flow,AFI.ipv6,SAFI.flow_vpn)
# +------------------------------+
# | Capability Code (1 octet) |
# +------------------------------+
# | Capability Length (1 octet) |
# +------------------------------+
# | Capability Value (variable) |
# +------------------------------+
class Capabilities (dict):
_ADD_PATH = [
(AFI.ipv4,SAFI.unicast),
(AFI.ipv6,SAFI.unicast),
(AFI.ipv4,SAFI.nlri_mpls),
(AFI.ipv4,SAFI.mpls_vpn),
(AFI.ipv6,SAFI.mpls_vpn),
]
_NEXTHOP = [
(AFI.ipv4, SAFI.unicast, AFI.ipv6),
(AFI.ipv4, SAFI.multicast, AFI.ipv6),
(AFI.ipv4, SAFI.nlri_mpls, AFI.ipv6),
(AFI.ipv4, SAFI.mpls_vpn ,AFI.ipv6),
]
def announced (self, capability):
return capability in self
def __str__ (self):
r = []
for key in sorted(self.keys()):
r.append(str(self[key]))