Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_from_crontab(self, expr, expected_repr, timezone):
trigger = CronTrigger.from_crontab(expr, timezone)
assert repr(trigger) == expected_repr
def _schedule_routine(self, name, cronstring):
trigger = CronTrigger.from_crontab(cronstring)
job = self.scheduler.add_job(func=self._run_routine,
args=[name], trigger=trigger, name=name)
self._routine_to_sched_id_map[name] = job.id
def kwargs(self) -> Tuple[dict, dict]:
default = {
"id": self.aps_job_id,
"func": controller.scheduler_job,
"replace_existing": True,
"args": [self.job.id, self.aps_job_id],
}
if self.scheduling_mode == "cron":
self.periodic = True
trigger = {"trigger": CronTrigger.from_crontab(self.crontab_expression)}
elif self.frequency:
self.periodic = True
frequency_in_seconds = (
int(self.frequency)
* {"seconds": 1, "minutes": 60, "hours": 3600, "days": 86400}[
self.frequency_unit
]
)
trigger = {
"trigger": "interval",
"start_date": self.aps_date("start_date"),
"end_date": self.aps_date("end_date"),
"seconds": frequency_in_seconds,
}
else:
self.periodic = False
def _insert_periodics(self, scheduler):
try:
danger_period = self.config.danger_period
except AttributeError:
pass
else:
runner = periodics.DangerZoneDetector(self)
if runner.is_enabled(self):
runner_name = reflection.get_class_name(runner)
runner_description = periodics.DangerZoneDetector.__doc__
runner_trigger = cron.CronTrigger.from_crontab(
danger_period, timezone=self.config.tz)
runner_id = utils.hash_pieces([
runner_name, danger_period, runner_description,
], max_len=8)
scheduler.add_job(
runner, trigger=runner_trigger,
jobstore='memory',
name="\n".join([runner_name, runner_description]),
id=runner_id, coalesce=True)
def schedule_newsletter_job(newsletter_job_id, name='', func=None, remove_job=False, args=None, cron=None):
if NEWSLETTER_SCHED.get_job(newsletter_job_id):
if remove_job:
NEWSLETTER_SCHED.remove_job(newsletter_job_id)
logger.info(u"Tautulli NewsletterHandler :: Removed scheduled newsletter: %s" % name)
else:
NEWSLETTER_SCHED.reschedule_job(
newsletter_job_id, args=args, trigger=CronTrigger.from_crontab(cron))
logger.info(u"Tautulli NewsletterHandler :: Re-scheduled newsletter: %s" % name)
elif not remove_job:
NEWSLETTER_SCHED.add_job(
func, args=args, id=newsletter_job_id, trigger=CronTrigger.from_crontab(cron))
logger.info(u"Tautulli NewsletterHandler :: Scheduled newsletter: %s" % name)
@scheduler.scheduled_job(trigger=CronTrigger.from_crontab(CRON_INTERVAL), id='sync_all_teams')
def sync_all_teams():
"""
Lookup teams in a GitHub org and synchronize all teams with LDAP
:return:
"""
pprint(f'Syncing all teams: {time.strftime("%A, %d. %B %Y %I:%M:%S %p")}')
with app.app_context() as ctx:
c = ctx.push()
gh = GitHubApp(c)
installations = gh.app_client.app_installations
for i in installations():
client = gh.app_installation(installation_id=i.id)
org = client.organization(i.account['login'])
for team in org.teams():
sync_team(
client=client,
def kwargs(self) -> Tuple[dict, dict]:
default = {
"id": self.aps_job_id,
"func": scheduler_job,
"replace_existing": True,
"args": [self.job.id, self.aps_job_id],
}
if self.scheduling_mode == "cron":
self.periodic = True
trigger = {"trigger": CronTrigger.from_crontab(self.crontab_expression)}
elif self.frequency:
self.periodic = True
frequency_in_seconds = (
int(self.frequency)
* {"seconds": 1, "minutes": 60, "hours": 3600, "days": 86400}[
self.frequency_unit
]
)
trigger = {
"trigger": "interval",
"start_date": self.aps_date("start_date"),
"end_date": self.aps_date("end_date"),
"seconds": frequency_in_seconds,
}
else:
self.periodic = False
def main(args):
scheduler = BlockingScheduler()
scheduler.add_job(
print_current_date.send,
CronTrigger.from_crontab("* * * * *"),
)
try:
scheduler.start()
except KeyboardInterrupt:
scheduler.shutdown()
return 0