Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
try:
logger.debug(f"""Process attached file {v}""")
sp = v.filename.split("/")
fn = []
for p in sp:
if p not in ("", ".", ".."):
fn.append(secure_filename(p))
dest = path.join(tempdir, *fn)
if not path.isdir(path.dirname(dest)):
makedirs(path.dirname(dest))
logger.debug(f"""Save {v.filename} to {dest}""")
v.save(dest)
except Exception as err:
raise ValueError(f"""Failed to process attached file {v}, {err}""")
body = {}
for k, ls in iterlists(connexion.request.form):
logger.debug(f"""Process form parameter {k}""")
for v in ls:
try:
if not v:
continue
if k == "workflow_params":
job_file = path.join(tempdir, "job.json")
with open(job_file, "w") as f:
json.dump(json.loads(v), f, indent=4)
logger.debug(f"""Save job file to {job_file}""")
loader = Loader(load.jobloaderctx.copy())
job_order_object, _ = loader.resolve_ref(job_file, checklinks=False)
body[k] = job_order_object
else:
body[k] = v
except Exception as err:
def list_policies(detail=None):
request_inputs = anchore_engine.apis.do_request_prep(request, default_params={'detail': False})
user_auth = request_inputs['auth']
bodycontent = request_inputs['bodycontent']
params = request_inputs['params']
return_object = []
httpcode = 500
userId = request_inputs['userId']
try:
logger.debug('Listing policies')
client = internal_client_for(CatalogClient, request_inputs['userId'])
try:
policy_records = client.list_policies()
httpcode = 200
except Exception as err:
logger.warn("unable to get policy_records for user (" + str(userId) + ") - exception: " + str(err))
def get_release(release, with_csrf_header=False):
release = _get_release(release)
if not release:
return problem(404, "Not Found", "Release name: %s not found" % release)
headers = {"X-Data-Version": release["data_version"]}
if with_csrf_header:
headers.update(get_csrf_headers())
if request.args.get("pretty"):
indent = 4
separators = (",", ": ")
else:
indent = None
separators = None
# separators set manually due to https://bugs.python.org/issue16333 affecting Python 2
return Response(response=json.dumps(release["data"], indent=indent, separators=separators, sort_keys=True), mimetype="application/json", headers=headers)
def _get_filters(self):
query = get_input_dict()
where = [getattr(self.table.history, f) == query.get(f) for f in query]
where.append(self.table.history.data_version != null())
request = connexion.request
if hasattr(self.history_table, "channel"):
if request.args.get("channel"):
where.append(self.history_table.channel == request.args.get("channel"))
if hasattr(self.history_table, "product"):
where.append(self.history_table.product != null())
if request.args.get("product"):
where.append(self.history_table.product == request.args.get("product"))
if request.args.get("timestamp_from"):
where.append(self.history_table.timestamp >= int(request.args.get("timestamp_from")))
if request.args.get("timestamp_to"):
where.append(self.history_table.timestamp <= int(request.args.get("timestamp_to")))
return where
def save_attachment(self, attachment, location, exist_ok=False):
if path.isfile(location) and not exist_ok:
raise FileExistsError("[Errno 17] File exists: '" + location + "'")
data = connexion.request.files[attachment]
data.save(location)
def _post(self, transaction, changed_by):
if connexion.request.get_json().get("when", None) is None:
return problem(400, "Bad Request", "'when' cannot be set to null when scheduling a new change " "for a Rule")
if connexion.request.get_json():
change_type = connexion.request.get_json().get("change_type")
else:
change_type = connexion.request.values.get("change_type")
what = {}
delete_change_type_allowed_fields = ["telemetry_product", "telemetry_channel", "telemetry_uptake", "when", "rule_id", "data_version", "change_type"]
for field in connexion.request.get_json():
# TODO: currently UI passes extra rule model fields in change_type == 'delete' request body. Fix it and
# TODO: change the below operation from filter/pop to throw Error when extra fields are passed.
if (
field == "csrf_token"
or (change_type == "insert" and field in ["rule_id", "data_version"])
or (change_type == "delete" and field not in delete_change_type_allowed_fields)
):
continue
if field in ["rule_id", "data_version"]:
what[field] = int(connexion.request.get_json()[field])
else:
def list_imagetags():
try:
request_inputs = anchore_engine.apis.do_request_prep(request, default_params={})
user_auth = request_inputs['auth']
method = request_inputs['method']
bodycontent = request_inputs['bodycontent']
params = request_inputs['params']
return_object = {}
httpcode = 500
client = internal_client_for(CatalogClient, request_inputs['userId'])
return_object = client.get_imagetags()
httpcode = 200
except Exception as err:
httpcode = 500
def update_user(username, user): # noqa: E501
"""Updated user
This can only be done by the logged in user. # noqa: E501
:param username: name that need to be deleted
:type username: str
:param user: Updated user object
:type user: dict | bytes
:rtype: None
"""
if connexion.request.is_json:
user = User.from_dict(connexion.request.get_json()) # noqa: E501
return 'do some magic!'
def post_job(body):
"""
submit a new job
Submit a new job from a workflow definition.
:param body: Input binding for workflow.
:type body: dict | bytes
:rtype: Job
"""
if connexion.request.is_json:
body = JobDescription.from_dict(connexion.request.get_json())
with _job_store:
job_id = _job_store.create_job(
body.name, body.workflow, json.dumps(body.input))
job = _job_store.get_job(job_id)
return _internal_job_to_rest_job(job), 201