Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_release(release, with_csrf_header=False):
release_row = _get_release(release)
if not release_row:
return problem(404, "Not Found", "Release name: %s not found" % release)
headers = {"X-Data-Version": release_row["data_version"]}
if with_csrf_header:
headers.update(get_csrf_headers())
if request.args.get("pretty"):
indent = 4
separators = (",", ": ")
else:
indent = None
separators = None
# separators set manually due to https://bugs.python.org/issue16333 affecting Python 2
return Response(
response=json.dumps(release_row["data"], indent=indent, separators=separators, sort_keys=True), mimetype="application/json", headers=headers
)
def project_not_found_error(project_id):
"""return a Connexion error object when a project is not found"""
return connexion.problem(
status=404,
title='Project not found',
detail="Project '{}' not found".format(project_id))
def _bad_request(detail):
return connexion.problem(400, 'Bad Request', detail)
def schedule_deletion(sc_emergency_shutoff, changed_by, transaction):
if "csrf_token" in sc_emergency_shutoff:
del sc_emergency_shutoff["csrf_token"]
change_type = sc_emergency_shutoff.get("change_type")
if change_type != "delete":
return problem(400, "Bad Request", "Invalid or missing change_type")
view = ScheduledChangesView("emergency_shutoff", dbo.emergencyShutoffs)
return view._post(sc_emergency_shutoff, transaction, changed_by, change_type)
def get_rule(id_or_alias, with_csrf_headers=False):
rule = dbo.rules.getRule(id_or_alias)
if not rule:
return problem(status=404, title="Not Found", detail="Requested rule wasn't found", ext={"exception": "Requested rule does not exist"})
headers = {"X-Data-Version": rule["data_version"]}
if with_csrf_headers:
headers.update(get_csrf_headers())
return Response(response=json.dumps(rule), mimetype="application/json", headers=headers)
def bouncer(endpoint, *args, **kwargs):
"""
Checks if the user making request is in the predefined list of
allowed users or match the defined username parttern.
"""
config = Configuration()
if not hasattr(connexion.request, 'user'):
logger.debug('User not found in the request',
extra={'allowed_users': config.allowed_users})
return connexion.problem(403, 'Forbidden',
'Anonymous access is not allowed '
'in this endpoint')
if config.allowed_users is not None:
logger.debug('Checking if user is allowed',
extra={'user': connexion.request.user,
'allowed_users': config.allowed_users})
if connexion.request.user not in config.allowed_users:
return connexion.problem(403, 'Forbidden',
'User is not allowed to access '
'this endpoint')
if config.allowed_user_pattern is not None:
logger.debug('Checking if user pattern is allowed',
extra={
'user': connexion.request.user,
def get_release_single_locale(release, platform, locale, with_csrf_header=False):
try:
locale = dbo.releases.getLocale(release, platform, locale)
except KeyError as e:
return problem(404, "Not Found", json.dumps(e.args))
data_version = dbo.releases.getReleases(name=release)[0]["data_version"]
headers = {"X-Data-Version": data_version}
if with_csrf_header:
headers.update(get_csrf_headers())
return Response(response=json.dumps(locale), mimetype="application/json", headers=headers)
def get_health():
try:
r.ping()
except Exception:
return connexion.problem(503, "Service Unavailable", "Unhealthy")
else:
return "Healthy"
def get_by_id(product, channel):
shutoffs = dbo.emergencyShutoffs.select(where=dict(product=product, channel=channel))
if not shutoffs:
return problem(status=404, title="Not Found", detail="Requested emergency shutoff wasn't found", ext={"exception": "Requested shutoff does not exist"})
headers = {"X-Data-Version": shutoffs[0]["data_version"]}
return Response(response=json.dumps(shutoffs[0]), headers=headers, mimetype="application/json")