Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
#!/usr/bin/env python
# encoding: utf-8
"""
flow.py
Created by Thomas Mangin on 2010-01-14.
Copyright (c) 2009-2015 Exa Networks. All rights reserved.
"""
import unittest
from exabgp.configuration.environment import environment
env = environment.setup('')
from exabgp.bgp.message.update.nlri.flow import *
from exabgp.protocol.ip.inet import *
from exabgp.bgp.message.update.attribute.community import *
class TestFlow (unittest.TestCase):
def setUp(self):
pass
def test_rule (self):
components = {
'destination': Destination("192.0.2.0",24),
'source' : Source("10.1.2.0",24),
'anyport_1' : AnyPort(NumericOperator.EQ,25),
#!/usr/bin/env python
# encoding: utf-8
"""
protocol.py
Created by Thomas Mangin on 2009-08-27.
Copyright (c) 2009-2015 Exa Networks. All rights reserved.
"""
import unittest
from exabgp.configuration.environment import environment
env = environment.setup('')
from exabgp.bgp.message.open import Open
from exabgp.bgp.message.open import Capabilities
from exabgp.bgp.message.open import new_Open
from exabgp.bgp.message.notification import Notification
from exabgp.bgp.message.keepalive import KeepAlive
from exabgp.bgp.message.keepalive import new_KeepAlive
from exabgp.bgp.message.update import Update
from exabgp.bgp.message.update import Attributes
from exabgp.rib.table import Table
from exabgp.rib.delta import Delta
from exabgp.reactor.protocol import Protocol
from exabgp.bgp.neighbor import Neighbor
from StringIO import StringIO
def api_eor (self, command):
tokens = formated(command).split(' ')[2:]
number = len(tokens)
if not number:
return Family(1,1)
if number != 2:
return False
afi = AFI.fromString(tokens[0])
if afi == AFI.undefined:
return False
safi = SAFI.fromString(tokens[1])
if safi == SAFI.undefined:
return False
return Family(afi,safi)
def fromElements (cls, prefix, suffix):
try:
if '.' in prefix:
data = [character(0),character(1)]
data.extend([character(int(_)) for _ in prefix.split('.')])
data.extend([character(suffix >> 8),character(suffix & 0xFF)])
distinguisher = concat_bytes_i(data)
else:
number = int(prefix)
if number < pow(2,16) and suffix < pow(2,32):
distinguisher = character(0) + character(0) + pack('!H',number) + pack('!L',suffix)
elif number < pow(2,32) and suffix < pow(2,16):
distinguisher = character(0) + character(2) + pack('!L',number) + pack('!H',suffix)
else:
raise ValueError('invalid route-distinguisher %s' % number)
return cls(distinguisher)
except ValueError:
raise ValueError('invalid route-distinguisher %s:%s' % (prefix,suffix))
def create (cls,string,afi):
if afi == AFI.ipv4:
if string is cls.codes:
klass = cls(cls.codes[string],32)
klass.maximum = 32
return klass
maximum = 32
elif afi == AFI.ipv6:
if string in cls.codes:
raise ValueError('IPv4 mask used with an IPv6 address')
maximum = 128
else:
raise ValueError('invalid address family')
if not string.isdigit():
raise ValueError('invalid netmask %s' % string)
value = long(string)
ip,mask = ip.split('/')
mask = int(mask)
except ValueError:
mask = 32
try:
if 'rd' in tokens:
klass = MPLS
elif 'route-distinguisher' in tokens:
klass = MPLS
elif 'label' in tokens:
klass = MPLS
else:
klass = INET
# nexthop must be false and its str return nothing .. an empty string does that
update = Change(klass(afi=IP.toafi(ip),safi=IP.tosafi(ip),packed=IP.pton(ip),mask=mask,nexthop=None,action=OUT.ANNOUNCE),Attributes())
except ValueError:
return self.error.set(self.syntax)
if 'announce' not in self.scope.content[-1]:
self.scope.content[-1]['announce'] = []
self.scope.content[-1]['announce'].append(update)
return True
def prefix (tokeniser):
# XXX: could raise
ip = tokeniser()
try:
ip,mask = ip.split('/')
except ValueError:
mask = '32'
if ':' in ip:
mask = '128'
tokeniser.afi = IP.toafi(ip)
return IPRange.create(ip,mask)
self.peer = peer
self.neighbor = peer.neighbor
self.negotiated = Negotiated(self.neighbor)
self.connection = None
if self.neighbor.connect:
self.port = self.neighbor.connect
elif os.environ.get('exabgp.tcp.port','').isdigit():
self.port = int(os.environ.get('exabgp.tcp.port'))
elif os.environ.get('exabgp_tcp_port','').isdigit():
self.port = int(os.environ.get('exabgp_tcp_port'))
else:
self.port = 179
from exabgp.configuration.environment import environment
self.log_routes = peer.neighbor.adj_rib_in or environment.settings().log.routes
def _newASN (self, value):
if value.count('.'):
high,low = value.split('.',1)
as_number = (int(high) << 16) + int(low)
else:
as_number = int(value)
return ASN(as_number)
def _newASN (self,value):
if value.count('.'):
high,low = value.split('.',1)
asn = (int(high) << 16) + int(low)
else:
asn = int(value)
return ASN(asn)