Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _HandleAuthPacket(self, pkt):
"""Process a packet received on the authentication port.
If this packet should be dropped instead of processed a
ServerPacketError exception should be raised. The main loop will
drop the packet and log the reason.
:param pkt: packet to process
:type pkt: Packet class instance
"""
self._AddSecret(pkt)
if pkt.code != packet.AccessRequest:
raise ServerPacketError(
'Received non-authentication packet on authentication port')
self.HandleAuthPacket(pkt)
"""Process a packet received on the coa port.
If this packet should be dropped instead of processed a
ServerPacketError exception should be raised. The main loop will
drop the packet and log the reason.
:param pkt: packet to process
:type pkt: Packet class instance
"""
self._AddSecret(pkt)
pkt.secret = self.hosts[pkt.source[0]].secret
if pkt.code == packet.CoARequest:
self.HandleCoaPacket(pkt)
elif pkt.code == packet.DisconnectRequest:
self.HandleDisconnectPacket(pkt)
else:
raise ServerPacketError('Received non-coa packet on coa port')
"""Main loop.
This method is the main loop for a RADIUS server. It waits
for packets to arrive via the network and calls other methods
to process them.
"""
self._poll = select.poll()
self._fdmap = {}
self._PrepareSockets()
while True:
for (fd, event) in self._poll.poll():
if event == select.POLLIN:
try:
fdo = self._fdmap[fd]
self._ProcessInput(fdo)
except ServerPacketError as err:
logger.info('Dropping packet: ' + str(err))
except packet.PacketError as err:
logger.info('Received a broken packet: ' + str(err))
else:
logger.error('Unexpected event in server main loop')
def _HandleAcctPacket(self, pkt):
"""Process a packet received on the accounting port.
If this packet should be dropped instead of processed a
ServerPacketError exception should be raised. The main loop will
drop the packet and log the reason.
:param pkt: packet to process
:type pkt: Packet class instance
"""
self._AddSecret(pkt)
if pkt.code not in [packet.AccountingRequest,
packet.AccountingResponse]:
raise ServerPacketError(
'Received non-accounting packet on accounting port')
self.HandleAcctPacket(pkt)
def _AddSecret(self, pkt):
"""Add secret to packets received and raise ServerPacketError
for unknown hosts.
:param pkt: packet to process
:type pkt: Packet class instance
"""
if pkt.source[0] in self.hosts:
pkt.secret = self.hosts[pkt.source[0]].secret
elif '0.0.0.0' in self.hosts:
pkt.secret = self.hosts['0.0.0.0'].secret
else:
raise ServerPacketError('Received packet from unknown host')
def _HandleProxyPacket(self, pkt):
"""Process a packet received on the reply socket.
If this packet should be dropped instead of processed a
:obj:`ServerPacketError` exception should be raised. The main loop
will drop the packet and log the reason.
:param pkt: packet to process
:type pkt: Packet class instance
"""
if pkt.source[0] not in self.hosts:
raise ServerPacketError('Received packet from unknown host')
pkt.secret = self.hosts[pkt.source[0]].secret
if pkt.code not in [packet.AccessAccept, packet.AccessReject,
packet.AccountingResponse]:
raise ServerPacketError('Received non-response on proxy socket')
return
try:
self.logger.debug('[%s:%d] Received from %s packet: %s', self.ip, self.port, addr, data.hex())
req = Packet(packet=data, dict=self.server.dict)
except Exception as exc:
self.logger.error('[%s:%d] Error on decode packet: %s', self.ip, self.port, exc)
return
try:
if req.code in (AccountingResponse, AccessAccept, AccessReject, CoANAK, CoAACK, DisconnectNAK, DisconnectACK):
raise ServerPacketError('Invalid response packet %d' % req.code)
elif self.server_type == ServerType.Auth:
if req.code != AccessRequest:
raise ServerPacketError('Received non-auth packet on auth port')
req = AuthPacket(secret=remote_host.secret,
dict=self.server.dict,
packet=data)
if self.server.enable_pkt_verify:
if req.VerifyAuthRequest():
raise PacketError('Packet verification failed')
elif self.server_type == ServerType.Coa:
if req.code != DisconnectRequest and req.code != CoARequest:
raise ServerPacketError('Received non-coa packet on coa port')
req = CoAPacket(secret=remote_host.secret,
dict=self.server.dict,
packet=data)
if self.server.enable_pkt_verify:
if req.VerifyCoARequest():
raise PacketError('Packet verification failed')