Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _avail_index(self):
assert len(self._pool) < self._processes
indices = set(p.index for p in self._pool)
return next(i for i in range(self._processes) if i not in indices)
def grow(self, n=1):
for i in range(n):
self._processes += 1
if self._putlock:
self._putlock.grow()
self.on_grow(n)
# The worker may have published a result before being terminated,
# but we have no way to accurately tell if it did. So we wait for
# _lost_worker_timeout seconds before we mark the job with
# WorkerLostError.
for job in [job for job in list(self._cache.values())
if not job.ready() and job._worker_lost]:
now = now or monotonic()
lost_time, lost_ret = job._worker_lost
if now - lost_time > job._lost_worker_timeout:
self.mark_as_worker_lost(job, lost_ret)
if shutdown and not len(self._pool):
raise WorkersJoined()
cleaned, exitcodes = {}, {}
for i in reversed(range(len(self._pool))):
worker = self._pool[i]
exitcode = worker.exitcode
popen = worker._popen
if popen is None or exitcode is not None:
# worker exited
debug('Supervisor: cleaning up worker %d', i)
if popen is not None:
worker.join()
debug('Supervisor: worked %d joined', i)
cleaned[worker.pid] = worker
exitcodes[worker.pid] = exitcode
if exitcode not in (EX_OK, EX_RECYCLE) and \
not getattr(worker, '_controlled_termination', False):
error(
'Process %r pid:%r exited with %r',
worker.name, worker.pid, human_status(exitcode),
def close_open_fds(keep=None): # noqa
keep = [maybe_fileno(f)
for f in (keep or []) if maybe_fileno(f) is not None]
for fd in reversed(range(get_fdmax(default=2048))):
if fd not in keep:
try:
os.close(fd)
except OSError as exc:
if exc.errno != errno.EBADF:
raise
def _ack(self, i, time_accepted, pid, *args):
start = i * self._chunksize
stop = min((i + 1) * self._chunksize, self._length)
for j in range(start, stop):
self._accepted[j] = True
self._worker_pid[j] = pid
self._time_accepted[j] = time_accepted
if self.ready():
self._cache.pop(self._job, None)
def _ensure_messages_consumed(self, completed):
""" Returns true if all messages sent out have been received and
consumed within a reasonable amount of time """
if not self.on_ready_counter:
return False
for retry in range(GUARANTEE_MESSAGE_CONSUMPTION_RETRY_LIMIT):
if self.on_ready_counter.value >= completed:
debug('ensured messages consumed after %d retries', retry)
return True
time.sleep(GUARANTEE_MESSAGE_CONSUMPTION_RETRY_INTERVAL)
warning('could not ensure all messages were consumed prior to '
'exiting')
return False
def _repopulate_pool(self, exitcodes):
"""Bring the number of pool processes up to the specified number,
for use after reaping workers which have exited.
"""
for i in range(self._processes - len(self._pool)):
if self._state != RUN:
return
try:
if exitcodes and exitcodes[i] not in (EX_OK, EX_RECYCLE):
self.restart_state.step()
except IndexError:
self.restart_state.step()
self._create_worker_process(self._avail_index())
debug('added worker')