Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_process_queues(self):
"""Creates new in, out (and optionally syn) queues,
returned as a tuple."""
# NOTE: Pipes must be set O_NONBLOCK at creation time (the original
# fd), otherwise it will not be possible to change the flags until
# there is an actual reader/writer on the other side.
inq = _SimpleQueue(wnonblock=True)
outq = _SimpleQueue(rnonblock=True)
synq = None
assert isblocking(inq._reader)
assert not isblocking(inq._writer)
assert not isblocking(outq._reader)
assert isblocking(outq._writer)
if self.synack:
synq = _SimpleQueue(wnonblock=True)
assert isblocking(synq._reader)
assert not isblocking(synq._writer)
return inq, outq, synq
# receive jobs in the old buffer, so we need to reset the
# job._write_to and job._scheduled_for attributes used to recover
# message boundaries when processes exit.
infd = proc.inqW_fd
for job in values(cache):
if job._write_to and job._write_to.inqW_fd == infd:
job._write_to = proc
if job._scheduled_for and job._scheduled_for.inqW_fd == infd:
job._scheduled_for = proc
fileno_to_outq[proc.outqR_fd] = proc
# maintain_pool is called whenever a process exits.
add_reader(
proc.sentinel, event_process_exit, hub, proc.sentinel,
)
assert not isblocking(proc.outq._reader)
# handle_result_event is called when the processes outqueue is
# readable.
add_reader(proc.outqR_fd, handle_result_event, proc.outqR_fd)
waiting_to_start.add(proc)
hub.call_later(
self._proc_alive_timeout, verify_process_alive, proc,
)
def create_process_queues(self):
"""Creates new in, out (and optionally syn) queues,
returned as a tuple."""
# NOTE: Pipes must be set O_NONBLOCK at creation time (the original
# fd), otherwise it will not be possible to change the flags until
# there is an actual reader/writer on the other side.
inq = _SimpleQueue(wnonblock=True)
outq = _SimpleQueue(rnonblock=True)
synq = None
assert isblocking(inq._reader)
assert not isblocking(inq._writer)
assert not isblocking(outq._reader)
assert isblocking(outq._writer)
if self.synack:
synq = _SimpleQueue(wnonblock=True)
assert isblocking(synq._reader)
assert not isblocking(synq._writer)
return inq, outq, synq
def create_process_queues(self):
"""Create new in, out, etc. queues, returned as a tuple."""
# NOTE: Pipes must be set O_NONBLOCK at creation time (the original
# fd), otherwise it won't be possible to change the flags until
# there's an actual reader/writer on the other side.
inq = _SimpleQueue(wnonblock=True)
outq = _SimpleQueue(rnonblock=True)
synq = None
assert isblocking(inq._reader)
assert not isblocking(inq._writer)
assert not isblocking(outq._reader)
assert isblocking(outq._writer)
if self.synack:
synq = _SimpleQueue(wnonblock=True)
assert isblocking(synq._reader)
assert not isblocking(synq._writer)
return inq, outq, synq
# If we got the same fd as a previous process then we'll also
# receive jobs in the old buffer, so we need to reset the
# job._write_to and job._scheduled_for attributes used to recover
# message boundaries when processes exit.
infd = proc.inqW_fd
for job in values(cache):
if job._write_to and job._write_to.inqW_fd == infd:
job._write_to = proc
if job._scheduled_for and job._scheduled_for.inqW_fd == infd:
job._scheduled_for = proc
fileno_to_outq[proc.outqR_fd] = proc
# maintain_pool is called whenever a process exits.
self._track_child_process(proc, hub)
assert not isblocking(proc.outq._reader)
# handle_result_event is called when the processes outqueue is
# readable.
add_reader(proc.outqR_fd, handle_result_event, proc.outqR_fd)
waiting_to_start.add(proc)
hub.call_later(
self._proc_alive_timeout, verify_process_alive, ref(proc),
)
def _recv_message(self, add_reader, fd, callback,
__read__=__read__, readcanbuf=readcanbuf,
BytesIO=BytesIO, unpack_from=unpack_from,
load=_pickle.load):
Hr = Br = 0
if readcanbuf:
buf = bytearray(4)
bufv = memoryview(buf)
else:
buf = bufv = BytesIO()
# header
assert not isblocking(fd)
while Hr < 4:
try:
n = __read__(
fd, bufv[Hr:] if readcanbuf else bufv, 4 - Hr,
)
except OSError as exc:
if get_errno(exc) not in UNAVAIL:
raise
yield
else:
if n == 0:
raise (OSError('End of file during message') if Hr
else EOFError())
Hr += n
def create_process_queues(self):
"""Creates new in, out (and optionally syn) queues,
returned as a tuple."""
# NOTE: Pipes must be set O_NONBLOCK at creation time (the original
# fd), otherwise it will not be possible to change the flags until
# there is an actual reader/writer on the other side.
inq = _SimpleQueue(wnonblock=True)
outq = _SimpleQueue(rnonblock=True)
synq = None
assert isblocking(inq._reader)
assert not isblocking(inq._writer)
assert not isblocking(outq._reader)
assert isblocking(outq._writer)
if self.synack:
synq = _SimpleQueue(wnonblock=True)
assert isblocking(synq._reader)
assert not isblocking(synq._writer)
return inq, outq, synq
"""Creates new in, out (and optionally syn) queues,
returned as a tuple."""
# NOTE: Pipes must be set O_NONBLOCK at creation time (the original
# fd), otherwise it will not be possible to change the flags until
# there is an actual reader/writer on the other side.
inq = _SimpleQueue(wnonblock=True)
outq = _SimpleQueue(rnonblock=True)
synq = None
assert isblocking(inq._reader)
assert not isblocking(inq._writer)
assert not isblocking(outq._reader)
assert isblocking(outq._writer)
if self.synack:
synq = _SimpleQueue(wnonblock=True)
assert isblocking(synq._reader)
assert not isblocking(synq._writer)
return inq, outq, synq
def create_process_queues(self):
"""Creates new in, out (and optionally syn) queues,
returned as a tuple."""
# NOTE: Pipes must be set O_NONBLOCK at creation time (the original
# fd), otherwise it will not be possible to change the flags until
# there is an actual reader/writer on the other side.
inq = _SimpleQueue(wnonblock=True)
outq = _SimpleQueue(rnonblock=True)
synq = None
assert isblocking(inq._reader)
assert not isblocking(inq._writer)
assert not isblocking(outq._reader)
assert isblocking(outq._writer)
if self.synack:
synq = _SimpleQueue(wnonblock=True)
assert isblocking(synq._reader)
assert not isblocking(synq._writer)
return inq, outq, synq