Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
ip,mask = ip.split('/')
mask = int(mask)
except ValueError:
mask = 32
try:
if 'rd' in tokens:
klass = MPLS
elif 'route-distinguisher' in tokens:
klass = MPLS
elif 'label' in tokens:
klass = MPLS
else:
klass = INET
# nexthop must be false and its str return nothing .. an empty string does that
update = Change(klass(afi=IP.toafi(ip),safi=IP.tosafi(ip),packed=IP.pton(ip),mask=mask,nexthop=None,action=OUT.ANNOUNCE),Attributes())
except ValueError:
return self.error.set(self.syntax)
if 'announce' not in self.scope.content[-1]:
self.scope.content[-1]['announce'] = []
self.scope.content[-1]['announce'].append(update)
return True
def prefix (tokeniser):
# XXX: could raise
ip = tokeniser()
try:
ip,mask = ip.split('/')
except ValueError:
mask = '32'
if ':' in ip:
mask = '128'
tokeniser.afi = IP.toafi(ip)
return IPRange.create(ip,mask)
def pack (self):
return concat_bytes(
pack('!B', self.TLV),
pack('!H', self.LENGTH),
pack('!B', 0),
pack('!H', 0),
IP.pton(self.v6sid)
)
def attributes (tokeniser):
ipmask = prefix(lambda: tokeniser.tokens[-1])
if 'rd' in tokeniser.tokens or 'route-distinguisher' in tokeniser.tokens:
nlri = IPVPN(IP.toafi(ipmask.top()),SAFI.mpls_vpn,OUT.ANNOUNCE)
elif 'label' in tokeniser.tokens:
nlri = Label(IP.toafi(ipmask.top()),SAFI.nlri_mpls,OUT.ANNOUNCE)
else:
nlri = INET(IP.toafi(ipmask.top()),IP.tosafi(ipmask.top()),OUT.ANNOUNCE)
nlri.cidr = CIDR(ipmask.pack(),ipmask.mask)
attr = Attributes()
labels = None
rd = None
while True:
command = tokeniser()
if not command:
return []
if command == 'nlri':
break
def mpls (tokeniser):
ipmask = prefix(tokeniser)
mpls = IPVPN(
afi=IP.toafi(ipmask.top()),
safi=IP.tosafi(ipmask.top()),
action=OUT.ANNOUNCE
)
mpls.cidr = CIDR(ipmask.ton(),ipmask.mask)
return Change(
mpls,
Attributes()
)
def __init__ (self, afi, safi, packed, mask, nexthop, action,path=None):
self.path_info = PathInfo.NOPATH if path is None else path
self.nexthop = IP.unpack(nexthop) if nexthop else NoIP
NLRI.__init__(self,afi,safi)
CIDR.__init__(self,packed,mask)
self.action = action
if self.scope.content[-1]['announce'][-1].attributes.has(Attribute.CODE.NEXT_HOP):
return self.error.set(self.syntax)
try:
# next-hop self is unsupported
ip = tokens.pop(0)
if ip.lower() == 'self':
if 'local-address' in self.scope.content[-1]:
la = self.scope.content[-1]['local-address']
elif self._nexthopself:
la = self._nexthopself
else:
return self.error.set('next-hop self can only be specified with a neighbor')
nh = IP.unpack(la.pack())
else:
nh = IP.create(ip)
change = self.scope.content[-1]['announce'][-1]
nlri = change.nlri
afi = nlri.afi
safi = nlri.safi
nlri.nexthop = nh
if afi == AFI.ipv4 and safi in (SAFI.unicast,SAFI.multicast):
change.attributes.add(Attribute.unpack(NextHop.ID,NextHop.FLAG,nh.packed,None))
# NextHop(nh.ip,nh.packed) does not cache the result, using unpack does
# change.attributes.add(NextHop(nh.ip,nh.packed))
return True
except Exception:
return self.error.set(self.syntax)
if md5_base64 is True:
try:
md5_bytes = base64.b64decode(md5)
except TypeError:
raise MD5Error("Failed to decode base 64 encoded PSK")
elif md5_base64 is None and not re.match('.*[^a-f0-9].*', md5): # auto
options = [md5+'==', md5+'=', md5]
for md5 in options:
try:
md5_bytes = base64.b64decode(md5)
break
except TypeError:
pass
# __kernel_sockaddr_storage
n_af = IP.toaf(ip)
n_addr = IP.pton(ip)
n_port = socket.htons(port)
# pack 'x' is padding, so we want the struct
# Do not use '!' for the pack, the network (big) endian switch in
# struct.pack is fighting against inet_pton and htons (note the n)
if IP.toafi(ip) == AFI.ipv4:
# SS_MAXSIZE is 128 but addr_family, port and ipaddr (8 bytes total) are written independently of the padding
SS_MAXSIZE_PADDING = 128 - calcsize('HH4s') # 8
sockaddr = pack('HH4s%dx' % SS_MAXSIZE_PADDING, socket.AF_INET, n_port, n_addr)
else:
SS_MAXSIZE_PADDING = 128 - calcsize('HI16sI') # 28
SIN6_FLOWINFO = 0
SIN6_SCOPE_ID = 0
sockaddr = pack('HHI16sI%dx' % SS_MAXSIZE_PADDING, n_af, n_port, SIN6_FLOWINFO, n_addr, SIN6_SCOPE_ID)
def unpack (cls, data):
if len(data) == 4:
# IPv4 address
addr = IP.unpack(data[:4])
elif len(data) == 16:
# IPv6
addr = IP.unpack(data[:16])
return cls(addr=addr)
def __repr__ (self):
return IP.__repr__(self)