Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _decrease_parallel_environment_slots(self, slots_to_remove):
Logger.info('Decrease number of parallel environment slots by %s.' % slots_to_remove)
self.grid_engine.decrease_parallel_environment_slots(slots_to_remove)
Logger.info('Number of parallel environment slots was decreased.')
expired_jobs = [job for index, job in enumerate(pending_jobs) if now >= expiration_datetimes[index]]
if expired_jobs:
Logger.info('There are %s waiting jobs that are in queue for more than %s seconds. '
'Scaling up is required.' % (len(expired_jobs), self.scale_up_timeout.seconds))
if len(additional_hosts) < self.max_additional_hosts:
Logger.info('There are %s/%s additional child pipelines. Scaling up will be performed.' %
(len(additional_hosts), self.max_additional_hosts))
self.scale_up()
else:
Logger.info('There are %s/%s additional child pipelines. Scaling up is aborted.' %
(len(additional_hosts), self.max_additional_hosts))
else:
Logger.info('There are no waiting jobs that are in queue for more than %s seconds. '
'Scaling up is not required.' % self.scale_up_timeout.seconds)
else:
Logger.info('There are no waiting jobs.')
if self.latest_running_job:
Logger.info('Latest started job with id %s has started at %s.' %
(self.latest_running_job.id, self.latest_running_job.datetime))
if now >= self.latest_running_job.datetime + self.scale_down_timeout:
Logger.info('Latest job started more than %s seconds ago. Scaling down is required.' %
self.scale_down_timeout.seconds)
self._scale_down(running_jobs, additional_hosts)
else:
Logger.info('Latest job started less than %s seconds. '
'Scaling down is not required.' % self.scale_down_timeout.seconds)
else:
Logger.info('There are no previously running jobs. Scaling is skipped.')
Logger.info('Finish scaling step at %s.' % self.clock.now())
post_scale_additional_hosts = self.host_storage.load_hosts()
Logger.info('There are %s additional pipelines.' % len(post_scale_additional_hosts))
def _perform_command(self, action, msg, error_msg, skip_on_failure):
Logger.info(msg)
try:
action()
except RuntimeError as e:
Logger.warn(error_msg)
if not skip_on_failure:
raise RuntimeError(error_msg, e)
def _remove_host_from_grid_engine_configuration(self, host):
Logger.info('Remove additional worker with host=%s from GE cluster configuration.' % host)
self.grid_engine.delete_host(host)
Logger.info('Additional worker with host=%s was removed from GE cluster configuration.' % host)
def _scale_down(self, running_jobs, additional_hosts):
active_hosts = set([job.host for job in running_jobs])
inactive_additional_hosts = [host for host in additional_hosts if host not in active_hosts]
if inactive_additional_hosts:
Logger.info('There are %s inactive additional child pipelines. '
'Scaling down will be performed.' % len(inactive_additional_hosts))
inactive_additional_host = inactive_additional_hosts[0]
succeed = self.scale_down(inactive_additional_host)
if succeed:
self.host_storage.remove_host(inactive_additional_host)
else:
Logger.info('There are no inactive additional child pipelines. Scaling down will not be performed.')
def scale_down(self, child_host):
"""
Stops required child pipeline.
:param child_host: Child pipeline host that supposed to be stopped.
:return: True if the worker was scaled down, False otherwise.
"""
Logger.info('Start grid engine SCALING DOWN for %s host.' % child_host)
return self.scale_down_handler.scale_down(child_host)
def _increase_parallel_environment_slots(self, slots_to_append):
Logger.info('Increase number of parallel environment slots by %s.' % slots_to_append)
self.grid_engine.increase_parallel_environment_slots(slots_to_append)
Logger.info('Number of parallel environment slots was increased.')
def _remove_host_from_default_hostfile(self, host):
Logger.info('Remove host %s from default hostfile.' % host)
self._remove_line_from_file(file=self.default_hostfile, line=host)
def _await_worker_initialization(self, run_id):
Logger.info('Waiting for additional worker with run_id=%s to initialize.' % run_id)
attempts = self.polling_timeout / self.polling_delay if self.polling_delay \
else GridEngineScaleUpHandler._POLL_ATTEMPTS
while attempts != 0:
run = self.pipe.load_run(run_id)
if run['initialized']:
Logger.info('Additional worker with run_id=%s has initialized.' % run_id)
return
Logger.info('Additional worker with run_id=%s hasn\'t been initialized yet. Only %s attempts remain left.'
% (run_id, attempts))
attempts -= 1
time.sleep(self.polling_delay)
error_msg = 'Additional worker hasn\'t been initialized after %s seconds.' % self.polling_timeout
Logger.fail(error_msg)
raise ScalingError(error_msg)