Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def load_jobs(self):
jobs = []
for row in self.engine.execute(select([self.jobs_t])):
try:
job = Job.__new__(Job)
job_dict = dict(row.items())
job.__setstate__(job_dict)
jobs.append(job)
except Exception:
job_name = job_dict.get('name', '(unknown)')
logger.exception('Unable to restore job "%s"', job_name)
self.jobs = jobs
def _reconstitute_job(self, job_state):
job_state = pickle.loads(job_state)
job = Job.__new__(Job)
job.__setstate__(job_state)
job._scheduler = self._scheduler
job._jobstore_alias = self._alias
return job
def construct_job(job: Job, scheduler, alias="beer_garden"):
"""Convert a Beergarden job to an APScheduler one."""
if job is None:
return None
trigger = construct_trigger(job.trigger_type, job.trigger)
next_run_time = utc.localize(job.next_run_time) if job.next_run_time else None
ap_job = APJob.__new__(APJob)
ap_job._scheduler = scheduler
ap_job._jobstore_alias = alias
ap_job.__setstate__(
{
"id": job.id,
"func": "beer_garden.scheduler:run_job",
"trigger": trigger,
"executor": "default",
"args": (),
"kwargs": {"request_template": job.request_template, "job_id": job.id},
"name": job.name,
"misfire_grace_time": job.misfire_grace_time,
"coalesce": job.coalesce,
"max_instances": job.max_instances,
"next_run_time": next_run_time,
}
def _reconstitute_job(self, job_state):
job_state = job_state
job = Job.__new__(Job)
job.__setstate__(job_state)
job._scheduler = self._scheduler
job._jobstore_alias = self._alias
return job
def _reconstitute_job(self, job_state):
job_state = pickle.loads(job_state)
job_state['jobstore'] = self
job = Job.__new__(Job)
job.__setstate__(job_state)
job._scheduler = self._scheduler # pylint: disable=protected-access
job._jobstore_alias = self._alias # pylint: disable=protected-access
return job
def _reconstitute_job(self, job_state):
job_state = pickle.loads(job_state)
job_state['jobstore'] = self
job = Job.__new__(Job)
job.__setstate__(job_state)
job._scheduler = self._scheduler
job._jobstore_alias = self._alias
return job