Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def pull_out_fields(self, a_charge):
amt = a_charge.group(1)
card = a_charge.group(2)
vendor = a_charge.group(3).strip()
when = a_charge.group(4)
tz = a_charge.group(4).rstrip('.').rsplit(' ', 1)[1]
when = arrow.get(when, 'MMMM DD, YYYY').replace(tzinfo=dateutil.tz.gettz(tz))
return amt, card, vendor, when
def get_runtime(logc):
with logc.open("crunchstat.txt") as in_handle:
tstart = arrow.get(in_handle.readline().split()[0])
for line in in_handle:
last = line
tend = arrow.get(last.split()[0])
return tend - tstart
'geothermal': 0.0,
'unknown': 0.0
},
'storage': {
'hydro': -10.0,
},
'source': 'mysource.com'
}
"""
if target_datetime:
raise NotImplementedError('This parser is not yet able to parse past dates')
# Define actual and previous day (for midnight data).
now = arrow.now(tz=tz_bo)
formatted_date = now.format('YYYY-MM-DD')
past_formatted_date = arrow.get(formatted_date, 'YYYY-MM-DD').shift(days=-1).format(
'YYYY-MM-DD')
# initial path for url to request
url_init = 'http://www.cndc.bo/media/archivos/graf/gene_hora/despacho_diario.php?fechag='
# Start with data for previous day in order to get midnight data.
url = url_init + past_formatted_date
r = session or requests.session()
response = r.get(url)
obj = webparser(response)
data_yesterday = fetch_hourly_production(zone_key, obj, past_formatted_date)
# Now get data for rest of today.
url = url_init + formatted_date
r = session or requests.session()
response = r.get(url)
def _parse_date_value(value):
"""Deserialize a Date object from its proper ISO-8601 representation."""
return arrow.get(value, "YYYY-MM-DD").date()
def utc_strtime( self, utc_time ):
"""
功能:utc转字符串时间
"""
str_time = '%s'%arrow.get(utc_time).to('local')
str_time.replace('T', ' ')
str_time = str_time.replace('T', ' ')
return str_time[:19]
def _build_link_path(symlinks_location, record, filename):
"""Build the symlink path."""
report_number = record['report_number'][0] if \
len(record['report_number']) > 0 else record['report_number']
year = arrow.get(record['date']).year
return os.path.join(symlinks_location, record['type'],
record['category'], str(year), report_number,
filename)
def get_build_types(device, romtype, after, version):
roms = get_device(device)
roms = [x for x in roms if x['type'] == romtype]
for rom in roms:
rom['date'] = arrow.get(rom['date']).datetime
if after:
after = arrow.get(after).datetime
roms = [x for x in roms if x['date'] > after]
if version:
roms = [x for x in roms if x['version'] == version]
data = []
for rom in roms:
data.append({
"id": rom['sha256'],
"url": '{}{}'.format(app.config['DOWNLOAD_BASE_URL'], rom['filepath']),
"romtype": rom['type'],
"datetime": rom['datetime'],
"version": rom['version'],
"filename": rom['filename'],
"size": rom['size'],
})
return -2
elif fake_day <= arrow.get(0001, 1, 1, 4, 0):
return -1
elif fake_day <= arrow.get(0001, 1, 1, 6, 0):
return 0
elif fake_day <= arrow.get(0001, 1, 1, 8, 0):
return 1
elif fake_day <= arrow.get(0001, 1, 1, 10, 0):
return 2
elif fake_day <= arrow.get(0001, 1, 1, 12, 0):
return 3
elif fake_day <= arrow.get(0001, 1, 1, 14, 0):
return 4
elif fake_day <= arrow.get(0001, 1, 1, 16, 0):
return 5
elif fake_day <= arrow.get(0001, 1, 1, 18, 0):
return 6
elif fake_day <= arrow.get(0001, 1, 1, 20, 0):
return -5
elif fake_day <= arrow.get(0001, 1, 1, 22, 0):
return -4
elif fake_day < arrow.get(0001, 1, 1, 23, 59):
return -3
else:
raise ValueError("Problem with time: %s:%s" % hour, minute)
def fetch_solar_all(session, hours_in_the_past=2):
data_url = SOLAR_URL
r = session.post(data_url, {'day': _get_australian_date()})
data = r.json()
if data and 'output' in data and data['output']:
production_data = data['output']
first_timestamp = arrow.get(production_data[0]['ts'])
if (arrow.utcnow() - first_timestamp).total_seconds() >= (hours_in_the_past * 60 * 60):
return production_data
else:
production_data = []
# If we got here, we want to get more data.
# Requesting yesterday's data in the browser sometimes gives an HTTP 406 Unacceptable error,
# but it's always worked in the script so far. Could double check and adjust
# how many hours are fetched if it causes a problem in the future.
data_url = SOLAR_URL
r = session.post(data_url, {'day': _get_australian_date(days_in_past=1)})
data = r.json()
full_production_data = data['output'] + production_data