Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, code=pyrad.packet.DisconnectRequest, id=None,
secret=None, authenticator=None, **attributes):
pyrad.packet.Packet.__init__(self, code, id, secret, authenticator,
**attributes)
def CreatePacket(self, **args):
"""Create a new RADIUS packet.
This utility function creates a new RADIUS authentication
packet which can be used to communicate with the RADIUS server
this client talks to. This is initializing the new packet with
the dictionary and secret used for the client.
:return: a new empty packet instance
:rtype: pyrad.packet.Packet
"""
return packet.Packet(dict=self.dict, **args)
def datagram_received(self, data, addr):
try:
reply = Packet(packet=data, dict=self.client.dict)
if reply and reply.id in self.pending_requests:
req = self.pending_requests[reply.id]
packet = req['packet']
reply.dict = packet.dict
reply.secret = packet.secret
if packet.VerifyReply(reply, data):
req['future'].set_result(reply)
# Remove request for map
del self.pending_requests[reply.id]
else:
self.logger.warn('[%s:%d] Ignore invalid reply for id %d. %s', self.server, self.port, reply.id)
else:
self.logger.warn('[%s:%d] Ignore invalid reply: %d', self.server, self.port, data)
def datagram_received(self, data, addr):
self.logger.debug('[%s:%d] Received %d bytes from %s', self.ip, self.port, len(data), addr)
receive_date = datetime.utcnow()
if addr[0] in self.hosts:
remote_host = self.hosts[addr[0]]
elif '0.0.0.0' in self.hosts:
remote_host = self.hosts['0.0.0.0'].secret
else:
self.logger.warn('[%s:%d] Drop package from unknown source %s', self.ip, self.port, addr)
return
try:
self.logger.debug('[%s:%d] Received from %s packet: %s', self.ip, self.port, addr, data.hex())
req = Packet(packet=data, dict=self.server.dict)
except Exception as exc:
self.logger.error('[%s:%d] Error on decode packet: %s', self.ip, self.port, exc)
return
try:
if req.code in (AccountingResponse, AccessAccept, AccessReject, CoANAK, CoAACK, DisconnectNAK, DisconnectACK):
raise ServerPacketError('Invalid response packet %d' % req.code)
elif self.server_type == ServerType.Auth:
if req.code != AccessRequest:
raise ServerPacketError('Received non-auth packet on auth port')
req = AuthPacket(secret=remote_host.secret,
dict=self.server.dict,
packet=data)
if self.server.enable_pkt_verify:
if req.VerifyAuthRequest():
self.id = self.CreateID()
if self.message_authenticator:
self._refresh_message_authenticator()
attr = self._PktEncodeAttributes()
header = struct.pack('!BBH', self.code, self.id, (20 + len(attr)))
self.authenticator = md5_constructor(header[0:4] + 16 * six.b('\x00') +
attr + self.secret).digest()
ans = header + self.authenticator + attr
return ans
class CoAPacket(Packet):
"""RADIUS CoA packets. This class is a specialization
of the generic :obj:`Packet` class for CoA packets.
"""
def __init__(self, code=CoARequest, id=None, secret=six.b(''),
authenticator=None, **attributes):
"""Constructor
:param dict: RADIUS dictionary
:type dict: pyrad.dictionary.Dictionary class
:param secret: secret needed to communicate with a RADIUS server
:type secret: string
:param id: packet identification number
:type id: integer (8 bits)
:param code: packet type code
:type code: integer (8bits)
def CreateReply(self, **attributes):
"""Create a new packet as a reply to this one. This method
makes sure the authenticator and secret are copied over
to the new instance.
"""
return Packet(id=self.id, secret=self.secret,
authenticator=self.authenticator, dict=self.dict,
**attributes)
while buf:
hash = md5_constructor(self.secret + last).digest()
if six.PY3:
for i in range(16):
result += bytes((hash[i] ^ buf[i],))
else:
for i in range(16):
result += chr(ord(hash[i]) ^ ord(buf[i]))
last = result[-16:]
buf = buf[16:]
return result
class AuthPacket(Packet):
def __init__(self, code=AccessRequest, id=None, secret=six.b(''),
authenticator=None, **attributes):
"""Constructor
:param code: packet type code
:type code: integer (8bits)
:param id: packet identification number
:type id: integer (8 bits)
:param secret: secret needed to communicate with a RADIUS server
:type secret: string
:param dict: RADIUS dictionary
:type dict: pyrad.dictionary.Dictionary class
:param packet: raw packet to decode
:type packet: string
def CreatePacket(self, id, **args):
if not id:
raise Exception('Missing mandatory packet id')
return Packet(id=id, dict=self.dict,
secret=self.secret, **args)
challenge = self['CHAP-Challenge'][0]
return password == md5_constructor(chapid + userpwd + challenge).digest()
def VerifyAuthRequest(self):
"""Verify request authenticator.
:return: True if verification failed else False
:rtype: boolean
"""
assert(self.raw_packet)
hash = md5_constructor(self.raw_packet[0:4] + 16 * six.b('\x00') +
self.raw_packet[20:] + self.secret).digest()
return hash == self.authenticator
class AcctPacket(Packet):
"""RADIUS accounting packets. This class is a specialization
of the generic :obj:`Packet` class for accounting packets.
"""
def __init__(self, code=AccountingRequest, id=None, secret=six.b(''),
authenticator=None, **attributes):
"""Constructor
:param dict: RADIUS dictionary
:type dict: pyrad.dictionary.Dictionary class
:param secret: secret needed to communicate with a RADIUS server
:type secret: string
:param id: packet identification number
:type id: integer (8 bits)
:param code: packet type code
:type code: integer (8bits)