Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def fetch_price(zone_key, session=None, target_datetime=None,
logger=logging.getLogger(__name__)):
if target_datetime:
now = arrow.get(target_datetime, tz='Europe/Paris')
else:
now = arrow.now(tz='Europe/Paris')
r = session or requests.session()
formatted_from = now.shift(days=-1).format('DD/MM/YYYY')
formatted_to = now.format('DD/MM/YYYY')
url = 'http://www.rte-france.com/getEco2MixXml.php?type=donneesMarche&da' \
'teDeb={}&dateFin={}&mode=NORM'.format(formatted_from, formatted_to)
response = r.get(url)
obj = ET.fromstring(response.content)
datas = {}
for donnesMarche in obj:
if donnesMarche.tag != 'donneesMarche':
continue
start_date = arrow.get(arrow.get(donnesMarche.attrib['date']).datetime, 'Europe/Paris')
def parse_sw():
for i in range(0, 4):
# print i
now = arrow.now()
# print now
# print now.weekday()
week_day = now-timedelta(i)
day = week_day.format('YYYYMMDD')
sw = parse_sw_with_day(day)
if sw is not None:
return sw
def initialize_state(reddit_state):
"""
Sets up the reddit state
:param reddit_state: dictionary holding reddit settings
:return: none
"""
check_for_existence('time_to_save', reddit_state,
arrow.now().replace(hours=0))
check_for_existence('max_score', reddit_state, 0)
check_for_existence('gilded_skip', reddit_state, 0)
check_for_existence('whitelisted_comments', reddit_state, {})
check_for_existence('whitelisted_posts', reddit_state, {})
check_for_existence('scheduled_time', reddit_state, 0)
check_for_existence('refresh_token', reddit_state, '')
check_for_existence('reddit_username', reddit_state, '')
check_for_existence('reddit_password', reddit_state, '')
check_for_existence('reddit_client_id', reddit_state, '')
check_for_existence('reddit_client_secret', reddit_state, '')
reddit_state['scheduler_bool'] = 0
reddit_state['whitelist_window_open'] = 0
reddit_state['confirmation_window_open'] = 0
reddit_state.sync
def main():
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-sd', '--start_date', type=str,
default='1990-01-01', help='Start date parameter value - format YYYY-MM-DD')
parser.add_argument('-ed', '--end_date', type=str,
default=arrow.now().format('YYYY-MM-DD'), help='End date parameter - format YYYY-MM-DD')
parser.add_argument('-t', '--ticker', nargs='+',
help=' Set flag', required=True)
parser.add_argument(
'-s', '--source', help=' set source', required=True)
parser.add_argument('-a', '--attempt',
help='set max attempt to download', default=10)
parser.add_argument(
'-e', '--exist', help='check exist stock history file', default=False)
parser.add_argument('-p', '--prefix', help='add prefix in output name')
args = parser.parse_args()
# # fetch all data
prefix_name = ""
# make sure output folder is exist
if not os.path.isdir("../stockdatas"):
os.mkdir("../stockdatas")
def reschedule(bot, trigger):
"""Allows admins to set stream times on the fly
"""
args = trigger.group(2)
if (args):
pTime,flag = dateParser.parseDT(args, sourceTime=getStartOfDay()) # use beginning of today as the source day to ensure DT returned.
#Anything not explicitly dated will go to next occurance of that date
#so check if the date given was pushed more than 6 months in the future
#if it was assume the issuer ment the past. Will break setting dates manually with years.
pTime = arrow.get(pTime, defaultTz) # avoid python AWFUL datetimes.
if (pTime > arrow.now(defaultTz).replace(months=+6)):
pTime = pTime.replace(years=-1)
if (flag == 1):
# parsed as a date, so we can't really do anything with it. Just print the schedule for that day.
streams = getStreamsOnDay(pTime)
if (len(streams) > 0):
stream = streams[0];
tense = "should air"
if (stream.getEnd() < arrow.now()):
tense = "should have aired"
bot.say("@%s: The stream %s %s" % (trigger.nick, tense, stream.start.strftime("%b %d %I:%M %p %Z")))
else:
bot.say("@%s: No stream scheduled for %s" % (trigger.nick, pTime.strftime("%b %d")))
return
if (flag == 2):
def __time_point(self, days):
today = arrow.now()
point_date = today.shift(days=days)
point_date = datetime.datetime(
point_date.year, point_date.month, point_date.day
)
return point_date - datetime.timedelta(hours=9)
def get_post_from_url(post_url):
with SetupBrowserEnvironment() as browser:
now_datetime = arrow.now('US/Pacific')
try:
instagram_post = InstagramPost(browser, post_url)
instagram_post.extract_post_info()
post_stats = {
'username': instagram_post.username,
'post_url': post_url,
'likes': instagram_post.likes,
'views': instagram_post.views,
'caption': instagram_post.caption,
'checked_date': now_datetime.format('MM-DD-YYYY'),
'checked_time': now_datetime.format('hh:mm:ss A'),
'still_up': True
}
except NoInstaPostPageFound:
self.reset()
return
# get all instrument
for i in range(self.RETRY_TIME):
logging.info('TRY ({}) get instrument'.format(i))
instrument = self.trader.ReqQryInstrument()
if instrument is not False:
break
else:
logging.error('get instrument FAILED!')
self.reset()
return
# check whether today is tradingday
self.today = arrow.now().format('YYYYMMDD')
self.tradingday = self.trader.GetTradingDay().decode('gb2312')
logging.info('today: {}, tradingday: {}'.format(
self.today, self.tradingday
))
if self.today != self.tradingday:
logging.info('today({}) is not tradingday({})!'.format(
self.today, self.tradingday
))
self.reset()
return
# create market spi
for i in range(self.RETRY_TIME):
logging.info('TRY ({}) times market login'.format(i))
if self.marketLogin():
break
response = r.post(url, data=body, headers=headers)
root = ET.fromstring(response.content)
timestamp = root[0][0][0].text
key = str(int(float(timestamp[5:10])/ 1.307000))
url = 'https://demanda.ree.es/WSVisionaV01/wsDemanda30Service'
body = """
%s
%s
""" % (arrow.now(tz='Europe/Madrid').format('YYYY-MM-DD'), key)
response = r.post(url, data=body, headers=headers)
root = ET.fromstring(response.content)
data = None
for data in root[0][0][0].getchildren(): pass
parsed = {}
for item in data.getchildren():
parsed[item.tag] = item.text
obj = {
'countryCode': COUNTRY_CODE,
'datetime': arrow.get(arrow.get(parsed['timeStamp']).datetime,
'Europe/Madrid').datetime # We receive the time in local time
}
obj['consumption'] = {
def updateState(self, kwargs=None):
self.log('loading data from ical feed')
data = requests.get(self.feed).text
ical = ics.Calendar(data)
self.log('ical data loaded')
now = arrow.now()
future = arrow.now().replace(days=self.max_days)
events = [{
'name': e.name,
'location': e.location,
'begin': e.begin.isoformat(),
'end': e.end.isoformat(),
} for e in ical.events if now < e.begin < future][:self.max_events]
self.set_app_state(self.entity, {
'state': "",
'attributes': events
})