Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_from_format(text, fmt, expected, now):
if now is None:
now = pendulum.datetime(2015, 11, 12)
else:
now = pendulum.parse(now)
# Python 2.7 loses precision for x timestamps
# so we don't test
if fmt == "x" and PY2:
return
with pendulum.test(now):
assert pendulum.from_format(text, fmt).isoformat() == expected
def test_from_format(text, fmt, expected):
with pendulum.test(pendulum.create(2015, 11, 12)):
assert pendulum.from_format(text, fmt, formatter='alternative').isoformat() == expected
# This will be the length of the time slice
timeString = filename.split(".")
return int(timeString[2]) - int(timeString[1])
elif filename.count(".") == 1:
# Unprocessed filename
splitFilename = filename.replace(".root", "").split("_")
timeString = "_".join(splitFilename[3:])
else:
# Processed filename
timeString = filename.split(".")[1]
# The timestamp format is the same for unprocessed and processed filenames, so once the
# timeString is extracted, we can handle both the same.
# Extract the time string and then convert it to a pendulum datetime object. Sicne it was
# recorded in Geneva, we create it in that timezone so we can convert it to UTC.
timeStamp = pendulum.from_format(timeString, "YYYY_MM_DD_HH_mm_ss", tz = "Europe/Zurich")
# This timestamp is unix time in UTC.
return int(timeStamp.timestamp())
def extract_datetime(path_to_dt, full_path, tz="UTC"):
string_to_match = f"(?:{path_to_dt}/)([0-9]+-[0-9]+-[0-9]+)/([0-9]+:[0-9]+:[0-9]+)"
dateinfo = re.match(string_to_match, full_path).groups()
return pendulum.from_format(
dateinfo[0] + "T" + dateinfo[1], "YYYY-MM-DDTHH:mm:ss", tz=tz
)
def main(command, datetime=None):
ctx = Context()
if datetime is None:
datetime = pendulum.now(ctx.config["TIMEZONE"])
else:
try:
datetime = pendulum.from_format(datetime, 'YYYY-MM-DD', tz=ctx.config["TIMEZONE"])
ctx.logger(f"Running in date override mode with date {datetime.format('YYYY-MM-DD')}.")
ctx.logger("This is an experimental feature and may generate incorrect data.")
except:
ctx.logger("Failed to parse custom datetime argument")
return 1
try:
outcome = ACTIONS[command](ctx, datetime)
except Exception as exc:
ctx.logger(exc)
traceback.print_tb(exc.__traceback__)
ctx.logger(f"{command} failed")
return 1
if outcome is 0:
ctx.logger(f"{command} completed successfully")
return 0
def fetch_records(self, url: furl) -> Iterator[Tuple[str, Union[str, dict, bytes], pendulum.Pendulum]]:
while True:
logger.info('Fetching %s', url.url)
records = self.requests.get(url.url).json()['response'].get('award', [])
for record in records:
yield (record['id'], record, pendulum.from_format(record['date'], '%m/%d/%Y'))
if len(records) < PAGE_SIZE:
break
url.args['offset'] += PAGE_SIZE
lambda row: pd.to_datetime(
pendulum.from_format(str(row), "YYYYMMDD").format("YYYY-MM-DD")
)
parser.add_argument('--until', type=lambda d: pendulum.from_format(d, '%Y-%m-%d'), help='Only consider works modified on or before this date')