Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
self.job = scheduler_job.Job(None, None, {"job_schedule": {}})
self.job_doc['job_schedule'])
self.start_session()
# if the job contains exec action and the scheduler passes the
# parameter --disable-exec job execution should fail
if self.contains_exec() and CONF.disable_exec:
LOG.info("Job {0} failed because it contains exec action "
"and exec actions are disabled by scheduler"
.format(self.id))
self.result = Job.FAIL_RESULT
self.finish()
return
for job_action in self.job_doc.get('job_actions', []):
if job_action.get('mandatory', False) or\
(result == Job.SUCCESS_RESULT):
action_result = self.execute_job_action(job_action)
if action_result == Job.FAIL_RESULT:
result = Job.FAIL_RESULT
if action_result == Job.ABORTED_RESULT:
result = Job.ABORTED_RESULT
else:
freezer_action = job_action.get('freezer_action', {})
action_name = freezer_action.get('action', '')
LOG.warning("skipping {0} action".
format(action_name))
self.result = result
self.finish()
def process_event(self, job_doc):
with self.scheduler.lock:
next_event = job_doc['job_schedule'].get('event', '')
while next_event:
if next_event == Job.STOP_EVENT:
if isinstance(self.state(), StopState):
LOG.info('JOB {0} event: STOP'.format(self.id))
next_event = self.state.stop(self, job_doc)
elif next_event == Job.START_EVENT:
LOG.info('JOB {0} event: START'.format(self.id))
next_event = self.state.start(self, job_doc)
elif next_event == Job.ABORT_EVENT:
LOG.info('JOB {0} event: ABORT'.format(self.id))
next_event = self.state.abort(self, job_doc)
elif next_event == Job.ABORTED_RESULT:
LOG.info('JOB {0} aborted.'.format(self.id))
break
def execute(self):
result = Job.SUCCESS_RESULT
with self.scheduler.lock:
LOG.info('job {0} running'.format(self.id))
self.state = RunningState
self.update_job_schedule_doc(status=Job.RUNNING_STATUS,
result="",
time_started=int(time.time()),
time_ended=Job.TIME_NULL)
self.scheduler.update_job_schedule(
self.id,
self.job_doc['job_schedule'])
self.start_session()
# if the job contains exec action and the scheduler passes the
# parameter --disable-exec job execution should fail
if self.contains_exec() and CONF.disable_exec:
LOG.info("Job {0} failed because it contains exec action "
"and exec actions are disabled by scheduler"
.format(self.id))
self.result = Job.FAIL_RESULT
self.finish()
return
for job_action in self.job_doc.get('job_actions', []):
def create(scheduler, executable, job_doc):
job = Job(scheduler, executable, job_doc)
if job.job_doc_status in ['running', 'scheduled']:
LOG.warning('Resetting {0} status from job {1}'
.format(job.job_doc_status, job.id))
if job.job_doc_status == 'stop' and not job.event:
LOG.info('Job {0} was stopped.'.format(job.id))
job.event = Job.STOP_EVENT
elif not job.event:
LOG.info('Autostart Job {0}'.format(job.id))
job.event = Job.START_EVENT
return job
def remove(job):
job.unschedule()
job.job_doc_status = Job.REMOVED_STATUS
return Job.NO_EVENT
def unschedule(self):
try:
# already executing job are not present in the apscheduler list
self.scheduler.remove_job(job_id=self.id)
except Exception:
pass
self.event = Job.NO_EVENT
self.job_doc_status = Job.STOP_STATUS
self.state = StopState
def remove(job):
job.event = Job.REMOVE_EVENT
return Job.NO_EVENT
def remove(job):
job.unschedule()
job.job_doc_status = Job.REMOVED_STATUS
return Job.NO_EVENT