Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def addReader(self, reader):
"""
Add a FileDescriptor for notification of data available to read.
"""
fd = reader.fileno()
if fd not in self._reads:
if fd not in self._selectables:
self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD|KQ_EV_ENABLE)
self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD|KQ_EV_DISABLE)
self._selectables[fd] = reader
else:
self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ENABLE)
self._reads.add(fd)
def addReader(self, reader):
"""
Add a FileDescriptor for notification of data available to read.
"""
fd = reader.fileno()
if fd not in self._reads:
if fd not in self._selectables:
self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD|KQ_EV_ENABLE)
self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD|KQ_EV_DISABLE)
self._selectables[fd] = reader
else:
self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ENABLE)
self._reads.add(fd)
def addWriter(self, writer):
"""
Add a FileDescriptor for notification of data available to write.
"""
fd = writer.fileno()
if fd not in self._writes:
if fd not in self._selectables:
self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD|KQ_EV_ENABLE)
self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD|KQ_EV_DISABLE)
self._selectables[fd] = writer
else:
self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ENABLE)
self._writes.add(fd)