Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def reInit(self, bot):
self.bot = bot
triggers = bot.moduleHandler.mappedTriggers
self.command = triggers[self.commandStr].execute
self.cronStr = {
'cron': self.timeStr
}[self.type]
self.cron = croniter(self.cronStr, datetime.datetime.utcnow())
self.nextTime = self.cron.get_next(datetime.datetime)
def schedule(self, id_):
site = self.site_storage.cache[id_]
if 'schedule' not in site:
return
if id_ in self.waiting_calls:
call, schedule = self.waiting_calls[id_]
if schedule != site['schedule']:
self.cancel(id_)
self.schedule(id_)
return
if not site['schedule']:
return
try:
cron = croniter(site['schedule'])
except Exception:
if site.get('schedule_valid', True) is True:
self.site_storage.update(
{'_id': id_, 'schedule_valid': False}
)
logger.warning('CRON entry "{}" invalid for site {}'
.format(site['schedule'], site['url']))
else:
deadline = cron.get_next()
delay = deadline - time.time()
logger.debug('Scheduling {} in {:.1f} seconds'
.format(site['url'], delay))
call = self.ioloop.add_timeout(deadline,
self.start_crawl, id_=id_)
self.waiting_calls[id_] = call, site['schedule']
self.site_storage.update({
def previous_schedule(self, dttm):
if isinstance(self._schedule_interval, six.string_types):
cron = croniter(self._schedule_interval, dttm)
return cron.get_prev(datetime)
elif isinstance(self._schedule_interval, timedelta):
return dttm - self._schedule_interval
def next_natural_occurrence(self, dt=None):
dt = date_now() if dt is None else dt
return croniter(self._expression, dt).get_next(datetime)
def validate(self, value, context, template=None):
if not value:
return True
try:
croniter.croniter(value)
return True
except Exception as ex:
self._error_message = _(
'Invalid CRON expression: %s') % str(ex)
return False
def _its_time(self, cron_format):
"""Returns True if current time matches cron time spec."""
now = int(time.time())
itr = croniter(cron_format, now - 60)
nxt = itr.get_next()
return now / 60 * 60 == nxt
def get_next_schedule(base_datetime, schedule_type, schedule):
if schedule_type == ScheduleType.CRONTAB:
itr = croniter(schedule, base_datetime)
next_schedule = itr.get_next(datetime)
elif schedule_type == ScheduleType.INTERVAL:
count, unit_name = schedule
# count is the "number of units" and unit_name is the "unit name of interval"
# which is inverse from what rrule calls them
rule = rrule.rrule(
freq=SCHEDULE_INTERVAL_MAP[unit_name], interval=count, dtstart=base_datetime, count=2
)
if rule[0] > base_datetime:
next_schedule = rule[0]
else:
next_schedule = rule[1]
else:
raise NotImplementedError("unknown schedule_type")
return next_schedule
def validate(self):
# validate cron expression if specified
if TIME_CONSTRAINTS in self.spec:
tcs = self.alarm_properties[TIME_CONSTRAINTS]
for tc in tcs:
exp = tc.get(TC_START, '')
try:
croniter.croniter(exp)
except Exception as ex:
msg = _("Invalid cron expression specified for property "
"'%(property)s' (%(exp)s): %(ex)s"
) % {'property': TC_START, 'exp': exp,
'ex': six.text_type(ex)}
raise exc.InvalidSpec(message=msg)
tz = tc.get(TC_TIMEZONE, '')
try:
pytz.timezone(tz)
except Exception as ex:
msg = _("Invalid timezone value specified for property "
"'%(property)s' (%(tz)s): %(ex)s"
) % {'property': TC_TIMEZONE, 'tz': tz,
'ex': six.text_type(ex)}
raise exc.InvalidSpec(message=msg)