Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
debug('timeout: sending TERM to %s', worker._name)
try:
if os.getpgid(worker.pid) == worker.pid:
debug("worker %s is a group leader. It is safe to kill (SIGTERM) the whole group", worker.pid)
os.killpg(os.getpgid(worker.pid), signal.SIGTERM)
else:
worker.terminate()
except OSError:
pass
else:
if worker._popen.wait(timeout=0.1):
return
debug('timeout: TERM timed-out, now sending KILL to %s', worker._name)
try:
if os.getpgid(worker.pid) == worker.pid:
debug("worker %s is a group leader. It is safe to kill (SIGKILL) the whole group", worker.pid)
os.killpg(os.getpgid(worker.pid), signal.SIGKILL)
else:
_kill(worker.pid, SIGKILL)
except OSError:
pass
def terminate(self):
debug('terminating pool')
self._state = TERMINATE
self._worker_handler.terminate()
self._terminate()
def _after_fork(self):
debug('Queue._after_fork()')
self._notempty = threading.Condition(threading.Lock())
self._buffer = collections.deque()
self._thread = None
self._jointhread = None
self._joincancelled = False
self._closed = False
self._close = None
self._send_bytes = self._writer.send
self._recv = self._reader.recv
self._send_bytes = self._writer.send_bytes
self._recv_bytes = self._reader.recv_bytes
self._poll = self._reader.poll
abs(min(now - time_terminate - 5.0, 0)))
if hasattr(outqueue, '_reader'):
debug('ensuring that outqueue is not full')
# If we don't make room available in outqueue then
# attempts to add the sentinel (None) to outqueue may
# block. There is guaranteed to be no more than 2 sentinels.
try:
for i in range(10):
if not outqueue._reader.poll():
break
get()
except (IOError, EOFError):
pass
debug('result handler exiting: len(cache)=%s, thread._state=%s',
len(cache), self._state)
check_timeouts = self.check_timeouts
on_state_change = self.on_state_change
time_terminate = None
while cache and self._state != TERMINATE:
if check_timeouts is not None:
check_timeouts()
try:
ready, task = poll(1.0)
except (IOError, EOFError) as exc:
debug('result handler got %r -- exiting', exc)
return
if ready:
if task is None:
debug('result handler ignoring extra sentinel')
continue
on_state_change(task)
try:
join_exited_workers(shutdown=True)
except WorkersJoined:
now = monotonic()
if not time_terminate:
time_terminate = now
else:
if now - time_terminate > 5.0:
debug('result handler exiting: timed out')
break
debug('result handler: all workers terminated, '
'timeout in %ss',
abs(min(now - time_terminate - 5.0, 0)))
if ready:
if task is None:
debug('result handler ignoring extra sentinel')
continue
on_state_change(task)
try:
join_exited_workers(shutdown=True)
except WorkersJoined:
now = monotonic()
if not time_terminate:
time_terminate = now
else:
if now - time_terminate > 5.0:
debug('result handler exiting: timed out')
break
debug('result handler: all workers terminated, '
'timeout in %ss',
abs(min(now - time_terminate - 5.0, 0)))
if hasattr(outqueue, '_reader'):
debug('ensuring that outqueue is not full')
# If we don't make room available in outqueue then
# attempts to add the sentinel (None) to outqueue may
# block. There is guaranteed to be no more than 2 sentinels.
try:
for i in range(10):
if not outqueue._reader.poll():
break
get()
except (IOError, EOFError):
def body(self):
while self._state == RUN:
try:
for _ in self.handle_timeouts():
time.sleep(1.0) # don't spin
except CoroStop:
break
debug('timeout handler exiting')
self.mark_as_worker_lost(job, lost_ret)
if shutdown and not len(self._pool):
raise WorkersJoined()
cleaned, exitcodes = {}, {}
for i in reversed(range(len(self._pool))):
worker = self._pool[i]
exitcode = worker.exitcode
popen = worker._popen
if popen is None or exitcode is not None:
# worker exited
debug('Supervisor: cleaning up worker %d', i)
if popen is not None:
worker.join()
debug('Supervisor: worked %d joined', i)
cleaned[worker.pid] = worker
exitcodes[worker.pid] = exitcode
if exitcode not in (EX_OK, EX_RECYCLE) and \
not getattr(worker, '_controlled_termination', False):
error(
'Process %r pid:%r exited with %r',
worker.name, worker.pid, human_status(exitcode),
exc_info=0,
)
self.process_flush_queues(worker)
del self._pool[i]
del self._poolctrl[worker.pid]
del self._on_ready_counters[worker.pid]
if cleaned:
all_pids = [w.pid for w in self._pool]
for job in list(self._cache.values()):
def _start(self):
from .connection import Listener
assert self._listener is None
util.debug('starting listener and thread for sending handles')
self._listener = Listener(authkey=process.current_process().authkey)
self._address = self._listener.address
t = threading.Thread(target=self._serve)
t.daemon = True
t.start()
self._thread = t