Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _doWriteOrRead(self, selectable, fd, event):
why = None
inRead = False
filter, flags, data, fflags = event.filter, event.flags, event.data, event.fflags
if flags & KQ_EV_EOF and data and fflags:
why = main.CONNECTION_LOST
else:
try:
if filter == KQ_FILTER_READ:
inRead = True
why = selectable.doRead()
if filter == KQ_FILTER_WRITE:
inRead = False
why = selectable.doWrite()
if not selectable.fileno() == fd:
inRead = False
why = main.CONNECTION_LOST
except:
log.err()
why = sys.exc_info()[1]
if why:
self._disconnectSelectable(selectable, why, inRead)
def removeWriter(self, writer):
"""
Remove a Selectable for notification of data available to write.
"""
fd = writer.fileno()
if fd == -1:
for fd, fdes in self._selectables.iteritems():
if writer is fdes:
break
else:
return
if fd in self._writes:
self._writes.discard(fd)
if fd not in self._reads:
del self._selectables[fd]
self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DISABLE)