Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
cb_log(ev, init, 'Learning generic AIML interactions...')
core.learn(os.sep.join([ev.resource('aiml_files'), '*.aiml']))
cb_log(ev, init, 'Learning properties unique to the client...')
with open(ev.resource('properties.yml')) as prop_file:
prop_data = yaml.safe_load(prop_file)
for prop_key in prop_data:
prop_val = prop_data.get(prop_key)
chatter_core.setBotPredicate(prop_key, prop_val)
cb_log(ev, init, 'Learning additional software details...')
version = ev.bot.info.get_version()
full_version = f'{version.major}.{version.minor}.{version.patch}'
if version.beta:
full_version += ' Beta'
chatter_core.setBotPredicate('version', full_version)
birthday_date = arrow.get(datetime.date(2016, 8, 16))
age = (arrow.utcnow() - birthday_date).days // 365.25
chatter_core.setBotPredicate('age', str(int(age)))
chatter_core.setBotPredicate('birthday', birthday_date.format('MMMM DD'))
chatter_core.setBotPredicate('birthdate', birthday_date.format('MMMM DD, YYYY'))
cb_log(ev, init, 'Loaded Chatter Core.')
def test_publication_date(minimal_record_model, depid_pid, legacyjson_v1):
"""Test publication date."""
for k in ['publication_date', 'embargo_date']:
minimal_record_model[k] = arrow.utcnow().date() - timedelta(days=1)
obj = legacyjson_v1.transform_record(
depid_pid, minimal_record_model)['metadata']
assert arrow.get(obj[k]).date() <= arrow.utcnow().date()
def get_start(days):
if days:
# "now" is really midnight tonight, so we really want tomorrows date.
# This makes the comparisons and math work so 1 day would mean today
now = arrow.utcnow() + datetime.timedelta(days=1)
delta = datetime.timedelta(days=days)
return now - delta
else:
# Hard code in when Betty started
return arrow.get(datetime.date(year=2014, month=7, day=8))
either be
A) creating new assays, in which case we're looking up line ID's, names, and assays
counts to inform new assay naming
B) merging data with existing assay, in which case we're looking up existing assay ID's
"""
# only look up assays once per import
if self._assays_by_pk:
return
self._creation_update = Update.load_update()
assays = self.matched_assays
lookup_ids = self.loa_pks
start = arrow.utcnow()
if self.matched_assays:
# do a bulk query for the assays
logger.info("Querying for assays..")
lookup_dict = Assay.objects.in_bulk(lookup_ids)
self._assays_by_pk = lookup_dict
self._assays_by_loa_pk = lookup_dict
end = arrow.utcnow()
duration = _build_duration(start, end)
logger.info(f"Done querying assays in {duration}")
# recheck for time metadata on all the assays..if it exists consistently,
# we'll use it to inform the import
if self._use_assay_time_meta:
for assay in self._assays_by_pk.values():
if not assay.metadata_get(self.assay_time_mtype):
def determine_validity_years(end_date):
"""Given an end date determine how many years into the future that date is.
:param end_date:
:return: str validity in years
"""
now = arrow.utcnow()
if end_date < now.replace(years=+1):
return 1
elif end_date < now.replace(years=+2):
return 2
elif end_date < now.replace(years=+3):
return 3
raise Exception("DigiCert issued certificates cannot exceed three"
" years in validity")
def _run_real_time(self, simulate_orders=True, user_id=None, auth_aliases=None):
self.log.notice("Running live trading, simulating orders: {}".format(simulate_orders))
if self.trading_info["DATA_FREQ"] != "minute":
self.log.warn('"daily" data frequency is not supported in live mode, using "minute"')
self.trading_info["DATA_FREQ"] = "minute"
# start = arrow.get(self.trading_info["START"], 'YYYY-M-D')
end = arrow.get(self.trading_info["END"])
if end < arrow.utcnow().floor("minute"):
self.log.warning(f"End Date: {end} is invalid, will use ")
# self.log.warning("Specified end date is invalid, will use 3 days from today")
self.log.warning("Will use 30 minutes from now")
end = arrow.utcnow().shift(minutes=+30)
self.trading_info["END"] = end.format("YYYY-M-D-H-MM")
# self.log.notice(f'Starting Strategy {start.humanize()} -- {start}')
self.log.notice(f"Stopping strategy {end.humanize()} -- {end}")
# catalyst loads state before init called
# so need to fetch state before algorithm starts
if outputs.load_state_from_storage(self):
self.log.info(f"Resuming strategy with saved state")
run_algorithm(
capital_base=self.trading_info["CAPITAL_BASE"],
initialize=self._init_func,
handle_data=self._process_data,
analyze=self._analyze,
exchange_name=self.trading_info["EXCHANGE"],
def __init__(self, item):
self.item = item
self.created = arrow.utcnow()
def delete(self, url, **kwargs):
kwargs = self._set_defaults(**kwargs)
start_time = arrow.utcnow()
try:
return self._request_api.delete(url, **kwargs)
finally:
self._update_wait_time(start_time, arrow.utcnow())
def _flush_cache(self):
if arrow.utcnow().timestamp > self._cache_check_next:
self._cache = {}
self._cache_check_next = arrow.utcnow().timestamp + TOKEN_CACHE_DELAY
:param guild_name:
:type guild_name:
:param incidents:
:type incidents:
:param modifier:
:type modifier:
:return:
:rtype:
"""
if not os.path.exists('cache'):
os.makedirs('cache')
file_name = f'{guild_name} Incidents.txt'
with open(f'cache/{file_name}', 'w', encoding='utf-8') as export_file:
info_lines = f'Server: {guild_name}\n'
info_lines += f'Incidents: {len(incidents)} [{modifier}]\n'
info_lines += f'Date: {arrow.utcnow().format("DD. MMMM YYYY HH:mm:ss")} UTC\n'
info_lines += f'{"=" * 40}\n\n'
export_file.write(info_lines)
export_file.write('\n'.join([incident.to_text() for incident in incidents]))
return file_name