Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def unpack (cls, data):
rd = RouteDistinguisher.unpack(data[:8])
esi = ESI.unpack(data[8:18])
iplen = ordinal(data[18])
if iplen not in (32,128):
raise Notify(3,5,"IP length field is given as %d in current Segment, expecting 32 (IPv4) or 128 (IPv6) bits" % iplen)
ip = IP.unpack(data[19:19+(iplen//8)])
return cls(rd,esi,ip,data)
def unpack (cls,data,length):
if len(data) == 4:
# IPv4 address
addr = IP.unpack(data[:4])
elif len(data) == 16:
# IPv6
addr = IP.unpack(data[:16])
else:
raise Notify(3,5, "Error parsing OSPF Forwarding Address. Wrong size")
return cls(addr=addr)
def next_hop (self, name, command, tokens):
if self.scope.content[-1]['announce'][-1].attributes.has(Attribute.CODE.NEXT_HOP):
return self.error.set(self.syntax)
try:
# next-hop self is unsupported
ip = tokens.pop(0)
if ip.lower() == 'self':
if 'local-address' in self.scope.content[-1]:
la = self.scope.content[-1]['local-address']
elif self._nexthopself:
la = self._nexthopself
else:
return self.error.set('next-hop self can only be specified with a neighbor')
nh = IP.unpack(la.pack())
else:
nh = IP.create(ip)
change = self.scope.content[-1]['announce'][-1]
nlri = change.nlri
afi = nlri.afi
safi = nlri.safi
nlri.nexthop = nh
if afi == AFI.ipv4 and safi in (SAFI.unicast,SAFI.multicast):
change.attributes.add(Attribute.unpack(NextHop.ID,NextHop.FLAG,nh.packed,None))
# NextHop(nh.ip,nh.packed) does not cache the result, using unpack does
# change.attributes.add(NextHop(nh.ip,nh.packed))
return True
def unpack (cls,data,length):
if len(data) == 4:
# IPv4 address
terid = IP.unpack(data[:4])
elif len(data) == 16:
# IPv6
terid = IP.unpack(data[:16])
return cls(terid=terid)
def __init__ (self, afi=AFI.ipv4,safi=SAFI.flow_ip,nexthop=None,rd=None):
NLRI.__init__(self,afi,safi)
self.rules = {}
self.action = OUT.ANNOUNCE
self.nexthop = IP.unpack(nexthop) if nexthop else NoIP
self.rd = rd
self.unique = unique.next()
def unpack (cls, data):
if len(data) == 4:
# IPv4 address
addr = IP.unpack(data[:4])
elif len(data) == 16:
# IPv6
addr = IP.unpack(data[:16])
return cls(iface_addr=addr)
def unpack (cls, data, igp):
dtype, dlength = unpack('!HH',data[0:4])
if dtype not in NODE_TLVS.keys():
raise Exception("Unknown Node Descriptor Sub-TLV")
# OSPF Area-ID
if dtype == 514:
return cls(
node_id=IP.unpack(data[4: 4 + dlength]),
dtype=dtype,
packed=data[:4+dlength]
), data[4 + dlength:]
# IGP Router-ID: The TLV size in combination with the protocol
# identifier enables the decoder to determine the type
# of the node: sec 3.2.1.4.
elif dtype == 515:
# OSPFv{2,3} non-pseudonode
if (igp == 3 or igp == 6) and dlength == 4:
r_id = IP.unpack(data[4: 4 + 4])
return cls(
node_id=r_id,
dtype=dtype,
packed=data[:4+dlength]
), data[4 + 4:]
# OSPFv{2,3} LAN pseudonode
def unpack (cls, data):
if len(data) == 4:
# IPv4 address
addr = IP.unpack(data[:4])
elif len(data) == 16:
# IPv6
addr = IP.unpack(data[:16])
return cls(iface_addr=addr)