Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def close (self):
# sometimes socket is not closed at once
# possibly related with SSL socket
# then prevent doble callbacking in request_handler
if self.socket:
# self.socket is still None, when DNS not found
asynchat.async_chat.close (self)
self._fileno = None
# re-init asychat
self.ac_in_buffer = b''
self.incoming = []
self.producer_fifo.clear()
self._proto = None
self._closed = True
if not self.handler:
# return to the pool
return self.set_active (False)
if not self.errcode:
# disconnect intentionally
return
def close (self, deactive = 1):
asynchat.async_chat.close (self)
# re-init asychat
self.ac_in_buffer = b''
self.incoming = []
self.producer_fifo.clear()
dbconnect.AsynDBConnect.close (self, deactive)
self.logger ("[info] ..dbo %s:%d has been closed" % self.address)