Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def wait_for(self, predicate, timeout=None):
result = predicate()
if result:
return result
if timeout is not None:
endtime = monotonic() + timeout
else:
endtime = None
waittime = None
while not result:
if endtime is not None:
waittime = endtime - monotonic()
if waittime <= 0:
break
self.wait(waittime)
result = predicate()
return result
def wait(object_list, timeout=None): # noqa
'''
Wait till an object in object_list is ready/readable.
Returns list of those objects in object_list which are ready/readable.
'''
if timeout is not None:
if timeout <= 0:
return _poll(object_list, 0)
else:
deadline = monotonic() + timeout
while True:
try:
return _poll(object_list, timeout)
except OSError as e:
if e.errno != errno.EINTR:
raise
if timeout is not None:
timeout = deadline - monotonic()
def get(self, block=True, timeout=None):
if block and timeout is None:
with self._rlock:
res = self._recv_bytes()
self._sem.release()
else:
if block:
deadline = monotonic() + timeout
if not self._rlock.acquire(block, timeout):
raise Empty
try:
if block:
timeout = deadline - monotonic()
if timeout < 0 or not self._poll(timeout):
raise Empty
elif not self._poll():
raise Empty
res = self._recv_bytes()
self._sem.release()
finally:
self._rlock.release()
# unserialize the data after having released the lock
return ForkingPickler.loads(res)
Returns list of those objects in object_list which are ready/readable.
'''
if timeout is not None:
if timeout <= 0:
return _poll(object_list, 0)
else:
deadline = monotonic() + timeout
while True:
try:
return _poll(object_list, timeout)
except (OSError, IOError, socket.error) as e:
if e.errno != errno.EINTR:
raise
if timeout is not None:
timeout = deadline - monotonic()
def workloop(self, debug=debug, now=monotonic, pid=None):
pid = pid or os.getpid()
put = self.outq.put
inqW_fd = self.inqW_fd
synqW_fd = self.synqW_fd
maxtasks = self.maxtasks
max_memory_per_child = self.max_memory_per_child or 0
prepare_result = self.prepare_result
wait_for_job = self.wait_for_job
_wait_for_syn = self.wait_for_syn
def wait_for_syn(jid):
i = 0
while 1:
if i > 60:
error('!!!WAIT FOR ACK TIMEOUT: job:%r fd:%r!!!',
def _init_timeout(timeout=CONNECTION_TIMEOUT):
return monotonic() + timeout
try:
ready, task = poll(1.0)
except (IOError, EOFError) as exc:
debug('result handler got %r -- exiting', exc)
return
if ready:
if task is None:
debug('result handler ignoring extra sentinel')
continue
on_state_change(task)
try:
join_exited_workers(shutdown=True)
except WorkersJoined:
now = monotonic()
if not time_terminate:
time_terminate = now
else:
if now - time_terminate > 5.0:
debug('result handler exiting: timed out')
break
debug('result handler: all workers terminated, '
'timeout in %ss',
abs(min(now - time_terminate - 5.0, 0)))
if hasattr(outqueue, '_reader'):
debug('ensuring that outqueue is not full')
# If we don't make room available in outqueue then
# attempts to add the sentinel (None) to outqueue may
# block. There is guaranteed to be no more than 2 sentinels.
try:
def _init_timeout(timeout=CONNECTION_TIMEOUT):
return monotonic() + timeout
def _check_timeout(t):
return monotonic() > t