Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_headers(headers):
header_dict = {}
# Get updated header
if 'HTTP_UPDATED' in headers:
try:
header_dict['updated'] = parse_datetime(
headers.pop('HTTP_UPDATED'))
except (Exception, ISO8601Error):
raise ParamError(
"Updated header was not a valid ISO8601 timestamp")
elif 'updated' in headers:
try:
header_dict['updated'] = parse_datetime(headers.pop('updated'))
except (Exception, ISO8601Error):
raise ParamError(
"Updated header was not a valid ISO8601 timestamp")
# Get content type header
header_dict['CONTENT_TYPE'] = headers.pop('CONTENT_TYPE', None)
if not header_dict['CONTENT_TYPE'] and 'Content-Type' in headers:
header_dict['CONTENT_TYPE'] = headers.pop('Content-Type')
# Could not exist with deletes
if header_dict['CONTENT_TYPE']:
# FireFox automatically adds ;charset=foo to the end of headers. This
# will strip it out
if ';' in header_dict['CONTENT_TYPE'] and 'boundary' not in header_dict['CONTENT_TYPE']:
header_dict['CONTENT_TYPE'] = header_dict['CONTENT_TYPE'].split(';')[
0]
def validate_substatement(self, substmt):
# Ensure incoming substmt is a dict and check allowed and required
# fields
self.check_if_dict(substmt, "SubStatement")
self.check_allowed_fields(sub_allowed_fields, substmt, "SubStatement")
self.check_required_fields(
sub_required_fields, substmt, "SubStatement")
# If timestamp is included, ensure a valid time can be parsed
if 'timestamp' in substmt:
timestamp = substmt['timestamp']
try:
parse_datetime(timestamp)
# Reject statements that don't comply with ISO 8601 offsets
if timestamp.endswith("-00") or timestamp.endswith("-0000") or timestamp.endswith("-00:00"):
self.return_error(
"Timestamp error - Substatement Timestamp Illegal offset (-00, -0000, or -00:00) %s" % timestamp)
except Exception as e:
self.return_error(
"Timestamp error - There was an error while parsing the date from %s -- Error: %s" % (timestamp, e.message))
# Can't next substmts in other substmts - if not supplied it is an
# Activity
if 'objectType' in substmt['object']:
if substmt['object']['objectType'] == 'SubStatement':
self.return_error(
"Cannot nest a SubStatement inside of another SubStatement")
validator = StatementValidator()
if 'agent' in req_dict['params']:
try:
agent = json.loads(req_dict['params']['agent'])
req_dict['params']['agent'] = agent
except Exception:
raise ParamError("agent param for agent profile is not valid")
validator.validate_agent(agent, "agent param for agent profile")
else:
err_msg = "Error -- agent_profile - method = %s, but agent parameter missing.." % req_dict[
'method']
raise ParamError(err_msg)
if 'since' in req_dict['params']:
try:
parse_datetime(req_dict['params']['since'])
except (Exception, ISO8601Error):
raise ParamError(
"Since parameter was not a valid ISO8601 timestamp")
# Extra validation if oauth
if req_dict['auth']['type'] == 'oauth':
validate_oauth_for_documents(req_dict, "agent profile")
return req_dict
implementation ignores all those restrictions and returns something when it is
able to find all necessary components.
In detail:
it does not check, whether only the last component has fractions.
it allows weeks specified with all other combinations
The alternative format does not support durations with years, months or days
set to 0.
"""
if not isinstance(datestring, basestring):
raise TypeError("Expecting a string %r" % datestring)
match = ISO8601_PERIOD_REGEX.match(datestring)
if not match:
# try alternative format:
if datestring.startswith("P"):
durdt = parse_datetime(datestring[1:])
if durdt.year != 0 or durdt.month != 0:
# create Duration
ret = Duration(days=durdt.day, seconds=durdt.second,
microseconds=durdt.microsecond,
minutes=durdt.minute, hours=durdt.hour,
months=durdt.month, years=durdt.year)
else: # FIXME: currently not possible in alternative format
# create timedelta
ret = timedelta(days=durdt.day, seconds=durdt.second,
microseconds=durdt.microsecond,
minutes=durdt.minute, hours=durdt.hour)
return ret
raise ISO8601Error("Unable to parse duration string %r" % datestring)
groups = match.groupdict()
for key, val in groups.items():
if key not in ('separator', 'sign'):
version_regex = re.compile("^1\.0(\.\d+)?$")
if not version_regex.match(stmt['version']):
self.return_error(
"%s is not a supported version" % stmt['version'])
else:
self.return_error("Version must be a string")
# If id included, make sure it is a valid UUID
if 'id' in stmt:
self.validate_uuid(stmt['id'], 'Statement id')
# If timestamp included, make sure a valid date can be parsed from it
if 'timestamp' in stmt:
timestamp = stmt['timestamp']
try:
parse_datetime(timestamp)
# Reject statements that don't comply with ISO 8601 offsets
if timestamp.endswith("-00") or timestamp.endswith("-0000") or timestamp.endswith("-00:00"):
self.return_error(
"Timestamp error - Statement Timestamp Illegal offset (-00, -0000, or -00:00) %s" % timestamp)
except Exception as e:
self.return_error(
"Timestamp error - There was an error while parsing the date from %s -- Error: %s" % (timestamp, e.message))
# If stored included, make sure a valid date can be parsed from it
if 'stored' in stmt:
stored = stmt['stored']
try:
parse_datetime(stored)
except Exception as e:
formats = ['exact', 'canonical', 'ids']
if 'format' in req_dict['params']:
if req_dict['params']['format'] not in formats:
raise ParamError("The format filter value (%s) was not one of the known values: %s" % (
req_dict['params']['format'], ','.join(formats)))
else:
req_dict['params']['format'] = 'exact'
# StatementId could be for voided statement as well
if 'statementId' in req_dict['params'] or 'voidedStatementId' in req_dict['params']:
req_dict['statementId'] = validate_statementId(req_dict)
if 'since' in req_dict['params']:
try:
parse_datetime(req_dict['params']['since'])
except (Exception, ISO8601Error):
raise ParamError(
"since parameter was not a valid ISO8601 timestamp")
if 'until' in req_dict['params']:
try:
parse_datetime(req_dict['params']['until'])
except (Exception, ISO8601Error):
raise ParamError(
"until parameter was not a valid ISO8601 timestamp")
if 'ascending' in req_dict['params']:
if req_dict['params']['ascending'].lower() == 'true':
req_dict['params']['ascending'] = True
elif req_dict['params']['ascending'].lower() == 'false':
req_dict['params']['ascending'] = False
formats = ['exact', 'canonical', 'ids']
if 'params' in req_dict and 'format' in req_dict['params']:
if req_dict['params']['format'] not in formats:
raise ParamError("The format filter value (%s) was not one of the known values: %s" % (
req_dict['params']['format'], ','.join(formats)))
else:
req_dict['params']['format'] = 'exact'
# StatementId could be for voided statement as well
if 'params' in req_dict and ('statementId' in req_dict['params'] or 'voidedStatementId' in req_dict['params']):
req_dict['statementId'] = validate_statementId(req_dict)
if 'since' in req_dict['params']:
try:
parse_datetime(req_dict['params']['since'])
except (Exception, ISO8601Error):
raise ParamError(
"Since parameter was not a valid ISO8601 timestamp")
if 'until' in req_dict['params']:
try:
parse_datetime(req_dict['params']['until'])
except (Exception, ISO8601Error):
raise ParamError(
"Until parameter was not a valid ISO8601 timestamp")
if 'ascending' in req_dict['params']:
if req_dict['params']['ascending'].lower() == 'true':
req_dict['params']['ascending'] = True
else:
req_dict['params']['ascending'] = False
def get_headers(headers):
header_dict = {}
# Get updated header
if 'HTTP_UPDATED' in headers:
try:
header_dict['updated'] = parse_datetime(
headers.pop('HTTP_UPDATED'))
except (Exception, ISO8601Error):
raise ParamError(
"Updated header was not a valid ISO8601 timestamp")
elif 'updated' in headers:
try:
header_dict['updated'] = parse_datetime(headers.pop('updated'))
except (Exception, ISO8601Error):
raise ParamError(
"Updated header was not a valid ISO8601 timestamp")
# Get content type header
header_dict['CONTENT_TYPE'] = headers.pop('CONTENT_TYPE', None)
if not header_dict['CONTENT_TYPE'] and 'Content-Type' in headers:
header_dict['CONTENT_TYPE'] = headers.pop('Content-Type')
# Could not exist with deletes