Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_is_finished_task(self):
t = Task.objects.create(
worker=self._worker.worker,
arch=self._arch,
channel=self._channel,
owner=self._user,
method='DummyForkTask',
state=TASK_STATES['FREE'],
)
tm = TaskManager(conf={'worker': self._worker})
task_info = t.export(False)
with patch('kobo.worker.taskmanager.os', fork=Mock(return_value=9999)) as os_mock:
tm.take_task(task_info)
os_mock.fork.assert_called_once()
with patch('kobo.worker.taskmanager.os', waitpid=Mock(return_value=(123, 0))) as os_mock:
self.assertTrue(tm.is_finished_task(t.id))
os_mock.waitpid.assert_called_once()
def test_shutdown_with_running_tasks(self):
t = Task.objects.create(
worker=self._worker.worker,
arch=self._arch,
channel=self._channel,
owner=self._user,
method='DummyForkTask',
state=TASK_STATES['FREE'],
)
self.assertEqual(t.state, TASK_STATES['FREE'])
tm = TaskManager(conf={'worker': self._worker})
task_info = t.export(False)
with patch('kobo.worker.taskmanager.os', fork=Mock(return_value=9999)) as os_mock:
tm.take_task(task_info)
os_mock.fork.assert_called_once()
tm.update_tasks()
# reload task info
t = Task.objects.get(id=t.id)
self.assertEqual(t.state, TASK_STATES['OPEN'])
tm.shutdown()
# reload task info
t = Task.objects.get(id=t.id)
def test_is_finished_task_catch_os_error(self):
t = Task.objects.create(
worker=self._worker.worker,
arch=self._arch,
channel=self._channel,
owner=self._user,
method='DummyForkTask',
state=TASK_STATES['FREE'],
)
tm = TaskManager(conf={'worker': self._worker})
task_info = t.export(False)
with patch('kobo.worker.taskmanager.os', fork=Mock(return_value=9999)) as os_mock:
tm.take_task(task_info)
os_mock.fork.assert_called_once()
err = OSError()
err.errno = errno.ECHILD
with patch('kobo.worker.taskmanager.os', waitpid=Mock(side_effect=err)) as os_mock:
self.assertFalse(tm.is_finished_task(t.id))
os_mock.waitpid.assert_called_once()
def test_run_task_mark_task_as_failed_if_generic_exception(self):
t = Task.objects.create(
worker=self._worker.worker,
arch=self._arch,
channel=self._channel,
owner=self._user,
method='DummyExceptionTask',
state=TASK_STATES['OPEN'],
)
self.assertEqual(t.state, TASK_STATES['OPEN'])
tm = TaskManager(conf={'worker': self._worker})
task_info = t.export(False)
tm.run_task(task_info)
t = Task.objects.get(id=t.id)
self.assertEqual(t.state, TASK_STATES['FAILED'])
def test_run_task_runs_fork_task(self):
t = Task.objects.create(
worker=self._worker.worker,
arch=self._arch,
channel=self._channel,
owner=self._user,
method='DummyForkTask',
state=TASK_STATES['OPEN'],
)
self.assertEqual(t.state, TASK_STATES['OPEN'])
tm = TaskManager(conf={'worker': self._worker})
task_info = t.export(False)
tm.run_task(task_info)
t = Task.objects.get(id=t.id)
self.assertEqual(t.state, TASK_STATES['CLOSED'])
def test_get_next_task_dont_run_tasks_if_not_ready(self):
t = Task.objects.create(
worker=self._worker.worker,
arch=self._arch,
channel=self._channel,
owner=self._user,
method='DummyForegroundTask',
state=TASK_STATES['FREE'],
)
self.assertEqual(t.state, TASK_STATES['FREE'])
tm = TaskManager(conf={'worker': self._worker})
tm.worker_info['ready'] = False
tm.get_next_task()
# reload task info
t = Task.objects.get(id=t.id)
self.assertEqual(t.state, TASK_STATES['FREE'])
def test_get_next_task_runs_free_task(self):
t = Task.objects.create(
worker=self._worker.worker,
arch=self._arch,
channel=self._channel,
owner=self._user,
method='DummyForegroundTask',
state=TASK_STATES['FREE'],
)
self.assertEqual(t.state, TASK_STATES['FREE'])
tm = TaskManager(conf={'worker': self._worker})
tm.get_next_task()
# reload task info
t = Task.objects.get(id=t.id)
self.assertEqual(t.state, TASK_STATES['CLOSED'])
def test_is_finished_task_invalid_child_pid(self):
t = Task.objects.create(
worker=self._worker.worker,
arch=self._arch,
channel=self._channel,
owner=self._user,
method='DummyForkTask',
state=TASK_STATES['FREE'],
)
tm = TaskManager(conf={'worker': self._worker})
task_info = t.export(False)
with patch('kobo.worker.taskmanager.os', fork=Mock(return_value=9999)) as os_mock:
tm.take_task(task_info)
os_mock.fork.assert_called_once()
with patch('kobo.worker.taskmanager.os', waitpid=Mock(return_value=(0, 0))) as os_mock:
self.assertFalse(tm.is_finished_task(t.id))
os_mock.waitpid.assert_called_once()
t3 = Task.objects.create(
worker=self._worker.worker,
arch=self._arch,
channel=self._channel,
owner=self._user,
method='DummyForegroundTask',
waiting=True,
parent=t2,
state=TASK_STATES['FREE'],
)
self.assertEqual(t1.state, TASK_STATES['OPEN'])
self.assertEqual(t2.state, TASK_STATES['ASSIGNED'])
self.assertEqual(t3.state, TASK_STATES['FREE'])
tm = TaskManager(conf={'worker': self._worker})
tm.lock()
tm.get_next_task()
# reload task info
t1 = Task.objects.get(id=t1.id)
self.assertEqual(t1.state, TASK_STATES['OPEN'])
t2 = Task.objects.get(id=t2.id)
self.assertEqual(t2.state, TASK_STATES['CLOSED'])
t3 = Task.objects.get(id=t3.id)
self.assertEqual(t3.state, TASK_STATES['FREE'])
def main_loop(conf, foreground=False):
"""infinite daemon loop"""
# define custom signal handlers
signal.signal(signal.SIGTERM, daemon_shutdown)
# initialize TaskManager
try:
log_file = conf.get("LOG_FILE", None)
logger = logging.Logger("TaskManager")
logger.setLevel(logging.DEBUG)
if log_file:
log_level = logging.getLevelName(conf.get("LOG_LEVEL", "DEBUG").upper())
kobo.log.add_rotating_file_logger(logger, log_file, log_level=log_level)
tm = TaskManager(conf=conf, logger=logger)
except Exception as ex:
raise
sys.stderr.write("Error initializing TaskManager: %s\n" % ex)
sys.exit(1)
if foreground and tm._logger is not None:
kobo.log.add_stderr_logger(tm._logger)
while 1:
try:
tm.log_debug(80 * '-')
# poll hub for new tasks
tm.hub._login()
tm.update_worker_info()
tm.update_tasks()
tm.get_next_task()