Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_on_failure_acks_on_failure_or_timeout_disabled(self):
self.app.conf.acks_on_failure_or_timeout = False
job = self.xRequest()
job.time_start = 1
self.mytask.acks_late = True
self.mytask.acks_on_failure_or_timeout = False
try:
raise KeyError('foo')
except KeyError:
exc_info = ExceptionInfo()
job.on_failure(exc_info)
assert job.acknowledged is True
job._on_reject.assert_called_with(req_logger, job.connection_errors,
False)
self.app.conf.acks_on_failure_or_timeout = True
def test_execute_jail_failure(self):
ret = jail(
self.app, uuid(), self.mytask_raising.name, [4], {},
)
assert isinstance(ret, ExceptionInfo)
assert ret.exception.args == (4,)
def test_mark_as_failure(self):
tb1 = self.create_backend()
tb2 = self.create_backend()
tid3 = uuid()
try:
raise KeyError('foo')
except KeyError as exception:
einfo = ExceptionInfo()
tb1.mark_as_failure(tid3, exception, traceback=einfo.traceback)
assert tb2.get_state(tid3) == states.FAILURE
assert isinstance(tb2.get_result(tid3), KeyError)
assert tb2.get_traceback(tid3) == einfo.traceback
def test_on_failure_propagates_MemoryError(self):
einfo = None
try:
raise MemoryError()
except MemoryError:
einfo = ExceptionInfo(internal=True)
assert einfo is not None
req = self.get_request(self.add.s(2, 2))
with pytest.raises(MemoryError):
req.on_failure(einfo)
def test_worker_term_hard_handler_only_stop_MainProcess(self):
process = current_process()
name, process.name = process.name, 'OtherProcess'
try:
with patch('celery.apps.worker.active_thread_count') as c:
c.return_value = 3
worker = self._Worker()
handlers = self.psig(
cd.install_worker_term_hard_handler, worker)
try:
handlers['SIGQUIT']('SIGQUIT', object())
assert state.should_terminate
finally:
state.should_terminate = None
with patch('celery.apps.worker.active_thread_count') as c:
c.return_value = 1
worker = self._Worker()
handlers = self.psig(
def test_worker_term_hard_handler_only_stop_MainProcess(self):
process = current_process()
name, process.name = process.name, 'OtherProcess'
try:
with patch('celery.apps.worker.active_thread_count') as c:
c.return_value = 3
worker = self._Worker()
handlers = self.psig(
cd.install_worker_term_hard_handler, worker)
try:
handlers['SIGQUIT']('SIGQUIT', object())
assert state.should_terminate
finally:
state.should_terminate = None
with patch('celery.apps.worker.active_thread_count') as c:
c.return_value = 1
worker = self._Worker()
handlers = self.psig(
def test_set_pdeathsig(self):
success = "done"
q = Queue()
p = Process(target=parent_task, args=(q, success))
p.start()
child_proc = psutil.Process(q.get(timeout=3))
try:
p.terminate()
assert q.get(timeout=3) == success
finally:
child_proc.terminate()
def test_constants(self, name):
assert getattr(_winapi, name) is not None